Giter VIP home page Giter VIP logo

kafkajs's Introduction

npm version npm pre-release version Build Status Slack Channel

Logo

KafkaJS

A modern Apache Kafka® client for Node.js
Get Started »

Read the Docs · Report Bug · Request Feature

Table of Contents

About the Project

KafkaJS is a modern Apache Kafka client for Node.js. It is compatible with Kafka 0.10+ and offers native support for 0.11 features.

KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use by KafkaJS. KafkaJS has no affiliation with and is not endorsed by The Apache Software Foundation.

Sponsors ❤️

Upstash: Serverless Kafka

  • True Serverless Kafka with per-request-pricing
  • Managed Apache Kafka, works with all Kafka clients
  • Built-in REST API designed for serverless and edge functions
  • Start for free in 30 seconds!
Logo

Get help directly from a KafkaJS developer

  • Become a Github Sponsor to have a video call with a KafkaJS developer
  • Receive personalized support, validate ideas or accelerate your learning
  • Save time and get productive sooner, while supporting KafkaJS!
  • See support options!

To become a sponsor, reach out in our Slack community to get in touch with one of the maintainers. Also consider becoming a Github Sponsor by following any of the links under "Sponsor this project" in the sidebar.

Features

  • Producer
  • Consumer groups with pause, resume, and seek
  • Transactional support for producers and consumers
  • Message headers
  • GZIP compression
    • Snappy, LZ4 and ZSTD compression through pluggable codecs
  • Plain, SSL and SASL_SSL implementations
  • Support for SCRAM-SHA-256 and SCRAM-SHA-512
  • Support for AWS IAM authentication
  • Admin client

Getting Started

npm install kafkajs
# yarn add kafkajs

Usage

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['kafka1:9092', 'kafka2:9092']
})

const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {
  // Producing
  await producer.connect()
  await producer.send({
    topic: 'test-topic',
    messages: [
      { value: 'Hello KafkaJS user!' },
    ],
  })

  // Consuming
  await consumer.connect()
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

run().catch(console.error)

Learn more about using KafkaJS on the official site!

Read something on the website that didn't work with the latest stable version?
Check the pre-release versions - the website is updated on every merge to master.

Contributing

KafkaJS is an open-source project where development takes place in the open on GitHub. Although the project is maintained by a small group of dedicated volunteers, we are grateful to the community for bug fixes, feature development and other contributions.

See Developing KafkaJS for information on how to run and develop KafkaJS.

Help wanted 🤝

We welcome contributions to KafkaJS, but we also want to see a thriving third-party ecosystem. If you would like to create an open-source project that builds on top of KafkaJS, please get in touch and we'd be happy to provide feedback and support.

Here are some projects that we would like to build, but haven't yet been able to prioritize:

Contact 💬

Join our Slack community

License

See LICENSE for more details.

Acknowledgements

Apache Kafka and Kafka are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries. KafkaJS has no affiliation with the Apache Software Foundation.

kafkajs's People

Contributors

adalbert-homa avatar ankon avatar arszen123 avatar artysidorenko avatar brianphillips avatar dariofilkovic avatar dashie avatar dependabot[bot] avatar flucivja avatar goriunov avatar goriunovphl avatar ianwsperber avatar jaaprood avatar jonathanhaviv avatar julienvincent avatar klippx avatar markgaylard avatar nevon avatar nirga avatar paambaati avatar pimpelsang avatar priitkaard avatar rob3000 avatar ronfarkash avatar shubhanilbag avatar slava avatar t-d-d avatar tulios avatar uwburn avatar waleedashraf avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafkajs's Issues

React Native Support

Hi There @tulios,

This Library looks very promising!
I would like to ask you if your platform supports React Native as a client

Thank you in advance!

Allow API version lock

Currently, kafkaJS uses the API versions call to decide on which API to enable. This approach is the way forward, but to have a smooth transition between the stable APIs and the new APIs introduced by this initiative the project should be able to lock the used APIs.

const { Kafka, BROKER_VERSIONS } = require('kafkajs')

new Kafka({
  brokerVersion: BROKER_VERSIONS.V0_10
})

eventually, the default value should be set to BROKER_VERSIONS.AUTO which effectively uses the best implemented version (current behavior).

Validate message format in producer

KafkaJS won't raise any errors if the message doesn't have a value attribute and this might be hard to debug.

Messages without value (e.g: { key: 'key1'}) should raise an error.

How to shutdown consumer when using eachBatch

The eachBatch callback offers a more advanced use of the consumer, the current implementation has a drawback which prevents it to be fully used. Currently, the offset of the last message will be resolved after the callback runs, but when the consumer is shutting down there is no way to stop consuming the batch without committing the last offset, so the consumer has to wait for the full batch to be consumed before it can shut down, which might take a while and get the consumer killed by the OS, Docker, etc.

This callback already receives isRunning and resolveOffset but it doesn't offer any mechanisms to prevent the last offset from being committed.

Suggestions:

  1. Expose a function which should be called for each message, this function will interrupt the function and allow the consumer to commit the last resolved offset instead of the last offset in the batch.
function eachBatch({ batch, resolveOffset, stopIfNotRunning }) {
  for (let message of batch.messages) {
    await stopIfNotRunning()
    await processMessage(message)
    await resolveOffset(message.offset)
  }
}
  1. Expose a function to register a function to run after eachBatch is complete
function eachBatch({ batch, resolveOffset, after }) {
  try {
    for (let message of batch.messages) {
      await processMessage(message)
      await resolveOffset(message.offset)
    }
  } catch (e) {
    after(() => {
      // just log
      // or resolveOffset(batch.lastOffset())
    })
  }
}
  1. Add an option to disable the auto-resolve from eachBatch
consumer.run({
  eachBatchAutoResolve: false,
  eachBatch: ({ batch, resolveOffset, isRunning }) => {
    try {
      for (let message of batch.messages) {
        if (!isRunning()) break
        await processMessage(message)
        await resolveOffset(message.offset)
      }
    } catch (e) {
      if (e instanceof SkipToTheLastOffsetOfTheBatchError) {
        await resolveOffset(batch.lastOffset())
      }
    }
  }
})

Commit resolved offsets on group rebalance

We should commit the already resolved offsets when we get REBALANCE_IN_PROGRESS. Currently, we are dropping the offsets to perform the rebalance.

this.consumerGroup.resolveOffset({ topic, partition, offset: message.offset })
await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval })

consumer#heartbeat can throw the error and skip the commit phase, which is done by the fetch function.

await this.consumerGroup.commitOffsets()

I think it should be something like:

// ...
try {
  await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval })
} catch (e) {
  if (e.type === 'REBALANCE_IN_PROGRESS') {
    await this.consumerGroup.commitOffsets()
  }
  throw e
}

In this way, we "save" the work already done before we rebalance

Flaky test in connection.spec.js

The test relies on a local network setup, which is not guaranteed to end up in connection timeout for a specific hardcoded IP address.

 ● Network > Connection › #connect › PLAINTEXT › rejects the Promise in case of errors

   expect(object).toHaveProperty(path, value)

   Expected the object:
     [Error: Connection error: connect ECONNREFUSED 99.99.99.99:9092]
   To have a nested property:
     "message"
   With a value of:
     "Connection timeout"


     at Object.<anonymous> (node_modules/jest-matchers/build/index.js:176:51)
         at Generator.throw (<anonymous>)
         at <anonymous>
     at process._tickCallback (internal/process/next_tick.js:188:7)

Validate batch checksum

A lot of the errors we've seen have been because we've been reading the wrong amount of bytes for each message or batch. All of those errors would have been caught if we were validating the checksum that we get with each batch/message.

As it could imply a performance hit, we might want to conditionally use that validation.


There's a pretty good reference on how to perform the checksum validation on the batch level in the Java client:
https://github.com/apache/kafka/blob/647afeff6a2e3fd78328f6989e8d9f96bcde5121/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L37-L76

On the message level, things look more dire:

https://github.com/apache/kafka/blob/cea319a4ad9c55d3d3263cf7a4224c25772d0e11/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java#L197-L204

For v2 and above, there is no checksum, because the checksum that was computed by the producer might not match what the broker sends, intentionally.

So basically we could do verification on each batch, and throw a retriable error if the checksum doesn't match.

How to correct disconnect multiple consumers?

I have some trouble with disconnecting multiple consumers I have started with different groupId's.
Hopefully you can help me.

This is my code for disconnecting...

/**
 * Service stopped lifecycle event handler
 */
async stopped() {

    await Promise.all(this.subscriptions.map(async (subscription) => {
        try {
            await subscription.consumer.pause( [ { topic: subscription.topic } ]);
            this.logger.info(`Consumer for subscription ${subscription.id} paused`);
        } catch(err) {
            this.logger.warn(`Pause consumer for subscription ${subscription.id} failed`);
        }
        Promise.resolve();
    }));
    await Promise.all(this.subscriptions.map(async (subscription) => {
        try {
            await subscription.consumer.disconnect();
            this.logger.info(`Consumer for subscription ${subscription.id} disconnected`);
        } catch(err) {
            this.logger.warn(`Disconnecting consumer for subscription ${subscription.id} failed`);
        }
        Promise.resolve();
    }));
    this.logger.info(`All consumers disconnected`);

}
`
I'm getting the following log:
`
  console.log lib\flow.subscriber.js:139
    ConsumerGroup INFO Pausing fetching from 1 topics { timestamp: '2018-05-29T19:10:29.954Z',
      logger: 'kafkajs',
      message: 'Pausing fetching from 1 topics',
      topics: [ 'events' ] }

  console.log lib\flow.subscriber.js:139
    ConsumerGroup INFO Pausing fetching from 1 topics { timestamp: '2018-05-29T19:10:29.959Z',
      logger: 'kafkajs',
      message: 'Pausing fetching from 1 topics',
      topics: [ 'events' ] }
  console.log lib\flow.subscriber.js:139
    Runner DEBUG stop consumer group { timestamp: '2018-05-29T19:10:29.965Z',
      logger: 'kafkajs',
      message: 'stop consumer group',
      groupId: 'f0c54a7b-1cbd-4248-8471-ae788c3566db',
      memberId: 'flow.subscriber1527621028637-9bef2c45-b84f-40f5-b670-c8b687240b51' }

  console.log lib\flow.subscriber.js:139
    Runner DEBUG stop consumer group { timestamp: '2018-05-29T19:10:29.981Z',
      logger: 'kafkajs',
      message: 'stop consumer group',
      groupId: '6bb861e2-e250-4f79-af08-3e9acecd2d45',
      memberId: 'flow.subscriber1527621028637-80fc7d53-6b5f-471c-b9de-504d750e8804' }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Request LeaveGroup(key: 13, version: 0) { timestamp: '2018-05-29T19:10:29.983Z',
      logger: 'kafkajs',
      message: 'Request LeaveGroup(key: 13, version: 0)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      correlationId: 12,
      size: 147 }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Request LeaveGroup(key: 13, version: 0) { timestamp: '2018-05-29T19:10:29.985Z',
      logger: 'kafkajs',
      message: 'Request LeaveGroup(key: 13, version: 0)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      correlationId: 12,
      size: 147 }

  console.info node_modules\moleculer\src\logger.js:112
    [2018-05-29T19:10:29.987Z] INFO  nbtpt510-al-7796/FLOW.PUBLISHER: Producer disconnectied

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Response Fetch(key: 1, version: 2) { timestamp: '2018-05-29T19:10:34.452Z',
      logger: 'kafkajs',
      message: 'Response Fetch(key: 1, version: 2)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      correlationId: 11,
      size: 42,
      data: '[filtered]' }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Request Heartbeat(key: 12, version: 0) { timestamp: '2018-05-29T19:10:34.481Z',
      logger: 'kafkajs',
      message: 'Request Heartbeat(key: 12, version: 0)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      correlationId: 13,
      size: 151 }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Response LeaveGroup(key: 13, version: 0) { timestamp: '2018-05-29T19:10:34.487Z',
      logger: 'kafkajs',
      message: 'Response LeaveGroup(key: 13, version: 0)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      correlationId: 12,
      size: 6,
      data: { errorCode: 0 } }

  console.log lib\flow.subscriber.js:139
    Consumer DEBUG consumer has stopped, disconnecting { timestamp: '2018-05-29T19:10:34.491Z',
      logger: 'kafkajs',
      message: 'consumer has stopped, disconnecting',
      groupId: 'f0c54a7b-1cbd-4248-8471-ae788c3566db' }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG disconnecting... { timestamp: '2018-05-29T19:10:34.495Z',
      logger: 'kafkajs',
      message: 'disconnecting...',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637' }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG disconnected { timestamp: '2018-05-29T19:10:34.499Z',
      logger: 'kafkajs',
      message: 'disconnected',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637' }

  console.log lib\flow.subscriber.js:139
    Consumer INFO Stopped { timestamp: '2018-05-29T19:10:34.500Z',
      logger: 'kafkajs',
      message: 'Stopped',
      groupId: 'f0c54a7b-1cbd-4248-8471-ae788c3566db' }

  console.info node_modules\moleculer\src\logger.js:112
    [2018-05-29T19:10:34.502Z] INFO  nbtpt510-al-7796/FLOW.SUBSCRIBER: Consumer for subscription f0c54a7b-1cbd-4248-8471-ae788c3566db disconnected

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Response Fetch(key: 1, version: 2) { timestamp: '2018-05-29T19:10:34.503Z',
      logger: 'kafkajs',
      message: 'Response Fetch(key: 1, version: 2)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      correlationId: 11,
      size: 42,
      data: '[filtered]' }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Request Heartbeat(key: 12, version: 0) { timestamp: '2018-05-29T19:10:34.505Z',
      logger: 'kafkajs',
      message: 'Request Heartbeat(key: 12, version: 0)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      correlationId: 13,
      size: 151 }
`
now somthing went wrong...

`
  console.log lib\flow.subscriber.js:139
    Connection ERROR Response Heartbeat(key: 12, version: 0) { timestamp: '2018-05-29T19:10:34.508Z',
      logger: 'kafkajs',
      message: 'Response Heartbeat(key: 12, version: 0)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      error: 'The coordinator is not aware of this member',
      correlationId: 13,
      size: 6 }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Response Heartbeat(key: 12, version: 0) { timestamp: '2018-05-29T19:10:34.511Z',
      logger: 'kafkajs',
      message: 'Response Heartbeat(key: 12, version: 0)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      error: 'The coordinator is not aware of this member',
      correlationId: 13,
      payload: <Buffer 00 19> }

  console.log lib\flow.subscriber.js:139
    Runner ERROR The coordinator is not aware of this member, re-joining the group { timestamp: '2018-05-29T19:10:34.515Z',
      logger: 'kafkajs',
      message: 'The coordinator is not aware of this member, re-joining the group',
      groupId: 'f0c54a7b-1cbd-4248-8471-ae788c3566db',
      memberId: 'flow.subscriber1527621028637-9bef2c45-b84f-40f5-b670-c8b687240b51',
      error: 'The coordinator is not aware of this member',
      retryCount: 0,
      retryTime: 51 }
`

Move retry on low level network errors to connection

Currently, all errors are bubbled up to high-level entities. KafkaJS should handle low-level errors such as request timeout inside the connection automatically.

Log example:

{
  broker: "kafka:9094",
  clientId: "myClientId",
  correlationId: 2785	
  error: "The request timed out",
  level: "ERROR",
  logger: "kafkajs",
  message: "[Connection] Response OffsetCommit(key: 8, version: 2)",
  size: 70	,
  timestamp: "2018-03-26T16:17:48.521Z"
}

Method to stop consumers without disconnecting the cluster

Currently, the disconnect method will stop the consumer and disconnect the cluster causing issues for other consumers sharing the same connection. Example:

const kafka = new Kafka(...)
const consumers = [
  kafka.consumer({ groupId: 'g1' }),
  kafka.consumer({ groupId: 'g2' }),
  kafka.consumer({ groupId: 'g3' })
]

await Promise.all(consumers.map(c => c.connect()))

// This will work fine for the first consumers that manage to disconnect, but it will cause issues
// to the other ones
await Promise.all(consumers.map(c => c.disconnect()))

The suggestion is to add a new stop method in the consumer which will stop the consumers but keep the cluster connected and to add a disconnect method to the client, to disconnect the cluster.

Example:

// ...
await Promise.all(consumers.map(c => c.connect()))
await Promise.all(consumers.map(c => c.stop()))
await kafka.disconnect()

Improve response object from produce#send

produce#send is returning the result from broker#produce which is a bad user experience and bad for future designs, it should return it's own object with the necessary data (e.g.: partition, topic).

Include snappy compression (or working example)

Hi,

Tried to do it myself according to example from readme.md, but hitting "Invalid input" error.

So it would be usefull to have either built in support (auto-detecting snappy module presence if don't want to include c++ dependency) or at least working example in /examples folder.

My code:

//snappy-compression.js
var snappy = require('snappy');

const { promisify } = require('util');
const snappyCompress = promisify(snappy.compress)
const snappyDecompress = promisify(snappy.uncompress)

module.exports = {
  async compress(encoder) {
    return await snappyCompress(encoder.buffer)
  },

  async decompress(buffer) {
      return await snappyDecompress(buffer)
  }
}

//index.js
const { Kafka, logLevel, CompressionTypes, CompressionCodecs } = require('kafkajs')

CompressionCodecs[CompressionTypes.Snappy] = () => require('./snappy-compression');

// Create the client with the broker list
const kafka = new Kafka({...

I see data arriving on INFO level logs, snappy decompress run but unable to decompress the buffer.

{"level":"DEBUG","timestamp":"2018-08-24T19:39:21.578Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 3)","broker":"IP.IP.IP.IP:9092","clientId":"kafkajs-app","error":"Invalid input","correlationId":3,"payload":{"type":"Buffer","data":[0,0,0,0,0,0,0,1,0,19,100,101,98,101,122,105,117,109,45,104,101,97,114...

Using kafka 1.1, producing data with java client

Implement a means to react on retries

Context

Currently we're facing a situation, where we either swallow an exception from a message handler immediately after it's raised, or wait for the consumer to die because the heartbeats timing out after retrying too many times.

We'd need something like a hook or callback after all the retries are used up on trying to handle a certain message, in which we can decide to prevent the client crashing by committing the event. Alternatively a counter that tells us how deep are we in the retry-cycle, and act with our own logic.

Proposal

  • Implement a retry option, that would be called with the last or all exceptions during retrying after all the retries are exceeded.
  • Feed the retry cycle counter to the eachMessage handler, or the batch message handler.

Additional methods on admin API

Currently the Admin API has the functionality to create topics and adjust topic offsets. It would be nice to also be able to do the following to avoid having to use multiple libraries / clients.

  • Remove topics
  • Update topic partitions
  • Update topic replication factor

Consumer receiving topic metadata without being subscribed

We saw this behavior when a service was re-using the same group id as a different service. They were getting metadata for a completely different topic than the one they were subscribed to, causing KafkaJS to crash the consumer when trying to find the leader for a partition: Cannot read property ‘partitionMetadata’ of undefined.

I.e.

  1. Service A with group id consumer subscribes to topic-1 and consumes messages.
  2. Service B with group id consumer subscribes to topic-2 and tries to consume messages.
  3. Service B consumer crashes because it only gets topic metadata for topic-1.

Project logo

I was thinking about an octopus tentacle, drawn in a way the looks like the Kafka logo. I'm no designer but I made an attempt using Sketch and I quite like it 😆

tentacle

@sebastiannorde has the proper skill set to solve this problem and has helped with other projects in the past, can you give us a hand here?

Maybe we can tweak the colors to use the Node.js green, @matthiasfeist @Nevon WDYT?

Constantly refresh metadata for consumers and producers

Add support to metadata max age. From the Kafka docs:

metadata.max.age.ms | The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. | long | 300000

On producers:

kafka.producer({ metadataMaxAgeInMs: 300000 })

On consumers:

kafka.consumer({ groupId: 'my-group', metadataMaxAgeInMs: 300000 })

Kafka default is 5 minutes; we should keep the same.

Handle missing username/password during authentication

If username or password is missing during SASL SCRAM authentication, a very unhelpful error is thrown: [BrokerPool] KafkaJSSASLAuthenticationError: SASL SCRAM SHA256 authentication failed: Cannot read property 'replace' of undefined

It would be better if we validated the username and password early and threw a more descriptive error.

Allow producer to use v3+ API

Make sure the broker abstraction can use the new API, and the producer can be used with all supported versions of Kafka

Handle parallel calls during SCRAM authentication flow

If you are producing to multiple topics in parallel, the broker gets confused during the SCRAM authentication flow. From the broker perspective, it looks something like this:

  1. Client -> I want to authenticate as Nevon
  2. Broker <- Sure, send me the proof
  3. Client -> I want to authenticate as Nevon
  4. Broker <- Uh... What?
  5. Client -> Here's the proof!
  6. Broker <- ... Are you okay?

When the client is in the SCRAM authentication flow, we should set some kind of flag so that we don't start it again if another request causes us to try to authenticate again.

Map new error codes

Currently, KafkaJS has protocol errors mapped up code 60.

https://github.com/tulios/kafkajs/blob/master/src/protocol/error.js

New errors:

ERROR CODE RETRIABLE DESCRIPTION
DELEGATION_TOKEN_AUTH_DISABLED 61 False Delegation Token feature is not enabled.
DELEGATION_TOKEN_NOT_FOUND 62 False Delegation Token is not found on server.
DELEGATION_TOKEN_OWNER_MISMATCH 63 False Specified Principal is not valid Owner/Renewer.
DELEGATION_TOKEN_REQUEST_NOT_ALLOWED 64 False Delegation Token requests are not allowed on PLAINTEXT/1-way SSL channels and on delegation token authenticated channels.
DELEGATION_TOKEN_AUTHORIZATION_FAILED 65 False Delegation Token authorization failed.
DELEGATION_TOKEN_EXPIRED 66 False Delegation Token is expired.
INVALID_PRINCIPAL_TYPE 67 False Supplied principalType is not supported
NON_EMPTY_GROUP 68 False The group The group is not empty is not empty
GROUP_ID_NOT_FOUND 69 False The group id The group id does not exist was not found
FETCH_SESSION_ID_NOT_FOUND 70 True The fetch session ID was not found
INVALID_FETCH_SESSION_EPOCH 71 True The fetch session epoch is invalid

Produce to multiple topics at once

Currently you can produce to only a single topic, but in case you want to produce to multiple topics, you currently have to make multiple calls.

Instead of doing this, we should provide the ability to produce to multiple topics with one call. For example:

await producer.sendBatch([{
  topic: 'foo',
  messages: [ ... ]
},
{
  topic: 'bar',
  messages: [ ... ]
}])

Where sendBatch would take an array of topic-messages (same object that Producer.send currently receives).

Support private CAs similar to how node does

I suggest that we handle CAs similar to how Node does, if possible. Node uses standard CAs
and you can add more by setting NODE_EXTRA_CA_CERTS to extra certs. Doing it this way makes it much easier support private CAs without copying a bunch of boilerplate code.

Add support to fetch protocol versions v4 to v6

Fetch Request (Version: 4) => replica_id max_wait_time min_bytes max_bytes isolation_level [topics] 
  replica_id => INT32
  max_wait_time => INT32
  min_bytes => INT32
  max_bytes => INT32
  isolation_level => INT8
  topics => topic [partitions] 
    topic => STRING
    partitions => partition fetch_offset max_bytes 
      partition => INT32
      fetch_offset => INT64
      max_bytes => INT32
Fetch Response (Version: 4) => throttle_time_ms [responses] 
  throttle_time_ms => INT32
  responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition_header record_set 
      partition_header => partition error_code high_watermark last_stable_offset [aborted_transactions] 
        partition => INT32
        error_code => INT16
        high_watermark => INT64
        last_stable_offset => INT64
        aborted_transactions => producer_id first_offset 
          producer_id => INT64
          first_offset => INT64
      record_set => RECORDS

More information at:
https://kafka.apache.org/protocol.html#The_Messages_Fetch

Investigate API v7

Don't retry on all brokers when producing

return await sendMessages({
topic,
messages,
acks,
timeout,
compression,
})

If one fails, we currently retry all, which could lead to duplicate messages.

const requests = Object.keys(partitionsPerLeader).map(async nodeId => {
const partitions = partitionsPerLeader[nodeId]
const topicData = createTopicData({ topic, partitions, messagesPerPartition })
const broker = await cluster.findBroker({ nodeId })
const response = await broker.produce({ acks, timeout, compression, topicData })
return responseSerializer(response)
})
return flatten(await Promise.all(requests))

Instead, we should only retry the ones that fail.

Assign partitions based on lag

In cases where the offset lag is unevenly spread across partitions, it makes sense to assign partitions across consumers based on the current offset lag, rather than completely round robin.

Essentially:

  1. For each topic
    1.1. Calculate the offset lag for each partition in the topic
    1.2. Sort the partitions by offset lag in descending order
    1.3. For each partition
    1.3.1. Assign the partition to the consumer with the lowest number of assigned partitions (across all topics)
    1.3.2. If two or more consumers have the same number of assigned partitions, assign the partition to the one where the sum of the offset lag for the assigned partitions is the lowest

Add ability to pause consumer

Consumer should expose pause and resume methods, accepting a list of topic-partitions. When a topic-partition is "paused", no data will be fetched from that topic-partition.

One use case is to be able to add a kill switch to a consumer to quickly be able to stop consuming from that partition.

Q: How should this affect partition reassignment?

Verify buffer size before reading

#11 was caused by attempting to read the offset and size from the buffer without checking that there were enough bytes in the buffer. The fix was to simply do that check first:

if (!decoder.canReadInt64()) {
throw new KafkaJSPartialMessageError(
`Tried to decode a partial message: There isn't enough bytes to read the offset`
)
}
const offset = decoder.readInt64().toString()

However, we don't want to have to do these checks manually everywhere we try to read from a buffer. We could check the buffer size in the decoder itself, but it doesn't have the knowledge about messages etc, so it would just be able to throw very generic errors.

The suggestion is to never use theDecoder directly, but rather create specific decoders for what we are trying to decode, for example MessageDecoder, which can call the primitive methods on the Decoder, catch the generic errors and re-throw more specific ones.

Add support to RecordBatch protocol

In Kafka 0.11, the structure of the 'MessageSet' and 'Message' were significantly changed. Not only were new fields added to support new features like exactly once semantics and record headers, but the recursive nature of the previous versions of the message format was eliminated in favor of a flat structure. A 'MessageSet' is now called a 'RecordBatch', which contains one or more 'Records' (and not 'Messages'). When compression is enabled, the RecordBatch header remains uncompressed, but the Records are compressed together. Further, multiple fields in the 'Record' are variant encoded, which leads to significant space savings for larger batches.

The new message format has a Magic value of 2. Its structure is as follows:

RecordBatch =>
  FirstOffset => int64
  Length => int32
  PartitionLeaderEpoch => int32
  Magic => int8 
  CRC => int32
  Attributes => int16
  LastOffsetDelta => int32
  FirstTimestamp => int64
  MaxTimestamp => int64
  ProducerId => int64
  ProducerEpoch => int16
  FirstSequence => int32
  Records => [Record]
  
Record =>
  Length => varint
  Attributes => int8
  TimestampDelta => varint
  OffsetDelta => varint
  KeyLen => varint
  Key => data
  ValueLen => varint
  Value => data
  Headers => [Header]
  
Header => HeaderKey HeaderVal
  HeaderKeyLen => varint
  HeaderKey => string
  HeaderValueLen => varint
  HeaderValue => data

More information can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

Support plugins

In order to support klarna/phobos_db_checkpoint, we would need to expose some kind of plugin interface.

In order to support our first use case, the plugin would need to be able to inject arguments into eachMessage and eachBatch, and be responsible for actually invoking the user's eachMessage/eachBatch in order to be able to catch errors.

Add support to producer protocol versions v3 to v5

Produce Request (Version: 3) => transactional_id acks timeout [topic_data] 
  transactional_id => NULLABLE_STRING
  acks => INT16
  timeout => INT32
  topic_data => topic [data] 
    topic => STRING
    data => partition record_set 
      partition => INT32
      record_set => RECORDS
Produce Response (Version: 3) => [responses] throttle_time_ms 
  responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition error_code base_offset log_append_time 
      partition => INT32
      error_code => INT16
      base_offset => INT64
      log_append_time => INT64
  throttle_time_ms => INT32

More information here:
https://kafka.apache.org/protocol.html#The_Messages_Produce

Handle changes in brokers when retrying producing

TypeError: Cannot read property 'map' of undefined
  File "/opt/app/node_modules/kafkajs/src/producer/createTopicData.js", line 4, col 28, in module.exports
    partitions: partitions.map(partition => ({
  File "/opt/app/node_modules/kafkajs/src/producer/sendMessages.js", line 46, col 27, in brokersWithoutResponse.map
    const topicData = createTopicData({ topic, partitions, messagesPerPartition })
  ?, in Array.map
  File "/opt/app/node_modules/kafkajs/src/producer/sendMessages.js", line 44, col 37, in createProducerRequests
    return brokersWithoutResponse.map(async broker => {
  ?, in null.<anonymous>

Calling `seek` within `eachMessage` does not abort the current batch

In my dead-letter queue plugin, when I fail to send a message to the dead-letter queue, I need to seek back to the failed message in order to re-process it. This works, as on the subsequent fetch, the message will be re-processed. However, the current batch is first completed, which means that any other messages in the batch after the one that fails gets processed twice.

https://github.com/Nevon/kafkajs-dlq/blob/5a6f00c3de75abf71640a88ff564ab5a5691f7a6/src/consumer.integration.spec.js#L149-L163

I would like a way to abort the current batch. Either immediately as an effect of calling seek within eachMessage, or in some other way (perhaps by throwing a specific type of error).

Allow consumer to use v4+ API

Make sure the broker abstraction can use the new API, and the consumer can be used with all supported versions of Kafka

docker-compose for Kafka 0.11

Add a new docker-compose file with 0.11 Kafka. Ideally, Travis will run both versions to make sure the project is working with all supported versions, but we can start by supporting this behavior locally. Something like:

KAFKA_VERSION=0.11 yarn test

Runs the docker-compose.0_11.yml and run the tests.

The default docker-compose will change after we flag this version as stable.

Create Topic

Is it planned to add a method to create new topics?
( running kafka with auto.create.topics.enable )

Expose logger

I'm currently building a "plugin" that wraps around consumer.eachMessage. In some cases, I would like to log some information. At the moment, the user needs to provide the logger to the plugin, but I would rather be able to get KafkaJS's logger from the consumer or producer, so that I can log "as" kafkajs (in the same format, following the same log level settings).

For example:

const client = new Kafka({ ... })

const consumer = kafka.consumer({ ... })
const consumerLogger = consumer.logger()

const producer = kafka.producer({ ... })
const producerLogger = producer.logger()

`client.logger` is not a function

The method and the field on the root client both have the same name, so if you try to call logger to get the current logger instance, you will get an error since it will have been reassigned to the logger instance on construction.

SSL Connection Error: Hostname/IP doesn't match certificate's altnames

I'm using the Heroku kafka addon. The hosts are just ec2 hosts (eg. ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com), but the certs CN is a random alpha string. Using "rejectUnauthorized": false works but then it does not verify the cert is signed by the provided CA. Is there any way to ignore the hostname match but keep all the rest of the verification?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.