Exactly Once Data Processing with Amazon Kinesis and Spark Streaming
[et_pb_section admin_label=”section”][et_pb_row admin_label=”row”][et_pb_column type=”4_4″][et_pb_text admin_label=”Text” background_layout=”light” text_orientation=”left” text_font_size=”16″ use_border_color=”off” border_color=”#ffffff” border_style=”solid”]
April 25th, 2017
The Kinesis Client Library provides convenient abstractions for interacting with Amazon Kinesis. Consumer checkpoints are automatically tracked in DynamoDB (Kinesis checkpointing) and it’s easy to spawn workers to consume data from each shard (Kinesis term for a partition) in parallel. For those unfamiliar with checkpointing in streaming applications, it is the process of tracking which messages have been successfully read from the stream.
Spark Streaming implements a receiver using the Kinesis Client Library to read messages from Kinesis. Spark also provides a utility called checkpointing (Spark checkpointing; not to be confused with Kinesis checkpointing in DynamoDB) which helps make applications fault-tolerant. Using Spark checkpointing in combination with Kinesis checkpointing provides at-least-once semantics.
When we tried to implement the recommended solution using Spark checkpointing, it was very difficult to develop any code without breaking our checkpoints. When Spark saves checkpoints, it serializes the classes which define the transformations and then uses that to restart a stopped stream. If you then change the structure of one of the transformation classes, checkpoints become invalid and cannot be used for recovery. (There are ways to make code changes without breaking your application’s checkpoints, however in my opinion they add unnecessary complexity and risk to the development process as cited in this example). This challenge, in combination with a sub-optimal at-least-once guarantee, led us to abandon Spark checkpointing to pursue a simpler, albeit somewhat hacky, alternative.
Every message sent to Kinesis is given a partition key. The partition key determines the shard to which the message is assigned. Once a message is assigned to a shard, it is given a sequence number. Sequence numbers within a shard are unique and increase over time. (If the producer is leveraging message aggregation, it is possible for multiple consumed messages to have the same sequence number)
When starting up a Spark Streaming Kinesis application, there are two positions you can start consuming from the stream: InitialPositionInStream.LATEST will consume any messages received while your application is running, and InitialPositionInStream.TRIM_HORIZON which will consume from the oldest message in the streams retention window (24hr by default). The subtlety here is that if a kinesis-checkpoint table is present, the stream will start consuming from the existing offsets even when TRIM_HORIZON is specified.
Setting the state of DynamoDB before starting the streaming application will ensure the KCL workers start consuming from the last successfully saved offset. To do that we’ll need the shard-id and the sequence number of the last message successfully persisted from that shard. There are a couple ways offset information could be maintained. We opted to persist offsets as part of the message itself on HDFS. This approach has a small amount of disk overhead and doesn’t require additional external dependencies. A similar approach using zookeeper is described here.
The record processor used by the KCL workers is a function (Record => T). We can cast Record to a UserRecord which provides the partition key, sequence number, and sub-sequence number (necessary when a producer uses message aggregation)
The new problem is that we have no way to know which shard on the stream a message came from. The KCL assigns shards to the workers under the covers and the shard-id is not exposed. Luckily, we can use the partition key to determine the shard-id.
Now that each record is explicitly linked with the shard to which it was assigned, we can persist this information on HDFS along with the message.
In the event of a failure, we load the max saved sequence number per shard…
and then set the state of DynamoDB before restarting the stream so that the application will start consuming at the latest offset.
The major benefit of this approach is that data from Kinesis can be processed exactly-once using spark streaming. This process could be easily modified to work with Kafka as well. If you’re interested in data solutions like this one, follow us @b23llc and visit us at http://www.b23.io/.
About the Author: David Kegley is a Data Engineer with B23 LLC working with our clients to provide scalable cloud data solutions. He has a B.S. in Computer Science from James Madison University and enjoys listening to live music and spending time outdoors in his free time.