Skip to main content

Parameter Decorators

Instead of the positional (payload, context) arguments, decorate individual parameters — mirroring @Payload() / @Ctx() from @nestjs/microservices. The decorators participate in the enhancer pipeline, so param-level pipes run just as they do on an HTTP controller argument.

import {ParseIntPipe} from '@nestjs/common';
import {
KafkaConsumer,
KafkaContext,
KafkaCtx,
KafkaHandler,
KafkaHeaders,
KafkaMessage,
KafkaMessageHeaders,
} from '@nest-native/kafka';

@KafkaConsumer('orders.placed')
export class OrdersConsumer {
@KafkaHandler()
handle(
@KafkaMessage() order: OrderPlaced, // whole parsed payload
@KafkaMessage('id') id: string, // one payload property
@KafkaHeaders() headers: KafkaMessageHeaders, // all headers (empty if none)
@KafkaHeaders('trace-id') traceId: string | Buffer, // one header by key
@KafkaCtx() context: KafkaContext, // topic, partition, raw message, headers
): void {}
}

The Decorators

DecoratorResolves to
@KafkaMessage()The whole deserialized payload.
@KafkaMessage('prop')One property of the payload.
@KafkaHeaders()All headers as a KafkaMessageHeaders map (empty if none).
@KafkaHeaders('key')One header value (string | Buffer).
@KafkaCtx()The raw KafkaContext.
@KafkaBatch()The raw KafkaConsumerBatch (batch handlers only).

@KafkaCtx() is the parameter form; KafkaContext is also the type of the second positional argument and is returned by ExecutionContext.switchToRpc().getContext().

The Context Object

KafkaContext exposes the transport details without leaking Confluent client internals:

  • getTopic() — the source topic.
  • getPartition() — the source partition.
  • getMessage() — the raw incoming message (key, value, headers, offset).
  • getHeaders() — the message headers.

Header Conventions Stay Neutral

The package does not standardize traceId / correlationId / messageType keys. Read whatever keys your producers set with @KafkaHeaders('your-key'). Treat header values as untrusted input and validate them like any payload — secrets must never travel in headers shown in samples, logs, or docs.

Batch Handlers

In batch mode, @KafkaMessage() resolves to the array of deserialized payloads and @KafkaBatch() resolves to the raw KafkaConsumerBatch. See Batch & Concurrency.