Giter VIP home page Giter VIP logo

export's Introduction

Export

ci build Go Report Card License

Mainflux Export service can send message from one Mainflux cloud to another via MQTT, or it can send messages from edge gateway to Mainflux Cloud. Export service is subscribed to local message bus and connected to MQTT broker in the cloud.
Messages collected on local message bus are redirected to the cloud. When connection is lost, messages from local bus are stored into Redis stream. Upon connection reestablishment Export service consumes messages from Redis stream and sends it to the Mainflux cloud.

Install

Get the code:

go get github.com/mainflux/export
cd $GOPATH/github.com/mainflux/export

Make:

make

Usage

cd build
./mainflux-export

Configuration

By default Export service looks for config file at ../configs/config.toml if no env vars are specified.

[exp]
  cache_pass = ""
  cache_url = "localhost:6379"
  cache_db = "0"
  log_level = "debug"
  nats = "localhost:4222"
  port = "8170"

[mqtt]
  username = "<thing_id>"
  password = "<thing_password>"
  ca = "ca.crt"
  cert = "thing.crt"
  mtls = "false"
  priv_key = "thing.key"
  retain = "false"
  skip_tls_ver = "false"
  url = "tcp://mainflux.com:1883"

[[routes]]
  mqtt_topic = "channel/<channel_id>/messages"
  subtopic = "subtopic"
  nats_topic = "export"
  type = "plain"
  workers = 10

Http port

  • port - HTTP port where status of Export service can be fetched.
curl -X GET http://localhost:8170/version
{"service":"export","version":"0.0.1"}%

Redis connection

To configure Redis connection settings cache_url, cache_pass, cache_db in config.toml are used.

MQTT connection

To establish connection to MQTT broker following settings are needed:

  • username - Mainflux <thing_id>
  • password - Mainflux <thing_key>
  • url - url of MQTT broker

Additionally, you will need MQTT client certificates if you enable mTLS. To obtain certificates ca.crt, thing.crt and key thing.key follow instructions here.

Routes

Routes are being used for specifying which subscriber's topic(subject) goes to which publishing topic. Currently only MQTT is supported for publishing. To match Mainflux requirements mqtt_topic must contain channel/<channel_id>/messages, additional subtopics can be appended.

  • mqtt_topic - channel/<channel_id>/messages/<custom_subtopic>
  • nats_topic - Export service will be subscribed to NATS subject <nats_topic>.>
  • subtopic - messages will be published to MQTT topic <mqtt_topic>/<subtopic>/<nats_subject>, where dots in nats_subject are replaced with '/'
  • workers control number of workers that will be used for message forwarding.
  • type - specifies message transformation, currently only plain is supported, meaning no transformation.

Before running Export service edit configs/config.toml and provide username, password and url

  • username - matches thing_id in Mainflux cloud instance
  • password - matches thing_key
  • channel - MQTT part of the topic where to publish MQTT data (channel/<channel_id>/messages is format of mainflux MQTT topic) and plays a part in authorization.

In order for Export service to listen on Mainflux NATS deployed on the same machine NATS port must be exposed. Run mainflux using make run.

Environment variables

Service will look for config.toml first and if not found it will be configured with env variables and new config file specified with MF_EXPORT_CONFIG_FILE will be saved with values populated from env vars.
The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.

Variable Description Default
MF_NATS_URL Nats url localhost:4222
MF_EXPORT_MQTT_HOST Mqtt url where to export tcp://localhost:1883
MF_EXPORT_MQTT_USERNAME MQTT username, thing id in case of mainflux
MF_EXPORT_MQTT_PASSWORD MQTT password, thing key in case of mainflux
MF_EXPORT_MQTT_CHANNEL MQTT channel where to publish
MF_EXPORT_MQTT_SKIP_TLS Skip tls verification true
MF_EXPORT_MQTT_MTLS Use MTLS for authentication false
MF_EXPORT_MQTT_CA CA for tls ca.crt
MF_EXPORT_MQTT_CLIENT_CERT Client cert for authentication in case when MTLS = true thing.crt
MF_EXPORT_MQTT_CLIENT_PK Client key for authentication in case when MTLS = true thing.key
MF_EXPORT_MQTT_QOS MQTT QOS 0
MF_EXPORT_MQTT_RETAIN MQTT retain false
MF_EXPORT_CONFIG_FILE Configuration file config.toml

for values in environment variables to take effect make sure that there is no MF_EXPORT_CONF file.

If you run with environment variables you can create config file:

MF_EXPORT_PORT=8178 \
MF_EXPORT_LOG_LEVEL=debug \
MF_EXPORT_MQTT_HOST=tcp://localhost:1883 \
MF_EXPORT_MQTT_USERNAME=<thing_id> \
MF_EXPORT_MQTT_PASSWORD=<thing_key> \
MF_EXPORT_MQTT_CHANNEL=<channel_id> \
MF_EXPORT_MQTT_SKIP_TLS=true \
MF_EXPORT_MQTT_MTLS=false \
MF_EXPORT_MQTT_CA=ca.crt \
MF_EXPORT_MQTT_CLIENT_CERT=thing.crt \
MF_EXPORT_MQTT_CLIENT_PK=thing.key \
MF_EXPORT_CONFIG_FILE=export.toml \
../build/mainflux-export&

Service will be subscribed to NATS <nats_topic>.> subject and send messages to channels/<MF_EXPORT_MQTT_CHANNEL>/messages + / + <NatsSubject>. For example if you are running Mainflux on a gateway if you set nats_topic="channel" you can make export service forward messages to other Mainflux instances i.e. into to the Mainflux cloud. When message gets published to local Mainflux instance it will end on NATS as channels.<local_channel_id>.messages.subtopic, Export service will pick it up and forward it to <mqtt_topic> ending on <mqtt_topic>/channels/<local_channel_id>/messages/subtopic. Created export.toml you can edit to add different routes and use in next run.

How to save config via agent

Configuration file for Export service can be send over MQTT using Agent service. save, export,

mosquitto_pub -u <thing_id> -P <thing_key> -t channels/<control_ch_id>/messages/req -h localhost -p 18831  -m  "[{\"bn\":\"1:\", \"n\":\"config\", \"vs\":\"save, export, <config_file_path>, <file_content_base64>\"}]"

vs="config_file_path, file_content_base64" - vs determines where to save file and contains file content in base64 encoding payload:

b,_ := toml.Marshal(export.Config)
payload := base64.StdEncoding.EncodeToString(b)

export's People

Contributors

blokovi avatar drasko avatar mteodor avatar pricelessrabbit avatar sammyoina avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

export's Issues

Workers get stuck when waiting for Paho Token

STR

  • setup an export with QoS 2
  • set the workers number to a low one
  • start to send messages to mainflux
  • simulate network partition
  • reconnect the export

expected

  • all the messages sent during the disconnection are delivered to mainflux

actual

  • only 1 message per worker is delivered. For example with 3 workers, only 3 messages are persisted and sent when exporter reconnects. All the other messages are lost

Seems that the issue is in the Token.wait() used here

when a worker publish to mqtt and qos = 2, paho wait for reconnection to send the message so the worker get stuck in the wait() callback and cant manage further messages.

I managed to fix it using Token.WaitTimeout

token := e.mqtt.Publish(topic, byte(e.cfg.MQTT.QoS), e.cfg.MQTT.Retain, payload)
	sentInTime := token.WaitTimeout(3 * time.Second)
	if !sentInTime {
		e.logger.Warn(fmt.Sprintf("MQTT message in topic %s sending timeout exceed: persisted if QOS >= 1", topic))
		return nil
	}
	if sentInTime && token.Error() != nil {
		e.logger.Error(fmt.Sprintf("Failed to publish to topic %s", topic))
		return token.Error()
	}
	return nil

In that way when waiting exceeds 3 seconds the worker returns online

Refactor export, add Subscriber and Publisher

Think about creating Subscriber interface that will be implemented using nats package, and Publisher interface that will be implemented using mqtt package. Then, you can use these interfaces from the service in order to read and forward messages. Also, maybe you can separate MQTT client and logger from Route in this way.

Simplify folder structure

This was following go project recommendations, but turend to be much complicated than Mainflux flat repo.

Use either Mainflux flat repo structure or eventually just pkg dir.

Issues on storing message locally when connection to the cloud is lost

As an edge service working on an industrial context, I expect that storage and retention of data implementation is solid and protects from data loss in case of common system failure events.

As now, after the redis stream removal, the current implementation that delegate to paho message persistence fails in common use cases. I opened a PR that fix some of them but there is still a major flaw that can cause large amount of data loss. This is related to a limitation of paho client during the first MQTT connection and afaik it cannot be easily solved without a custom storage mechanism like the one based on redis streams.

TEST ENVIRONMENT:

  • local nats publisher producing messages
  • local export service running
  • export service with setup to publish QOS 2 messages to cloud
  • cloud instance of mainflux with export channel configured
  • external mqtt client (mossquitto_sub) connected to mainflux channel to check data loss

TESTED USE CASES:


  • connection lost after successfull connection
  1. export service starts and connects to broker
  2. export disconnects from the broker (simulated network issue)
  3. export service reconnects to broker

results:

  • all messages are delivered (correctly managed by paho)

  • restart of export service
  1. export service starts and connects to broker
  2. force restart of export service

result:

  • sometimes some messages are lost

fix:

  • fixed with PR to persist in file

  • connection lost after successfull connection and restart
  1. export service starts and connects to broker
  2. export disconnects from the broker (simulated network issue)
  3. force restart of export service
  4. export service reconnects to broker

results:

  • all pending messages ( produced during disconnection timeframe) are lost

fix:

  • fixed with PR to persist in file

  • broker unreachable when service starts
  1. export service starts but cannot connect to broker

OR

  1. export service starts and connects to broker
  2. export service force restarts
  3. export service starts but the broker now is unreachable

results:
export service shutdown, all messages are lost

fix:

  • NO FIX. I tried to fix it by disable the initial paho token wait that cause the shutdown of the service and let paho manage the auto reconnect, but there is an known limitation of it (eclipse/paho.mqtt.golang#77). seems that paho stores messages locally and manage reconnects only if the first connection is successful.

This imho is a major flaw of the export service, that becomes essentially useless in contexts requiring reliability also in cases when the service is started without internet connection.

Imagine the situation in which the edge hardware is restarted but that day there is no internet connection. All the messages of that day are lost.


Note about #19

How can export service guarantee message reliability in case of future http publisher implementation, if it is delegated to the MQTT library?

export service stuck when internet connection lost

Hi, there is a possible issue when the internet connection is lost and export service cannot connect with the mqtt uplink. This (in my case) results in data loss. In particular seems that the exporter stops to process the incoming NATS messages.

environment

I used a dockerized environment with all the edge stack

version: "3.7"

services:

  nats:
    image: nats:1.3.0
    command: "-c /etc/nats/nats.conf"
    ports: 
      - 4222:4222
    volumes:
      - ./nats/nats.conf:/etc/nats/nats.conf
      - ./nats/data/store:/etc/nats/store
    restart: on-failure

  redis:
    image: redis:6-alpine
    ports: 
      - 6379:6379
    volumes: 
      - ./redis/data:/data
      - ./redis/conf/redis.conf:/usr/local/etc/redis/redis.conf
    restart: on-failure

  agent:
    image: mainflux/agent:latest
    environment: 
        - MF_AGENT_NATS_URL=nats://nats:4222
        - MF_AGENT_LOG_LEVEL=debug  
        - MF_AGENT_CONFIG_FILE=/configs/config.toml
    env_file:
        ./.env
    volumes: 
        - ./agent/configs:/configs
    restart: on-failure
    depends_on: 
        - nats

  export:
    image: mainflux/export
    ports: 
      - 8170:8170
    env_file:
      ./.env
    volumes: 
      - ./agent/configs/export:/configs
    restart: on-failure
    depends_on: 
      - nats
      - redis

exporter config

File = "/configs/export/config.toml"

[exp]
  log_level = "debug"
  nats = "nats://nats:4222"
  port = "8172"
  cache_url = "redis:6379"
  cache_pass = ""
  cache_db = "0"

[mqtt]
  ca_path = ""
  cert_path = ""
  channel = "<channel>"
  host = "tcp:/<host>:1883"
  mtls = false
  password = "<pass>"
  priv_key_path = ""
  qos = 2
  retain = false
  skip_tls_ver = false
  username = "<user>"

[[routes]]
  mqtt_topic = "channels/<channel>/messages"
  nats_topic = ">"
  subtopic = ""
  type = "plain"
  workers = 10

Steps to reproduce

I started a go test executable that publish a message in the export.message NATS subject. The exporter relay the messages to the mainflux cloud

2020/07/30 13:11:35 Configuration loaded from file ../configs/config.toml
export_1  | {"level":"debug","message":"Client export-d6d128d8-841a-4906-81d0-cc7ab1a6c682 connected","ts":"2020-07-30T13:11:35.409503589Z"}
export_1  | {"level":"error","message":"Failed to create stream group: BUSYGROUP Consumer Group name already exists","ts":"2020-07-30T13:11:35.410850563Z"}
export_1  | {"level":"info","message":"Stream group \u003e created ","ts":"2020-07-30T13:11:35.410924929Z"}
export_1  | {"level":"info","message":"Export service started, exposed port :8172","ts":"2020-07-30T13:11:35.411072197Z"}
export_1  | {"level":"info","message":"Republish, waiting for stream data","ts":"2020-07-30T13:11:35.411050072Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 148","ts":"2020-07-30T13:11:37.341565285Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 149","ts":"2020-07-30T13:11:39.346738911Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 150","ts":"2020-07-30T13:11:41.343323877Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 151","ts":"2020-07-30T13:11:43.344359133Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 152","ts":"2020-07-30T13:11:45.347391765Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 153","ts":"2020-07-30T13:11:47.348156214Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 154","ts":"2020-07-30T13:11:49.34798676Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 155","ts":"2020-07-30T13:11:51.350006471Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 156","ts":"2020-07-30T13:11:53.348543792Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 157","ts":"2020-07-30T13:11:55.349915616Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 158","ts":"2020-07-30T13:11:57.354041142Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 159","ts":"2020-07-30T13:11:59.357020898Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 160","ts":"2020-07-30T13:12:01.359262115Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 161","ts":"2020-07-30T13:12:03.35459824Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 162","ts":"2020-07-30T13:12:05.357387038Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 163","ts":"2020-07-30T13:12:07.35790137Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 164","ts":"2020-07-30T13:12:09.358460938Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 165","ts":"2020-07-30T13:12:11.357249305Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 166","ts":"2020-07-30T13:12:13.360555103Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 167","ts":"2020-07-30T13:12:15.359051578Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 168","ts":"2020-07-30T13:12:17.359637009Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 169","ts":"2020-07-30T13:12:19.360792346Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 170","ts":"2020-07-30T13:12:21.361622165Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 171","ts":"2020-07-30T13:12:23.361572003Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 172","ts":"2020-07-30T13:12:25.365206421Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 173","ts":"2020-07-30T13:12:27.365519358Z"}
export_1  | {"level":"debug","message":"Client export-d6d128d8-841a-4906-81d0-cc7ab1a6c682 disconnected","ts":"2020-07-30T13:13:10.41096313Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 175","ts":"2020-07-30T13:13:17.577789392Z"}
export_1  | {"level":"error","message":"not connected to mqtt broker","ts":"2020-07-30T13:13:17.577826033Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 178","ts":"2020-07-30T13:13:17.577821667Z"}
export_1  | {"level":"error","message":"not connected to mqtt broker","ts":"2020-07-30T13:13:17.577859377Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 181","ts":"2020-07-30T13:13:17.57794355Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 182","ts":"2020-07-30T13:13:17.577973627Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 174","ts":"2020-07-30T13:13:17.577929897Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 179","ts":"2020-07-30T13:13:17.578131105Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 180","ts":"2020-07-30T13:13:17.578151371Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 183","ts":"2020-07-30T13:13:17.57815695Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 177","ts":"2020-07-30T13:13:17.578172894Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 176","ts":"2020-07-30T13:13:17.578184137Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 185","ts":"2020-07-30T13:13:17.580156408Z"}
export_1  | {"level":"debug","message":"Published to: export.message, payload: message 184","ts":"2020-07-30T13:13:17.580139305Z"}

I tried to power off the internet connection to simulate a network issue. After that (2020-07-30T13:13:17.577859377Z) the exporter sends some out-of-order messages (I guess the ones that was stored in the redis stream) but then it got stuck. After the message 184 it doesn't produce any logs but in the meantime the go test service continues to publish messages in the NATS subject. If I restart the exporter, it starts to send the new messages but all the data between the "stuck" state and the restart seems to be lost

Add Support For Multi-broker Messaging From Mainflux

FEATURE REQUEST

  1. Is there an open issue addressing this request? No

  2. Describe the feature you are requesting, as well as the possible use case(s) for it.
    Since mainflux now supports multi-broker messaging, it would be nice to have a way to specify the broker to use when publishing messages. This would allow for a more flexible deployment of mainflux.

  3. Indicate the importance of this feature to you (must-have, should-have, nice-to-have).
    Should-have

Filter messages sent from the edge

I'm trying to filter messages that are sent from edge to cloud. I want to do achieve what is described here but it's not working.
export config file

File = "./export-config.toml"

[exp]
cache_db = "0"
cache_pass = ""
cache_url = "localhost:6379"
log_level = "debug"
nats = "nats://127.0.0.1:4222"
port = "8170"

[mqtt]
ca_path = "ca.crt"
client_cert = ""
client_cert_key = ""
client_cert_path = "thing.crt"
client_priv_key_path = "thing.key"
host = "tcp://192.168.0.100:1883"
mtls = false
password = "dcc10567-b1d8-424d-a7b3-c5e70cffe5bc"
qos = 0
retain = false
skip_tls_ver = true
username = "333b6ee8-c3e5-4e61-a090-19cef0030146"

[[routes]]
mqtt_topic = "channels/dd5b8d8b-086b-4d7b-9c9e-dc5c5835d492/messages"
nats_topic = "export"
subtopic = ""
type = "default"
workers = 10

[[routes]]
mqtt_topic = "channels/dd5b8d8b-086b-4d7b-9c9e-dc5c5835d492/messages"
nats_topic = "channels.f26f6357-1a58-4f63-9198-c68628df173e"
subtopic = "my_subj"
type = "mfx"
workers = 10

publishing message
mosquitto_pub -d -u f6b7e529-5adb-41ff-8666-d4b2fad6d769 -P e687d1fe-a779-414b-9d5d-1b61d87d9f5d -t channels/f26f6357-1a58-4f63-9198-c68628df173e/messages -h localhost -p 1883 -m '[{"bn":"1:", "n":"exec", "vs":"ls, -l"}]'

when I remove .f26f6357-1a58-4f63-9198-c68628df173e from nats_topic, export sends all messages regularly
{"level":"debug","message":"Client export-333b6ee8-c3e5-4e61-a090-19cef0030146 connected","ts":"2022-02-09T10:12:58.113449092Z"} {"level":"debug","message":"Published to: channels.f26f6357-1a58-4f63-9198-c68628df173e, payload: [{\"bn\":\"1:\", \"n\":\"exec\", \"vs\":\"ls, -l\"}]","ts":"2022-02-09T10:13:03.420431845Z"}

Do I need to configure something else or add anything more? Is there another way to filter messages that will be sent to cloud?

Error connecting to the broker

I'm trying to deploy mainflux on raspberry pi 4, mainflux core is working properly, agent. When I try to run export service I get error with message Client export - had error connecting to the broker
2021/11/16 22:37:05 Configuration loaded from file ./export-config.toml {"level":"error","message":"Client export- had error connecting to the broker: Network Error : %!s(<nil>)","ts":"2021-11-17T08:37:05.766113751Z"} {"level":"error","message":"Failed to create service :Network Error : %!s(<nil>)","ts":"2021-11-17T08:37:05.766327877Z"}

I'm following article on medium on using mainflux on edge, but I didn't use this script, I used export-start script (I supposed this was a typo)

Also, I'm not following the article step by step, I built agent and export services on raspberry directly. I also don't use DigitalOcean, mainflux is just deployed on my laptop (and not ui, basic and later I added services missing in comparison to docker-compose in ui). Still agent service on raspberry connected to bootstrap and I got configuration for raspberry.

Consume goroutine in some cases never process channel messages

Hi all. I found a very strange behaviour in export service if adding multiple routes. Seems that sometimes the workers goroutines remains waiting also if there is data in the channel. This issue appears randomly, but i managed to reproduce it with workers = 1 and GOMAXPROCS= 1 (but the issue can appear also with default settings).
This is my setup

[[routes]]
mqtt_topic = "channels/8f99dca5-7e83-4d57-bc4f-c3356d4e73d7/messages"
nats_topic = "measures"
subtopic = ""
type = "default"
workers = 1

[[routes]]
mqtt_topic = "channels/8f99dca5-7e83-4d57-bc4f-c3356d4e73d7/messages"
nats_topic = "measures2"
subtopic = ""
type = "default"
workers = 1

i also have an external process that publishes on nats "measures" topic.

  • if i remove the second route, it works

  • if i switch the 2 routes, it works

  • if the first route defined is the "measures" one , then the r.Consume() worker goroutine remains waiting on the for msg := range r.Messages, and never process messages also if the channel is full / partially full

Seems a strange concurrency issue, but i cannot understand it. Tried some workarounds to better understand the issue that works:

  • change the for := range with a select and added a default case with a sleep. In that case it works
  • subscribe to nats with a callback instead of channel, and manually insert values in the channel:
for _, r := range e.consumers {
		//_, err := nc.ChanQueueSubscribe(r.NatsTopic, exportGroup, r.Messages)
		_, err := nc.QueueSubscribe(r.NatsTopic, exportGroup,func(msg *nats.Msg){
			r.Messages <- msg
		})
		if err != nil {
			e.logger.Error(fmt.Sprintf("Failed to subscribe to NATS %s: %s", r.NatsTopic, err))
		}
		for i := 0; i < r.Workers; i++ {
			go r.Consume()
		}
	}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.