Migration Guide
This guide ports a Kafka application off @nestjs/microservices's official Kafka
transport (built on kafkajs) onto @nest-native/kafka (built on Confluent's
officially supported @confluentinc/kafka-javascript). The migration is mostly a
mechanical rename: handler bodies, DI, and the Nest enhancer pipeline carry over
unchanged. The behavioral deltas — the parts that are not a rename — are called
out explicitly.
A runnable end-to-end version lives in sample/06-microservice-migration, whose
smoke test proves the ported consumer works with KafkaTestModule. The canonical,
field-by-field guide is kept in the repository at
docs/migration-from-nestjs-microservices.md.
At A Glance
@nestjs/microservices Kafka | @nest-native/kafka |
|---|---|
ClientsModule.register([{ transport: Transport.KAFKA, ... }]) | KafkaModule.forRoot({ ... }) / forRootAsync |
@Controller() on the consumer class | @KafkaConsumer('topic'?, options?) |
@MessagePattern('topic') / @EventPattern('topic') | @KafkaHandler('topic'?, options?) |
@Payload() | @KafkaMessage() |
@Ctx() ctx: KafkaContext | @KafkaCtx() ctx: KafkaContext |
| (read headers off the raw message) | @KafkaHeaders() |
ClientKafka + client.emit() / client.send() | KafkaProducerService or @InjectKafkaProducer() |
app.connectMicroservice(...) + app.startAllMicroservices() | nothing — consumers start on application bootstrap |
| Custom test harness / real broker | KafkaTestModule + createMockKafkaProducer() |
@UseGuards, @UseInterceptors, @UsePipes, and @UseFilters work exactly as
before, and handlers still run under the 'rpc' execution-context type.
The Consumer Class
Before:
import {Controller} from '@nestjs/common';
import {Ctx, EventPattern, KafkaContext, Payload} from '@nestjs/microservices';
@Controller()
export class OrdersController {
@EventPattern('orders.placed')
handleOrderPlaced(@Payload() order: OrderPlaced, @Ctx() context: KafkaContext) {
// ...
}
}
After:
import {Injectable} from '@nestjs/common';
import {KafkaConsumer, KafkaContext, KafkaCtx, KafkaHandler, KafkaMessage} from '@nest-native/kafka';
@Injectable()
@KafkaConsumer('orders.placed', {groupId: 'orders-consumer'})
export class OrdersConsumer {
@KafkaHandler()
handleOrderPlaced(@KafkaMessage() order: OrderPlaced, @KafkaCtx() context: KafkaContext) {
// identical body
}
}
The consumer group moves from the transport options onto @KafkaConsumer (or
@KafkaHandler). There is no separate "start the microservice" step — the
consumer explorer subscribes during onApplicationBootstrap. See
Module and Consumers.
@MessagePattern vs @EventPattern
@nest-native/kafka models Kafka as the event log it is: @KafkaHandler is
fire-and-forget, like @EventPattern. If you relied on the transport's built-in
request/reply correlation, implement it explicitly by producing to a reply topic
with KafkaProducerService and correlating with a header you own — the package
stays neutral on header keys.
Producing Messages
Before, you injected a ClientKafka and called client.emit(topic, object).
After, inject KafkaProducerService:
constructor(private readonly producer: KafkaProducerService) {}
await this.producer.send({
topic: 'orders.placed',
messages: [{key: order.id, value: JSON.stringify(order)}],
});
Key delta: serialization is explicit. The official transport serialized
objects for you; here the message value is a string / Buffer / null. On the
way in, the consumer JSON-parses by default and falls back to the decoded string.
See Producer.
Behavioral Deltas
These are the parts that are not a rename.
- Exception handling (
#9679). Unhandled handler errors map to commit/retry instead of being swallowed. See Error Mapping. - Sequential per-topic processing (
#12703).concurrencysetspartitionsConsumedConcurrently; default1. See Batch & Concurrency. - Rebalance safety (
#12355). Batch consumers resolve each offset as it is processed. See Batch & Concurrency. sendOffsetsshape. Takes the live consumer object, not aconsumerGroupIdstring. See Transactions.- Backpressure.
maxInFlightcaps in-flight work; default uncapped.
Testing
Replace any real-broker or hand-rolled test harness with KafkaTestModule, and
unit-test producer-injecting services with createMockKafkaProducer(). See
Testing.