Skip to main content

Transactions

KafkaProducerService.transactional(work) runs work inside one Kafka transaction. It commits when work resolves and aborts — delivering nothing — when it throws, re-raising the original error. The package wraps only what the Confluent client provides; it does not add exactly-once helpers beyond Confluent's transactions.

Enabling Transactions

Configure a transactionalId to make the shared producer transactional. Confluent's client then also enables idempotence:

KafkaModule.forRoot({
client: {brokers: ['localhost:9092']},
producer: {transactionalId: 'orders-producer'}, // unique per producer instance
});

Atomic Multi-Topic Write

Both writes land, or neither does:

await this.producer.transactional(async tx => {
await tx.send({topic: 'orders.placed', messages: [{value: id}]});
await tx.sendBatch({
topicMessages: [{topic: 'orders.audit', messages: [{value: `placed ${id}`}]}],
});
});

Read-Process-Write With sendOffsets

For the consume-process-produce ("read-process-write") pattern, commit the consumer's offset inside the same transaction with sendOffsets, so the produced message and the consumed offset commit atomically — exactly-once across the consume → produce step:

await this.producer.transactional(async tx => {
await tx.send({topic: 'receipts.issued', messages: [{value: receipt}]});
await tx.sendOffsets({
consumer, // the live consumer object (see the migration note below)
topics: [
{
topic: 'payments.captured',
// commit "next offset to read" = consumed offset + 1
partitions: [{partition, offset: String(Number(offset) + 1)}],
},
],
});
});

The KafkaTransactionOffsets / KafkaTopicOffsets / KafkaPartitionOffset types model Confluent's shape.

:::note Migration note (kafkajs → Confluent) sendOffsets takes the live consumer object in @confluentinc/kafka-javascript, not the consumerGroupId string kafkajs used. The KafkaTransactionOffsets type models the Confluent shape. See the Migration Guide. :::

Abort Error Semantics

transactional commits on success and aborts on throw. If the abort itself fails while unwinding a failed work, the original error still surfaces with the abort failure attached as its cause, so neither error is lost.

Sample

Sample 05-transactions isolates the helper — atomic multi-topic write, abort-on-throw, and sendOffsets. See the Sample Catalog.