Quick Start
This walkthrough wires a producer and a consumer into a Nest application.
Install
npm i @nest-native/kafka @confluentinc/kafka-javascript
Install the required Nest peers if your project does not already have them:
npm i @nestjs/common @nestjs/core @nestjs/microservices reflect-metadata rxjs
The published package keeps "dependencies": {}. The Confluent client and the
NestJS packages are peers, so applications install only the ecosystems they use.
See the Support Policy for the supported version lines.
Register The Module
import {Module} from '@nestjs/common';
import {KafkaModule} from '@nest-native/kafka';
@Module({
imports: [
KafkaModule.forRoot({
clientId: 'orders-service',
client: {brokers: ['localhost:9092']},
}),
],
})
export class AppModule {}
forRoot returns a global DynamicModule. For configuration resolved from other
providers (a ConfigService, for example), use forRootAsync — see
Module.
Produce A Message
Inject KafkaProducerService. It connects when the module initializes and
disconnects on graceful shutdown:
import {Injectable} from '@nestjs/common';
import {KafkaProducerService} from '@nest-native/kafka';
@Injectable()
export class OrdersService {
constructor(private readonly producer: KafkaProducerService) {}
async placeOrder(id: string): Promise<void> {
await this.producer.send({
topic: 'orders.placed',
messages: [{key: id, value: JSON.stringify({id})}],
});
}
}
See Producer for sendBatch, transactional, and direct producer
access.
Consume A Message
Mark a class with @KafkaConsumer and a method with @KafkaHandler. The parsed
payload is the first argument and the raw KafkaContext is the second:
import {Injectable} from '@nestjs/common';
import {KafkaConsumer, KafkaContext, KafkaHandler} from '@nest-native/kafka';
@Injectable()
@KafkaConsumer('orders.placed', {groupId: 'orders-service'})
export class OrdersConsumer {
@KafkaHandler()
handle(order: {id: string}, context: KafkaContext): void {
console.log(`order on ${context.getTopic()}`, order);
}
}
Register the consumer as a provider, either in a module's providers or through
KafkaModule.forFeature([OrdersConsumer]):
import {Module} from '@nestjs/common';
import {KafkaModule} from '@nest-native/kafka';
import {OrdersConsumer} from './orders.consumer';
import {OrdersService} from './orders.service';
@Module({
imports: [KafkaModule.forFeature([OrdersConsumer])],
providers: [OrdersService],
})
export class OrdersModule {}
The payload is JSON-parsed by default, falling back to the decoded string for
non-JSON values. Header conventions stay neutral — the package does not
standardize traceId / correlationId / messageType keys.
Run It Without A Broker
In tests, swap KafkaModule for KafkaTestModule to run the whole transport
against an in-memory broker — no librdkafka, no KAFKA_BROKERS. See
Testing.
Next Steps
- Consumers: guards, interceptors, pipes, and filters on handlers.
- Error Mapping: what happens when a handler throws.
- Batch & Concurrency: process partitions concurrently.
- Migration Guide: port an app off
@nestjs/microservicesKafka.