Skip to main content

Batch & Concurrency

This page covers batch consumption, per-topic concurrency (nestjs/nest#12703), rebalance-safe offsets (nestjs/nest#12355), and backpressure.

Batch Consumption

Opt a handler into batch mode to process a whole fetched topic-partition batch at once instead of one message at a time. @KafkaMessage() then resolves to the array of deserialized payloads, and @KafkaBatch() resolves to the raw KafkaConsumerBatch:

import {KafkaBatch, KafkaConsumer, KafkaConsumerBatch, KafkaHandler, KafkaMessage} from '@nest-native/kafka';

@KafkaConsumer('metrics', {groupId: 'aggregator', concurrency: 2})
export class MetricsConsumer {
@KafkaHandler(undefined, {batch: true}) // inherits the consumer's topic
aggregate(
@KafkaMessage() metrics: Metric[],
@KafkaBatch() batch: KafkaConsumerBatch,
) {
// runs once per fetched batch; batch.partition is the source partition
}
}

Per-message and batch handlers in the same group always run on separate Kafka consumers, because a consumer runs either eachMessage or eachBatch.

Per-Topic Concurrency (#12703)

The official transport processes a topic sequentially. Here, the concurrency option sets the consumer's partitionsConsumedConcurrently:

  • The default is 1 — strict per-partition ordering.
  • Raising it processes partitions concurrently while preserving order within each partition.
  • Resolution is handler → consumer → KafkaModule.forRoot({concurrency})1, so a single handler can opt in or out of the module-wide default.
KafkaModule.forRoot({
client: {brokers: ['localhost:9092']},
concurrency: 4, // module-wide default
});

Rebalance-Safe Offsets (#12355)

Batch consumers resolve each message's offset as the batch is processed — the client's all-or-nothing auto-resolve is disabled. If a partition is revoked mid-batch during a rebalance, the consumer keeps the progress already made instead of replaying the whole batch or hanging. Combined with the rule that offsets commit only after a successful handler return, in-flight messages either complete or are explicitly accounted for.

Backpressure

maxInFlight caps how many messages or batches a consumer processes at once, so a fast broker cannot overwhelm slow handlers:

  • The default is uncapped (0).
  • It resolves handler → consumer → module, the same way as concurrency.
@KafkaConsumer('metrics', {groupId: 'aggregator', maxInFlight: 100})
export class MetricsConsumer {}

Sample

Sample 04-batch-concurrency demonstrates batch consume, per-topic concurrency, and rebalance-safe offset resolution. See the Sample Catalog.