Replaying Kafka Messages — An Implementation in Golang

Pranay Singhal
8 min readJul 24, 2019
Image Source: Forbes

Apache Kafka is widely used in event-driven architectures for asynchronous, messaging-based integration between applications and services. Most use-cases deal with the consumption of messages as they are being produced. Once a message is successfully processed, consumers move on to newer messages, and don’t have to deal with the old ones. However, there are certain situations in which revisiting older messages and “replaying” them might become necessary. For example, a bug in the consumer might require deploying a newer version, and re-processing some or all of the messages. In another scenario, some part of the consumed data from messages may get lost, and the only recourse to recovery might be to re-process those messages for which data was lost. As you might know, Kafka messages are persistent and durable, which allows for messages to be replayed . Let’s take a look at some of these “replay” scenarios, and explore an implementation in Go of a potential solution for performing Kafka message replays.

First, some basics

Before we dive into the details of the replay implementation, it might help to revisit some fundamental Kafka concepts that form the building blocks of the solution.

Figure 1: Topics, Partitions and Consumer Groups

Figure 1 shows a Kafka topic with 5 partitions. Messages published to this topic are distributed across the partitions. Each partition contains messages with ordered offsets, with each message represented by a unique offset. The oldest message in a partition appears at the earliest offset, whereas the most recent message appears at the latest offset. Messages are retained in each partition according to the retention period for the topic. Messages older than the retention period are periodically removed from the topic. Each topic can have one or more consumers. Consumers are typically part of a consumer group. Each partition is assigned to one, and only one consumer. If there are fewer consumers than partitions, some consumers are assigned multiple partitions. Every time a consumer joins or leaves the group, the partitions are re-balanced across the consumer group. The Kafka broker keeps track of the committed offsets for partitions for each consumer group, thus keeping track of which messages from a topic have been consumed by a consumer group. Each topic can have multiple consumer groups, with separate tracking of their committed offsets. Thus, each consumer group can be in a different state of message consumption from the same topic.

As you will shortly see, all of the above concepts have some bearing on the design and behavior of our replay implementation.

A simple replay strategy

The committed offsets for each consumer group of a topic is separate and independent from other consumer groups. Also, the committed offsets are initially unset. Given this, a simple replay strategy could be as follows:

  1. Create a new consumer group (with a new, unique group ID) of consumers.
  2. Bring up this consumer group and attach it to the topic that contains the messages to be replayed.
  3. The consumers in the consumer group will start consuming messages from the earliest offsets in the partitions, effective replaying the messages.

For the simplest of use-cases, this might actually be a reasonable and practical replay strategy. However, this approach suffers from a major drawback: it replays ALL of the retained messages in the topic. Depending on the retention period for the topic, this might be a much bigger message volume than what you need to replay. What you might need is a way to be able to control the point from which the messages are replayed.

Controlled replay

In most practical scenarios, you will want to control the point of time from which messages are replayed. For example, consider the “replay to recover data” scenario. The database where you store the results of message consumption gets corrupted, and the administrator is able to restore it back to the point of the last backup. In order to recover the remaining data, you will need to replay the messages from the point in time when the last backup was taken.

Fortunately, Kafka (starting with version 0.10.1.0) allows you to search message offsets by timestamp. Most language implementations of Kafka API provide corresponding functions/API to perform such a search. Using this API, you can program your consumer to determine the offset of the message that appeared in its assigned partition at or immediately after a certain point of time. When configured for replaying, the consumer can thus determine the offset for a given timestamp, and simply flag it as the “committed” offset. Once this is done, the consumer will start consuming messages from that point onwards, until the the message at the latest offset is consumed. This effectively achieves the “rewind and replay” objective.

Implementation

Let’s now look at a concrete implementation of a consumer that can be configured and deployed to replay messages. At the core of this replay consumer is it’s ability to rewind the committed message offsets. Following is a high-level outline of the steps it performs when it detects an event to assign/reassign partitions:

  1. Determine the assigned partitions.
  2. For each assigned partition, determine the offset that corresponds to the supplied timestamp.
  3. Reset the committed offsets for each assigned partition.

The following Golang code snippet illustrates the implementation of this offset rewinding logic (complete source code is available in the Github repo referenced at the end of this article):

case kafka.AssignedPartitions:
partitionsToAssign := e.Partitions
if len(partitionsToAssign) == 0 {
log.Printf("No partitions assigned\n")
continue
}

log.Printf("Assigned/Re-assigned Partitions: %s\n", getPartitionNumbers(partitionsToAssign))
//if the consumer was launched in replay mode, it needs to figure out which offset to replay from in each assigned partition, and then
//reset the offset to that point for each partition.
if cfg.ReplayMode {
switch cfg.ReplayType {
case replayFromBeginning:
log.Println("Replay from beginning, resetting offsets to beginning")
//reset offsets of all assigned partitions to "beginning"
partitionsToAssign, err = resetPartitionOffsetsToBeginning(c, e.Partitions)
if err != nil {
log.Fatalf("error trying to reset offsets to beginning: %v\n", err)
}
case replayFromTimestamp:
log.Printf("Replay from timestamp %s, resetting offsets to that point\n", cfg.ReplayFrom)
t, err := time.Parse(time.RFC3339Nano, cfg.ReplayFrom)
if err != nil {
log.Fatalf("failed to parse replay timestamp %s due to error %v", cfg.ReplayFrom, err)
}
//reset offsets of all assigned partitions to the specified timestamp in the past
partitionsToAssign, err = resetPartitionOffsetsToTimestamp(c, e.Partitions, t.UnixNano()/int64(time.Millisecond))
if err != nil {
log.Fatalf("error trying to reset offsets to timestamp: %v\n", err)
}
}
}

c.Assign(partitionsToAssign)

This implementation utilizes the Confluent “librdkafka” Kafka library for Golang. As you can see, you can make the Consumer configurable, such that you can control if you want the consumer to run in the replay mode, and if so, whether you want the replay to occur from the earliest retained messages, or from a certain point of time. For the case of time-based replay, the “resetPartitionOffsetsToTimestamp” function determines the appropriate offsets as follows:

func resetPartitionOffsetsToTimestamp(c *kafka.Consumer, partitions []kafka.TopicPartition, timestamp int64) ([]kafka.TopicPartition, error) {
var prs []kafka.TopicPartition
for _, par := range partitions {
prs = append(prs, kafka.TopicPartition{Topic: par.Topic, Partition: par.Partition, Offset: kafka.Offset(timestamp)})
}

updtPars, err := c.OffsetsForTimes(prs, 5000)
if err != nil {
log.Printf("Failed to reset offsets to supplied timestamp due to error: %v\n", err)
return partitions, err
}

return updtPars, nil
}

The “OffetsForTimes” function provided by the librdkafka library performs the task of determining partition offsets corresponding to a given timestamp. Details of this function are available here: https://godoc.org/github.com/confluentinc/confluent-kafka-go/kafka#Consumer.OffsetsForTimes

Consequences of resetting offsets upon partition assignment

We have designed the replay consumer to reset the offsets of its assigned partitions at the time of partition assignment. This makes sense, since it’s through this event that the consumer becomes aware of its assigned partitions. The partition assignment is received as the “kafka.AssignedPartitions” event type. However, this is the same event that is generated when partitions are re-assigned among consumers in a consumer group. Recall that any time a consumer joins or leaves the consumer group, partitions are re-balanced across all active consumers. This has some interesting consequences in the consumer, which must be carefully considered when designing the handling of replayed messages.

When multiple replay consumers are being used, the following behavior would occur:

  1. A consumer instance comes online, gets partitions assigned to it, resets committed message offsets for those partitions, and starts replaying messages from those partitions.
  2. Another consumer comes online, triggering the rebalancing of partitions. Each consumer instance (including the ones that are already online) once again resets the committed offsets of its assigned partitions, and begins replaying messages.

You can easily simulate this behavior using the sample replay implementation. Simply bring up multiple consumer instances in the replay mode, and observe the program output of each instance as you add a new consumer instance.

Depending on the number of consumers and the interval of separation between when they join the group, this would lead to certain messages being replayed multiple times. Given that most production configurations will utilize multiple consumer instances per consumer group, there is high likelihood of some messages being consumed multiple times in replay mode.

Idempotent message consumption — guarding against duplicate replayed messages

So, how do we handle the potential for duplicate consumption of messages during replay? While it is possible to design some sort of stateful tracking of replayed messages to address duplicate messages — such a solution can quickly become too complicated. The simplest and most reliable strategy to handle duplicate messages is to ensure that the message consumption and handling is idempotent. The consumer should be designed such that for each incoming message, it can detect if the effects of that message are already present. If so, it can simply drop that message without any action (other than logging, if needed). How exactly such a check is performed (and whether idempotent message handling is even possible) would depend on the specifics of message consumption. For example, if the messages are used to update a database record, the consumer could check if the database already has updates for the same record that are more recent than the message — and choose to disregard the message if it does.

Regardless of such potential issues resulting from replay, idempotent message handling is a desirable strategy for handling duplicate and out-of-sequence messages.

Replay deployment architecture

Figure 2 below depicts a potential operational deployment architecture for replay.

Figure 2: Operational Architecture for “Replay” Consumers

With this approach, you can keep your replay consumer cluster on standby, separate from your “live” consumers. In the event a replay of messages is needed, the replay cluster can be brought online. With this approach, the replay-based recovery could potentially take place in parallel, while the consumption of real-time messages continues unobstructed. Once replay is completed, the replay consumers can simply be terminated and returned to a dormant state. Standing up and maintaining such a deployment architecture is incredibly simple in a Cloud-native deployment platform such as Kubernetes, where the consumers are deployed as container replicas.

Source code for sample implementation

The source code for the sample implementation is available in the following Github repo: https://github.com/psinghal04/kafka-replay-sample

--

--