Giter VIP home page Giter VIP logo

flow-pipeline's Introduction

flow-pipeline

This repository contains a set of tools and examples for GoFlow, a NetFlow/IPFIX/sFlow collector by Cloudflare.

Start a flow pipeline

The demo directory contains a startup file for an example pipeline including:

  • GoFlow: an sFlow collector
  • A mock collector
  • Kafka/Zookeeper
  • A database (Postgres/clickhouse)
  • An inserter: to insert the flows in a database (for Postgres)

It will listen on port 6343/UDP for sFlow and 2055/UDP for NetFlow.

The protobuf provided in this repository is a light version of the GoFlow original one. Only a handful of fields will be inserted.

A basic pipeline looks like this:




                   +------+         +-----+
     sFlow/NetFlow |goflow+--------->Kafka|
                   +------+         +-----+
                                       |
                                       +--------------+
                      Topic: flows     |              |
                                       |              |
                                 +-----v----+       +-v---------+
                                 | inserter |       |new service|
                                 +----------+       +-----------+
                                      |
                                      |
                                   +--v--+
                                   |  DB |
                                   +-----+

You can add a processor that would enrich the data by consuming from Kafka and re-injecting the data into Kafka or directly into the database.

For instance, IP addresses can be mapped to countries, ASN or customer information.

A suggestion is extending the GoFlow protobuf with new fields.

Run a mock insertion

A mock insertion replaces the GoFlow decoding part. A mocker generates protobuf messages and sends them to Kafka.

Clone the repository, then run the following (for Postgres):

$ cd compose
$ docker-compose -f docker-compose-postgres-mock.yaml

Wait a minute for all the components to start.

You can connect on the local Grafana http://localhost:3000 (admin/admin) to look at the flows being collected.

Run a GoFlow insertion

If you want to send sFlow/NetFlow/IPFIX to a GoFlow, run the following:

Using Postgres:

$ cd compose
$ docker-compose -f docker-compose-postgres-collect.yml

Using Clickhouse (see next section):

$ cd compose
$ docker-compose -f docker-compose-clickhouse-collect.yml

Keep in mind this is a development/prototype setup. Some components will likely not be able to process more than a few thousands rows per second. You will likely have to tweak configuration statements, number of workers.

Using a production setup, GoFlow was able to process more than +100k flows per seconds and insert them in a Clickhouse database.

About the Clickhouse setup

If you choose to visualize in Grafana, you will need a Clickhouse Data source plugin. You can connect to the compose Grafana which has the plugin installed.

The insertion is handled natively by Clickhouse:

Note: the protobuf messages to be written with their lengths.

Clickhouse will connect to Kafka periodically and fetch the content. Materialized views allow to store the data persistently and aggregate over fields.

To connect to the database, you have to run the following:

$ docker exec -ti compose_db_1 clickhouse-client

Once in the client CLI, a handful of tables are available:

  • flows is directly connected to Kafka, it fetches from the current offset
  • flows_raw contains the materialized view of flows
  • flows_5m contains 5-minutes aggregates of ASN

Commands example:

:) DESCRIBE flows_raw

DESCRIBE TABLE flows_raw

┌─name───────────┬─type────────────┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐
│ Date           │ Date            │              │                    │         │                  │                │
│ TimeReceived   │ DateTime        │              │                    │         │                  │                │
│ TimeFlowStart  │ DateTime        │              │                    │         │                  │                │
│ SequenceNum    │ UInt32          │              │                    │         │                  │                │
│ SamplingRate   │ UInt64          │              │                    │         │                  │                │
│ SamplerAddress │ FixedString(16) │              │                    │         │                  │                │
│ SrcAddr        │ FixedString(16) │              │                    │         │                  │                │
│ DstAddr        │ FixedString(16) │              │                    │         │                  │                │
│ SrcAS          │ UInt32          │              │                    │         │                  │                │
│ DstAS          │ UInt32          │              │                    │         │                  │                │
│ EType          │ UInt32          │              │                    │         │                  │                │
│ Proto          │ UInt32          │              │                    │         │                  │                │
│ SrcPort        │ UInt32          │              │                    │         │                  │                │
│ DstPort        │ UInt32          │              │                    │         │                  │                │
│ Bytes          │ UInt64          │              │                    │         │                  │                │
│ Packets        │ UInt64          │              │                    │         │                  │                │
└────────────────┴─────────────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘

:) SELECT Date,TimeReceived,IPv6NumToString(SrcAddr), IPv6NumToString(DstAddr), Bytes, Packets FROM flows_raw;

SELECT
    Date,
    TimeReceived,
    IPv6NumToString(SrcAddr),
    IPv6NumToString(DstAddr),
    Bytes,
    Packets
FROM flows_raw

┌───────Date─┬────────TimeReceived─┬─IPv6NumToString(SrcAddr)─┬─IPv6NumToString(DstAddr)─┬─Bytes─┬─Packets─┐
│ 2020-03-22 │ 2020-03-22 21:26:38 │ 2001:db8:0:1::80         │ 2001:db8:0:1::20         │   105 │      63 │
│ 2020-03-22 │ 2020-03-22 21:26:38 │ 2001:db8:0:1::c2         │ 2001:db8:0:1::           │   386 │      43 │
│ 2020-03-22 │ 2020-03-22 21:26:38 │ 2001:db8:0:1::6b         │ 2001:db8:0:1::9c         │   697 │      29 │
│ 2020-03-22 │ 2020-03-22 21:26:38 │ 2001:db8:0:1::81         │ 2001:db8:0:1::           │  1371 │      54 │
│ 2020-03-22 │ 2020-03-22 21:26:39 │ 2001:db8:0:1::87         │ 2001:db8:0:1::32         │   123 │      23 │

To look at aggregates (optimizing will run the summing operation). The Nested structure allows to have sum per structures (in our case, per Ethernet-Type).

:) OPTIMIZE TABLE flows_5m;

OPTIMIZE TABLE flows_5m

Ok.

:) SELECT * FROM flows_5m WHERE SrcAS = 65001;

SELECT *
FROM flows_5m
WHERE SrcAS = 65001

┌───────Date─┬────────────Timeslot─┬─SrcAS─┬─DstAS─┬─ETypeMap.EType─┬─ETypeMap.Bytes─┬─ETypeMap.Packets─┬─ETypeMap.Count─┬─Bytes─┬─Packets─┬─Count─┐
│ 2020-03-22 │ 2020-03-22 21:25:00 │ 65001 │ 65000 │ [34525]        │ [2930]         │ [152]            │ [4]            │  2930 │     152 │     4 │
│ 2020-03-22 │ 2020-03-22 21:25:00 │ 65001 │ 65001 │ [34525]        │ [1935]         │ [190]            │ [3]            │  1935 │     190 │     3 │
│ 2020-03-22 │ 2020-03-22 21:25:00 │ 65001 │ 65002 │ [34525]        │ [4820]         │ [288]            │ [6]            │  4820 │     288 │     6 │

Regarding the storage of IP addresses: At the moment, the current Clickhouse table does not perform any transformation of the addresses before insertion. The bytes are inserted in a FixedString(16) regardless of the family (IPv4, IPv6). In the dashboards, the function IPv6NumToString(SrcAddr) is used.

For example, 192.168.1.1 will end up being 101:a8c0::

WITH toFixedString(reinterpretAsString(ipv4), 16) AS ipv4c
SELECT
    '192.168.1.1' AS ip,
    IPv4StringToNum(ip) AS ipv4,
    IPv6NumToString(ipv4c) AS ipv6

┌─ip──────────┬───────ipv4─┬─ipv6───────┐
│ 192.168.1.13232235777101:a8c0:: │
└─────────────┴────────────┴────────────┘

In order to convert it:

WITH IPv6StringToNum(ip) AS ipv6
SELECT
    '101:a8c0::' AS ip,
    reinterpretAsUInt32(ipv6) AS ipv6c,
    IPv4NumToString(ipv6c) AS ipv4

┌─ip─────────┬──────ipv6c─┬─ipv4────────┐
│ 101:a8c0:: │ 3232235777192.168.1.1 │
└────────────┴────────────┴─────────────┘

Which for instance to display either IPv4 or IPv6 in a single query:

SELECT
  if(EType = 0x800, IPv4NumToString(reinterpretAsUInt32(SrcAddr)), IPv6NumToString(SrcAddr) AS SrcIP

This will be fixed in future dashboard/db schema version.

Information and roadmap

This repository is an example and does not offer any warranties. I try to update it whenever I can. Contributions are welcome.

The main purpose is for users to get started quickly and provide a basic system. This should not be used in production.

I received requests to publish the Flink aggregator source code as you may have seen it being used in GoFlow presentations. Unfortunately, we moved entirely towards Clickhouse, the old code has not been updated in a while. It may get published at some point but this is currently low priority.

Issue troubleshooting

The compose files don't bind to specific versions of the containers. You will likely need to down in order to clean the setup (volumes, network), push to resynchronize repositories like GoFlow and build to rebuild components like inserter .

$ docker-compose -f some-yaml-listed-above.yml down
$ docker-compose -f some-yaml-listed-above.yml pull
$ docker-compose -f some-yaml-listed-above.yml build
$ docker-compose -f some-yaml-listed-above.yml up

flow-pipeline's People

Contributors

jbampton avatar lspgn avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flow-pipeline's Issues

Project Status, and License missing

Hello, @lspgn @jbampton

I find this project quite interesting but seems to be abandoned. As per GitHub rules the lack of a license limits what forks can do. Whats the status of this project, and if abandoned, would it be possible to add a license so that other folks can fork/modify/publish this project?

Thanks!

Leading capital letter in branch dictionary-clickhouse

In branch dictionary-clickhouse
When you create the DB you are using small letters for dictionaries.protocols but in the CSV you are using leading capital letter
It cause error in the Grafana dashboard for ports
Please change the DB to have leading capital letter

CREATE DATABASE dictionaries; 

CREATE DICTIONARY dictionaries.protocols (   \
    Proto UInt8,  \
    Name String,  \
    Description String   \
)  \
PRIMARY KEY Proto   \ 
LAYOUT(FLAT())  \
SOURCE (FILE(path '/var/lib/clickhouse/user_files/protocols.csv' format 'CSVWithNames'))  \
LIFETIME(3600);  

Bugs in docker-compose-collect.yml?

Hi,

I tried to run the pipeline specified by docker-compose-collect.yml, but it raises an error says that flag "kafka.broker.out" is not found. Is that a deprecated one?

Thank you so much!

Clickhouse engine Kafka consumer cannot decode flowtpye normally

Using go client can decode normally:
{SFLOW_5 1612172554 50360667 15000 0 [10 16 2 1] 1612172554 1612172554 115 1 [10 1 1 1] [110 43 83 35] 2048 6 62306 443 56 1745 102106146323752 132052928835024 257 1712 4 0 0 64 0 127 24 8 0 681602 21969 16384 0
0 0 [] 0 0 0 false [] [] 0 0 0 0 0 0 0 false 0 0 0 0 0 0 0 0 0 false 0 {} [] 0}
Using Clickhouse Kafka:
The Tpye field is decoded to 0 (unknown flow)

Hardware spec used for handling 100k flows per seconds

Reading the documentation (README), I can see that you have mentioned-

"Using a production setup, GoFlow was able to process more than +100k flows per second and insert them in a Clickhouse database."

Can you please tell us the hardware specs of different components of the goflow pipeline you used for that?

inserter dies out in collect configuration

Thank for GOFlow software. I am studying how to implement in my small network.

I started with flow-pipeline in a docker environment to better understand all the parts.

docker-compose -f ./docker-compose-mock.yml up

worked fine ! The problem is when I try

docker-compose -f docker-compose-collect.yml up

Apparently inserter dies at the kafka.broker creation. Error is the following:

time="2019-08-09T14:52:02Z" level=fatal msg="kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"

the container restart (as per policy) and dies again. the export to postgresql do not occur.

What is surpring is that the same inserter container with the mock generation is working fine.
Maybe my knowledge on Kafka is still to limited to understand what is going on.
Regards
Enrico

Docker engine

Server: Docker Engine - Community
 Engine:
  Version:          19.03.1
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.12.5

Docker-compose version

docker-compose version 1.24.1, build 4667896b
docker-py version: 3.7.3
CPython version: 3.6.8
OpenSSL version: OpenSSL 1.1.0j  20 Nov OS 2018

OS system
Fedora 30 workstation

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.