Giter VIP home page Giter VIP logo

snap-plugin-publisher-kafka's Introduction

DISCONTINUATION OF PROJECT.

This project will no longer be maintained by Intel.

This project has been identified as having known security escapes.

Intel has ceased development and contributions including, but not limited to, maintenance, bug fixes, new releases, or updates, to this project.

Intel no longer accepts patches to this project.

DISCONTINUATION OF PROJECT

This project will no longer be maintained by Intel. Intel will not provide or guarantee development of or support for this project, including but not limited to, maintenance, bug fixes, new releases or updates. Patches to this project are no longer accepted by Intel. If you have an ongoing need to use this project, are interested in independently developing it, or would like to maintain patches for the community, please create your own fork of the project.

Build Status

Snap publisher plugin - Kafka

Allows publishing of data to Apache Kafka

It's used in the snap framework.

  1. Getting Started
  1. Documentation
  1. Community Support
  2. Contributing
  3. License
  4. Acknowledgements

Getting Started

System Requirements

  • Uses sarama golang client for Kafka by Shopify

Installation

Download Kafka plugin binary:

You can get the pre-built binaries for your OS and architecture at plugin's GitHub Releases page.

To build the plugin binary:

Fork https://github.com/intelsdi-x/snap-plugin-publisher-kafka

Clone repo into $GOPATH/src/github.com/intelsdi-x/:

$ git clone https://github.com/<yourGithubID>/snap-plugin-publisher-kafka.git

Build the plugin by running make within the cloned repo:

$ make

This builds the plugin in ./build

Configuration and Usage

Task Manifest Config

In task manifest, in config section of Kafka publisher the following settings can be declared:

Key Type Default value Description
topic string "snap" The topic to send messages
brokers string "localhost:9092" Semicolon delimited list of "server:port" brokers
Kafka's standard broker communication port is 9092

Notice To publish to remote Kafka endpoint, please ensure that all listed brokers are available from the host where snap-plugin-publisher-kafka is running. You can simply check it by running telnet providing address and port where Kafka advertised broker is running, for example telnet 123.45.67.89 9092.

Documentation

Kafka Quickstart

This is a minimal-configuration needed to run the Kafka broker service on Docker

Run ZooKeeper server on docker

Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one.

 $ docker run -d --name zookeeper jplock/zookeeper:3.4.6

Check if ZooKeeper docker is running:

$ docker ps  

	CONTAINER ID        IMAGE                      COMMAND                CREATED             STATUS              PORTS              			NAMES
	9b0ddbdd75cd        jplock/zookeeper:3.4.6     "/opt/zookeeper/bin/   38 seconds ago      Up 38 seconds       2181/tcp, 2888/tcp, 3888/tcp	zookeeper

Run Kafka server on docker

 docker run -d --name kafka --link zookeeper:zookeeper ches/kafka

Verify the running docker containers:

$ docker ps

	CONTAINER ID        IMAGE                        COMMAND                CREATED             STATUS              PORTS							NAMES
	dfb3cdfb3f87        ches/kafka:latest            "/start.sh"            7 seconds ago       Up 6 seconds        7203/tcp, 9092/tcp				kafka
	9b0ddbdd75cd        jplock/zookeeper:3.4.6       "/opt/zookeeper/bin/   3 minutes ago       Up 3 minutes        2181/tcp, 2888/tcp, 3888/tcp	zookeeper

Get Kafka advertised hostname:

$ docker inspect --format '{{ .NetworkSettings.IPAddress }}' kafka

  172.17.0.14

Read more about Kafka on http://kafka.apache.org

Published data

The plugin publishes all collected metrics serialized as JSON to Kafka. An example of published data is below:

[
  {
    "timestamp": "2016-07-25T11:27:59.795548989+02:00",
    "namespace": "/intel/mock/bar",
    "data": 82,
    "unit": "",
    "tags": {
      "plugin_running_on": "my-machine"
    },
    "version": 0,
    "last_advertised_time": "2016-07-25T11:27:21.852064032+02:00"
  },
  {
    "timestamp": "2016-07-25T11:27:59.795549268+02:00",
    "namespace": "/intel/mock/foo",
    "data": 72,
    "unit": "",
    "tags": {
      "plugin_running_on": "my-machine"
    },
    "version": 0,
    "last_advertised_time": "2016-07-25T11:27:21.852063228+02:00"
  }
]

Examples

Example of running psutil collector plugin and publishing data to Kafka.

Set up the Snap framework

Ensure Snap daemon is running:

  • initd: service snap-telemetry start
  • systemd: systemctl start snap-telemetry
  • command line: sudo snapteld -l 1 -t 0 &

Download and load Snap plugins (paths to binary files for Linux/amd64):

$ wget http://snap.ci.snap-telemetry.io/plugins/snap-plugin-publisher-kafka/latest/linux/x86_64/snap-plugin-publisher-kafka
$ wget http://snap.ci.snap-telemetry.io/plugins/snap-plugin-collector-psutil/latest/linux/x86_64/snap-plugin-collector-psutil
$ snaptel plugin load snap-plugin-publisher-kafka
$ snaptel plugin load snap-plugin-collector-psutil

Create a task manifest (see exemplary tasks), for example psutil-kafka.json with following content:

{
  "version": 1,
  "schedule": {
    "type": "simple",
    "interval": "1s"
  },
  "workflow": {
    "collect": {
      "metrics": {
        "/intel/psutil/load/load1": {},
        "/intel/psutil/load/load15": {}
      },
      "publish": [
        {
          "plugin_name": "kafka",
          "config": {
            "topic": "test",
            "brokers": "172.17.0.14:9092"
          }
        }
      ]
    }
  }
}

Create a task:

$ snaptel task create -t psutil-kafka.json

Watch created task:

$ snaptel task watch <task_id>

To stop previously created task:

$ snaptel task stop <task_id>

Roadmap

There isn't a current roadmap for this plugin, but it is in active development. As we launch this plugin, we do not have any outstanding requirements for the next release.

If you have a feature request, please add it as an issue and/or submit a pull request.

Community Support

This repository is one of many plugins in Snap, a powerful telemetry framework. See the full project at http://github.com/intelsdi-x/snap To reach out to other users, head to the main framework

Contributing

We love contributions!

There's more than one way to give back, from examples to blogs to code updates. See our recommended process in CONTRIBUTING.md.

License

Snap, along with this plugin, is an Open Source software released under the Apache 2.0 License.

Acknowledgements

And thank you! Your contribution, through code and participation, is incredibly important to us.

snap-plugin-publisher-kafka's People

Contributors

andrzej-k avatar candysmurf avatar cr2025x1 avatar geauxvirtual avatar ircody avatar izabellaraulin avatar jcooklin avatar katarzyna-z avatar marcin-krolik avatar nanliu avatar nguyenddn avatar rdower avatar sandlbn avatar smartx-jshan avatar taotod avatar zhengqingyuan avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

snap-plugin-publisher-kafka's Issues

Few failures

this happens with DF plugin -> KAFKA plugin (0.17.0 and 0.16.1-beta tested):

time="2016-11-09T20:35:05Z" level=error msg="error with publisher job" _module=scheduler-job block=run error="Publish call error: Cannot marshal metrics to JSON format, err=json: unsupported value: NaN" job-type=publisher plugin-config=map[] plugin-name=kafka plugin-version=-1 

this happens with 0.18.0:

time="2016-11-09T20:32:56Z" level=error msg="error with publisher job" _module=scheduler-job block=run error="Publish call error: Cannot initialize a new Sarama SyncProducer using the given broker addresses ([localhost:9092]), err=kafka: client has run out of available brokers to talk to (Is your cluster reachable?)" job-type=publisher plugin-config=map[brokers:{localhost:9092} topic:{snap}] plugin-name=kafka plugin-version=-1 

Same configuration, only snapd binary changed.

kafka message in topic is instable

Hello,
I tested kafka plubsher using Snap V0.11 with Kafka pluign.
I user psutil (collection), passthrough (processor) and kafka (publisher).
I defined some task like this.
task

And, when I see messages using this command,
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test5 --from-beginning

I saw this message.
task1

How to can see messages correctly? (Formatiing Issues?)

Updates in README needed

There are some TODOs in README ("Documentation" and "Examples") that need to be resolved - at the very least an example of global config and sample task are needed.

Missing partitioning key

Should allow to set key when sending data to kafka.

It means: all data now is sent to random partition. Which means - you do not have guarantee of proper ordering, nor you can properly use multiple readers (as data distribute randomly).
That makes any kind of "windows"/derivative calculation impossible.

Add two options: no key, key=plugin_running_on. Maybe some other keying options.
I would say, most of people would prefer partitioning by hostname/ip.

not serializing published output as json

I am using the plugin binary for v0.15.0-beta. When data arrives in kafka it does not appear to be serialized as json. Have been using trifecta and kafka-console-consumer.sh to inspect data. Data for metrics is definitely arriving in kafka, but it is not in json format AFAICT, and is possibly corrupted. eg.

disk_totalfloat6�`�Asource10.1.0.90:5051plugin_running_onjenkins-1-node-0�m�F#2��intelmesosagentslavevalid_framework_messagesfloat6source10.1.0.90:5051plugin_running_onjenkins-1-node-0�m�F#2��
"��[]plugin.MetricType��������
MetricType��
Namespace_��LastAdvertisedTime_�Version_Config_��Data_Tags_��Unit_

Plugin Load Error when loading snap-plugin-publisher-kafka

Issue reported by smartx-jshan on main snap repo - intelsdi-x/snap#1057

•OS version : Ubuntu 14.04.4
•Snap version: Snap v0.14-beta
•Environment details: Virtual Machine based on KVM

When loading snap-plugin-publisher-kafka, I see the error message as below.

$ snapd -t 0 -l 1
$ snapctl plugin load snap-plugin-publisher-kafka

snap error

gob: name not registred for interface: "*cpolicy.ConfigPolicyNode"

However, when I use Snap v0.11-beta with kafka plugin, it works correctly.

Better output format

Current data export option is quite BIIIIIG ONE. And having so much duplicated information does not help at all.
So - as for kafka, I would suggest to add new data format. Rather than exporting
{ [ {},{} ] } ... do little magic.
I would suggest drop most of fields, leaving only value and create tree structure on output.

Here is java code:

List<Metrics> metrics = mapper.readValue(record.value(), List.class, Metrics.class);
Map<String, Object> output = new HashMap<>();
for (Metrics m : metrics) {
    String[] split = m.namespace.split("/");
    List<String> path = new ArrayList<>(Arrays.asList(split));
    path.remove(path.size() - 1);
    path.remove(0);
    ensurePath(output, path).put(split[split.length - 1], m.data);
}


    private Map<String, Object> ensurePath(Map<String, Object> map, List<String> path) {
        if (!map.containsKey(path.get(0)))
            map.put(path.get(0), new HashMap<String, Object>());
        if (path.size() > 1) {
            String remove = path.remove(0);
            return ensurePath((Map<String, Object>) map.get(remove), path);
        } else
            return (Map<String, Object>) map.get(path.get(0));
    }

Here is partial JSON example on output:

{
  "intel":{
    "procfs":{
      "iface":{
        "lo":{
          "packets_recv":29227674,
          "compressed_recv":0,
          "multicast_sent":0,
          "frame_recv":0,
          "fifo_recv":0,
          "drop_sent":0,
          "bytes_sent":27885493282,
          "errs_recv":0,
          "compressed_sent":0,
          "errs_sent":0,
          "packets_sent":29227674,
          "drop_recv":0,
          "frame_sent":0,
          "multicast_recv":0,
          "bytes_recv":27885493282,
          "fifo_sent":0
        },
        "eth0":{
          "packets_recv":9660182,
          "compressed_recv":0,
          "multicast_sent":0,
          "frame_recv":0,
          "fifo_recv":0,
          "drop_sent":0,
          "bytes_sent":14853181981,
          "errs_recv":0,
          "compressed_sent":0,
          "errs_sent":0,
          "packets_sent":11323280,
          "drop_recv":0,
          "frame_sent":0,
          "multicast_recv":0,
          "bytes_recv":7701517374,
          "fifo_sent":0
        }
      },
      "processes":{
        "running":1,
        "stopped":0,
        "tracing":0,
        "wakekill":0,
        "parked":0,
        "waiting":0,
        "zombie":0,
        "waking":0,
          "pid":{
            "18399":{
            "snap-plugin-collector-interface":{
            "ps_disk_octets_wchar":174263636,

Only need to add
_timestamp and _tags from first Metrics[] element (or task) to make it complete.

{
  "_timestamp":"123123123",
  "_tags" : { ....}
  "intel" {
....
}

Explanation:
All plugins run at time, when task is scheduled - every 10 seconds, every minute etc. Only thing is important to have task scheduled/running time. If we run evey N secs, we do not care about exact time (nanoseconds :) ). did it run eta+0 or eta+1 or eta+whatever. +- it will run at our scheduled time.
Tags .. they are the same for each metrics reported - at least for all plugins I've worked with.
Adding this as optional "compressed, tree" output format would save a lot of space and that also allow better ways of processing.
Data size compared to original is ~10%.
I do not use logstash, but this format also would allow easy processing chain snap -> kafka -> logstash -> ES. As tree would be much simpler to consume and as you can see from example, you can effectively insert that data directly to ES.

Connection is not closed after each publishing

The plugin opens a new connection to Kafka everytime when it produces a message on Kafka. However it does not close the connection after it is done. These not closed connections eventually floods the plugin and the snap task using the plugin stops.

screenshot from 2016-07-25 18 20 53
screenshot from 2016-07-25 18 35 53

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.