Giter VIP home page Giter VIP logo

emqx_plugin_kafka's Introduction

emqx_plugin_kafka

Kafka plugin for EMQX >= V5.4.0

Usage

Release

> git clone https://github.com/jostar-y/emqx_plugin_kafka.git
> cd emqx_plugin_kafka
> make rel
_build/default/emqx_plugrel/emqx_plugin_kafka-<vsn>.tar.gz

Config

Explain

> cat priv/emqx_plugin_kafka.hocon
plugin_kafka {
  // required
  connection {
    // Kafka client id: "emqx_plugin:kafka_client:${client_id}"
    // optional   default:"emqx_plugin:kafka_client:emqx_plugin_kafka_connection"
    client_id = "kafka_client"
    // Kafka address.
    // required
    bootstrap_hosts = ["10.3.64.223:9192", "10.3.64.223:9292", "10.3.64.223:9392"]

    // Reference type: kpro_connection:config().
    // https://github.com/kafka4beam/kafka_protocol/blob/master/src/kpro_connection.erl
    // optional   default:5s
    connect_timeout = 5s
    // enum: per_partition | per_broker
    // optional   default:per_partition
    connection_strategy = per_partition
    // optional   default:5s
    min_metadata_refresh_interval = 5s
    // optional   default:true
    query_api_versions = true
    // optional   default:3s
    request_timeout = 3s
    sasl {
      // enum:  plain | scram_sha_256 | scram_sha_512
      mechanism = plain
      username = "username"
      password = "password"
    }
    ssl {
      enable = false
    }

    //Emqx resource opts.
    // optional   default:32s
    health_check_interval = 32s
  }

  // optional
  producer {
    // Most number of bytes to collect into a produce request.
    // optional   default:896KB
    max_batch_bytes = 896KB
    // enum:  no_compression | snappy | gzip
    // optional   default:no_compression
    compression = no_compression
    // enum:  random | roundrobin | first_key_dispatch
    // optional   default:random
    partition_strategy = random

    // Encode kafka value.
    // enum:  plain | base64
    // optional   default:plain
    encode_payload_type = plain
  }

  // required
  hooks = [
    {
      // Hook point.
      // required
      endpoint = message.publish
      // Emqx topic pattern.
      // 1. Cannot match the system message;
      // 2. Cannot use filters that start with '+' or '#'.
      // message required
      filter = "test/#"
      // Kafka topic, must be created in advance in Kafka.
      // required
      kafka_topic = emqx_test
      // Matching template, value = ${.} indicates that all keys match
      // optional default:{timestamp = "${.timestamp}", value = "${.}",key = "${.clientid}"}
      kafka_message = {
        timestamp = "${.timestamp}"
        value = "${.}"
        key = "${.clientid}"
      }
    }
  ]
}

Some examples in the directory priv/example/.

Hook Point

endpoint filter
client.connect /
client.connack /
client.connected /
client.disconnected /
client.authenticate /
client.authorize /
client.authenticate /
client.check_authz_complete /
session.created /
session.subscribed /
session.unsubscribed /
session.resumed /
session.discarded /
session.takenover /
session.terminated /
message.publish required
message.delivered required
message.acked required
message.dropped required

Path

  • Default path: emqx/etc/emqx_plugin_kafka.hocon
  • Attach to path: set system environment variables export EMQX_PLUGIN_KAFKA_CONF="absolute_path"

emqx_plugin_kafka's People

Contributors

jostar-y avatar

Stargazers

Carlos Enriquez Lopez avatar  avatar igit-cn avatar

Watchers

 avatar

emqx_plugin_kafka's Issues

msg: bad_hocon_file, mfa: emqx_plugin_kafka:read_config/0(44)

你好, 通过使用emqx 5.4.0源码打包方式(erlang 25.2.2 + rebar3 3.20.0 ), 再使用你提供 kafka插件 (v1.0.0), 在emqx界面上进行加载并启动。
emqx界面显示 启动成功, 但后台日志报错,并且不能转发kafka。

如下, 请帮忙给看一下,怎么解决,谢谢
2024-02-01T20:58:43.198576+08:00 [warning] msg: plugin_app_already_running, mfa: emqx_plugins:load_plugin_app/4(663), loading_vsn: 0.3.4, name: replayq, running_vsn: 0.3.7
2024-02-01T20:58:43.202306+08:00 [error] msg: bad_hocon_file, mfa: emqx_plugin_kafka:read_config/0(44), file: etc/emqx_plugin_kafka.hocon, reason: {enoent,"/opt/emqx/emqx-v5.4.0/_build/emqx/rel/emqx/etc/emqx_plugin_kafka.hocon"}

启动插件报错,弄了很久找不到原因,请指导一下,感谢

kafka里emqx_test主题已经提前建好
出错日志如下:

2024-04-25T15:12:06.826883+08:00 [warning] msg: plugin_app_already_running, mfa: emqx_plugins:load_plugin_app/4(663), loading_vsn: 0.3.4, name: replayq, running_vsn: 0.3.7
2024-04-25T15:12:06.836304+08:00 [error] input-config:, #{<<"bootstrap_hosts">> => [<<"192.168.119.143:59092">>], <<"client_id">> => <<"kafka_client">>,<<"connect_timeout">> => <<"5s">>, <<"connection_strategy">> => <<"per_partition">>, <<"health_check_interval">> => <<"32s">>, <<"min_metadata_refresh_interval">> => <<"5s">>, <<"query_api_versions">> => true,<<"request_timeout">> => <<"3s">>, <<"sasl">> =>, #{<<"mechanism">> => <<"plain">>,<<"password">> => <<"admin">>, <<"username">> => <<"admin">>}, <<"ssl">> => #{<<"enable">> => false}}, #{exception =>, {case_clause,{[<<"client_id">>,<<"client_id">>], [<<"connect_timeout">>,<<"connection_strategy">>, <<"min_metadata_refresh_interval">>, <<"query_api_versions">>,<<"request_timeout">>, <<"sasl">>,<<"ssl">>,<<"health_check_interval">>]}}, field => <<"connection">>,path => "plugin_kafka", reason => failed_to_check_field}, mfa: undefined
2024-04-25T15:12:06.836466+08:00 [error] crasher: initial call: application_master:init/4, pid: <0.3219.0>, registered_name: [], exit: {{bad_return,{{emqx_plugin_kafka_app,start,[normal,[]]},{'EXIT',{#{exception => {case_clause,{[<<"client_id">>,<<"client_id">>],[<<"connect_timeout">>,<<"connection_strategy">>,<<"min_metadata_refresh_interval">>,<<"query_api_versions">>,<<"request_timeout">>,<<"sasl">>,<<"ssl">>,<<"health_check_interval">>]}},field => <<"connection">>,path => "plugin_kafka",reason => failed_to_check_field},[{hocon_tconf,match_field_names,3,[{file,"hocon_tconf.erl"},{line,717}]},{hocon_tconf,check_unknown_fields,3,[{file,"hocon_tconf.erl"},{line,695}]},{hocon_tconf,do_map2,3,[{file,"hocon_tconf.erl"},{line,408}]},{hocon_tconf,map_one_field_non_hidden,4,[{file,"hocon_tconf.erl"},{line,492}]},{hocon_tconf,map_fields_cont,4,[{file,"hocon_tconf.erl"},{line,431}]},{hocon_tconf,map_one_field_non_hidden,4,[{file,"hocon_tconf.erl"},{line,492}]},{hocon_tconf,map_fields_cont,4,[{file,"hocon_tconf.erl"},{line,431}]},{hocon_tconf,map,4,[{file,"hocon_tconf.erl"},{line,301}]},{hocon_tconf,map_translate,3,[{file,"hocon_tconf.erl"},{line,99}]},{emqx_config,do_check_config,3,[{file,"emqx_config.erl"},{line,476}]},{emqx_config,check_config,3,[{file,"emqx_config.erl"},{line,462}]},{emqx_plugin_kafka,read_config,0,[{file,"/home/cqct/emqx_plugin_kafka/src/emqx_plugin_kafka.erl"},{line,24}]},{emqx_plugin_kafka,load,0,[{file,"/home/cqct/emqx_plugin_kafka/src/emqx_plugin_kafka.erl"},{line,12}]},{emqx_plugin_kafka_app,start,2,[{file,"/home/cqct/emqx_plugin_kafka/src/emqx_plugin_kafka_app.erl"},{line,14}]},{application_master,start_it_old,4,[{file,"application_master.erl"},{line,293}]}]}}}},[{application_master,init,4,[{file,"application_master.erl"},{line,142}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]}, ancestors: [<0.3218.0>], message_queue_len: 1, messages: [{'EXIT',<0.3220.0>,normal}], links: [<0.3218.0>,<0.1997.0>], dictionary: [], trap_exit: true, status: running, heap_size: 610, stack_size: 28, reductions: 241; neighbours:
2024-04-25T15:12:06.840398+08:00 [alert] msg: failed_to_start_plugin, mfa: emqx_plugins:ensure_started/1(368), reason: #{app => emqx_plugin_kafka,err_app => emqx_plugin_kafka,error => "failed_to_start_plugin_app",reason => {bad_return,{{emqx_plugin_kafka_app,start,[normal,[]]},{'EXIT',{#{exception => {case_clause,{[<<"client_id">>,<<"client_id">>],[<<"connect_timeout">>,<<"connection_strategy">>,<<"min_metadata_refresh_interval">>,<<"query_api_versions">>,<<"request_timeout">>,<<"sasl">>,<<"ssl">>,<<"health_check_interval">>]}},field => <<"connection">>,path => "plugin_kafka",reason => failed_to_check_field},[{hocon_tconf,match_field_names,3,[{file,"hocon_tconf.erl"},{line,717}]},{hocon_tconf,check_unknown_fields,3,[{file,"hocon_tconf.erl"},{line,695}]},{hocon_tconf,do_map2,3,[{file,"hocon_tconf.erl"},{line,408}]},{hocon_tconf,map_one_field_non_hidden,4,[{file,"hocon_tconf.erl"},{line,492}]},{hocon_tconf,map_fields_cont,4,[{file,"hocon_tconf.erl"},{line,431}]},{hocon_tconf,map_one_field_non_hidden,4,[{file,"hocon_tconf.erl"},{line,492}]},{hocon_tconf,map_fields_cont,4,[{file,"hocon_tconf.erl"},{line,431}]},{hocon_tconf,map,4,[{file,"hocon_tconf.erl"},{line,301}]},{hocon_tconf,map_translate,3,[{file,"hocon_tconf.erl"},{line,99}]},{emqx_config,do_check_config,3,[{file,"emqx_config.erl"},{line,476}]},{emqx_config,check_config,3,[{file,"emqx_config.erl"},{line,462}]},{emqx_plugin_kafka,read_config,0,[{file,"/home/cqct/emqx_plugin_kafka/src/emqx_plugin_kafka.erl"},{line,24}]},{emqx_plugin_kafka,load,0,[{file,"/home/cqct/emqx_plugin_kafka/src/emqx_plugin_kafka.erl"},{line,12}]},{emqx_plugin_kafka_app,start,2,[{file,"/home/cqct/emqx_plugin_kafka/src/emqx_plugin_kafka_app.erl"},{line,14}]},{application_master,start_it_old,4,[{file,"application_master.erl"},{line,293}]}]}}}}}
2024-04-25T15:12:06.875796+08:00 [warning] msg: configured_plugin_not_installed, mfa: emqx_plugins:list/1(426), name_vsn: emqx_plugin_kafka-0.1

emqx_plugin_kafka.hocon配置文件如下:
plugin_kafka {
connection {
client_id = "kafka_client"
bootstrap_hosts = ["192.168.119.143:59092"]
connect_timeout = 5s
connection_strategy = per_partition
min_metadata_refresh_interval = 5s
query_api_versions = true
request_timeout = 3s
sasl {
mechanism = plain
username = "admin"
password = "admin"
}
ssl {
enable = false
}

health_check_interval = 32s

}

hooks = [
{
endpoint = message.publish
filter = "test/#"
kafka_topic = emqx_test
kafka_message = {
timestamp = "${.timestamp}"
value = "${.}"
key = "${.clientid}"
}
}
]
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.