Giter VIP home page Giter VIP logo

lua-resty-kafka's Introduction

Name

lua-resty-kafka - Lua kafka client driver for the ngx_lua based on the cosocket API

Table of Contents

Status

This library is still under early development and is still experimental.

Description

This Lua library is a Kafka client driver for the ngx_lua nginx module:

http://wiki.nginx.org/HttpLuaModule

This Lua library takes advantage of ngx_lua's cosocket API, which ensures 100% nonblocking behavior.

Note that at least ngx_lua 0.9.3 or openresty 1.4.3.7 is required, and unfortunately only LuaJIT supported (--with-luajit).

Note for ssl connections at least ngx_lua 0.9.11 or openresty 1.7.4.1 is required, and unfortunately only LuaJIT supported (--with-luajit).

Synopsis

    lua_package_path "/path/to/lua-resty-kafka/lib/?.lua;;";

    server {
        location /test {
            content_by_lua '
                local cjson = require "cjson"
                local client = require "resty.kafka.client"
                local producer = require "resty.kafka.producer"

                local broker_list = {
                    {
                        host = "127.0.0.1",
                        port = 9092,

                        -- optional auth
                        sasl_config = {
                            mechanism = "PLAIN",
                            user = "USERNAME",
                            password = "PASSWORD",
                        },
                    },
                }

                local key = "key"
                local message = "halo world"

                -- usually we do not use this library directly
                local cli = client:new(broker_list)
                local brokers, partitions = cli:fetch_metadata("test")
                if not brokers then
                    ngx.say("fetch_metadata failed, err:", partitions)
                end
                ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))


                -- sync producer_type
                local p = producer:new(broker_list)

                local offset, err = p:send("test", key, message)
                if not offset then
                    ngx.say("send err:", err)
                    return
                end
                ngx.say("send success, offset: ", tonumber(offset))

                -- this is async producer_type and bp will be reused in the whole nginx worker
                local bp = producer:new(broker_list, { producer_type = "async" })

                local ok, err = bp:send("test", key, message)
                if not ok then
                    ngx.say("send err:", err)
                    return
                end

                ngx.say("send success, ok:", ok)
            ';
        }
    }

Back to TOC

Modules

resty.kafka.client

To load this module, just do this

    local client = require "resty.kafka.client"

Back to TOC

Methods

new

syntax: c = client:new(broker_list, client_config)

The broker_list is a list of broker, like the below

[
    {
        "host": "127.0.0.1",
        "port": 9092,

        // optional auth
        "sasl_config": {
            //support mechanism: PLAIN、SCRAM-SHA-256、SCRAM-SHA-512
            "mechanism": "PLAIN",
            "user": "USERNAME",
            "password": "PASSWORD"
        }
    }
]
  • sasl_config

    support mechanism: PLAIN、SCRAM-SHA-256、SCRAM-SHA-512.

    warn:SCRAM-SHA-256、SCRAM-SHA-512 need install lua-resty-jit-uuid and lua-resty-openssl

An optional client_config table can be specified. The following options are as follows:

client config

  • socket_timeout

    Specifies the network timeout threshold in milliseconds. SHOULD lagrer than the request_timeout.

  • keepalive_timeout

    Specifies the maximal idle timeout (in milliseconds) for the keepalive connection.

  • keepalive_size

    Specifies the maximal number of connections allowed in the connection pool for per Nginx worker.

  • refresh_interval

    Specifies the time to auto refresh the metadata in milliseconds. Then metadata will not auto refresh if is nil.

  • ssl

    Specifies if client should use ssl connection. Defaults to false. See: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • ssl_verify

    Specifies if client should perform SSL verification. Defaults to false. See: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • resolver

    Specifies a function to host resolving, which returns a string of IP or nil, to override system default host resolver. Default nil, no resolving performed. Example function(host) if host == "some_host" then return "10.11.12.13" end end

Back to TOC

fetch_metadata

syntax: brokers, partitions = c:fetch_metadata(topic)

In case of success, return the all brokers and partitions of the topic. In case of errors, returns nil with a string describing the error.

Back to TOC

refresh

syntax: brokers, partitions = c:refresh()

This will refresh the metadata of all topics which have been fetched by fetch_metadata. In case of success, return all brokers and all partitions of all topics. In case of errors, returns nil with a string describing the error.

Back to TOC

choose_api_version

syntax: api_version = c:choose_api_version(api_key, min_version, max_version)

This helps the client to select the correct version of the api_key corresponding to the API.

When min_version and max_version are provided, it will act as a limit and the selected versions in the return value will not exceed their limits no matter how high or low the broker supports the API version. When they are not provided, it will follow the range of versions supported by the broker.

Tip: The version selection strategy is to choose the maximum version within the allowed range.

Back to TOC

resty.kafka.producer

To load this module, just do this

    local producer = require "resty.kafka.producer"

Back to TOC

Methods

new

syntax: p = producer:new(broker_list, producer_config?, cluster_name?)

It's recommend to use async producer_type.

broker_list is the same as in client

An optional options table can be specified. The following options are as follows:

socket_timeout, keepalive_timeout, keepalive_size, refresh_interval, ssl, ssl_verify are the same as in client_config

producer config, most like in http://kafka.apache.org/documentation.html#producerconfigs

  • producer_type

    Specifies the producer.type. "async" or "sync"

  • request_timeout

    Specifies the request.timeout.ms. Default 2000 ms

  • required_acks

    Specifies the request.required.acks, SHOULD NOT be zero. Default 1.

  • max_retry

    Specifies the message.send.max.retries. Default 3.

  • retry_backoff

    Specifies the retry.backoff.ms. Default 100.

  • api_version

    Specifies the produce API version. Default 0. If you use Kafka 0.10.0.0 or higher, api_version can use 0, 1 or 2. If you use Kafka 0.9.x, api_version should be 0 or 1. If you use Kafka 0.8.x, api_version should be 0.

  • partitioner

    Specifies the partitioner that choose partition from key and partition num. syntax: partitioner = function (key, partition_num, correlation_id) end, the correlation_id is an auto increment id in producer. Default partitioner is:

    local function default_partitioner(key, num, correlation_id)
        local id = key and crc32(key) or correlation_id
        -- partition_id is continuous and start from 0
        return id % num
    end

buffer config ( only work producer_type = "async" )

  • flush_time

    Specifies the queue.buffering.max.ms. Default 1000.

  • batch_num

    Specifies the batch.num.messages. Default 200.

  • batch_size

    Specifies the send.buffer.bytes. Default 1M(may reach 2M). Be careful, SHOULD be smaller than the socket.request.max.bytes / 2 - 10k config in kafka server.

  • max_buffering

    Specifies the queue.buffering.max.messages. Default 50,000.

  • error_handle

    Specifies the error handle, handle data when buffer send to kafka error. syntax: error_handle = function (topic, partition_id, message_queue, index, err, retryable) end, the failed messages in the message_queue is like { key1, msg1, key2, msg2 } , key in the message_queue is empty string "" even if orign is nil. index is the message_queue length, should not use #message_queue. when retryable is true that means kafka server surely not committed this messages, you can safely retry to send; and else means maybe, recommend to log to somewhere.

  • wait_on_buffer_full

    Specifies whether to wait when the buffer queue is full, Default false. When buffer queue is full, if option passed true, will use semaphore wait function to block coroutine until timeout or buffer queue has reduced, Otherwise, return "buffer overflow" error with false. Notice, it could not be used in those phases which do not support yields, i.e. log phase.

  • wait_buffer_timeout

    Specifies the max wait time when buffer is full, Default 5 seconds.

Not support compression now.

The third optional cluster_name specifies the name of the cluster, default 1 (yeah, it's number). You can Specifies different names when you have two or more kafka clusters. And this only works with async producer_type.

Back to TOC

send

syntax: ok, err = p:send(topic, key, message)

  1. In sync model

    In case of success, returns the offset (** cdata: LL **) of the current broker and partition. In case of errors, returns nil with a string describing the error.

  2. In async model

    The message will write to the buffer first. It will send to the kafka server when the buffer exceed the batch_num, or every flush_time flush the buffer.

    It case of success, returns true. In case of errors, returns nil with a string describing the error (buffer overflow).

Back to TOC

offset

syntax: sum, details = p:offset()

Return the sum of all the topic-partition offset (return by the ProduceRequest api);
and the details of each topic-partition

Back to TOC

flush

syntax: ok = p:flush()

Always return true.

Back to TOC

resty.kafka.basic-consumer

To load this module, just do this

    local bconsumer = require "resty.kafka.basic-consumer"

This module is a minimalist implementation of a consumer, providing the list_offset API for querying by time or getting the start and end offset and the fetch API for getting messages in a topic.

In a single call, only the information of a single partition in a single topic can be fetched, and batch fetching is not supported for now. The basic consumer does not support the consumer group related API, so you need to fetch the message after getting the offset through the list_offset API, or your service can manage the offset itself.

Back to TOC

Methods

new

syntax: c = bconsumer:new(broker_list, client_config)

The broker_list is a list of broker, like the below

[
    {
        "host": "127.0.0.1",
        "port": 9092,

        // optional auth
        "sasl_config": {
            "mechanism": "PLAIN",
            "user": "USERNAME",
            "password": "PASSWORD"
        }
    }
]

An optional client_config table can be specified. The following options are as follows:

client config

Back to TOC

list_offset

syntax: offset, err = c:list_offset(topic, partition, timestamp)

The parameter timestamp can be a UNIX timestamp or a constant defined in resty.kafka.protocol.consumer, LIST_OFFSET_TIMESTAMP_LAST, LIST_OFFSET_TIMESTAMP_FIRST, LIST_OFFSET_TIMESTAMP_MAX, used to get the initial and latest offsets, etc., semantics with the ListOffsets API in Apache Kafka. See: https://kafka.apache.org/protocol.html#The_Messages_ListOffsets

In case of success, return the offset of the specified case. In case of errors, returns nil with a string describing the error.

Back to TOC

fetch

syntax: result, err = c:fetch(topic, partition, offset)

In case of success, return the following result of the specified case. In case of errors, returns nil with a string describing the error.

The result will contain more information such as the messages:

Back to TOC

Errors

When you call the modules provided in this library, you may get some errors. Depending on the source, they can be divided into the following categories.

  • Network errors: such as connection rejected, connection timeout, etc. You need to check the connection status of each service in your environment.

  • Metadata-related errors: such as Metadata or ApiVersion data cannot be retrieved properly; the specified topic or partition does not exist, etc. You need to check the Kafka Broker and client configuration.

  • Error returned by Kafka: sometimes Kafka will include err_code data in the response data, When this problem occurs, the err in the return value looks like this OFFSET_OUT_OF_RANGE, all uppercase characters, and separated by underscores, and in the current library we provide a error list of mappings corresponding to the textual descriptions. To learn more about these errors, see the descriptions in the Kafka documentation.

Installation

You need to configure the lua_package_path directive to add the path of your lua-resty-kafka source tree to ngx_lua's LUA_PATH search path, as in

    # nginx.conf
    http {
        lua_package_path "/path/to/lua-resty-kafka/lib/?.lua;;";
        ...
    }

Ensure that the system account running your Nginx ''worker'' proceses have enough permission to read the .lua file.

Back to TOC

TODO

  1. Fetch API
  2. Offset API
  3. Offset Commit/Fetch API

Back to TOC

Author

Dejiang Zhu (doujiang24) [email protected].

Back to TOC

Copyright and License

This module is licensed under the BSD license.

Copyright (C) 2014-2020, by Dejiang Zhu (doujiang24) [email protected].

All rights reserved.

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

  • Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.

  • Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

Back to TOC

See Also

Back to TOC

lua-resty-kafka's People

Contributors

afsheenb avatar akayeshmantha avatar alyoshark avatar belltoy avatar bzp2010 avatar dg-wangtao avatar doujiang24 avatar jeremyjpj0916 avatar jon-gaara avatar kikito avatar lzle avatar olegabr avatar orthographic-pedant avatar salon0630 avatar samcrawford avatar sko00o avatar wanghuizzz avatar zz123er avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

lua-resty-kafka's Issues

too many pending timers

if I send fast, the error log:

[lua] producer.lua:280: _flush_buffer(): failed to create timer at _flush_buffer, err: too many pending timers.

But I use the ngx.timer.pending_count() and ngx.timer.running_count() function watching the pending and running value. The pending value max is 2 and the running value max is 1.

This is my nginx.conf setting:

lua_max_pending_timers 1024;
lua_max_running_timers 256;

Thanks!

no offset and no error

I run the following code:

  local p = producer:new(broker_list)
  local offset, err = p:send(topic, key, body)
  if not offset then
    ngx.log(ngx.ERR, err)
    return ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
  end

I've got some 500 error, but when i log the error i don't have anything either.

2017/01/20 16:33:02 [error] 10#10: *719 [lua] script.lua:33: , client: 10.10.10.10, server: api.example.com, request: "POST /some/path HTTP/1.1", host: "api.example.com"

I can't figure out how to debug this. any idea ?

buffered messages send to kafka err: MessageSizeTooLarge

2016/12/22 22:32:33 [error] 13413#0: *378703 [lua] producer.lua:258: buffered messages send to kafka err: MessageSizeTooLarge, retryable: false, topic: ns-source-wimonitor, partition_id: 2, length: 111, context: ngx.timer, client: 120.237.52.234, server: 0.0.0.0:443

I have got a error from error.log. should i increase the message size? and what should i do?

Memory leak when using async method for producing Kafka messages

Hi - We are running a portion of our network through OpenResty / Kafka using your driver, in order to test it out. We are producing about 100-200 messages per second onto Kafka.

When we set producer_type = async, memory utilization increases until the following error is output to our logs:

*emergency(): Unable to connect to kafka: buffer overflow

Can you please help us understand what is causing this memory leak? Thanks!

producer: how to retry to send message in error_handle method

I mean, how to access the producer instance I created later

local producer_config = {
    producer_type = "async",
    error_handle = function(topic, partition_id, message_queue, index, err, retryable)
        if retryable then
            for i = 1,index then
                local key = message_queue[i * 2 - 1]
                local message = message_queue[i * 2]
                -- how to retry ?
            end
        else
            ngx.log(ngx.ERR, "send to kafka error and retryable is false", err)
        end
    end
}

local bp = producer:new(broker_list, producer_config)
local topic = "nginx_log"

return empty brokers

local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata(kafka.topic)
if not brokers then
ngx.log(ngx.ERR, "fetch_metadata failed, err:", partitions)
end
ngx.log(ngx.ERR, "brokers: ", #brokers, "; partitions: ", cjson.encode(partitions))

I use the code above in my macos environment and I can ensure the broker_list is available and is it bug to just return a empty brokers? but partitions info is ok

send bytes to kafka throws an error

I'm try to send byte array to kafka.
local ok, err = bp:send("m5", key, msg_bytes)
and msg_bytes is byte table.
But there throws an error.
/lua-resty-kafka-master/lib/resty/kafka/request.lua:158: invalid value (table) at index 6 in table for 'concat'

Then i checked the request.lua ,and i found


    local req = {
        -- MagicByte
        str_int8(0),
        -- XX hard code no Compression
        str_int8(0),
        str_int32(key_len),
        key,
        str_int32(len),
        msg,
    }

    local str = concat(req)

Seems i can't send bytes to kafka,because the client needs to concat the key and the message.

Is there any way i can send the bytes to kakfa directly , not encoding them with Base64?

ngx.req.get_body_data() 通过kafka发送中文乱码

local producer = require "resty.kafka.producer"

local broker_list = {
{ host = "10.7.13.112", port = 9092 },
{ host = "10.7.13.114", port = 9092 },
{ host = "10.7.13.115", port = 9092 },
}
ngx.req.read_body()
local post = ngx.req.get_body_data()
local bp = producer:new(broker_list, { producer_type = "async" })
if post then
local ok, err = bp:send("footPrint_eventlog", nil,post)
if not ok then
ngx.log(ngx.ERR, "kafka send err:", err)
return
end
ngx.say("{"code":200,"description":"ok"}")
else
ngx.say("{"code":400,"description":"no content"}")
end

kafka 发送post数据成功 接受到消息 中文有乱码 post通过ngx.log 输出文件没有乱码 求问大神什么原因

Kafka consumer?

Can lua-resty-kafka be used as a Kafka data consumer, or just a Kafka data producer?

nginx can not resolve the kafka's hostname ERROR no resolver defined to resolve "hdp5.novalocal"

i use zookeeper to manger the kafka,and the broker information in the zookeeper is like this
[zk: localhost:2181(CONNECTED) 1] get /brokers/ids/1001
{"jmx_port":-1,"timestamp":"1492749719841","endpoints":["PLAINTEXT://hdp5.novalocal:6667"],"host":"hdp5.novalocal","version":3,"port":6667}

when my nginx connect with kafka,I can not connect with kafka.
the error message is
no resolver defined to resolve "hdp5.novalocal"

I try to config my hosts file,and i add ip and namesever ,but it can not work.

how can i resolve this issue?

is this production ready?

In the readme it is written that it is still in the finalization but the inscriptions are 3 years old.

buffered messages send to kafka err timeout

nginx error log:
[error] 30952#0: *30248161 lua tcp socket read timed out, context: ngx.timer, client: xxxx, server: xxxx
[error] 30952#0: *30248161 [lua] producer.lua:258: buffered messages send to kafka err: timeout, retryable: nil, topic:xxxx, partition_id: 1, length: 1, context: ngx.timer, client: xxxx, server: xxxx

but it did not happen frequently. kafka works fine and everything is OK, I have set request_timeout 10000ms like this:

local bp = producer:new(broker_list, { producer_type = "async", request_timeout = 10000 })

but it seems to be not work.
could you give me a favor?

Usage/installation?

Hi,
Could you provide a little example about how to install and use the library?

Messages aren't dispatched to Kafka, but no error is shown.

Hi,
I tried you producer example, but it's not working. I tried to send a message, but kafka doesn't receive it.
LUA gives me no errors, and both the "err" and the "resp" variable are nil.
I'm sure I can connect to Kafka from that machine because I tested the connection with kafkacat and the sending of messages via CLI works.

no resolver defined to resolve "localhost"

return:
brokers: [{"host":"localhost","port":9092}]; partitions: {"0":{"id":0,"errcode":0,"replicas":[1],"isr":[1],"leader":1},"1":{"id":1,"errcode":0,"replicas":[1],"isr":[1],"leader":1},"errcode":0,"num":2}
send err1:no resolver defined to resolve "localhost"

code:

local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"

local broker_list = {
--{ host = "192.168.1.1", port = 9092 },
{ host = "127.0.0.1", port = 9092 }
}

local key = "key"
local message = "halo world"

-- usually we do not use this library directly
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("test")
if not brokers then
ngx.say("fetch_metadata failed, err:", partitions)
end
ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))

-- sync producer_type
local p = producer:new(broker_list)

local offset, err = p:send("test", key, message)
if not offset then
ngx.say("send err1:", err)
return
end
ngx.say("send success, offset: ", tonumber(offset))

How can I solve it?

kafka throws an exception

java.lang.IllegalArgumentException: A metric named 'MetricName [name=throttle-time, group=Produce, description=Tracking average throttle-time per client, tags={client-id=worker:26556}]' already exists, can't register another one

How to make it work with proxy_pass

Hi

Can this driver run on the phases other than 'content_by_lua'? If it can only run on 'content_by_lua', this will cause a conflict with proxy_pass which is also run on the 'content phase' of nginx.
How to make this kafka producer work well when I have an upstream configuration?
My main purpose is to output upstream_response_time to kafka.

Thank you

lua-resty kafka

I used hostname ,find error is
[error] 13484#0: *105 [lua] client.lua:150: _fetch_metadata(): all brokers failed in fetch topic metadata, client: 127.0.0.1, server: localhost, request: "GET /test HTTP/1.1", host: "localhost"

local broker_list = {
{ host = "storm01", port = 9092 }
}

I hope you can help me .

Scope of the producer and bp

In the readme example, a broker is instantiated in the scope of a location. Is it possible to do the require statement in an init_by_lua_block? I have the same question for the bp?

So will the following work as expected, or do you believe it not to be thread-safe ?

http {
  lua_package_path "/data/programs/lua-resty-kafka/lib/?.lua;;";

  init_by_lua_block {
    cjson = require("cjson")

    local kafka_broker_list = {{host = "kafka1.mydomain.com", port = 9092}}
    local kafka_producer_config = {producer_type = "async"}

    -- can this be instantiated here ?
    local producer = require "resty.kafka.producer"
    -- edited this line: made bp global VM level 
   bp = producer:new(kafka_broker_list, kafka_producer_config)
  }

  server {
    location /test {
      content_by_lua_block {
        local args, err = ngx.req.get_uri_args()
        local message = {
          test_arg1 = args.test_arg1 or nil
        }
        message = cjson.encode(message)
        local ok, err = bp:send("test_topic", "test_key", message)
        if not ok then
          ngx.say("send err:", err)
          return
        end

        ngx.say("send success, ok:", ok)
      }
    }
  }
}

Getting 'no request' error with producer.

I've attempted to use the Kafka producer to send messages to a Kafka instance on the same local private network. Here's an example of the relevant kafka producer code that I'm using:

_kafka_broker_list = {
  {host = "10.10.250.10", port = 9092}
}
_kafka_producer = _kafka:new(_kafka_broker_list, {producer_type = "async"})

-- Other work gets done here

local kafka_send_ok, kafka_send_err = _kafka_producer:send(list, "nginx", '{"message": "this is a test", "type": "test"}')
if not kafka_send_ok then
   ngx.log(ngx.ERR, "Could not send message to Kafka broker:", kafka_send_err)
   return
end

but unfortunately I get this error when I start nginx:

2017/05/09 18:07:17 [error] 8#8: init_by_lua error: /usr/local/openresty/site/lualib/resty/kafka/producer.lua:282: no request
stack traceback:
	[C]: in function 'timer_at'
	/usr/local/openresty/site/lualib/resty/kafka/producer.lua:282: in function '_flush_buffer'
	/usr/local/openresty/site/lualib/resty/kafka/producer.lua:291: in function '_timer_flush'
	/usr/local/openresty/site/lualib/resty/kafka/producer.lua:333: in function 'new'
	init_by_lua:51: in main chunk

I can't seem to figure out why I'm getting this. Would you have any ideas? Thanks for your help.

all brokers failed in fetch topic metadata

2016/07/26 20:54:58 [error] 17646#0: *34 connect() failed (113: No route to host), client: 192.168.1.111, server: 192.168.1.160, request: "GET /meta HTTP/1.1", host: "192.168.1.160:4002"
2016/07/26 20:54:58 [error] 17646#0: *34 [lua] client.lua:209: _fetch_metadata(): all brokers failed in fetch topic metadata, client: 192.168.1.111, server: 192.168.1.160, request: "GET /meta HTTP/1.1", host: "192.168.1.160:4002"
2016/07/26 20:54:58 [error] 17646#0: *34 lua entry thread aborted: runtime error: content_by_lua(nginx.conf:71):16: attempt to index local 'brokers' (a nil value)
stack traceback:
coroutine 0:
    content_by_lua(nginx.conf:71): in function <content_by_lua(nginx.conf:71):1>, client: 192.168.1.111, server: 192.168.1.160, request: "GET /meta HTTP/1.1", host: "192.168.1.160:4002"

kafka restart , lua kafka product send err, nginx must be reload manually

HI,ALL:

when I restart my kafka cluster, My resty kafka product will can't send message. I mush reload my nginx then it will be OK . How to do resty kafka product ,so it will be automatically check the kafka cluser .

producer.lua:258: buffered messages send to kafka err: not found broker, retryable: true, topic: test, partition_id: 0, length: 219, context: ngx.timer

no resolver defined to resolve

Nginx.conf
ngx.say("[start lua kafka]")
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "127.0.0.1", port = 9094},
}
local topic = "hellokafka";
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata(topic)

ngx.say("[fetch_metadata]");
if not brokers then
    ngx.say("fetch_metadata failed, err: ", partitions)

end
ngx.say("brokers: ", cjson.encode(brokers), ";\npartitions: ", cjson.encode(partitions), ";")
ngx.say("[send message]");

local key = nil
local message = "hello nginx kafka lua"
local bp = producer:new(broker_list)
local ok, err = bp:send(topic, nil, message)
if not ok then
    ngx.say("send err:", err)
return
end
ngx.say("send success, ok:", ok)

Output:
[start lua kafka]
[fetch_metadata]
brokers: [null,{"host":"cp01-rdqa-dev340.cp01.baidu.com","port":9094}];
partitions: {"errcode":0,"num":1,"0":{"id":0,"errcode":0,"replicas":[2],"isr":[2],"leader":2}};
[send message]
send err:no resolver defined to resolve "cp01-rdqa-dev340.cp01.baidu.com"

Can't resolve the Hostname? Please Help me...

104: Connection reset by peer

The error log is:

2016/07/27 14:02:56 [error] 7253#0: *25 recv() failed (104: Connection reset by peer), client: 192.168.1.111, server: 192.168.1.160, request: "GET /collect HTTP/1.1", host: "192.168.1.160:4002"

and the config is:

location /collect {
            lua_need_request_body on;
            client_max_body_size 5M;
            client_body_buffer_size 256k;
            default_type application/json;

            content_by_lua '
                local cjson = require "cjson"
                local client = require "resty.kafka.client"
                local producer = require "resty.kafka.producer"

                local broker_list = {
                    { host = "192.168.1.151", port = 9092 }
                }
                local key = "key"
                local message = "halo world"

                -- sync producer_type
                local p = producer:new(broker_list)

                local offset, err = p:send("demo2", key, message)
                if not offset then
                    ngx.say("send err:", err)
                    return
                end
                ngx.say("send success, offset: ", tonumber(offset))

                ngx.say("send success, ok:", ok)
            ';
        }

Manually select Kafka partition

Hi,
Is it possibile to manually select the kafka partition, when sending messages?
I noticed that in the current implementation all messages are sent to partition 1, is it correct?

send err:not foundd topic

Hi,

Not sure why I'm getting this error and the topic do exist. I also tested it through client python with the same topic and that worked.

....
....

local broker_list = {
{ host = "127.0.0.1", port = 9092 },
}

    local key = "key1"
     local message = "hello world"
     local p = producer:new(broker_list)

     local offset, err = p:send("test", key,message)
     if not offset then
         ngx.say("send err:", err)
         return
     end
     ngx.say("send success, offset: ", tostring(offset))

Using:
Openresty 1.7.2.1
kafka server 2.10-0.8.2.0

Also, I tested on 0.8.1 and 0.8.1.1
Thanks,

runtime error: content_by_lua(lua-kafka.conf:50):21: Cannot serialise table

1 部署环境
openresty-1.9.15.1.tar.gz + 最新lua-resty-kafka + CDH5.4.0(kafka1.3.0)+ CentOS6.4 64位

2 lua-resty-kafka相应的nginx配置文件内容

server {
listen 8081;
server_name 192.168.15.84;
location /test {
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"

            local broker_list = {
                { host = "192.168.15.81", port = 9092 },
                { host = "192.168.15.82", port = 9092 },
                { host = "192.168.15.83", port = 9092 }
            }

            local key = "key"
            local message = "halo world"

            -- usually we do not use this library directly
            local cli = client:new(broker_list)
            local brokers, partitions = cli:fetch_metadata("test")
            if not brokers then
                ngx.say("fetch_metadata failed, err:", partitions)
            end
            ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))


            -- sync producer_type
            local p = producer:new(broker_list)

            local offset, err = p:send("test", key, message)
            if not offset then
                ngx.say("send err:", err)
                return
            end
            ngx.say("send success, offset: ", tonumber(offset))

            -- this is async producer_type and bp will be reused in the whole nginx worker
            local bp = producer:new(broker_list, { producer_type = "async" })

            local ok, err = bp:send("test", key, message)
            if not ok then
                ngx.say("send err:", err)
                return
            end

            ngx.say("send success, ok:", ok)
        ';
}

}

3 请求例子
curl -X POST -H ' charset=UTF-8' -d 'sssss' http://192.168.15.84:8081/test

4 出现的问题(数据不能写入kafka),日志详情如下:
2016/06/21 18:10:30 [error] 22381#0: *1 lua entry thread aborted: runtime error: content_by_lua(lua-kafka.conf:50):21: Cannot serialise table: excessively sparse array
stack traceback:
coroutine 0:
[C]: in function 'encode'
content_by_lua(lua-kafka.conf:50):21: in function <content_by_lua(lua-kafka.conf:50):1>, client: 192.168.15.84, server: 192.168.15.84, request: "POST /test HTTP/1.1", host: "192.168.15.84:8081"

5 非常感谢,期待您的回答。

async producer fails when first kafka instance in the list is down

I am using async kafka producer in our lua code, and we recently got situation than one of the kafka machines when down. Unfortunately, async client was continiously failing in this case, and I guess it was metadata request failing

The way we configure a kafka cluster in our lua config:

local config = {
  ["kafka"]={
    { host = "down", port = 9092 },
    { host = "foo", port = 9092 },
    { host = "bar", port = 9092 },
  },
}

Host 'down' obviously went down during incident. Others were up. All hosts are configured by IP addresses, not the hostnames

The way we use async producer:

      ....
local function send_messages_to_kafka(messages)
        local error_handle = function (topic, partition_id, queue, index, err, retryable)
          ngx.log(ngx.ERR, "failed to send to kafka: " .. err)
        end

        local producer_kafka, err = kafkaproducer:new(config.kafka, { producer_type = "async", refresh_interval = 5000, error_handle = error_handle })
        .....

And the result we observed in logs:


2016/04/24 12:04:06 [error] 22322#0: lua tcp socket connect timed out, context: ngx.timer, client: some.client.ip.here, server: 0.0.0.0:80
2016/04/24 12:04:06 [error] 22322#0: [lua] dataeye_logging_helper.lua:12: failed to send to kafka: timeout, context: ngx.timer, client: some.client.ip.here, server: 0.0.0.0:80

So my question here - is there any good workaround for this problem? I can't really understand from the resty-kafka code if it performs fallback across multiple kafka instances

Errcode and offset field of producer response are both -1

When I connected lua-resty-kafka to 0.9.0.1, the errocode and offset field in the response are both -1. There will be error message in nginx error.log (line 258, producer.lua)

After I change line 153 in producer.lua as follows, everything works fine.
if errcode == 0 or errocode == -1 then

Is there any plan to upgrade the protocol implementation to support Kafka api version 2?

BTW, @doujiang24 great project. Thanks

broker_list parameter didn't take effect when add producer_type = "async" to producer constructor

I have multi route in nginx config, each route register a kafka topic to produce message into each kafka cluster. The problem I have come across is that when I add a new route to send one topic into a new kafka cluster with the producer_type = "async" parameter, I saw the new topic messages still send into the kafka cluster I have configiured in the old route. I solve this problem by removing the producer_type = "async" parameter.

May this will help.

lua-resty-kafka require error

when i require resty.kafka.producer , the error log show:

2016/08/26 14:10:54 [error] 11222#0: *284 lua entry thread aborted: runtime error: error loading module 'resty.kafka.response' from file '/usr/local/lib/lua/resty/kafka/response.lua':
/usr/local/lib/lua/resty/kafka/response.lua:76: malformed number near '4294967296LL'
stack traceback:
coroutine 0:
[C]: ?
[C]: in function 'require'
/usr/local/lib/lua/resty/kafka/producer.lua:4: in main chunk
[C]: in function 'require'

message send success but kafka can not receive

hi ~ @doujiang24:
I have a problem about message send success (sended status),but kafka can not receive message. this problem ocurr after a day or two when starting my nginx. if you use commad "nginx -s reload or restart",this problem will be killed(send message success ,kafka also receive message).
follow is my lua script and nginx configure:
`
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "192.168.100.29", port = 9092 },
{ host = "192.168.100.30", port = 9092 },
{ host = "192.168.100.31", port = 9092 },
-- { host = "sz-space3.novalocal", port = 9092 },
-- { host = "sz-space4.novalocal", port = 9092 },
-- { host = "sz-space5.novalocal", port = 9092 }
}
local key = os.date("%s")
local body = ngx.req.get_body_data()
local message = body
local cli = client:new(broker_list)
-- local brokers, partitions = cli:fetch_metadata("cleancenter_spaceplus_cleantrack_test_topic")
local brokers, partitions = cli:fetch_metadata("cleancenter_spaceplus_cleantrack_topic0")
if not brokers then
ngx.say("fetch_metadata failed, err:", partitions)
return
end
-- ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
-- local p = producer:new(broker_list)
-- local offset, err = p:send("cleancenter_spaceplus_cleantrack_topic0", key, message)
-- if not offset then
-- ngx.say("send err:", err)
-- return
-- end
-- ngx.say("send success, offset: ", tonumber(offset))
-- this is async producer_type and bp will be reused in the whole nginx worker
local bp = producer:new(broker_list, { producer_type = "async" })
-- local ok, err = bp:send("cleancenter_spaceplus_cleantrack_test_topic", key, message)
local ok, err = bp:send("cleancenter_spaceplus_cleantrack_topic0", key, message)
if not ok then
ngx.say("send err:", err)
return
end
ngx.log(ngx.ERR,"test","ssss")
ngx.say("send success, ok:", ok)
-- ngx.say("send success, ok:", message)

# spot point log config begin
lua_package_path "/data/softwares/openresty-1.9.15.1/lualib/?.lua;;";
lua_need_request_body on;
server {
#listen port
listen 8080;
server_name 127.0.0.1;
location /clean/track {
content_by_lua_file /data/softwares/openresty-1.9.15.1/nginx/conf/lua/spot_point_log1.lua;
#log_by_lua /data/softwares/openresty-1.9.15.1/nginx/conf/lua/spot_point_log1.lua;
}
}

`

send result success, in customer not thing

this is my code , the result show: send success, offset: true
but in customer i can't get anything;
location ^~/1 { default_type 'text/html'; access_log off; access_by_lua ' local producer = require "resty.kafka.producer" local broker_list = { { host = "192.168.210.24", port = 6667 }, { host = "192.168.210.25", port = 6667 }, } local producer_config = {producer_type = "async"} local p = producer:new(broker_list , producer_config) local offset, err = p:send("usertrajectory" , nil , "1111111111111111111112222222222" ) if not offset then ngx.say("send err:", err) return end ngx.say("send success, offset: ", offset) return '; }

cli:fetch_metadata: result has error "Cannot serialise table: excessively sparse array"

I added the request handler as example in README. However, fetch_metadata() seemed to return empty result which caused the error.
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("exp_khuang")
ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions)) -- Error thrown here.

I noticed the traceback is like client:fetch_metadata->broker:send_receive(). I added some dump in send_receive(). The request:packet() sent to kafka broker node is "true", and received data is nil. I am not sure where it goes wrong. Could anyone kindly suggest? Thanks a lot!

In async model , buffer overflow occurs frequently

We use the lua-resty-kafka to send our log to the kafka.
The qps is 6K+ , size per request is 0.6K.
However we see many buffer overflow errors in the errlog.
Andr i found this error in ringbuffer.lua .

function _M.add(self, topic, key, message)
    local num = self.num
    local size = self.size

    if num >= size then
        return nil, "buffer overflow"
    end

    local index = (self.start + num) % size
    local queue = self.queue

    queue[index] = topic
    queue[index + 1] = key
    queue[index + 2] = message

    self.num = num + 3

    return true
end

What config should i set?
And what does this error mean?

ERROR: malformed number

2015/08/14 15:03:34 [error] 17858#0: *122 failed to run log_by_lua*: error loading module 'resty.kafka.response' from file '/usr/local/sh
1306     /usr/local/share/lua/5.1/resty/kafka/response.lua:76: malformed number near '4294967296LL'
1307 stack traceback:
1308     [C]: ?
1309     [C]: in function 'require'
1310     /usr/local/share/lua/5.1/resty/kafka/producer.lua:4: in main chunk
1311     [C]: in function 'require'
1312     /etc/nginx/lua/log/log_into_kafka.lua:1: in function </etc/nginx/lua/log/log_into_kafka.lua:1> while logging request, client: 14.153.
1313 2015/08/14 15:03:35 [error] 17858#0: *122 failed to run log_by_lua*: /etc/nginx/lua/log/log_into_kafka.lua:1: loop or previous error load
1314 stack traceback:
1315     [C]: in function 'require'

lua-resty-kafka error

Failed to connect to Kafka [lua] client.lua:150: _fetch_metadata(): all brokers failed in fetch topic metadata
return result :fetch_metadata failed, err:not foundd topic
but topic is already created
why?

In code demo:

-- this is async producer_type and bp will be reused in the whole nginx worker
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = p:send("test", key, message)

p == bp

this should be
bp:send("test", key, message)

no resolver defined to resolve "name"

Hello! May be you can help me.
I have some producer:

 local broker_list = {
    { host = "127.0.0.1", port = 9092 }
 }
local p = producer:new(broker_list)
local key = "key"
local mess = "hi"
local offset, err = p:send('test', key, mess)

I get responce:
no resolver defined to resolve "esha"
I searched about nginx resolver, but
not solve my problem.
In my /etc/resolv.conf:
nameserver 127.0.1.1
In my /etc/hosts:

127.0.0.1   localhost
::1         localhost
27.0.1.1   esha

Where "esha" in error? If i have "localhost" in broker list ?

Now, i solve problem use "stub" in /producer.lua:

129     local config = brokers[leader]
130     
131     config.host = "127.0.0.1"
132     local bk = broker:new(config.host, config.port, self.socket_config)
133     self.producer_brokers[leader] = bk
134     
135     return bk

What would you recommend?

Getting error when sending message

Hi there,
I'm trying to use this module but get errors like below. Do you have any idea why this is? This "domain-test" topic should be existing on my broker.

2014/12/17 10:37:55 [error] 32594#0: *31 lua entry thread aborted: runtime error: /usr/share/lualib/resty/kafka/request.lua:167: 'for' limit must be a number
stack traceback:
coroutine 0:
        /usr/share/lualib/resty/kafka/request.lua: in function 'message_set'
        /usr/share/lualib/resty/kafka/producer.lua:76: in function 'produce_encode'
        /usr/share/lualib/resty/kafka/producer.lua:248: in function 'send'
        /etc/nginx/lua/info_tests.lua:42: in function </etc/nginx/lua/info_tests.lua:1>, 

with info-tests.lua being something like this (line numbers are not correct):

local cjson = require "cjson"
local producer = require "resty.kafka.producer"

local broker_list = {
  { host = "xxxx", port = xxx },
}

local key = "key"
local message = "halo world"

local bp = producer:new(broker_list, { producer_type = "async" })

local size, err = bp:send("domain-test", key, message)
if not size then
  ngx.say("send err:", err)
  return
end

send err:tmp.cn could not be resolved

Server1: ip is 100.100.100.101(something like this)
There are nginx, lua-resty-kafka, interface which will be called using http protocol. The interface will call lua-resty-kafka as producer
Server2: ip is 200.100.100.101(something like this)
Kafka running on server2 in single mode.
Server2 's /etc/hosts content is:
127.0.0.1 localhost
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
200.100.100.101 tmp.cn

When i run the interface on server1, it will give error send err:tmp.cn could not be resolved
I already use issue#5 for reference to set parameters
On server2, set host.name=127.0.0.1 or host.name=200.100.100.101
and
On server 1, nginx configuration set resolver 200.100.100.101
restart nginx on server1
These methods still can not solve my problem
It still give out this error

I wonder why it need to access domain name of kafka server, only ip address and port is not enough to communicate with kafka server?
Thank you

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.