Giter VIP home page Giter VIP logo

swiftkafka's Introduction

Kitura

APIDoc Build Status - Master macOS Linux Apache 2 Slack Status

SwiftKafka

A swift implementation of Kafka for producing and consuming from event streams.

This works by wrapping the librdkafka C library.

Swift version

The latest version of SwiftKafka requires Swift 5.0 or later. You can download this version of the Swift binaries by following this link.

Usage

Swift Package Manager

Add dependencies

Add the SwiftKafka package to the dependencies within your application’s Package.swift file. Substitute "x.x.x" with the latest SwiftKafka release.

.package(url: "https://github.com/IBM-Swift/SwiftKafka.git", from: "x.x.x")

Add SwiftKafka to your target's dependencies:

.target(name: "example", dependencies: ["SwiftKafka"]),

Import package

import SwiftKafka

Getting Started

To use SwiftKafka you will need to install the librdkafka package:

macOS

brew install librdkafka

Linux

Install librdkafka from the Confluent APT repositories - see instructions here (following steps 1 and 2 to add the Confluent package signing key and apt repository), and then install librdkafka:

sudo apt install librdkafka

Running a Kafka instance locally

To experiment locally, you can set up your own Kafka server to produce/consume from.

On macOS you can follow this guide on Kafka Installation using Homebrew to run a local server.

On Linux, you can follow this guide for a manual install on Ubuntu.

KafkaConfig

The KafkaConfig class contains your configuration settings for a KafkaConsumer/KafkaProducer.

The class is initialized with default values which can then be changed using the helper functions. For example, to enable all logging you would set the debug variable:

let config = KafkaConfig()
config.debug = [.all]

Alternatively, you can access the configuration dictionary directly on the KafkaConfig object:

let config = KafkaConfig()
config["debug"] = "all"

The list of configuration keys and descriptions can be found in the librdkafka CONFIGURATION.md.

When you pass this class to a producer/consumer, a copy is made so further changes to the instance will not affect existing configurations.

KafkaProducer:

The KafkaProducer class produces messages to a Kafka server.

You can initialize a KafkaProducer using a KafkaConfig instance or with the default configuration.

The producer sends a KafkaProducerRecord with the following fields:

  • topic: The topic where the record will be sent. If this topic doesn't exist the producer will try to create it.
  • value: The message body that will be sent with the record.
  • partition: The topic partition the record will be sent to. If this is not set the partition will be automatically assigned.
  • key: If the partition is not set, records with the same key will be sent to the same partition. Since order is guaranteed within a partition, these records will be read in order they were produced.

The send() function is asynchronous. The result is returned in a callback which contains a KafkaConsumerRecord on success or a KafkaError on failure.

The following example produces a message with the value "Hello World" to a "test" topic of a Kafka server running on localhost.

do {
    let producer = try KafkaProducer()
    guard producer.connect(brokers: "localhost:9092") == 1 else {
        throw KafkaError(rawValue: 8)
    }
    producer.send(producerRecord: KafkaProducerRecord(topic: "test", value: "Hello world", key: "Key")) { result in
        switch result {
        case .success(let message):
            print("Message at offset \(message.offset) successfully sent")
        case .failure(let error):
            print("Error producing: \(error)")
        }
    }
} catch {
    print("Error creating producer: \(error)")
}

KafkaConsumer:

The KafkaConsumer class consumes messages from a Kafka server.

You can initialize a KafkaConsumer using a KafkaConfig instance or with the default configuration.

You can then subscribe to topics using subscribe(). This will distribute the topic partitions evenly between consumers with the same group id. If you do not set a group id, a random UUID will be used.

Alternatively to can use assign() to manually set the partition and offset for the consumer.

Both subscribe() and assign() are asynchronous and will return immediately, however they may take up to sessionTimeoutMs (Default 10 seconds) * 2 before the consumer completely connects.

To consume messages from Kafka you call poll(timeout:). This will poll Kafka, blocking for timeout seconds. When it completes, it returns an array of KafkaConsumerRecord with the following fields:

  • value: The message value if it can be UTF8 decoded to a String.
  • valueData: The message value as raw data.
  • key: The message key if it can be utf8 decoded to a String.
  • keyData: The message key as raw data.
  • offset: The message offset.
  • topic: The topic that the message was consumed from.
  • partition: The partition that the message was consumed from.

When you have finished consuming, you can call close() to close the connection and unassigns the consumer. The unassigned partitions will then be rebalanced between other consumers in the group. If close() is not called, the consumer will be closed when the class is deallocated.

The following example consumes and print all unread messages from the "test" topic of the Kafka server.

do {
    let config = KafkaConfig()
    config.groupId = "Kitura"
    config.autoOffsetReset = .beginning
    let consumer = try KafkaConsumer(config: config)
    guard consumer.connect(brokers: "localhost:9092") == 1 else {
        throw KafkaError(rawValue: 8)
    }
    try consumer.subscribe(topics: ["test"])
    while(true) {
        let records = try consumer.poll()
        print(records)
    }
} catch {
    print("Error creating consumer: \(error)")
}

API Documentation

For more information visit our API reference.

Community

We love to talk server-side Swift, and Kitura. Join our Slack to meet the team!

License

This library is licensed under Apache 2.0. Full license text is available in LICENSE.

swiftkafka's People

Contributors

andrew-lees11 avatar djones6 avatar ianpartridge avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

swiftkafka's Issues

Compression / Buffer issue

Hi, I've been trying to use SwiftKafka for a project of mine, but run into an issue when sending messages of around 1.5KB. Instead of the JSON I try to send, it sends some malformed data like this:

2t_id":"16315612"??????

followed by the rest of the JSON in the correct format. Because of this the JSON becomes invalid and unusable.

I've tried multiple config options to increase buffer sizes to allow for bigger messages, but so far nothing has seemed to work. The options I've tried is setting

config.messageMaxBytes = 1000000000
config.socketAendBufferBytes = 1000000000

If I do this however, the message is still malformed and I get the following error in my console:

Failed to set socket send buffer size to 10000000: No buffer space available

Any and all help is much appreciated!

Config properties not reflecting in rdkafka

I have added the following config property in my Swift file.

config["bootstrap.servers"] = "localhost:9092"

I tested the property using breakpoint, and it is successfully reflected.

(lldb) po config["bootstrap.servers"]
▿ Optional<String>
  - some : "localhost:9092"

However, I still get the following warning and producer messages do not pass through.

%5|1667541464.556|CONFWARN|rdkafka#producer-1| [thrd:app]: No `bootstrap.servers` configured: client will not be able to connect to Kafka cluster

Xcode 12.2 fails to link rdkafka

Attempting to run SwiftKafkaTests on macOS target is resulting in a build error:

ld: warning: Could not find or use auto-linked library 'rdkafka'

...(all the librdkafka C API calls listed here)...

ld: symbol(s) not found for architecture x86_64
clang: error: linker command failed with exit code 1 (use -v to see invocation)

What's interesting is that it seems Xcode's package loading is failing to find rdkafka even before I attempt to compile showing this warning message:

You may be able to install rdkafka using your system package manager: brew install librdkafka

This is despite the fact that I'm absolutely positive librdkafka is installed.

rami@machine: ~ % brew info librdkafka
librdkafka: stable 1.5.2 (bottled), HEAD
Apache Kafka C/C++ library
https://github.com/edenhill/librdkafka
/usr/local/Cellar/librdkafka/1.5.2 (36 files, 4MB) *
  Poured from bottle on 2020-11-24 at 21:53:28
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/librdkafka.rb
License: BSD-2-Clause
==> Dependencies
Build: pkg-config ✔, [email protected] ✔
Required: lz4 ✔, lzlib ✔, [email protected] ✔, zstd ✔
==> Options
--HEAD
	Install HEAD version
==> Analytics
install: 9,191 (30 days), 19,972 (90 days), 71,599 (365 days)
install-on-request: 5,667 (30 days), 11,336 (90 days), 35,858 (365 days)
build-error: 0 (30 days)

Config details

macOS Big Sur 11.0.1 (20B29)
Xcode 12.2 (12B45b) + Command Line Tools 12B45b
Swift version 5.3.1
clang version 12.0.0

Homebrew 2.5.11
Homebrew/homebrew-core (git revision 699c9; last commit 2020-11-25)
Homebrew/homebrew-cask (git revision 18071; last commit 2020-11-25)
librdkafka stable 1.5.2

Using SwiftKafka for watchOS

I am developing a watchOS app, that uses data coming from a Kafkastream and I want to use this library for this purpose, but it keeps ignoring my local "librdkafka" library.

Detailed error description:

  • Warnings:
  1. ignoring file /usr/local/Cellar/librdkafka/1.3.0/lib/librdkafka.dylib, building for watchOS-arm64_32 but attempting to link with file built for macOS-x86_64
  • Errors:
    Undefined symbol: _rd_kafka_produce
    Undefined symbol: _rd_kafka_poll
    Undefined symbol: _rd_kafka_conf_dump
    Undefined symbol: _rd_kafka_topic_name
    Undefined symbol: _rd_kafka_consumer_close
    Undefined symbol: _rd_kafka_consumer_poll
    Undefined symbol: _rd_kafka_flush
    Undefined symbol: _rd_kafka_brokers_add
    Undefined symbol: _rd_kafka_commit
    Undefined symbol: _rd_kafka_topic_partition_list_new
    Undefined symbol: _rd_kafka_topic_partition_list_destroy
    Undefined symbol: _rd_kafka_message_destroy
    Undefined symbol: _rd_kafka_topic_new
    Undefined symbol: _rd_kafka_topic_partition_list_add
    Undefined symbol: _rd_kafka_subscribe
    Undefined symbol: _rd_kafka_conf_set_dr_msg_cb
    Undefined symbol: _rd_kafka_conf_set
    Undefined symbol: _rd_kafka_conf_dup
    Undefined symbol: _rd_kafka_conf_new
    Undefined symbol: _rd_kafka_conf_destroy
    Undefined symbol: _rd_kafka_conf_dump_free
    Undefined symbol: _rd_kafka_poll_set_consumer
    Undefined symbol: _rd_kafka_topic_destroy
    Undefined symbol: _rd_kafka_assign
    Undefined symbol: _rd_kafka_new
    Undefined symbol: _rd_kafka_destroy

What I did:

  1. Installed librdkafka
  • brew install librdkafka
  1. Created app
  • Created a new watchOS standalone app in Xcode
  • Added a new SwiftPackage with the built-in SPM from Xcode using the URL from your GitHub SwiftKafka project
  • Added SwiftKafka to "Link Binary With Libraries" in my WatchKit Extension
  1. Build the App -> Build failed with above errors

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.