42¢ glxn.net

Chaos Engineering for Kafka Consumers: Building a Retry Validator

When consuming a Kafka topic—especially when using it as an event source—it’s crucial to ensure your consumer logic is idempotent. To validate this, you can introduce a chaos monkey (inspired by Netflix’s Chaos Monkey) that periodically resets the offsets for each partition, forcing the consumer to retry messages. This approach helps you test and guarantee the resilience and correctness of your consumer code.

Why Idempotency Matters

In event-driven architectures, consumers may receive the same message more than once due to retries, rebalances, or failures. If your consumer is not idempotent, duplicate processing can lead to data corruption or inconsistent state. By simulating offset resets, you can proactively test your consumer’s ability to handle such scenarios.

The Chaos Monkey Mechanism

The core idea is to periodically seek the consumer’s offsets back to the beginning (or another point), making the consumer re-read and reprocess messages. This tests the idempotency and error handling of your consumer logic under repeated delivery.

Example: Coroutine-Based Kafka Consumer in Kotlin

Below is a simplified version of a CoroutineKafkaConsumer class that demonstrates this approach. The consumer can be configured to seek to the beginning of the topic on assignment, or periodically using a PeriodicReplayer. If processing a record fails, the consumer seeks back to the failed offset, pauses the partition, and schedules a retry with exponential backoff. Metrics are tracked using Micrometer.

class CoroutineKafkaConsumer<K, V> private constructor(
    topic: String,
    groupId: String,
    keyDeserializer: Class<*>,
    valueDeserializer: Class<*>,
    replayPeriodically: Boolean = false,
) {
    private val replayer = PeriodicReplayer(
        consumer,
        isBigLeap = { t -> t.hour == 5 && t.minute == 0 },
        isSmallLeap = { t -> t.minute == 0 },
        bigLeap = 4_000,
        smallLeap = 100,
        enabled = replayPeriodically,
    )
    
    suspend fun forEach(
        body: suspend (ConsumerRecord<K, V>) -> Unit
    ) = withContext(kafkaContext) {
        while (isActive && !stop.get() && !Health.terminating) {
            replayer.replayWhenLeap()
            consumer.resume(resumeQueue.pollAll())
            val records = try {
                consumer.poll(Duration.ofMillis(1000))
            } catch (e: Exception) {
                // handle unrecoverable error
                throw e
            }
            if (!records.isEmpty) {
                forEachRecord(records, body)
            }
        }
    }
    // ...
}

class PeriodicReplayer<K, V>(
    private val consumer: Consumer<K, V>,
    private val isBigLeap: Predicate<LocalDateTime>,
    private val isSmallLeap: Predicate<LocalDateTime>,
    private val bigLeap: Long,
    private val smallLeap: Long,
    private val enabled: Boolean,
) {
    private val log = logger()
    private val lastTick = AtomicReference<LocalDateTime>()

    fun replayWhenLeap() {
        if (!enabled) {
            return
        }

        val now = LocalDateTime.now().truncatedTo(ChronoUnit.MINUTES)
        if (now == lastTick.getAndSet(now)) {
            // noop
        } else if (isBigLeap.test(now)) {
            log.info("replaying big leap")
            consumer.replay(bigLeap)

        } else if (isSmallLeap.test(now)) {
            log.info("replaying small leap")
            consumer.replay(smallLeap)
        }
    }
}

/**
 * replays the number of records for all partitions that are not paused
 * by seeking to the current offset - number of records
 */
private fun <K, V> Consumer<K, V>.replay(numberOfRecords: Long) {
    (assignment() - paused()).forEach {
        val currentPosition = position(it)
        val newPosition = (currentPosition - numberOfRecords).coerceAtLeast(0)

        seek(it, newPosition)
    }
}

Key Takeaways

By integrating these patterns into your Kafka consumers, you can build more robust, production-ready event-driven applications.

comments powered by Disqus

Moar stuffs

13 Sep 2024 Having fun and making something useful with GeoLocation and open APIs
11 Oct 2021 Chaos Engineering for Kafka Consumers: Building a Retry Validator
12 Sep 2017 Building an executable WS client using maven and metro
07 Jun 2015 Deploy an Ember app to gh-pages using npm run-script
06 Jun 2015 JSON Contract testing using unit tests to assert full stack integration across REST services
03 May 2015 simple http serve a directory from terminal
07 Jan 2014 civu, a CLI for cloning git repositories from jenkins views
06 Jan 2014 PyramidSort, a Sublime Text plugin for for reformatting text
05 Jan 2014 Git commit-message hook for JIRA issue tags
31 May 2013 hacking kitchen tiles with coffeescript
30 May 2013 Nuke, ps grep kill something
24 May 2013 mvnr: recursive mvn command runner
23 May 2013 Query By Example for JPA
22 May 2013 gitr: recursive git command runner
21 May 2013 Keeping gh-pages branch in sync with master
19 May 2013 Migrated from wordpress to jekyll and github pages
14 Aug 2012 Using Sublime Text 2 as git commit message editor
10 Mar 2012 QRGen, a small wrapper on top of ZXING for generating QRCodes in java
04 Jan 2012 My Bash PS1 with git branch info
17 Aug 2010 Making a swing project using IntelliJ IDEA GUI builder with maven, Including executable jar
01 May 2010 Using Arquillian to test against a remote jboss container from within IDEA
06 Apr 2010 WELD/CDI lightningtalk from Know IT 2010 annual conference
03 Apr 2010 Solving Sudoku using java swing and junit
01 Mar 2010 Simple CDI/WELD login example
01 Mar 2010 Implementing @RequestParam in CDI/WELD using Qualifier and InjectionPoint as @HttpParam
01 Nov 2009 Seam Maven Refimpl