When consuming a Kafka topic—especially when using it as an event source—it’s crucial to ensure your consumer logic is idempotent. To validate this, you can introduce a chaos monkey (inspired by Netflix’s Chaos Monkey) that periodically resets the offsets for each partition, forcing the consumer to retry messages. This approach helps you test and guarantee the resilience and correctness of your consumer code.
In event-driven architectures, consumers may receive the same message more than once due to retries, rebalances, or failures. If your consumer is not idempotent, duplicate processing can lead to data corruption or inconsistent state. By simulating offset resets, you can proactively test your consumer’s ability to handle such scenarios.
The core idea is to periodically seek the consumer’s offsets back to the beginning (or another point), making the consumer re-read and reprocess messages. This tests the idempotency and error handling of your consumer logic under repeated delivery.
Below is a simplified version of a CoroutineKafkaConsumer class that demonstrates this approach. The consumer can be configured to seek to the beginning of the topic on assignment, or periodically using a PeriodicReplayer. If processing a record fails, the consumer seeks back to the failed offset, pauses the partition, and schedules a retry with exponential backoff. Metrics are tracked using Micrometer.
class CoroutineKafkaConsumer<K, V> private constructor(
    topic: String,
    groupId: String,
    keyDeserializer: Class<*>,
    valueDeserializer: Class<*>,
    replayPeriodically: Boolean = false,
) {
    private val replayer = PeriodicReplayer(
        consumer,
        isBigLeap = { t -> t.hour == 5 && t.minute == 0 },
        isSmallLeap = { t -> t.minute == 0 },
        bigLeap = 4_000,
        smallLeap = 100,
        enabled = replayPeriodically,
    )
    
    suspend fun forEach(
        body: suspend (ConsumerRecord<K, V>) -> Unit
    ) = withContext(kafkaContext) {
        while (isActive && !stop.get() && !Health.terminating) {
            replayer.replayWhenLeap()
            consumer.resume(resumeQueue.pollAll())
            val records = try {
                consumer.poll(Duration.ofMillis(1000))
            } catch (e: Exception) {
                // handle unrecoverable error
                throw e
            }
            if (!records.isEmpty) {
                forEachRecord(records, body)
            }
        }
    }
    // ...
}
class PeriodicReplayer<K, V>(
    private val consumer: Consumer<K, V>,
    private val isBigLeap: Predicate<LocalDateTime>,
    private val isSmallLeap: Predicate<LocalDateTime>,
    private val bigLeap: Long,
    private val smallLeap: Long,
    private val enabled: Boolean,
) {
    private val log = logger()
    private val lastTick = AtomicReference<LocalDateTime>()
    fun replayWhenLeap() {
        if (!enabled) {
            return
        }
        val now = LocalDateTime.now().truncatedTo(ChronoUnit.MINUTES)
        if (now == lastTick.getAndSet(now)) {
            // noop
        } else if (isBigLeap.test(now)) {
            log.info("replaying big leap")
            consumer.replay(bigLeap)
        } else if (isSmallLeap.test(now)) {
            log.info("replaying small leap")
            consumer.replay(smallLeap)
        }
    }
}
/**
 * replays the number of records for all partitions that are not paused
 * by seeking to the current offset - number of records
 */
private fun <K, V> Consumer<K, V>.replay(numberOfRecords: Long) {
    (assignment() - paused()).forEach {
        val currentPosition = position(it)
        val newPosition = (currentPosition - numberOfRecords).coerceAtLeast(0)
        seek(it, newPosition)
    }
}
By integrating these patterns into your Kafka consumers, you can build more robust, production-ready event-driven applications.