Giter VIP home page Giter VIP logo

xk6-kafka's Introduction

xk6-kafka logo xk6-kafka

GitHub Workflow Status Docker Pulls Coverage Status Go Reference

The xk6-kafka project is a k6 extension that enables k6 users to load test Apache Kafka using a producer and possibly a consumer for debugging.

The real purpose of this extension is to test the system you meticulously designed to use Apache Kafka. So, you can test your consumers, hence your system, by auto-generating messages and sending them to your system via Apache Kafka.

You can send many messages with each connection to Kafka. These messages are arrays of objects containing a key and a value in various serialization formats, passed via configuration objects. Various serialization formats are supported, including strings, JSON, binary, Avro, and JSON Schema. Avro and JSON Schema can either be fetched from Schema Registry or hard-code directly in the script. SASL PLAIN/SCRAM authentication and message compression are also supported.

For debugging and testing purposes, a consumer is available to make sure you send the correct data to Kafka.

If you want to learn more about the extension, read the article (outdated) explaining how to load test your Kafka producers and consumers using k6 on the k6 blog. You can also watch this recording of the k6 Office Hours about this extension.

Supported Features

Download Binaries

The Official Docker Image

The official Docker image is available on Docker Hub. Before running your script, make the script available to the container by mounting a volume (a directory) or passing it via stdin.

docker run --rm -i mostafamoradian/xk6-kafka:latest run - <scripts/test_json.js

The Official Binaries

The binaries are generated by the build process and published on the releases page. Currently, binaries for the GNU/Linux, macOS, and Windows on amd64 (x86_64) machines are available.

Note: If you want to see an official build for your machine, please build and test xk6-kafka from source and then create an issue with details. I'll add the specific binary to the build pipeline and publish them on the next release.

Build from Source

You can build the k6 binary on various platforms, each with its requirements. The following shows how to build k6 binary with this extension on GNU/Linux distributions.

Prerequisites

You must have the latest Go version installed to build the k6 binary. The latest version should match k6 and xk6. I recommend gvm because it eases version management.

  • gvm for easier installation and management of Go versions on your machine
  • Git for cloning the project
  • xk6 for building k6 binary with extensions

Install and build the latest tagged version

Feel free to skip the first two steps if you already have Go installed.

  1. Install gvm by following its installation guide.

  2. Install the latest version of Go using gvm. You need Go 1.4 installed for bootstrapping into higher Go versions, as explained here.

  3. Install xk6:

    go install go.k6.io/xk6/cmd/xk6@latest
  4. Build the binary:

    xk6 build --with github.com/mostafa/xk6-kafka@latest

Note You can always use the latest version of k6 to build the extension, but the earliest version of k6 that supports extensions via xk6 is v0.32.0. The xk6 is constantly evolving, so some APIs may not be backward compatible.

Build for development

If you want to add a feature or make a fix, clone the project and build it using the following commands. The xk6 will force the build to use the local clone instead of fetching the latest version from the repository. This process enables you to update the code and test it locally.

git clone [email protected]:mostafa/xk6-kafka.git && cd xk6-kafka
xk6 build --with github.com/mostafa/xk6-kafka@latest=.

Example scripts

There are many examples in the script directory that show how to use various features of the extension.

How to Test You Kafka Setup

You can start testing your setup immediately, but it takes some time to develop the script, so it would be better to test your script against a development environment and then start testing your environment.

Development environment

I recommend the fast-data-dev Docker image by Lenses.io, a Kafka setup for development that includes Kafka, Zookeeper, Schema Registry, Kafka-Connect, Landoop Tools, 20+ connectors. It is relatively easy to set up if you have Docker installed. Just monitor Docker logs to have a working setup before attempting to test because the initial setup, leader election, and test data ingestion take time.

  1. Run the Kafka environment and expose the ports:

    sudo docker run \
        --detach --rm \
        --name lensesio \
        -p 2181:2181 \
        -p 3030:3030 \
        -p 8081-8083:8081-8083 \
        -p 9581-9585:9581-9585 \
        -p 9092:9092 \
        -e ADV_HOST=127.0.0.1 \
        -e RUN_TESTS=0 \
        lensesio/fast-data-dev:latest
  2. After running the command, visit localhost:3030 to get into the fast-data-dev environment.

  3. You can run the command to see the container logs:

    sudo docker logs -f -t lensesio

Note: If you have errors running the Kafka development environment, refer to the fast-data-dev documentation.

The xk6-kafka API

All the exported functions are available by importing the module object from k6/x/kafka. The exported objects, constants and other data structures are available in the index.d.ts file, and they always reflect the latest changes on the main branch. You can access the generated documentation at api-docs/docs/README.md.

⚠️ Warning: The Javascript API is subject to change in future versions unless a new major version is released.

k6 Test Scripts

The example scripts are available as test_<format/feature>.js with more code and commented sections in the scripts directory. Since this project extends the functionality of k6, it has four stages in the test life cycle.

  1. To use the extension, you need to import it in your script, like any other JS module:

    // Either import the module object
    import * as kafka from "k6/x/kafka";
    
    // Or individual classes and constants
    import {
        Writer,
        Reader,
        Connection,
        SchemaRegistry,
        SCHEMA_TYPE_STRING,
    } from "k6/x/kafka";
  2. You need to instantiate the classes in the init context. All the k6 options are also configured here:

    // Creates a new Writer object to produce messages to Kafka
    const writer = new Writer({
        // WriterConfig object
        brokers: ["localhost:9092"],
        topic: "my-topic",
    });
    
    const reader = new Reader({
        // ReaderConfig object
        brokers: ["localhost:9092"],
        topic: "my-topic",
    });
    
    const connection = new Connection({
        // ConnectionConfig object
        address: "localhost:9092",
    });
    
    const schemaRegistry = new SchemaRegistry();
    // Can accept a SchemaRegistryConfig object
    
    if (__VU == 0) {
        // Create a topic on initialization (before producing messages)
        connection.createTopic({
        // TopicConfig object
        topic: "my-topic",
        });
    }
  3. In the VU code, you can produce messages to Kafka or consume messages from it:

    export default function () {
        // Fetch the list of all topics
        const topics = connection.listTopics();
        console.log(topics); // list of topics
    
        // Produces message to Kafka
        writer.produce({
        // ProduceConfig object
        messages: [
            // Message object(s)
            {
            key: schemaRegistry.serialize({
                data: "my-key",
                schemaType: SCHEMA_TYPE_STRING,
            }),
            value: schemaRegistry.serialize({
                data: "my-value",
                schemaType: SCHEMA_TYPE_STRING,
            }),
            },
        ],
        });
    
        // Consume messages from Kafka
        let messages = reader.consume({
        // ConsumeConfig object
        limit: 10,
        });
    
        // your messages
        console.log(messages);
    
        // You can use checks to verify the contents,
        // length and other properties of the message(s)
    
        // To serialize the data back into a string, you should use
        // the deserialize method of the Schema Registry client. You
        // can use it inside a check, as shown in the example scripts.
        let deserializedValue = schemaRegistry.deserialize({
        data: messages[0].value,
        schemaType: SCHEMA_TYPE_STRING,
        });
    }
  4. In the teardown function, close all the connections and possibly delete the topic:

    export function teardown(data) {
        // Delete the topic
        connection.deleteTopic("my-topic");
    
        // Close all connections
        writer.close();
        reader.close();
        connection.close();
    }
  5. You can now run k6 with the extension using the following command:

    ./k6 run --vus 50 --duration 60s scripts/test_json.js
  6. And here's the test result output:

            /\      |‾‾| /‾‾/   /‾‾/
        /\  /  \     |  |/  /   /  /
        /  \/    \    |     (   /   ‾‾\
    /          \   |  |\  \ |  (‾)  |
    / __________ \  |__| \__\ \_____/ .io
    
    execution: local
        script: scripts/test_json.js
        output: -
    
    scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop):
            * default: 50 looping VUs for 1m0s (gracefulStop: 30s)
    
    
    running (1m04.4s), 00/50 VUs, 20170 complete and 0 interrupted iterations
    default ✓ [======================================] 50 VUs  1m0s
    
        ✓ 10 messages are received
        ✓ Topic equals to xk6_kafka_json_topic
        ✓ Key contains key/value and is JSON
        ✓ Value contains key/value and is JSON
        ✓ Header equals {'mykey': 'myvalue'}
        ✓ Time is past
        ✓ Partition is zero
        ✓ Offset is gte zero
        ✓ High watermark is gte zero
    
        █ teardown
    
        checks.........................: 100.00% ✓ 181530       ✗ 0
        data_received..................: 0 B     0 B/s
        data_sent......................: 0 B     0 B/s
        iteration_duration.............: avg=153.45ms min=6.01ms med=26.8ms  max=8.14s   p(90)=156.3ms p(95)=206.4ms
        iterations.....................: 20170   313.068545/s
        kafka_reader_dial_count........: 50      0.776075/s
        kafka_reader_dial_seconds......: avg=171.22µs min=0s     med=0s      max=1.09s   p(90)=0s      p(95)=0s
        ✓ kafka_reader_error_count.......: 0       0/s
        kafka_reader_fetch_bytes_max...: 1000000 min=1000000    max=1000000
        kafka_reader_fetch_bytes_min...: 1       min=1          max=1
        kafka_reader_fetch_wait_max....: 200ms   min=200ms      max=200ms
        kafka_reader_fetch_bytes.......: 58 MB   897 kB/s
        kafka_reader_fetch_size........: 147167  2284.25179/s
        kafka_reader_fetches_count.....: 107     1.6608/s
        kafka_reader_lag...............: 1519055 min=0          max=2436190
        kafka_reader_message_bytes.....: 40 MB   615 kB/s
        kafka_reader_message_count.....: 201749  3131.446006/s
        kafka_reader_offset............: 4130    min=11         max=5130
        kafka_reader_queue_capacity....: 1       min=1          max=1
        kafka_reader_queue_length......: 1       min=0          max=1
        kafka_reader_read_seconds......: avg=96.5ms   min=0s     med=0s      max=59.37s  p(90)=0s      p(95)=0s
        kafka_reader_rebalance_count...: 0       0/s
        kafka_reader_timeouts_count....: 57      0.884725/s
        kafka_reader_wait_seconds......: avg=102.71µs min=0s     med=0s      max=85.71ms p(90)=0s      p(95)=0s
        kafka_writer_acks_required.....: 0       min=0          max=0
        kafka_writer_async.............: 0.00%   ✓ 0            ✗ 2017000
        kafka_writer_attempts_max......: 0       min=0          max=0
        kafka_writer_batch_bytes.......: 441 MB  6.8 MB/s
        kafka_writer_batch_max.........: 1       min=1          max=1
        kafka_writer_batch_size........: 2017000 31306.854525/s
        kafka_writer_batch_timeout.....: 0s      min=0s         max=0s
        ✓ kafka_writer_error_count.......: 0       0/s
        kafka_writer_message_bytes.....: 883 MB  14 MB/s
        kafka_writer_message_count.....: 4034000 62613.709051/s
        kafka_writer_read_timeout......: 0s      min=0s         max=0s
        kafka_writer_retries_count.....: 0       0/s
        kafka_writer_wait_seconds......: avg=0s       min=0s     med=0s      max=0s      p(90)=0s      p(95)=0s
        kafka_writer_write_count.......: 4034000 62613.709051/s
        kafka_writer_write_seconds.....: avg=523.21µs min=4.84µs med=14.48µs max=4.05s   p(90)=33.85µs p(95)=42.68µs
        kafka_writer_write_timeout.....: 0s      min=0s         max=0s
        vus............................: 7       min=7          max=50
        vus_max........................: 50      min=50         max=50

Emitted Metrics

Metric Type Description
kafka_reader_dial_count Counter Total number of times the reader tries to connect.
kafka_reader_fetches_count Counter Total number of times the reader fetches batches of messages.
kafka_reader_message_count Counter Total number of messages consumed.
kafka_reader_message_bytes Counter Total bytes consumed.
kafka_reader_rebalance_count Counter Total number of rebalances of a topic in a consumer group (deprecated).
kafka_reader_timeouts_count Counter Total number of timeouts occurred when reading.
kafka_reader_error_count Counter Total number of errors occurred when reading.
kafka_reader_dial_seconds Trend The time it takes to connect to the leader in a Kafka cluster.
kafka_reader_read_seconds Trend The time it takes to read a batch of message.
kafka_reader_wait_seconds Trend Waiting time before read a batch of messages.
kafka_reader_fetch_size Counter Total messages fetched.
kafka_reader_fetch_bytes Counter Total bytes fetched.
kafka_reader_offset Gauge Number of messages read after the given offset in a batch.
kafka_reader_lag Gauge The lag between the last message offset and the current read offset.
kafka_reader_fetch_bytes_min Gauge Minimum number of bytes fetched.
kafka_reader_fetch_bytes_max Gauge Maximum number of bytes fetched.
kafka_reader_fetch_wait_max Gauge The maximum time it takes to fetch a batch of messages.
kafka_reader_queue_length Gauge The queue length while reading batch of messages.
kafka_reader_queue_capacity Gauge The queue capacity while reading batch of messages.
kafka_writer_write_count Counter Total number of times the writer writes batches of messages.
kafka_writer_message_count Counter Total number of messages produced.
kafka_writer_message_bytes Counter Total bytes produced.
kafka_writer_error_count Counter Total number of errors occurred when writing.
kafka_writer_batch_seconds Trend The time it takes to write a batch of messages.
kafka_writer_batch_queue_seconds Trend The time it takes to queue a batch of messages.
kafka_writer_write_seconds Trend The time it takes writing messages.
kafka_writer_wait_seconds Trend Waiting time before writing messages.
kafka_writer_retries_count Counter Total number of attempts at writing messages.
kafka_writer_batch_size Counter Total batch size.
kafka_writer_batch_bytes Counter Total number of bytes in a batch of messages.
kafka_writer_attempts_max Gauge Maximum number of attempts at writing messages.
kafka_writer_batch_max Gauge Maximum batch size.
kafka_writer_batch_timeout Gauge Batch timeout.
kafka_writer_read_timeout Gauge Batch read timeout.
kafka_writer_write_timeout Gauge Batch write timeout.
kafka_writer_acks_required Gauge Required Acks.
kafka_writer_async Rate Async writer.

FAQ

  1. Why do I receive Error writing messages?

    There are a few reasons why this might happen. The most prominent one is that the topic might not exist, which causes the producer to fail to send messages to a non-existent topic. You can use Connection.createTopic method to create the topic in Kafka, as shown in scripts/test_topics.js. You can also set the autoCreateTopic on the WriterConfig. You can also create a topic using the kafka-topics command:

    $ docker exec -it lensesio bash
    (inside container)$ kafka-topics --create --topic xk6_kafka_avro_topic --bootstrap-server localhost:9092
    (inside container)$ kafka-topics --create --topic xk6_kafka_json_topic --bootstrap-server localhost:9092
  2. Why does the reader.consume keep hanging?

    If the reader.consume keeps hanging, it might be because the topic doesn't exist or is empty.

  3. I want to test SASL authentication. How should I do that?

    If you want to test SASL authentication, look at this commit message, in which I describe how to run a test environment to test SASL authentication.

  4. Why doesn't the consumer group consume messages from the topic?

    As explained in issue #37, multiple inits by k6 cause multiple consumer group instances to be created in the init context, which sometimes causes the random partitions to be selected by each instance. This, in turn, causes confusion when consuming messages from different partitions. This can be solved by using a UUID when naming the consumer group, thereby guaranteeing that the consumer group object was assigned to all partitions in a topic.

  5. Why do I receive a MessageTooLargeError when I produce messages bigger than 1 MB?

    Kafka has a maximum message size of 1 MB by default, which is set by message.max.bytes, and this limit is also applied to the Writer object.

    There are two ways to produce larger messages: 1) Change the default value of your Kafka instance to a larger number. 2) Use compression.

    Remember that the Writer object will reject messages larger than the default Kafka message size limit (1 MB). Hence you need to set batchBytes to a larger value, for example, 1024 * 1024 * 2 (2 MB). The batchBytes refers to the raw uncompressed size of all the keys and values (data) in your array of messages you pass to the Writer object. You can calculate the raw data size of your messages using this example script.

  6. Can I consume messages from a consumer group in a topic with multiple partitions?

    Yes, you can. Just pass the groupID to your Reader object. You must not specify the partition anymore. Visit this documentation article to learn more about Kafka consumer groups.

    Remember that you must set sessionTimeout on your Reader object if the consume function terminates abruptly, thus failing to consume messages.

  7. Why does the Reader.consume produces an unable to read message error?

    For performance testing reasons, the maxWait of the Reader is set to 200ms. If you keep receiving this error, consider increasing it to a larger value.

  8. How can I consume from multiple partitions on a single topic?

    You can configure your reader to consume from a (list of) topic(s) and its partitions using a consumer group. This can be achieve by setting groupTopics, groupID and a few other options for timeouts, intervals and lags. Have a look at the test_consumer_group.js example script.

  9. How can I use autocompletion in IDEs?

    Copy api-docs/index.d.ts into your project directory and reference it at the top of your JavaScript file:

    /// <reference path="index.d.ts" />
    
    ...
  10. Why timeouts give up sooner than expected?

    There are many ways to configure timeout for the Reader and Writer objects. They follow Go's time conventions, which means that one second is equal to 1000000000 (one billion). For ease of use, I added the constants that can be imported from the module.

    import { SECOND } from "k6/x/kafka";
    
    console.log(2 * SECOND); // 2000000000
    console.log(typeof SECOND); // number
  11. Can I catch errors returned by the consume function?

    Yes. You can catch errors by using a try-catch block. The consume function returns an error object. If the consume function raises, the error object will be populated with the error message.

    try {
        let messages = reader.consume({
        limit: 10,
        });
    } catch (error) {
        console.error(error);
    }
  12. I am using a nested Avro schema and getting unknown errors. How can I debug them?

    If you have a nested Avro schema and you want to test it against your data, I created a small tool for it, called nested-avro-schema. This tool will help you to find discrepancies and errors in your schema data, so that you can fix them before you run xk6-kafka tests. Refer to this comment for more information.

Contributions, Issues and Feedback

I'd be thrilled to receive contributions and feedback on this project. You're always welcome to create an issue if you find one (or many). I would do my best to address the issues. Also, feel free to contribute by opening a PR with changes, and I'll do my best to review and merge it as soon as I can.

Backward Compatibility Notice

If you want to keep up to date with the latest changes, please follow the project board. Also, since v0.9.0, the main branch is the development branch and usually has the latest changes and might be unstable. If you want to use the latest features, you might need to build your binary by following the build from source instructions. In turn, the tagged releases and the Docker images are more stable.

I make no guarantee to keep the API stable, as this project is in active development unless I release a major version. The best way to keep up with the changes is to follow the xk6-kafka API and look at the scripts directory.

The Release Process

The main branch is the development branch, and the pull requests will be squashed and merged into the main branch. When a commit is tagged with a version, for example, v0.10.0, the build pipeline will build the main branch on that commit. The build process creates the binaries and the Docker image. If you want to test the latest unreleased features, you can clone the main branch and instruct the xk6 to use the locally cloned repository instead of using the @latest, which refers to the latest tagged version, as explained in the build for development section.

The CycloneDX SBOM

CycloneDX SBOMs in JSON format are generated for go.mod (as of v0.9.0) and the Docker image (as of v0.14.0) and they can be accessed from the the release assets.

Disclaimer

This project was a proof of concept but seems to be used by some companies nowadays. However, it isn't supported by the k6 team, but rather by me personally, and the APIs may change in the future. USE AT YOUR OWN RISK!

This project was AGPL3-licensed up until 7 October 2021, and then we relicensed it under the Apache License 2.0.

xk6-kafka's People

Contributors

aermolaev avatar chrisdev83 avatar codebien avatar davidgomesdev avatar doxsch avatar eduardowitter avatar enamrik avatar floord avatar fmck3516 avatar hildebrandttk avatar jasmineca avatar mostafa avatar mstoykov avatar rgordill avatar simskij avatar thmshmm avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

xk6-kafka's Issues

Export all constants to JS

This removes the need to define these constants in the code, which is error-prone:

consumer: {
keyDeserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer",
valueDeserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer",
},
producer: {
keySerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer",
valueSerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer",
},

Exportable constants

String

xk6-kafka/string.go

Lines 9 to 14 in ad9fe72

const (
String srclient.SchemaType = "STRING"
StringSerializer string = "org.apache.kafka.common.serialization.StringSerializer"
StringDeserializer string = "org.apache.kafka.common.serialization.StringDeserializer"
)

ByteArray

xk6-kafka/bytearray.go

Lines 9 to 14 in ad9fe72

const (
ByteArray srclient.SchemaType = "BYTEARRAY"
ByteArraySerializer string = "org.apache.kafka.common.serialization.ByteArraySerializer"
ByteArrayDeserializer string = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
)

Avro

xk6-kafka/avro.go

Lines 9 to 12 in ad9fe72

const (
AvroSerializer string = "io.confluent.kafka.serializers.KafkaAvroSerializer"
AvroDeserializer string = "io.confluent.kafka.serializers.KafkaAvroDeserializer"
)

JSONSchema

xk6-kafka/jsonschema.go

Lines 11 to 14 in ad9fe72

const (
JsonSchemaSerializer string = "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer"
JsonSchemaDeserializer string = "io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer"
)

Protobuf (when implemented)

xk6-kafka/serde.go

Lines 12 to 16 in ad9fe72

const (
// TODO: move these to their own package
ProtobufSerializer string = "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer"
ProtobufDeserializer string = "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer"
)

Compression codecs

xk6-kafka/producer.go

Lines 15 to 20 in 9a5b118

CompressionCodecs = map[string]compress.Codec{
"Gzip": &compress.GzipCodec,
"Snappy": &compress.SnappyCodec,
"Lz4": &compress.Lz4Codec,
"Zstd": &compress.ZstdCodec,
}

Reference

Error to connect kafka when I change version to v0.9.0

My script worked in version v0.8.0. After I switch to version 0.9.0 I started getting this error:

Failed to write message: read tcp ip: read: connection reset by peerFailed to write message.

import { check } from "k6";
import {
    writer,
    produceWithConfiguration,
    createTopic,
} from "k6/x/kafka"; 
import http from 'k6/http';
import encoding from 'k6/encoding';

const bootstrapServers = ["xxx:9092"];
const topic = "payin";

const auth = JSON.stringify({
    username: "xxx",
    password: "xxx",
    algorithm: "SHA512",
});

const producer = writer(bootstrapServers, topic, auth);

var configuration = JSON.stringify({
    producer: {
        keySerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer",
        valueSerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer",
    },
    schemaRegistry: {
        url: "xxxx",
        basicAuth: {
            credentialsSource: "USER_INFO",
            userInfo: "xxx:xxx",
        },
    },
});

export default function () {

    const credentials = `xxx:xxx`;
    const encodedCredentials = encoding.b64encode(credentials);
    const options = {
        headers: {
        Authorization: `Basic ${encodedCredentials}`,
        },
    };

const result = http.get('xxxx', options)
const schema = JSON.parse(result.body).schema

let messages = [
    {
        value: JSON.stringify({
            "status": "PROPOSED"
        }),
    },
];

let error = produceWithConfiguration(
    producer,
    messages,
    configuration,
    null,
    schema
);

check(error, {
    "is sent": (err) => err == undefined,
});

}

Use syft to generate SBOMs for the Docker image

Note to self:

  1. I should find a better way to store the SBOMs, as they're only retained for 90 days on GitHub artifacts.
  2. How to run syft:
syft mostafamoradian/xk6-kafka --scope all-layers -o syft-json=sbom.syft.json

Publishing message encoding issues with confluentinc/cp-kafka:6.2.0

There is a serialization issue when publishing one of the sample code from samples/test_avro.js to kafka confluent. The kafka-avro-console-consumer command crash as soon as it attempt to serialize this message. It seems there is encoding issue.

% kafka-avro-console-consumer --bootstrap-server localhost:29092 --topic mytopic.v0.avro --from-beginning --property print.key=true Processed a total of 1 messages parse error: Invalid numeric literal at line 1, column 12

We also have an elastic search sink connector configured and it also can not deserialize the kafka data.
When we generate kafka messages using other tools like kafak-avro-console-producer or kafkajs things seems to work ok it's only when we're running a k6 tests that seeds data.

screen grabs from confluent control center showing the data.
Screen Shot 2021-07-30 at 22 41 22
Screen Shot 2021-07-30 at 22 41 02
.

Stack trace from kafka es connect
`
[2021-07-30 21:21:59,217] ERROR WorkerSinkTask{id=local_elasticsearch-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler

at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)

at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)

at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:496)

at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic mytopic.v0.avro to Avro:

at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)

at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)

at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:540)

at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:496)

at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)

at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)

... 13 more

Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

[2021-07-30 21:21:59,229] INFO [Consumer clientId=connector-consumer-local_elasticsearch-0, groupId=connect-local_elasticsearch] Revoke previously assigned partitions mytopic.v0.avro-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

[2021-07-30 21:21:59,229] INFO [Consumer clientId=connector-consumer-local_elasticsearch-0, groupId=connect-local_elasticsearch] Member connector-consumer-local_elasticsearch-0-c062d254-c5d7-47b3-9c55-9b5b2962b76d sending LeaveGroup request to coordinator kafkabroker:9092 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

[2021-07-30 21:21:59,241] INFO Publish thread interrupted for client_id=connector-consumer-local_elasticsearch-0 client_type=CONSUMER session= cluster=33W9Ct-mRbiv2QrIXn2TwA group=connect-local_elasticsearch (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)

[2021-07-30 21:21:59,262] INFO Publishing Monitoring Metrics stopped for client_id=connector-consumer-local_elasticsearch-0 client_type=CONSUMER session= cluster=33W9Ct-mRbiv2QrIXn2TwA group=connect-local_elasticsearch (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)

[2021-07-30 21:21:59,263] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-local_elasticsearch-0] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)

[2021-07-30 21:21:59,278] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)

[2021-07-30 21:21:59,279] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)

[2021-07-30 21:21:59,280] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)

[2021-07-30 21:21:59,283] INFO App info kafka.producer for confluent.monitoring.interceptor.connector-consumer-local_elasticsearch-0 unregistered (org.apache.kafka.common.utils.AppInfoParser)

[2021-07-30 21:21:59,283] INFO Closed monitoring interceptor for client_id=connector-consumer-local_elasticsearch-0 client_type=CONSUMER session= cluster=33W9Ct-mRbiv2QrIXn2TwA group=connect-local_elasticsearch (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)

[2021-07-30 21:21:59,284] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)

[2021-07-30 21:21:59,284] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)

[2021-07-30 21:21:59,284] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)

[2021-07-30 21:21:59,297] INFO App info kafka.consumer for connector-consumer-local_elasticsearch-0 unregistered (org.apache.kafka.common.utils.AppInfoParser)

`

Add docker-compose file for Kafka test environments

This ticket serves two purposes:

  1. A docker-compose file for testing in the CI pipeline.
  2. The same docker-compose file should be used for demo in the README.

Currently, the project uses a Zookeeperless instance of Apache Kafka to test one script, test_json.js, but that isn't enough. All the scripts should be tested with their scenarios, like SASL, compression, schema registry, etc.

While trying to test SASL authentication, I tried many different ready-made platforms like bitnami-docker-kafka, kafka-docker-playground and fast-data-dev. So far, each of them has served a different purpose and helped me test various features and fixes, but I believe the bitnami-docker-kafka is the easiest to set up.

Update:
While adding tests in #60, I replaced the Zookeeperless Kafka instance with the lensesio/fast-data-dev Docker image, which is also mentioned in the README. Yet, I couldn't make it work with SASL, so I might change it with the bitnami image.

Related:

Consumer waits 10 minutes when no message are coming

Hello,
I run a script where I produce a message to a topic A then waits for another message on a topic B.
Sometimes, message is not posted on topic B, and I would like the consumer to wait a few seconds, no more. Currently, it waits around 10 minutes.
I don't really understand why as I see that the MaxWait is configured to 200ms in the go consumer class.

Failed to write message: EOF

Hello,

using version 0.5.0 with a kafka cluster 2.8.0 deployed with strimzi on Openshift 4.7, i try to execute a run using sasl test script (https://github.com/mostafa/xk6-kafka/blob/master/scripts/test_sasl_auth.js).
I change the topic name, the bootstrap url. And when i execute this run :

$ oc rsh dc/k6
Defaulting container name to k6-inter-0-34-1.
Use 'oc describe pod/k6-1-lj7pw -n blz' to see all of the containers in this pod.
/home $ k6_run --stage 6m:15 --stage 6m:15  k6-scripts/benchs/k6aas/test_sasl_auth.js 

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: k6-scripts/benchs/k6aas/test_sasl_auth.js
     output: InfluxDBv1 (http://influxdb:8086)

  scenarios: (100.00%) 1 scenario, 15 max VUs, 12m30s max duration (incl. graceful stop):
           * default: Up to 15 looping VUs for 12m0s over 2 stages (gracefulRampDown: 30s, gracefulStop: 30s)

Failed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFailed to write message: EOFFail^CThere was an error fetching messages: context canceled
running (00m00.8s), 00/15 VUs, 0 complete and 1 interrupted iterations
default ✗ [--------------------------------------] 01/15 VUs  00m00.8s/12m00.0s

     ✗ is sent
      ↳  0% — ✓ 0 / ✗ 100

     █ teardown

     checks.........................: 0.00% ✓ 0   ✗ 100
     data_received..................: 0 B   0 B/s
     data_sent......................: 0 B   0 B/s
     iteration_duration.............: avg=3.66ms min=3.66ms med=3.66ms max=3.66ms p(90)=3.66ms p(95)=3.66ms
     kafka.writer.dial.count........: 0     0/s
     kafka.writer.error.count.......: 0     0/s
     kafka.writer.message.bytes.....: 0 B   0 B/s
     kafka.writer.message.count.....: 0     0/s
     kafka.writer.rebalance.count...: 0     0/s
     kafka.writer.write.count.......: 0     0/s


/home $ 

In the kafka log, it reports that the message are to big at network level:

2021-11-22 13:55:40,683 WARN [SocketServer listenerType=ZK_BROKER, nodeId=0] Unexpected error from /10.128.26.10; closing connection (org.apache.kafka.common.network.Selector) [data-plane-kafka-network-thread-0-ListenerName(PLAIN-9092)-PLAINTEXT-8]
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 369295617 larger than 104857600)

Is it normal to send huge messages (350 MB) ?

I have no issue with the test https://github.com/mostafa/xk6-kafka/blob/master/scripts/test_json.js (about 1MB/s and 3,8K msg/s).

Missing error handling in SchemaRegistry-Communication in schemaRegistry.go

Ignoring the error response results in some trouble finding the issue with outdated configuration.

// Default version of the schema is the latest version
// If CacheSchemas is true, the client will cache the schema
if version == 0 {
	schemaInfo, _ = srClient.GetLatestSchema(subject)
} else {
	schemaInfo, _ = srClient.GetSchemaByVersion(subject, version)
}```

Refactor extension

Use the latest API changes in k6 to refactor the extension. The whole module structure, functions, and metrics should also be refactored. This issue is a placeholder for all the refactoring that needs to be done. These changes might break some APIs, but it's okay.

Should serdes functionality be exported to JS?

Instead of implicitly calling the serdes functions via produce and consume, we can explicitly call them from JS and have the user control over them. This way, we can have a more flexible serializer/deserializer system, and we don't need to worry about API changes and backward compatibility. Also, we won't have to decide when to use the serializer/deserializer based on the schema and given parameters.

Support to consume from all partitions

Hi - are there plans to add support to consume from all partitions? I took a quick glance at consumer.go and it looks like the current implementation only allows to consume from one partition.

Recommended approach for using this alongside the k6 docker image?

I'm leveraging the k6 docker image as the base within my pipeline. I'm interested in adding this k6 extension. I see the k6 base image doesn't have golang installed, so curious on the recommended approach for using this the best way.

Example Dockerfile:

# This is a comment
FROM ubuntu:latest

# Setup k6 installation
RUN apt-get update 
RUN DEBIAN_FRONTEND="noninteractive" apt-get -y install tzdata
RUN apt-get install golang -y
RUN apt-get install ca-certificates gnupg2 -y
RUN apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys C5AD17C747E3415A3642D57D77C6C491D6AC1D69
RUN echo "deb https://dl.k6.io/deb stable main" | tee /etc/apt/sources.list.d/k6.list
RUN apt-get update 
RUN apt-get install k6
RUN apt-get install wget -y

# Get newer golang version
RUN wget https://dl.google.com/go/go1.16.4.linux-amd64.tar.gz
RUN tar -xvf go1.16.4.linux-amd64.tar.gz
RUN mv go /usr/local

# Setup ENV
ENV GOROOT=/usr/local/go
ENV GOPATH=$HOME/go
ENV PATH=$GOPATH/bin:$GOROOT/bin:$PATH

# Verification of go
RUN go version

RUN go get github.com/k6io/xk6/cmd/xk6@latest
RUN xk6 build --with github.com/mostafa/xk6-kafka@bb0ceb8

ENTRYPOINT [ "/bin/bash", "-l", "-c" ]

I'm getting the following error when running sample script from readme:

time="2021-07-27T18:48:49Z" level=debug msg="Initializing the runner..."
time="2021-07-27T18:48:49Z" level=debug msg=Loading... moduleSpecifier="file:///home/scripts/test_json.js" originalModuleSpecifier=/home/scripts/test_json.js
time="2021-07-27T18:48:50Z" level=debug msg="Babel: Transformed" t=192.005697ms
time="2021-07-27T18:48:50Z" level=error msg="unknown module: k6/x/kafka\n\tat reflect.methodValueCall (native)\n\tat file:///home/scripts/test_json.js:65:65(26)\n" hint="script exception"

Thanks

unknown module Error

Getting below error while running the k6 script mentioned in the Readme using this command:
./k6 run script.js

Build k6 with xk6 using below command:
./xk6 build --with github.com/mostafa/xk6-kafka

Error:

ERRO[0001] GoError: unknown module: k6/x/kafka
	at reflect.methodValueCall (native)
	at file:///home/ashish_22484/xk6_compiled/rtddkafka.js:55:47(22)  hint="script exception"

OS: Amazon linux2
go version: go1.15.8 linux/amd64

& same on

OS: OSx Bigsur
go version :go1.14.15 darwin/amd64

Schema gets updated with every message produce

I have notices an issue with produce with configuration with schema and schema serializer. For some reason the defaults (which can be null) are removed from from the server schema as soon as I run the tests.
It seems the goavro lib that is being used does this as part of serialization. The problem is, this schema later is used as in a post request to fitch the schema id from the server. This post request update the server schema in this case. I could not find a way that we provide the schema id to bypass this logic/behavior.

Add an "official" docker image

Hello,
I understand that i can manually download and build the plugin, but it would be MUCH better developer experience if you provide a docker image that already does that.

A feature request for access Kafka write & read metric counters

Hello,
I'm using K6 for testing Kafka clusters before migration plan.
And I need to be able to set a limit to the test based on one of the of the k6 Kafka metrics that are used in the summary report.
For example setting up threshold when kafka.writer.message,bytes = 1GB, enabling an access to these metrics will help me make it happened.

Update for k6 org change

I now get this error when building my docker image:

Step 4/12 : RUN go get -u github.com/k6io/xk6/cmd/xk6 &&     xk6 build v0.29.0 --with github.com/mostafa/xk6-kafka
 ---> Running in 3ded9e36f3a5
2021/03/30 18:33:27 [INFO] Temporary folder: /tmp/buildenv_2021-03-30-1833.563133095
2021/03/30 18:33:27 [INFO] Writing main module: /tmp/buildenv_2021-03-30-1833.563133095/main.go
2021/03/30 18:33:27 [INFO] Initializing Go module
2021/03/30 18:33:27 [INFO] exec (timeout=10s): /usr/local/go/bin/go mod init k6 
go: creating new go.mod: module k6
2021/03/30 18:33:27 [INFO] Pinning versions
2021/03/30 18:33:27 [INFO] exec (timeout=0s): /usr/local/go/bin/go get -d -v github.com/k6io/[email protected] 
go: downloading github.com/k6io/k6 v0.29.0
go get: github.com/k6io/[email protected]: parsing go.mod:
        module declares its path as: github.com/loadimpact/k6
                but was required as: github.com/k6io/k6
2021/03/30 18:33:30 [FATAL] exit status 1

I believe this is related to k6 moving off loadimpact to k6io?

Export all metrics from producer and consumer

Currently only a select few metrics are exported from writer and reader, even though many others are defined in stats. The other metrics are also equally important for measuring how well our system performs under load, so it would be good to also have them being reported in the results output of k6.

Metrics from srclient should also be taken into account, whenever this issue is addressed:

Update:
Created a ticket for srclient metrics:

Should Schema Registry client be exported to JS?

It could be in line with #50. In my opinion, explicit is better than implicit, and it makes things easier for the user, too. Yet, the script becomes bigger and needs more initialization, and I suspect it might be less performant, but it must be tested. These are related:

Update:
I am working on this issue in this branch along with #50, #53, and #54 and the results are not as promising as I initially thought. The performance degraded because most of the conversions (serdes) are being handled in JS, which slows things down. As shown below the number of messages produced dropped by %14. However, the introduction of the SchemaRegistry client seems to help create and retrieve schemas much easier, which comes at a cost. Maybe I should find a better way or develop the extension as is while exposing more configuration of the schema registry client and the serdes. 🤷

This is the result, as of now:

import { SchemaRegistry } from "k6/x/kafka";

const schemaRegistry = SchemaRegistry({
    url: "...",
    basicAuth: {...},
    tls: {...},
});

schemaRegistry.getSchema({ version: 0, subject: "..." });
schemaRegistry.createSchema({ version: 0, subject: "...", schema: "...", schemaType: "AVRO" });
schemaRegistry.getSubjectName({ element: "...", topic: "...", schema: "...", subjectNameStrategy: "..." });
// Automatically figures out data type, whether the schema is passed or not.
// You can pass a string, byte array, and JSON objects as data.
// If the schema is passed, either Avro, JSONSchema, or others, the data will be (de)serialized using that schema.
schemaRegistry.serialize({ schema: "...", data: "...", schemaType: "AVRO" });
schemaRegistry.deserialize({ schema: "...", data: "...", schemaType: "AVRO" });

The following is the result of running the test_json.js for a minute with 50 VUs.

$ ./k6 run -d 60s --vus 50 scripts/test_json.js

          /\      |‾‾| /‾‾/   /‾‾/
     /\  /  \     |  |/  /   /  /
    /  \/    \    |     (   /   ‾‾\
   /          \   |  |\  \ |  (‾)  |
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: scripts/test_json.js
     output: -

  scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop):
           * default: 50 looping VUs for 1m0s (gracefulStop: 30s)


running (1m02.3s), 00/50 VUs, 11835 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs  1m0s

     ✓ 10 messages are received
     ✓ Topic equals to xk6_kafka_json_topic
     ✓ Key is correct
     ✓ Value is correct
     ✓ Header equals {'mykey': 'myvalue'}
     ✓ Time is past
     ✓ Partition is zero
     ✓ Offset is gte zero
     ✓ High watermark is gte zero

     █ teardown

     checks.........................: 100.00% ✓ 106515       ✗ 0
     data_received..................: 0 B     0 B/s
     data_sent......................: 0 B     0 B/s
     iteration_duration.............: avg=257.04ms min=8.38ms med=35.86ms max=6.88s    p(90)=355.36ms p(95)=811.4ms
     iterations.....................: 11835   190.026501/s
     kafka.reader.dial.count........: 51      0.818872/s
     kafka.reader.dial.seconds......: avg=331.69µs min=0s     med=0s      max=214.01ms p(90)=0s       p(95)=0s

   ✗ kafka.reader.error.count.......: 1       0.016056/s
     kafka.reader.fetch_bytes.max...: 1000000 min=1000000    max=1000000
     kafka.reader.fetch_bytes.min...: 1       min=1          max=1
     kafka.reader.fetch_wait.max....: 200ms   min=200ms      max=200ms
     kafka.reader.fetch.bytes.......: 7.7 MB  124 kB/s
     kafka.reader.fetch.size........: 19737   316.903511/s
     kafka.reader.fetches.count.....: 101     1.621688/s
     kafka.reader.lag...............: 107492  min=-1         max=1004616
     kafka.reader.message.bytes.....: 23 MB   373 kB/s
     kafka.reader.message.count.....: 118398  1901.035712/s
     kafka.reader.offset............: 2360    min=9          max=3370
     kafka.reader.queue.capacity....: 1       min=1          max=1
     kafka.reader.queue.length......: 1       min=0          max=1
     kafka.reader.read.seconds......: avg=43.31ms  min=0s     med=0s      max=34.49s   p(90)=0s       p(95)=0s

     kafka.reader.rebalance.count...: 0       0/s
     kafka.reader.timeouts.count....: 50      0.802816/s
     kafka.reader.wait.seconds......: avg=668.76µs min=0s     med=0s      max=229.48ms p(90)=0s       p(95)=0s

     kafka.writer.acks.required.....: 0       min=0          max=0
     kafka.writer.async.............: 0.00%   ✓ 0            ✗ 1183500
     kafka.writer.attempts.max......: 0       min=0          max=0
     kafka.writer.batch.bytes.......: 259 MB  4.2 MB/s
     kafka.writer.batch.max.........: 1       min=1          max=1
     kafka.writer.batch.size........: 1183500 19002.65009/s
     kafka.writer.batch.timeout.....: 0s      min=0s         max=0s
   ✓ kafka.writer.error.count.......: 0       0/s
     kafka.writer.message.bytes.....: 518 MB  8.3 MB/s
     kafka.writer.message.count.....: 2367000 38005.300179/s
     kafka.writer.read.timeout......: 0s      min=0s         max=0s
     kafka.writer.retries.count.....: 0       0/s
     kafka.writer.wait.seconds......: avg=0s       min=0s     med=0s      max=0s       p(90)=0s       p(95)=0s

     kafka.writer.write.count.......: 2367000 38005.300179/s
     kafka.writer.write.seconds.....: avg=880.66µs min=4.16µs med=12.01µs max=3.43s    p(90)=31.9µs   p(95)=130.16µs
     kafka.writer.write.timeout.....: 0s      min=0s         max=0s
     vus............................: 6       min=6          max=50
     vus_max........................: 50      min=50         max=50

ERRO[0063] some thresholds have failed

Message and offset consumption problem

Hi

I am currently facing a scenario in which consuming messages presents a random behavior. Right now within the test I produce in a topic, a system takes care of processing that request and writes the result of that operation in another different topic. The problem is that sometimes I am not always able to consume the message from the response topic and the test gets stuck there. I think it is because of a problem with the reading offset.

If I read the topic from the beginning, it always works but there are more and more messages to read which makes it take longer and longer. If I read from the end I think that is when the behavior becomes random, because the answer sometimes is present before I start consuming. Using a numerical offset is not an alternative because I don't know how many messages are in the topic.

Does this make sense, and is there any way to address this problem? I had in mind the use of a temporal offset but I don't know if that would solve the problem.

Thank you!

xk6-kafka supports SASL_SSL authentication to confluent cloud?

I'm connecting to a kafka topic on confluent with this config details.

ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule

I'm getting the error
There was an error fetching messages: could not successfully authenticate to pkc-ep9mm.us-east-2.aws.confluent.cloud:9092 with SASL: SASL handshake failed: EOF

Test Script

import { check } from 'k6';
import { reader, consume } from 'k6/x/kafka'; // import kafka extension

const bootstrapServers = ['subdomain.us-east-2.aws.confluent.cloud:9092'];
const kafkaTopic = 'topicName';

const auth = JSON.stringify({
  username: 'apiclient',
  password: 'apiSecret',
  algorithm: 'plain'
});

const offset = 0;
// partition and groupID are mutually exclusive
const partition = '';
const groupID = '';

const consumer = reader(bootstrapServers, kafkaTopic, partition, groupID, offset, auth);

export default function () {
  // Read 1 message only
  let messages = consume(consumer, 1);
  check(messages, {
    '1 message returned': (msgs) => msgs.length == 1
  });
}

export function teardown(data) {
  consumer.close();
}

Does xk6-kafka not support SASL_SSL or am I missing something in my test script?

EOF error when sending messages to Confluent Cloud

xk6-kafka on master does not support Confluent Cloud:

Configuration:

const bootstrapServers = ["${secret}.us-east-1.aws.confluent.cloud:9092"];
const kafkaTopic = "xk6_kafka_json_topic";
const auth = JSON.stringify({
    username: "${secret}",
    password: "${secret}",
    algorithm: "plain"
})
const offset = 0;
const partition = 1;

const producer = writer(bootstrapServers, kafkaTopic, auth);
const consumer = reader(bootstrapServers, kafkaTopic, offset, partition, auth);

Result:

> ./k6 run --vus 1 --duration 1s test_sasl_auth.js

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: test_sasl_auth.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 31s max duration (incl. graceful stop):
           * default: 1 looping VUs for 1s (gracefulStop: 30s)

Failed to write message: EOFFailed to write message: EOFFailed to write message: EOF...

Fix:
This can be fixed by providing a TLS configuration to Dialer:

func getDialer(creds *Credentials) (dialer *kafkago.Dialer) {
	dialer = &kafkago.Dialer{
		Timeout:   10 * time.Second,
		DualStack: true,
		TLS: &tls.Config{},
	}
	...
}

Refactor Avro serdes and Schema Registry

While writing tests (issue: #58, PR: #60), I found that the Avro serde is flaky and lacks features and flexibility for testing. Also, issues #53 and #54 are created to deal with these problems. I suppose these should be done before tests because a few functions like getSchema cannot be mocked, and any failure inside them causes a domino effect.

Addresses: #44.

Compression support?

We're getting the following error when pulling data from our compressed topic. Any chance there is a way to specify that we are using snappy compression?

There was an error fetching messages: the compression code is invalid or its codec has not been importedINFO[0012]                                               source=console
There was an error fetching messages: the compression code is invalid or its codec has not been importedINFO[0012]                                               source=console
There was an error fetching messages: the compression code is invalid or its codec has not been importedINFO[0013]                                               source=console
There was an error fetching messages: the compression code is invalid or its codec has not been importedINFO[0014]                                               source=console
There was an error fetching messages: the compression code is invalid or its codec has not been importedINFO[0015]                                               source=console
There was an error fetching messages: the compression code is invalid or its codec has not been importedINFO[0015]                                               source=console
There was an error fetching messages: the compression code is invalid or its codec has not been importedINFO[0015]                                               source=console

Consumer group not consuming due to multiple k6 init

When using the consumer group feature to subscribe to all partitions in a topic, I got some very strange behaviour where some runs of the code did consume messages and some runs were just stuck and did not get any messages back.

After a lot of debugging I found that k6 runs the init code (code outside of methods) multiple times: https://community.k6.io/t/why-init-called-4-times/973

This caused multiple consumer group instances to be created, which in turn meant that partitions were distributed between them. Thus, sometimes the consumer group created during the last init contained the partitions where my data was, and sometimes it did not contain the right partitions as they were assigned to consumers created in a previous run of init.

I solved this by using a UUID when naming my consumer group, thereby guaranteeing that my consumer group object was assigned to all partitions in a topic.

Although this is not per-se a bug in xk6-kafka, when I followed the documentation here there was no mention of this and the examples all used consumers directly in the init code.
I would suggest to either make this clear in the documentation of somehow alter the code to avoid this problem.

Does Schema Registry support TLS?

It should be possible to configure the schema registry using https.

Current http client do not support any kind of certificate configuration

Create topic method is unable to determine Controller broker.

When I am connecting to Kafka using bootstrap address, during creation of topic I get Error = [41] Not Controller when creating topic . When I put all broker addresses manually and pick proper leader - it is able to complete the task. Coud you take a look on this doc and elaborate a bit if the issue can be solved?

Fix test pipeline and add test for all scripts

The reason is explained in the description of the #46 PR.

As a side note, the tests might look flaky (work sometimes and fails other times), but the actual reason is that I introduced thresholds on custom Kafka metrics and also added checks to verify produced and consumed messages. So, this behavior is expected. I am going to fix the test later in #48.

error uploading test to cloud k6

I run the command ./k6 cloud usercollection.js

I run the command and output I get:

ERRO[0044] unknown module: k6/x/faker
Run [at go.k6.io/k6/js.(*InitContext).Require-fm (native)
at file:///Users/nobody/Documents/UserCollection/usercollection.js:2:0(20)

I wait answer. Thanks a lot

Refactor logging and errors

I'll use logrus, because k6 uses it, so there are no extra dependencies.

Update:
While working on issue #58 in PR #60, I found that the logging should be refactored first. Also, the current way errors are reported (using fmt.Printf) makes it really hard to test the output of the functions. So, this issue takes precedence over issue #58.

Addresses: #44.

Support writing serialized protobuf objects

Hi @mostafa!
How hard is it to add support writing to kafka serialized protobuf objects?

export default function () {

  // build Protobuf object
  const protobufObject = new protobufs.ProtobufObject();
  protobufObject.setIdentifier(identifier);
  protobufObject.setEvent(eventSchema);

  // Write to kafka
  const error = produce(producer, signalEnvelope.serializeBinary()); // xk6 support only []map[string]string
  check(error, {
    'is sent': (err) => err == undefined,
  });

The Error is:

ERRO[0001] could not convert 10,12,10,10,97,117,116,111,109,97,116,105,111,110,58,175,2,10,51,10,35,103,101,110,101,114,97,108,95,105,110,102,111,114,109,97,116,105,111,110,46,108,111,103,95,115,111,117,114,99,101,46,110,97,109,101,18,12,85,110,105,100,101,110,116,105,102,105,101,100,10,70,10,30,103,101,110,101,114,97,108,95,105,110,102,111,114,109,97,116,105,111,110,46,109,101,115,115,97,103,101,95,105,100,18,36,98,102,97,48,51,101,57,53,45,49,97,102,100,45,52,51,52,53,45,57,97,102,55,45,51,102,48,102,99,49,51,53,97,102,50,101,10,54,10,31,103,101,110,101,114,97,108,95,105,110,102,111,114,109,97,116,105,111,110,46,114,97,119,95,109,101,115,115,97,103,101,18,19,78,101,109,111,45,87,114,105,116,101,45,84,111,45,75,97,102,107,97,10,75,10,41,103,101,110,101,114,97,108,95,105,110,102,111,114,109,97,116,105,111,110,46,115,116,97,110,100,97,114,100,95,109,101,115,115,97,103,101,95,116,105,109,101,18,30,50,48,50,49,45,48,54,45,51,48,84,49,52,58,53,57,58,52,55,46,54,56,53,49,52,50,50,48,48,90,10,43,10,29,103,101,110,101,114,97,108,95,105,110,102,111,114,109,97,116,105,111,110,46,116,101,110,97,110,116,95,105,100,18,10,97,117,116,111,109,97,116,105,111,110 to []map[string]string
        at go.k6.io/k6/js/common.Bind.func1 (native)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.