Giter VIP home page Giter VIP logo

confluentinc / confluent-kafka-javascript Goto Github PK

View Code? Open in Web Editor NEW
32.0 83.0 5.0 2.1 MB

Confluent's Apache Kafka JavaScript client

Home Page: https://www.npmjs.com/package/@confluentinc/kafka-javascript

License: MIT License

Makefile 0.21% JavaScript 57.59% Shell 0.31% Python 0.45% C++ 23.45% PowerShell 0.01% TypeScript 17.98%
confluent consumer javascript kafka kafka-client librdkafka producer

confluent-kafka-javascript's Introduction

Confluent's Javascript Client for Apache KafkaTM

confluent-kafka-javascript is Confluent's JavaScript client for Apache Kafka and the Confluent Platform. This is an early access library. The goal is to provide an highly performant, reliable and easy to use JavaScript client that is based on node-rdkafka yet also API compatible with KafkaJS to provide flexibility to users and streamline migrations from other clients.

This library leverages the work and concepts from two popular Apache Kafka JavaScript clients: node-rdkafka and KafkaJS. The core is heavily based on the node-rdkafka library, which uses our own librdkafka library for core client functionality. However, we leverage a promisified API and a more idiomatic interface, similar to the one in KafkaJS, making it easy for developers to migrate and adopt this client depending on the patterns and interface they prefer. This library currently uses librdkafka based off of the master branch.

This library is currently in early access and not meant for production use

This library is in active development, pre-1.0.0, and it is likely to have many breaking changes.

For this early-access release, we aim to get feedback from JavaScript developers within the Apache Kafka community to help meet your needs. Some areas of feedback we are looking for include:

  • Usability of the API compared to other clients
  • Migration experience from the node-rdkafka and KafkaJs
  • Overall quality and reliability

We invite you to raise issues to highlight any feedback you may have.

Within the early-access, only basic produce and consume functionality as well as the ability to create and delete topics are supported. All other admin client functionality is coming in future releases. See INTRODUCTION.md for more details on what is supported.

To use Schema Registry, use the existing kafkajs/confluent-schema-registry library that is compatible with this library. For a simple schema registry example, see sr.js. DISCLAIMER: Although it is compatible with confluent-kafka-javascript, Confluent does not own or maintain kafkajs/confluent-schema-registry, and the use and functionality of the library should be considered "as is".

Requirements

The following configurations are supported for this early access preview:

  • Any supported version of Node.js (The two LTS versions, 18 and 20, and the latest versions, 21 and 22).
  • Linux (x64 and arm64) - both glibc and musl/alpine.
  • macOS - arm64/m1.
  • Windows - x64 (experimentally available in EA).

Installation on any of these platforms is meant to be seamless, without any C/C++ compilation required.

In case your system configuration is not within the supported ones, a supported version of Python must be available on the system for the installation process. This is required for the node-gyp build tool..

$ npm install @confluentinc/kafka-javascript

Yarn and pnpm support is experimental.

Getting Started

Below is a simple produce example for users migrating from KafkaJS.

// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
const { Kafka } = require("@confluentinc/kafka-javascript").KafkaJS;

async function producerStart() {
    const kafka = new Kafka({
        kafkaJS: {
            brokers: ['<fill>'],
            ssl: true,
            sasl: {
                mechanism: 'plain',
                username: '<fill>',
                password: '<fill>',
            },
        }
    });

    const producer = kafka.producer();

    await producer.connect();

    console.log("Connected successfully");

    const res = []
    for (let i = 0; i < 50; i++) {
        res.push(producer.send({
            topic: 'test-topic',
            messages: [
                { value: 'v222', partition: 0 },
                { value: 'v11', partition: 0, key: 'x' },
            ]
        }));
    }
    await Promise.all(res);

    await producer.disconnect();

    console.log("Disconnected successfully");
}

producerStart();
  1. If you're migrating from kafkajs, you can use the migration guide.
  2. If you're migrating from node-rdkafka, you can use the migration guide.
  3. If you're starting afresh, you can use the quickstart guide.

An in-depth reference may be found at INTRODUCTION.md.

Contributing

Bug reports and early-access feedback is appreciated in the form of Github Issues. For guidelines on contributing please see CONTRIBUTING.md

confluent-kafka-javascript's People

Contributors

alexander-alvarez avatar ankon avatar battlecow avatar cjlarose avatar claimundefine avatar codeburke avatar confluentjenkins avatar dchesterton avatar edoardocomar avatar emasab avatar fabianschmitthenner avatar garywilber avatar geoffreyhervet avatar iradul avatar jaaprood avatar jpdstan avatar macabu avatar mccaig avatar milindl avatar mimaison avatar nhaq-confluent avatar pchelolo avatar pthm avatar rayokota avatar rusty0412 avatar sam-github avatar sgenoud avatar tvainika avatar webmakersteve avatar yunnysunny avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

confluent-kafka-javascript's Issues

Help with SSL mapping

Since SSL mapping is not provided as part of the KafkaJS config migration, I have a question on how to migrate a KafkaJS SSL configuration.

We have 3rd party SSL-based connections that are configured today in KafkaJS as such:

brokers:
  - kafka-dev1.some-domain.local:9096
ssl:
  ca: ${secret.kafka.ca}
  key: ${secret.kafka.key}
  cert: ${secret.kafka.cert}
  passphrase: ${secret.kafka.passphrase}
  checkServerIdentity: false  # disables hostname verification

...where ${secret.kafka.ca}, ${secret.kafka.key}, and ${secret.kafka.cert} are the string contents of .pem files (and the .pem files are not accessible at runtime).

I see a very wide range of ssl options in GlobalConfig, some prefixed with ssl_ and others with ssl.

I'm not sure how I should be mapping these to incorporate the passphrase and disable hostname verification.

ssl_ca: ${secret.kafka.ca}
ssl_key: ${secret.kafka.key}
ssl_certificate: ${secret.kafka.cert}
???

-or-

ssl.ca.pem: ${secret.kafka.ca}
ssl.key.pem: ${secret.kafka.key}
ssl.key.password: ${secret.kafka.passphrase}
ssl.certificate.pem: ${secret.kafka.cert}
???

-or-
other?

Thanks!

Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE

Hello, I noticed that COOPERATIVE rebalance (incremental assign/unassign bindings) was implemented in confluent-kafka-javascript library, which is currently not supported by node-rdkafka. I made a small POC to see how it works in Node.js, but encountered a problem where the following error occurs during rebalance:

Consumer [1|poc_test]: rebalance.error Error: Local: Erroneous state
        at KafkaConsumer.assign (/Users/s.franchuk/github/confluent-kafka-javascript/lib/kafka-consumer.js:266:16)
        at KafkaConsumer.conf.rebalance_cb (/Users/s.franchuk/github/confluent-kafka-javascript/lib/kafka-consumer.js:65:16)
[2024-04-01T13:05:27.192Z]  WARN: poc-confluent-kafka/36080 on s-franchuk:
    Consumer [1|poc_test]: event.log {
      severity: 4,
      fac: 'ASSIGN',
      message: '[thrd:main]: Group "poc_test": application *assign() call failed: Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE'
    }

Debugging showed that the KafkaConsumer::IncrementalAssign method in kafka-consumer.cc is indeed called, and the consumer method consumer->incremental_assign(partitions) is also called with the correct arguments. What happened next in C++ code of librdkafka is hard for me to say, but rko->rko_u.assign.method does not return RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN or RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN here:
https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_cgrp.c#L4842

Kafka consumer:

const { CODES, KafkaConsumer } = require('@confluentinc/kafka-javascript');

this._consumer = new KafkaConsumer({
      'bootstrap.servers': KAFKA_BROKERS,
      'client.id': clientId,
      'group.id': groupId,
      'auto.offset.reset': 'latest',
      'session.timeout.ms': 10000,
      'heartbeat.interval.ms': 100,
      'enable.auto.commit': false,
      'partition.assignment.strategy': 'cooperative-sticky',
      'fetch.wait.max.ms': 100,
      rebalance_cb: true,
      // debug: 'consumer,topic',
    });

Rebalance callback:

this._consumer.on('rebalance', (err, assignments) => {
      const partitions = assignments.map((assignment) => assignment.partition);
      const type = RebalanceEventType[err.code] || 'ERROR';

      logger.info(`${this._who()}: rebalance happened - ${type} | [${partitions.join(',')}]`);

      switch (err.code) {
        case CODES.ERRORS.ERR__ASSIGN_PARTITIONS:
          this._consumer.incrementalAssign(assignments);
          break;
        case CODES.ERRORS.ERR__REVOKE_PARTITIONS:
          this._consumer.incrementalUnassign(assignments);
          break;
        default:
          logger.error(`${this._who()}: rebalance error`, err);
      }
});

this._consumer.on('rebalance.error', (err) => {
      logger.error(`${this._who()}: rebalance.error`, err);
});

For testing, used the library version 0.1.11-devel from npm and a manual build of the project from the dev_early_access_development_branch branch.

What could be the reason for this problem?

Environment Information

  • OS: Mac
  • Node Version: 18.17.0
  • NPM Version: 9.6.7
  • confluent-kafka-javascript version: 0.1.11-devel / dev_early_access_development_branch
  • Docker Image: confluentinc/cp-server:7.2.1

KafkaJS.Consumer: Pausing topics during rebalance_cb not supported

Environment Information

  • OS [e.g. Mac, Arch, Windows 10]: macOS 14.5 (23F79)
  • Node Version [e.g. 8.2.1]: v18.17.0
  • NPM Version [e.g. 5.4.2]: 9.6.7
  • C++ Toolchain [e.g. Visual Studio, llvm, g++]: Xcode 14
  • confluent-kafka-javascript version [e.g. 2.3.3]: "@confluentinc/kafka-javascript": "0.1.16-devel"

Steps to Reproduce

const LibrdKafka = require("@confluentinc/kafka-javascript")

async function main() {
	/** @type {import("@confluentinc/kafka-javascript").KafkaJS.Consumer} */
	let consumer

	const config = {
		kafkaJS: {
			clientId: `Repro-Client-${Date.now()}`,
			brokers: ["127.0.0.1:9092"],
			connectionTimeout: 7000,
			requestTimeout: 30000,
			retry: { initialRetryTime: 300, retries: 50, maxRetryTime: 600 },
			logLevel: LibrdKafka.KafkaJS.logLevel.ERROR,
			groupId: `Repro-Client-${Date.now()}-CG`,
			rebalanceTimeout: 60000,
			sessionTimeout: 45000,
			allowAutoTopicCreation: true,
			autoCommit: false,
			partitionAssigners: ["cooperative-sticky"],
		},
		"group.instance.id":
			`Repro-Client-${Date.now()}-Instance`,
		rebalance_cb: (err, assignments) => {
			const unflattened = []
			for (const { topic, partition } of assignments) {
				unflattened.push({ topic, partitions: [partition] })
			}
			consumer.pause(unflattened)
			console.log("rebalance_cb: paused during callback:", consumer.paused())
		},
	}

	const kafka = new LibrdKafka.KafkaJS.Kafka(config)
	consumer = kafka.consumer(config)

	await consumer.connect()
	// Fill in some topics you have on hand
	await consumer.subscribe({
		topics: [/^.*something.*/],
	})

	await consumer.run({
		eachMessage: async message => {
			console.log("Expected to never be called:", message)
		},
	})

	await new Promise(resolve => setTimeout(resolve, 50_000))
}

main()

Here's the console output I get:

rebalance_cb: paused during callback: [
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0001',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0002',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0003',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0004',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0005',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0006',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0007',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0008',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0009',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0010',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0011',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0012',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0013',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0014',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0015',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0016',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0017',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0018',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0019',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0020',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0021',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0022',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0023',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0024',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0025',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0026',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0027',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0028',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0029',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0030',
    partitions: [ 0 ]
  }
]


INTERNALS: incrementalAssign [
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0001',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0002',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0003',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0004',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0005',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0006',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0007',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0008',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0009',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0010',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0011',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0012',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0013',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0014',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0015',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0016',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0017',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0018',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0019',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0020',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0021',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0022',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0023',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0024',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0025',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0026',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0027',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0028',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0029',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0030',
    partition: 0
  }
]
Expected to never be called: {
  topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0018',
  partition: 0,
  message: {
    key: <Buffer 7b 22 69 64 22 3a 22 32 32 36 37 31 3a 31 35 34 36 37 30 34 38 38 22 2c 22 5f 5f 64 62 7a 5f 5f 70 68 79 73 69 63 61 6c 54 61 62 6c 65 49 64 65 6e 74 ... 96 more bytes>,
    value: <Buffer 7b 22 73 74 61 74 75 73 22 3a 22 42 45 47 49 4e 22 2c 22 69 64 22 3a 22 32 32 36 37 31 3a 31 35 34 36 37 30 34 38 38 22 2c 22 65 76 65 6e 74 5f 63 6f ... 56 more bytes>,
    timestamp: '1723065962194',
    attributes: 0,
    offset: '1543',
    size: 106,
    leaderEpoch: 21,
    headers: undefined
  },
  heartbeat: [AsyncFunction: heartbeat],
  pause: [Function: bound pause]
}

Headers (outbound and inbound) do not comply with KafkaJS type definitions

The header implementation for both producers and consumers does not comply with the type definitions offered up in kafkajs.d.ts (which are unmodified from the KafkaJS originals).

Below is a comparison between KafkaJS and Confluent.

KafkaJS

import {Kafka} from "kafkajs";

const topic = "test-kafkajs-topic";
let receivedCount = 0;

const kafka = new Kafka({brokers: ["localhost:9092"]});

const consumer = kafka.consumer({groupId:`${topic}-group`});
await consumer.connect();
await consumer.subscribe({topic: TOPIC});
await consumer.run({
  eachMessage: async ({message}) => {
    log.info(JSON.stringify(message.headers, null, 2));
    receivedCount++;
  }
});

const producer = kafka.producer();
await producer.connect();
await producer.send({
  topic: TOPIC,
  messages: [{value: "one", headers: {header1: "alpha", header2: "beta"}}]
});

await until(async () => receivedCount == 1);

await producer.disconnect();
await consumer.disconnect();
{
  "header1": {
    "type": "Buffer",
    "data": [
      97,
      108,
      112,
      104,
      97
    ]
  },
  "header2": {
    "type": "Buffer",
    "data": [
      98,
      101,
      116,
      97
    ]
  }
}

Confluent

import {KafkaJS as Confluent} from "@confluentinc/kafka-javascript";

const topic = "test-confluent-topic";
let receivedCount = 0;

const kafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});

const consumer = kafka.consumer({kafkaJS: {groupId: `${topic}-group`}});
await consumer.connect();
await consumer.subscribe({topic});
await consumer.run({
  eachMessage: async ({message}) => {
    log.info(JSON.stringify(message.headers, null, 2));
    receivedCount++;
  }
});

await until(async () => consumer.assignment().length > 0);

const producer = kafka.producer({"linger.ms": 0});
await producer.connect();
await producer.send({
  topic,
  messages: [{value: "one", headers: {header1: "alpha", header2: "beta"}}]
});

await until(async () => receivedCount == 1);

await producer.disconnect();
await consumer.disconnect();
{
  "0": {
    "key": {
      "type": "Buffer",
      "data": [
        104,
        101,
        97,
        100,
        101,
        114,
        49
      ]
    }
  },
  "1": {
    "key": {
      "type": "Buffer",
      "data": [
        104,
        101,
        97,
        100,
        101,
        114,
        50
      ]
    }
  }
}

Two (maybe three) notable issues:

  1. The headers header1=alpha and header2=beta were sent to Kafka as key=header1 and key=header2
  2. When that message was received, the headers object does not match the IHeaders type definition:
export interface IHeaders {
  [key: string]: Buffer | string | (Buffer | string)[] | undefined
}
  1. if I had actually sent key=header1 and key=header2, KafkaJS compatibility would dictate a string key of "key" and a string[] value of ["header1","header2"]

Why is binding better than native implementation?

In our project, we plan to use Kafka for cross-service data exchange, but we are put off by the available choice of clients. Many clients look abandoned, including previously popular ones (for example, kafkajs or node-kafka). The only solutions are node-rdkafka or confluent-kafka-javascript based on librdkafka. There is currently no popular client for Kafka in the NodeJS community with active support without third-party dependencies, unlike RabbitMQ or NATS. Using bindings creates additional complexity when using applications on different platforms, and also requires studying the librdkafka documentation, which can be difficult for NodeJS developers without knowledge of other programming languages.

Is there a chance that an official client based on NodeJS will be introduced in the future?

Request: TypeScript example of KafkaJS

I'm not understanding how to construct an instance of KafkaJS.Kafka since KafkaJS is exported as a type and not a namespace. Are there any examples of this?

Unable to construct viable Docker image using `node:20-alpine`

Our build uses an ubuntu-latest Github runner to build a Docker image.
image

Our Dockerfile follows the example provided in this repo.

FROM node:20-alpine
COPY ./dist /app/
WORKDIR /app
RUN apk --no-cache add \
  bash \
  g++ \
  ca-certificates \
  lz4-dev \
  musl-dev \
  cyrus-sasl-dev \
  openssl-dev \
  make \
  python3 \
  gcompat # added to provide missing ld-linux-x86-64.so.2
RUN apk add --no-cache --virtual .build-deps gcc zlib-dev libc-dev bsd-compat-headers py-setuptools bash
RUN npm install --omit=dev

EXPOSE 4000
CMD [ "node", "app.js" ]

The deployed pod is hosted in AKS, and both the runners and host nodes are amd64 arch.

Without the @confluentinc/kafka-javascript dependency in the package.json, the application will start without issue on the container.

With the @confluentinc/kafka-javascript dependency in the package.json (and no reference from the application), the application will immediately fail with:

Segmentation fault (core dumped)

While troubleshooting, we discovered that if we reinstalled the package on the running container, the application would then startup normally.

Initial thought was that the wrong flavor of librdkafka was being download.

By adding the following to the Dockerfile, I was able to capture the node-pre-gyp output:

WORKDIR /app/node_modules/@confluentinc/kafka-javascript
RUN npx node-pre-gyp install --update-binary
WORKDIR /app
#12 0.816 node-pre-gyp info using [email protected]
#12 0.816 node-pre-gyp info using [email protected] | linux | x64
#12 0.906 node-pre-gyp http GET https://github.com/confluentinc/confluent-kafka-javascript/releases/download/v0.1.15-devel/confluent-kafka-javascript-v0.1.15-devel-node-v115-linux-musl-x64.tar.gz

Again, launching this container results in the segmentation fault on startup.

Starting the container, and running the following:

cd node_modules/\@confluentinc/kafka-javascript/
npx node-pre-gyp install --update-binary
cd /app

...seemingly performs the same operation we saw during the Docker image construction:

node-pre-gyp info using [email protected]
node-pre-gyp info using [email protected] | linux | x64
http GET https://github.com/confluentinc/confluent-kafka-javascript/releases/download/v0.1.15-devel/confluent-kafka-javascript-v0.1.15-devel-node-v115-linux-musl-x64.tar.gz

...yet after this operation is performed, the application starts without issue.

Please help us to understand what is going on here, and how we can solve this problem.

Confluent vs KafkaJS performance feedback

I ran a test sending and receiving 1000 messages individually (no batching) using the KafkaJS library, and then ran the same test using the Confluent library (following the migration instructions).

KafkaJS: 455ms
Confluent: 501951ms

That's not a typo. In this case, the Confluent test took 1000x time to complete.

I'm presuming there is some tuning that can be done via configuration; but this was an "out of the box" conversion, and my attempts at "tuning" the configuration did not yield any noticeable differences.

Notes

  • Topic already exists and is empty at start of test.
  • Run on an M3 Mac, with local docker-hosted Kafka (confluentinc/cp-kafka:7.6.0)

KafkaJS

import {Kafka} from "kafkajs";

const topic = "test-kafkajs-topic";
const total = 1000;
const start = Date.now();
let sentCount = 0;
let receivedCount = 0;

const kafka = new Kafka({brokers: ["localhost:9092"]});

const consumer = kafka.consumer({groupId: `${topic}-group`});
await consumer.connect();
await consumer.subscribe({topic});
await consumer.run({
  eachMessage: async ({message}) => {
    receivedCount++;
    if (receivedCount % 100 === 0) {
      log.info(`Rec'd ${String(receivedCount).padStart(4, " ")} : ${Date.now() - start}ms`);
    }
  }
});

const producer = kafka.producer();
await producer.connect();
for (let i = 0; i < total; i++) {
  await producer.send({
    topic,
    messages: [{value: "one"}]
  });
  if (++sentCount % 100 === 0) {
    log.info(`Sent  ${String(sentCount).padStart(4, " ")} : ${Date.now() - start}ms`);
  }
}

await until(async () => receivedCount == total, {timeout: 5000});

await producer.disconnect();
await consumer.disconnect();
Sent   100 : 133ms
Rec'd  100 : 133ms
Sent   200 : 163ms
Rec'd  200 : 163ms
Sent   300 : 193ms
Rec'd  300 : 193ms
Sent   400 : 229ms
Rec'd  400 : 229ms
Sent   500 : 271ms
Rec'd  500 : 271ms
Sent   600 : 331ms
Rec'd  600 : 332ms
Sent   700 : 371ms
Rec'd  700 : 371ms
Sent   800 : 398ms
Rec'd  800 : 399ms
Sent   900 : 427ms
Rec'd  900 : 428ms
Sent  1000 : 454ms
Rec'd 1000 : 455ms

Confluent

import {KafkaJS as Confluent} from "@confluentinc/kafka-javascript";

const topic = "test-confluent-topic";
const total = 1000;
const start = Date.now();
let sentCount = 0;
let receivedCount = 0;

const kafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});

const consumer = kafka.consumer({kafkaJS: {groupId: `${topic}-group`}});
await consumer.connect();
await consumer.subscribe({topic});
await consumer.run({
  eachMessage: async ({message}) => {
    receivedCount++;
    if (receivedCount % 100 === 0) {
      log.info(`Rec'd ${String(receivedCount).padStart(4, " ")} : ${Date.now() - start}ms`);
    }
  }
});

const producer = kafka.producer();
await producer.connect();
for (let i = 0; i < total; i++) {
  await producer.send({
    topic,
    messages: [{value: "one"}]
  });
  if (++sentCount % 100 === 0) {
    log.info(`Sent  ${String(sentCount).padStart(4, " ")} : ${Date.now() - start}ms`);
  }
}

await until(async () => receivedCount == total, {timeout: 5000});

await producer.disconnect();
await consumer.disconnect();
Sent   100 : 50630ms
Rec'd  100 : 63159ms
Sent   200 : 100720ms
Rec'd  200 : 127273ms
Sent   300 : 150805ms
Rec'd  300 : 191382ms
Sent   400 : 200890ms
Sent   500 : 250985ms
Rec'd  400 : 255503ms
Rec'd  500 : 255504ms
Sent   600 : 301079ms
Sent   700 : 351164ms
Rec'd  600 : 383739ms
Rec'd  700 : 383740ms
Sent   800 : 401253ms
Sent   900 : 451346ms
Sent  1000 : 501434ms
Rec'd  800 : 501949ms
Rec'd  900 : 501950ms
Rec'd 1000 : 501951ms

Understanding the roadmap

I wanted to get a better understanding of what are the roadmap items that are left before we enter Beta or GA.
It would be good to have a bit of visibility into the project, beyond the issues that are filed .

Thanks

Feature request: Support for DescribeClientQuotas/AlterClientQuotas

Currently, we are evaluating Kafka client libraries for JavaScript and were delighted to learn that with confluent-kafka-javascript an offering supported by Confluent is in the works.

For our use case, support for the Admin API functions

  • DescribeClientQuotas (API Key 48)
  • AlterClientQuotas (API Key 49)

would be required.

We realise that confluent-kafka-javascript depends on librdkafka to provide the missing functions. Can you share any insights into plans/timelines if and when librdkafka might be enhanced to support these API keys?

Cheers, Achim

How to use logging callback?

With KafkaJS, we used the logCreator config field on when creating the Kafka instance. That has been removed in this library.

I see the underlying rdkafka supports a log_cb logging callback, but I'm unable to determine how to make use of it and was unable to find any examples on the internet.

The type definition for the callback is any (not terribly helpful). I have tried various incarnations, but all of them have the same result.

const kafka = new Confluent.Kafka({
  kafkaJS: {
    brokers: ["localhost:9092"]
  },
  log_cb: () => console.log("log_cb")
});

Results in:

Error: Invalid callback type
    at Client.connect (/Users/peloquina/src/agilysys-inc/stay/backplane-base/node_modules/@confluentinc/kafka-javascript/lib/client.js:253:16)
    at /Users/peloquina/src/agilysys-inc/stay/backplane-base/node_modules/@confluentinc/kafka-javascript/lib/kafkajs/_consumer.js:802:28
    at new Promise (<anonymous>)
    at Consumer.connect (/Users/peloquina/src/agilysys-inc/stay/backplane-base/node_modules/@confluentinc/kafka-javascript/lib/kafkajs/_consumer.js:800:12)
    at Context.<anonymous> (itest/confluentKafkaAccessor.test.ts:149:20)

Please help.

question: Correct way to check consumer/producer health using KafkaJS API

First of all, I just want to say how happy I am to see confluent release an official JS client. Love the idea to make it compatible with KafkaJS/node-rdkafka. ๐Ÿ‘

Now for the issue

I don't see a way to get the current state / health of a consumer or producer using the KafkaJS api.

When using the node-rdkafka api, it looks like I can hook into lifecycle events using consumer.on('ready',...) etc.

But when using the KafkaJS api, the .on implementation is not implemented, and the state is marked private. With KafkaJS we were previously hooking into these lifecycle events to keep track of the consumer state, which we used for our Kubernetes readiness probes.

Is there any plan to implement events on the KafkaJS api?

[Enhancement] Roadmap or more info on path to production ready

Hi there!

Looking into this library for potential use at my company - it would be great if there was some kind of roadmap, rough timeline, or even MVP scope for this project, so we can evaluate if we'll be able to use it in the near term, or use another library initially and migrate here longer term.

I understand giving estimates on projects like this is a bit of a nightmare, but even something like "maybe this year, next year H1, next year H2" would be a massive help!

Thanks ๐Ÿš€

Segfault while closing the consumer while consume loop is running

Reproduction code:


function runConsumer() {
    const consumer = new RdKafka.KafkaConsumer({
        'group.id': 'test-group' + Math.random(),
        'bootstrap.servers': 'localhost:9092',
    }, {
        'auto.offset.reset': 'earliest',
    });

    consumer.connect();

    consumer.on('ready', () => {
        console.log("Consumer is ready");
        consumer.subscribe(['test-topic']);
        consumer.consume(); // consume loop
    });

    consumer.on('data', (data) => {
        console.log("Received data");
        console.log(data);
        consumer.disconnect();
    });

    consumer.on('event.error', (err) => {
        console.error(err);
    });
}

Cause: NodeKafka::Workers::KafkaConsumerConsumeLoop::HandleMessageCallback is called after KafkaConsumerConsumerLoop:Close and the callback has been cleared by that time so callback->Call causes a segfault.

Confluent client unexpectedly mutates function arguments

The KafkaJS consumer unexpected mutates the inputs to some of its methods. For example, kafkaJSClient.pause(topics) will mutate the input topics by adding the .partitions key if it's undefined. I generally expect that Javascript functions treat their inputs as immutable, so I don't need to defensively clone function arguments before passing them to a library. Mutation can lead to unexpected results.

In this case, I want to call .pause(assignments) during the rebalance callback for some assignments. This is mutating the assignment objects, leading to unexpected side effects in the assignment logic.

Producer property warnings on consumer creation

const kafka = new Kafka({
  kafkaJS: {
    brokers: ["localhost:9092"]
  }
});
const consumer = kafka.consumer({
  kafkaJS: {groupId: GROUP_ID}
});
await consumer.connect();

Results in the following log entries, even though I have clearly not specified retry.backoff.ms or retry.backoff.max.ms.

{
  message: '[thrd:app]: Configuration property retry.backoff.ms is a producer property and will be ignored by this consumer instance',
  fac: 'CONFWARN',
  timestamp: 1715289786815
}
{
  message: '[thrd:app]: Configuration property retry.backoff.max.ms is a producer property and will be ignored by this consumer instance',
  fac: 'CONFWARN',
  timestamp: 1715289786816
}

Confluent connections interfere with KafkaJS connections when SSL is involved

This is an odd one.

We're attempting to phase-in use of the Confluent library to our app for new functionality. This app has existing functionality using KafkaJS, and the existing functionality is in production. Given the "early access" nature of the Confluent library, it makes sense to adopt it for the new functionality, but retain KafkaJS for the existing functionality for now.

The new functionality was added, integration tests showed everything working, but as soon as we deployed to an environment using external Kafka cluster(s) the KafkaJS connections immediately started failing with:

Connection error: Client network socket disconnected before secure TLS connection was established {"broker":"XXXX.westus2.azure.confluent.cloud:9092","clientId":"kafkajs"}: Error: Client network socket disconnected before secure TLS connection was established

I narrowed it down to starting multiple consumers for both KafkaJS and Confluent concurrently... with SSL connections.

I created a test that replicates the scenario by creating consumers for 5 topics each with KafkaJS and Confluent concurrently on a Confluent Cloud cluster. There are also tests that run just 5 KafkaJS consumers or just 5 Confluent consumers to prove there is no issue when run independently.

Notes:

  • Topics must already exist.
  • This connects both Confluent and KafkaJS to the same cluster; but I see the same behavior when Confluent and KafkaJS are used to connect to different clusters.
  • 5 seems like a magic number, in that for me I can repro consistently with 5. If I run the test with less than 5, results are inconsistent (sometimes there is no issue). Our app has 10's of consumers on both sides.
  • Used XXXX in place of identifying details/credentials.
  • Tests should be run with an extended timeout (I was using 60s).
import {KafkaJS as Confluent} from "@confluentinc/kafka-javascript";
import {Logger} from "@confluentinc/kafka-javascript/types/kafkajs.js";
import {fail} from "assert";
import {Consumer, ConsumerGroupJoinEvent, Kafka, logLevel} from "kafkajs";

const KAFKA_JS_TOPICS: string[] = [
  "test-kafkajs-topic",
  "test-kafkajs-topic-2",
  "test-kafkajs-topic-3",
  "test-kafkajs-topic-4",
  "test-kafkajs-topic-5"
];

const CONFLUENT_TOPICS: string[] = [
  "test-confluent-topic",
  "test-confluent-topic-2",
  "test-confluent-topic-3",
  "test-confluent-topic-4",
  "test-confluent-topic-5"
];

describe("Supports KafkaJS and Confluent consumers", async () => {
  let confluentConsumers: Confluent.Consumer[] = [];
  let kafkaJSConsumers: Consumer[] = [];

  afterEach(async () => {
    const promises: Promise<void>[] = [];
    for (const consumer of kafkaJSConsumers) {
      promises.push(consumer.disconnect());
    }
    for (const consumer of confluentConsumers) {
      promises.push(consumer.disconnect());
    }
    await Promise.all(promises);
    confluentConsumers = [];
    kafkaJSConsumers = [];
  });

  it("Handles concurrent startup of multiple KafkaJS consumers", async () => {
    await doTest(KAFKA_JS_TOPICS, []);
  });

  it("Handles concurrent startup of multiple Confluent consumers", async () => {
    await doTest([], CONFLUENT_TOPICS);
  });

  it("Handles concurrent startup of multiple KafkaJS and Confluent consumers", async () => {
    await doTest(KAFKA_JS_TOPICS, CONFLUENT_TOPICS);
  });

  async function doTest(kafkaJSTopics: string[], confluentTopics: string[]) {
    const kafkaJSKafka = new Kafka({
      brokers: ["XXXX.westus2.azure.confluent.cloud:9092"],
      ssl: true,
      sasl: {
        mechanism: "plain",
        username: "XXXX",
        password: "XXXX"
      },
      logLevel: logLevel.INFO,
      logCreator: kafkaLevel => {
        return entry => {
          const {timestamp, logger, message, stack, ...others} = entry.log;
          console.log(`[KafkaJS:${entry.namespace}] ${message} ${JSON.stringify(others)}${stack ? `: ${stack}` : ""}`);
        };
      }
    });

    const confluentKafka = new Confluent.Kafka({
      kafkaJS: {
        brokers: ["XXXX.westus2.azure.confluent.cloud:9092"],
        ssl: true,
        sasl: {
          mechanism: "plain",
          username: "XXXX",
          password: "XXXX"
        },
        logLevel: Confluent.logLevel.INFO,
        logger: new ConfluentLogger()
      }
    });

    kafkaJSConsumers = [];
    let kafkaJSConnected: number = 0;
    setImmediate(async () => {
      for (const topic of kafkaJSTopics) {
        const kafkaJSConsumer = kafkaJSKafka.consumer({groupId: `${topic}-group`});
        kafkaJSConsumer.on(kafkaJSConsumer.events.GROUP_JOIN, (event: ConsumerGroupJoinEvent) => {
          kafkaJSConnected++;
        });
        await kafkaJSConsumer.connect();
        await kafkaJSConsumer.subscribe({topic});
        await kafkaJSConsumer.run({
          eachMessage: async ({message}) => {}
        });
        kafkaJSConsumers.push(kafkaJSConsumer);
      }
    });

    confluentConsumers = [];
    let confluentConnected: number = 0;
    setImmediate(async () => {
      for (const topic of confluentTopics) {
        const confluentConsumer = confluentKafka.consumer({kafkaJS: {groupId: `${topic}-group`}});
        await confluentConsumer.connect();
        confluentConnected++;
        await confluentConsumer.subscribe({topic});
        await confluentConsumer.run({
          eachMessage: async ({message}) => {}
        });
        confluentConsumers.push(confluentConsumer);
      }
    });

    await until(async () => confluentTopics.length == confluentConnected);
    for (const consumer of confluentConsumers) {
      await until(async () => consumer.assignment().length > 0);
    }
    await until(async () => kafkaJSTopics.length == kafkaJSConnected);
  }
});

class ConfluentLogger implements Logger {
  private logLevel: Confluent.logLevel;

  constructor() {
    this.logLevel = Confluent.logLevel.INFO;
  }

  setLogLevel(logLevel: Confluent.logLevel) {
    this.logLevel = logLevel;
  }

  info = (message: string, extra?: object) => this.doLog(Confluent.logLevel.INFO, message, extra);
  error = (message: string, extra?: object) => this.doLog(Confluent.logLevel.ERROR, message, extra);
  warn = (message: string, extra?: object) => this.doLog(Confluent.logLevel.WARN, message, extra);
  debug = (message: string, extra?: object) => this.doLog(Confluent.logLevel.DEBUG, message, extra);

  namespace() {
    return this;
  }

  private doLog(level: Confluent.logLevel, message: string, extra?: object) {
    if (this.logLevel >= level) {
      console.log(`[ConfluentKafka] ${message}${extra ? ` ${JSON.stringify(extra)}` : ""}`);
    }
  }
}

async function until(condition: () => Promise<boolean>) {
  const timeout = 30000;
  const finish = Date.now() + timeout;
  while (Date.now() <= finish) {
    const result = await condition();
    if (result) return;
    await new Promise(resolve => setTimeout(resolve, 500));
  }
  fail(`Failed within ${timeout!}ms`);
}

The test for both ultimately fails to connect all the consumers and on the KafkaJS side produces many occurrences of this error (which is not present when running KafkaJS only):

[KafkaJS:Connection] Connection error: Client network socket disconnected before secure TLS connection was established {"broker":"XXXX.westus2.azure.confluent.cloud:9092","clientId":"kafkajs"}: Error: Client network socket disconnected before secure TLS connection was established
    at connResetException (node:internal/errors:787:14)
    at TLSSocket.onConnectEnd (node:_tls_wrap:1727:19)
    at TLSSocket.emit (node:events:530:35)
    at endReadableNT (node:internal/streams/readable:1696:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)

It would be helpful to understand what is conflicting here and if it can be prevented on the Confluent side or if there is a way to work around it.

I have confirmed that if I start the KafkaJS consumers before the Confluent consumers, the KafkaJS connections succeed. This is not viable in a real-world scenario however, because if later on the connection is dropped and the consumer tries to reconnect it will encounter this same issue.

Errors are not sent back to the caller

im using the kafkaJs variant and producer.send and i was observing logs like this that were not send back to the caller.

timestamp: 1717074105728
  fac: 'REQTMOUT',
  message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]: sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out 2 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests',

KafkaJS.Consumer: subscribe({ topics: [/someRegex$/] }) unexpected behavior

Environment Information

  • OS [e.g. Mac, Arch, Windows 10]: macOS 14.5 (23F79)
  • Node Version [e.g. 8.2.1]: v18.17.0
  • NPM Version [e.g. 5.4.2]: 9.6.7
  • C++ Toolchain [e.g. Visual Studio, llvm, g++]: Xcode 14
  • confluent-kafka-javascript version [e.g. 2.3.3]: "@confluentinc/kafka-javascript": "0.1.16-devel"

Steps to Reproduce

When subscribing to topics via a regex, a start-of-string anchor is silently prepended to the regex:

consumer.subscribe({ topics: [/someSuffix$/ })

This is equivalent to:

consumer.subscribe({ topics: [/^someSuffix$/ })

This happens because the underlying rdkafka library only recognizes regex subscriptions if the start with a ^ character. Instead of silently prepending a ^, I would expect the KafkaJS client behavior to either:

  • Throw an error if the regex provided doesn't start with ^, so the developer understands the constraint and can adjust their code accordingly
  • Preprend ^.*, to ensure we match all topics that matched the user's input regex of /someSuffix$/, as /^.*someSuffix$/.

KafkaJS.Consumer: unpause function returned by consumer.pause() incorrectly unpauses all partitions

Environment Information

  • OS [e.g. Mac, Arch, Windows 10]: macOS 14.5 (23F79)
  • Node Version [e.g. 8.2.1]: v18.17.0
  • NPM Version [e.g. 5.4.2]: 9.6.7
  • C++ Toolchain [e.g. Visual Studio, llvm, g++]: Xcode 14
  • confluent-kafka-javascript version [e.g. 2.3.3]: "@confluentinc/kafka-javascript": "0.1.16-devel"

Steps to Reproduce

There is a logic error in the Consumer.pause() method. See the WARNING and ERROR comments I left below.

  pause(topics) {
    if (this.#state !== ConsumerState.CONNECTED) {
      throw new error.KafkaJSError('Pause can only be called while connected.', { code: error.ErrorCodes.ERR__STATE });
    }

    for (let topic of topics) {
      if (typeof topic.topic !== 'string') {
        throw new error.KafkaJSError('Topic must be a string.', { code: error.ErrorCodes.ERR__INVALID_ARG });
      }

      if (!topic.partitions) {
        // WARNING: incorrect passing [{ topic: string, partition: number }] will result
        // in unpausing all assigned partitions for that topic!
        topic.partitions = this.#getAllAssignedPartition(topic.topic);
      }
    }
    
    // ERROR: topics was Array<{ topic: string, partitions: number[] }>,
    // but now is Array<{ topic: string, partition: number }> since it's been flattened
    // Recommendation: don't change the types of local variables
    topics = this.#flattenTopicPartitions(topics);
    if (topics.length === 0) {
      return;
    }
    this.#internalClient.pause(topics);

    /* Mark the messages in the cache as stale, runInternal* will deal with
     * making it unusable. */
    this.#messageCache.markStale(topics);

    /* If anyone's using eachBatch, mark the batch as stale. */
    topics.map(partitionKey)
      .filter(key => this.#topicPartitionToBatchPayload.has(key))
      .forEach(key => this.#topicPartitionToBatchPayload.get(key)._stale = true);

    topics.map(JSON.stringify).forEach(topicPartition => this.#pausedPartitions.add(topicPartition));

    // ERROR: this.resume(topics) expects Array<{ topic: string, partitions: number[] }>,
    // but is now Array<{ topic: string, partition: number }> since it's been flattened
    // Given the branch `if (!topic.partitions)` above, we expect to unpause all partitions
    return () => this.resume(topics);
  }

Facing issues with client connection

Environment Information

  • OS [e.g. Mac, Arch, Windows 10]: MacOS Sonama 14.3.1
  • Node Version [e.g. 8.2.1]: Node.js v20.11.0
  • NPM Version [e.g. 5.4.2]: 10.2.4
  • C++ Toolchain [e.g. Visual Studio, llvm, g++]: gcc --version
    Apple clang version 15.0.0 (clang-1500.1.0.2.5)
    Target: arm64-apple-darwin23.3.0
    Thread model: posix
    InstalledDir: /Library/Developer/CommandLineTools/usr/bin
  • confluent-kafka-javascript version [e.g. 2.3.3]: "^0.1.10-devel"

Steps to Reproduce
Installed the package using npm: npm install @confluentinc/kafka-javascript
Used original example provided as shown in the welcome page for producing the message

confluent-kafka-javascript Configuration Settings

const  { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS
const kafka = new Client({
    KafkaJS: {
      brokers: ["xxxxxxxxxx"],
      ssl: true,
      sasl: {
        mechanism: "plain",
        username: "xxxxxxx",
        password:
          "yyyyyyyyyyyyy",
      },
    },
  });

Team informed me about the early access version and I am trying to see how the package works and was not able to start the client at all and gives me following errors.

const kafka = new Client({
                ^

ReferenceError: Client is not defined
    at producerStart (/Users/sai/Documents/Development/messaging/confluent.js:9:17)
    at Object.<anonymous> (/Users/sai/Documents/Development/messaging/confluent.js:47:1)
    at Module._compile (node:internal/modules/cjs/loader:1376:14)
    at Module._extensions..js (node:internal/modules/cjs/loader:1435:10)
    at Module.load (node:internal/modules/cjs/loader:1207:32)
    at Module._load (node:internal/modules/cjs/loader:1023:12)
    at Function.executeUserEntryPoint [as runMain] (node:internal/modules/run_main:135:12)
    at node:internal/main/run_main_module:28:49

I did do some extra checks to see if everything package is working as expected or not and was able to see the features and librdversion

const details = require("@confluentinc/kafka-javascript").features;

Result:

[
  'gzip',             'snappy',
  'ssl',              'sasl',
  'regex',            'lz4',
  'sasl_plain',       'sasl_scram',
  'plugins',          'zstd',
  'sasl_oauthbearer', 'http',
  'oidc'
]

Mismatches between`types/kafkajs.d.ts` and documentation

While working on a conversion from KafkaJS, I've encountered the following:

  • Per MIGRATION.md: acks, compression and timeout are not set on a per-send basis, yet ProducerRecord interface still contains acks, compression, and timeout leading one to believe these can be set per message.
  • According to MIGRATION.md retry option from existing KafkaJS config is supported, yet it is missing from KafkaConfig interface.
  • No mention is made of supporting batch message handling in MIGRATION.md and ConsumerRunConfig type only includes eachMessage?: EachMessageHandler and not eachBatch?: EachBatchHandler, yet EachBatchHandler and its supporting types are still present.
  • compression in producer config (and in kafkajs.d.ts) says to use the CompressionTypes enum. Doing so results in error message like: Error: Invalid value "3" for configuration property "compression.codec". To get this working, I had to use the native values from compression.codec: 'none' | 'gzip' | 'snappy' | 'lz4' | 'zstd' and in code as unknown as CompressionTypes to avoid compile-time errors.
  • the consumer on event methods are present in kafkajs.d.ts, they are not mentioned in MIGRATION.md, but use of them throws a "not implemented" error. Is the goal to provide implementation for these?

ConfluentJavascript.KafkaJS.Consumer["assignment"] typings incorrect

Environment Information

  • OS [e.g. Mac, Arch, Windows 10]: macOS 14.5 (23F79)
  • Node Version [e.g. 8.2.1]: v18.17.0
  • NPM Version [e.g. 5.4.2]: 9.6.7
  • C++ Toolchain [e.g. Visual Studio, llvm, g++]: Xcode 14
  • confluent-kafka-javascript version [e.g. 2.3.3]: "@confluentinc/kafka-javascript": "0.1.16-devel"

Steps to Reproduce

The typings for ConfluentJavascript.KafkaJS.Consumer["assignment"] is wrong for me.

The typing in the Javascript source code comment has the return type is import("../../types/kafkajs").TopicPartition[] aka Array<{ topic: string, partition: number }>, but the typing in the type declaration file used by Typescript and my IDE is TopicPartitions[] aka Array<{ topic: string, partitions: number[] }>. This caused a runtime error in my code

confluent-kafka-javascript Configuration Settings

n/a

Additional context

n/a

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.