Giter VIP home page Giter VIP logo

eventbus's Introduction

EventBus

GoDoc Coverage Status Build Status

Package EventBus is the little and lightweight eventbus with async compatibility for GoLang.

Installation

Make sure that Go is installed on your computer. Type the following command in your terminal:

go get github.com/asaskevich/EventBus

After it the package is ready to use.

Import package in your project

Add following line in your *.go file:

import "github.com/asaskevich/EventBus"

If you unhappy to use long EventBus, you can do something like this:

import (
	evbus "github.com/asaskevich/EventBus"
)

Example

func calculator(a int, b int) {
	fmt.Printf("%d\n", a + b)
}

func main() {
	bus := EventBus.New();
	bus.Subscribe("main:calculator", calculator);
	bus.Publish("main:calculator", 20, 40);
	bus.Unsubscribe("main:calculator", calculator);
}

Implemented methods

  • New()
  • Subscribe()
  • SubscribeOnce()
  • HasCallback()
  • Unsubscribe()
  • Publish()
  • SubscribeAsync()
  • SubscribeOnceAsync()
  • WaitAsync()

New()

New returns new EventBus with empty handlers.

bus := EventBus.New();

Subscribe(topic string, fn interface{}) error

Subscribe to a topic. Returns error if fn is not a function.

func Handler() { ... }
...
bus.Subscribe("topic:handler", Handler)

SubscribeOnce(topic string, fn interface{}) error

Subscribe to a topic once. Handler will be removed after executing. Returns error if fn is not a function.

func HelloWorld() { ... }
...
bus.SubscribeOnce("topic:handler", HelloWorld)

Unsubscribe(topic string, fn interface{}) error

Remove callback defined for a topic. Returns error if there are no callbacks subscribed to the topic.

bus.Unsubscribe("topic:handler", HelloWord);

HasCallback(topic string) bool

Returns true if exists any callback subscribed to the topic.

Publish(topic string, args ...interface{})

Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.

func Handler(str string) { ... }
...
bus.Subscribe("topic:handler", Handler)
...
bus.Publish("topic:handler", "Hello, World!");

SubscribeAsync(topic string, fn interface{}, transactional bool)

Subscribe to a topic with an asynchronous callback. Returns error if fn is not a function.

func slowCalculator(a, b int) {
	time.Sleep(3 * time.Second)
	fmt.Printf("%d\n", a + b)
}

bus := EventBus.New()
bus.SubscribeAsync("main:slow_calculator", slowCalculator, false)

bus.Publish("main:slow_calculator", 20, 60)

fmt.Println("start: do some stuff while waiting for a result")
fmt.Println("end: do some stuff while waiting for a result")

bus.WaitAsync() // wait for all async callbacks to complete

fmt.Println("do some stuff after waiting for result")

Transactional determines whether subsequent callbacks for a topic are run serially (true) or concurrently(false)

SubscribeOnceAsync(topic string, args ...interface{})

SubscribeOnceAsync works like SubscribeOnce except the callback to executed asynchronously

WaitAsync()

WaitAsync waits for all async callbacks to complete.

Cross Process Events

Works with two rpc services:

  • a client service to listen to remotely published events from a server
  • a server service to listen to client subscriptions

server.go

func main() {
    server := NewServer(":2010", "/_server_bus_", New())
    server.Start()
    // ...
    server.EventBus().Publish("main:calculator", 4, 6)
    // ...
    server.Stop()
}

client.go

func main() {
    client := NewClient(":2015", "/_client_bus_", New())
    client.Start()
    client.Subscribe("main:calculator", calculator, ":2010", "/_server_bus_")
    // ...
    client.Stop()
}

Notes

Documentation is available here: godoc.org. Full information about code coverage is also available here: EventBus on gocover.io.

Support

If you do have a contribution for the package feel free to put up a Pull Request or open Issue.

Special thanks to contributors

eventbus's People

Contributors

asaskevich avatar bennah avatar briandowns avatar crijonsi avatar dnathe4th avatar dominikschulz avatar erfanio avatar erickskrauch avatar hagii avatar lookfirst avatar pgermishuys avatar tetratorus avatar xmxiaoq avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

eventbus's Issues

`Lock` aquired when publish an event from subscriber.

Current behaviour

The main function Publish an event1 and relative handler of event1, publish event2, but the event2 handler is not calling, and the program is not exited.

bus.Subscribe("event2", func() {
	fmt.Println("event2")
})
bus.Subscribe("event1", func(bus EventBus.Bus) {
	fmt.Println("event1")
	time.Sleep(time.Second * 5)
	bus.Publish("event2")
})
bus.Publish("event1", bus)

Further debugging

  • I had seen in subscribe we lock bus, so whenever publish already locked, and in SubscribeAsync we can't lock bus.
    Is that default behaviour?

Use of reflect

There are multiple places where reflect is being used. Wouldnt that cause performance issues?

Feature request: subscription by mask

Something like

bus.SubscribeAsync("content:*", func() {}, true)

And it should react on "content:created" and "content:deleted", etc.

Also, for this case - will be cool if there will be "event" parameter, for subscriber.

Unsubscribe middle closure function

import (
	"github.com/asaskevich/EventBus"
	"github.com/stretchr/testify/require"
	"testing"
)

func TestAsaskevichBus_UnsubscribeClosureMiddle(t *testing.T) {

	callCount := 0
	sum := 0
	makeFn := func(a int) func() {
		return func() {
			callCount++
			sum += a
		}
	}
	const evName = "ev1"
	evBus := EventBus.New()
	f1 := makeFn(11)
	f2 := makeFn(22)
	f3 := makeFn(33)
	require.NoError(t, evBus.Subscribe(evName, f1))
	require.NoError(t, evBus.Subscribe(evName, f2))
	require.NoError(t, evBus.Subscribe(evName, f3))
	//
	require.NoError(t, evBus.Unsubscribe(evName, f2))
	//
	evBus.Publish(evName)
	require.Equal(t, callCount, 2)
	require.Equal(t, sum, 11+33)

}

like #47

Indexing error due to multiple removeHandler

In line 141 func Publish

// Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.
func (bus *EventBus) Publish(topic string, args ...interface{}) {
	bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish
	defer bus.lock.Unlock()
	if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) {
		// Handlers slice may be changed by removeHandler and Unsubscribe during iteration,
		// so make a copy and iterate the copied slice.
		copyHandlers := make([]*eventHandler, len(handlers))
		copy(copyHandlers, handlers)
		for i, handler := range copyHandlers {
			if handler.flagOnce {
				bus.removeHandler(topic, i) // multiple operation causes indexing error
			}
			if !handler.async {
				bus.doPublish(handler, topic, args...)
			} else {
				bus.wg.Add(1)
				if handler.transactional {
					bus.lock.Unlock()
					handler.Lock()
					bus.lock.Lock()
				}
				go bus.doPublishAsync(handler, topic, args...)
			}
		}
	}
}

i and handler are ranged in copyHandlers while remove operation actions in bus.handlers. ๐Ÿคจ

Basic Server Client Example not Working.

Server main.go

`package main

import (
evbus "github.com/asaskevich/EventBus"
)

func main() {
server := evbus.NewServer(":8090", "/server_bus", evbus.New())
server.Start()
// ...
server.EventBus().Publish("main:calculator", 4, 6)
}`

Client main.go

`
package main

import (
"fmt"
evbus "github.com/asaskevich/EventBus"
)

func calculator(a int, b int) {
fmt.Printf("%d\n", a+b)
}

func main() {
client := evbus.NewClient(":8089", "/_server_bus", evbus.New())
client.Start()
client.Subscribe("main:calculator", calculator, ":8090", "/server_bus")
}
`
After running the server code I try to run the client and I get this in the console ->
Server not found - runtime error: invalid memory address or nil pointer dereference

Can someone please tell me what I am doing wrong?

Stuck when trying to send response back to message Bus

Hi,
I have modified the example and wanted to send the response of the calculator back on the bus, on a different topic. I would like to chain different functions to react on events down a chain. When I run "go run" process is stuck, nothing happens.

package main
import (
    "fmt"
    "github.com/asaskevich/EventBus"
    "time"
)

var (
    bus  EventBus.Bus;
)

func calculator1(a int, b int) {
    fmt.Printf("calc1: %d\n", a + b)
    bus.Publish("print", "calc1 calculated : %d\n", a + b)
}

func printer(s string) {
	fmt.Println(s)
}

func main() {
	bus = EventBus.New();
    bus.Subscribe("calc", calculator1);
	bus.Subscribe("print", printer);
	sum := 1
    for sum < 10 {
        fmt.Println(sum)
        bus.Publish("calc", sum, sum);
        time.Sleep(1000 * time.Millisecond)
        sum += 1
    }
    bus.Unsubscribe("calc", calculator1);
    bus.Unsubscribe("print", printer);
}

Output

> go run poc3.go
1
calc1: 2
^Csignal: interrupt

I am on linux 64 bit.

Two subscribers?

As I see sources -

func (bus *EventBus) Subscribe(topic string, fn interface{}) error {
    ...
    bus.handlers[topic] = &eventHandler{
        v, false, false, false, sync.Mutex{},
    }
    ...
}

It bus support only one subscriber for each topic?
If I add second subscriber it will overwrite first.

Problems/limitations with network busses

I want to bridge two busses over the network, giving the illusion of a single bus. This isn't currently possible as it deadlocks (and also you'll end up with an infinite loop of duplicate events).

The simple program below will show the deadlock:

package main

import (
	"fmt"
	"time"

	"github.com/asaskevich/EventBus"
)

func main() {

	networkBusA := EventBus.NewNetworkBus(":2035", "/_net_bus_A")
	networkBusA.Start()

	networkBusB := EventBus.NewNetworkBus(":2030", "/_net_bus_B")
	networkBusB.Start()

	networkBusA.Subscribe("topic-A", func(a int) { fmt.Println("A handler:", a) }, ":2030", "/_net_bus_B")

	networkBusB.Subscribe("topic-A", func(a int) { fmt.Println("B handler:", a) }, ":2035", "/_net_bus_A")

	fmt.Println("Publishing on A...")
	networkBusA.EventBus().Publish("topic-A", 10)
	fmt.Println("Done.")

	time.Sleep(2 * time.Second)

	networkBusA.Stop()
	networkBusB.Stop()
}

This is similar to #25, with a slightly different cause.

  • networkBusA.EventBus().Publish("topic-A", 20) causes the networkBusA lock to be taken
  • The rpc callback for networkBusB is called to publish the event there.
  • This goes through networkBusB's normal Publish() path
    • Which will call the rpc callback for networkBusA, and now we're back trying to take the networkBusA lock again.

Avoiding the deadlock by spawning the Publish() in a goroutine shows the infinite loop (run the same program as above):

diff --git a/client.go b/client.go
index a9e9e69..831431a 100644
--- a/client.go
+++ b/client.go
@@ -116,7 +116,7 @@ type ClientService struct {
 
 // PushEvent - exported service to listening to remote events
 func (service *ClientService) PushEvent(arg *ClientArg, reply *bool) error {
-	service.client.eventBus.Publish(arg.Topic, arg.Args...)
+	go service.client.eventBus.Publish(arg.Topic, arg.Args...)
 	*reply = true
 	return nil
 }

I'm looking for any suggestions on how to fix this, or perhaps just a definition of how network busses should work (e.g. if there's multiple network clients subscribed to a bus, should events received over the network be propagated to them?)

Just avoiding sending an event back to the client that published it seems like OK behaviour, but I'm not entirely sure how to plumb that in - Publish() probably needs to gain an understanding of where a publish request came from, and handlers need to know where they are going to

unsubscribe and closures

Unsubscribe is broken with closures. This code shows the problem:

makeHandler := func(tag string) func(msg string) {
	return func(msg string) {
		fmt.Printf("%s %s\n", tag, msg)
	}
}

var bus EventBus.Bus = EventBus.New()

handler1 := makeHandler("handler1")
fmt.Printf("handler1 pointer %x\n", reflect.ValueOf(handler1).Pointer())
bus.Subscribe("foo", handler1)

handler2 := makeHandler("handler2")
fmt.Printf("handler2 pointer %x\n", reflect.ValueOf(handler2).Pointer())
bus.Subscribe("foo", handler2)

bus.Publish("foo", "A")
bus.Unsubscribe("foo", handler2)
bus.Publish("foo", "B")

Here's the output:

handler1 pointer 11ac100
handler2 pointer 11ac100
handler1 A
handler2 A
handler2 B

Note that even though we removed handler2, it still got an the B event.

What's happening is that EventBus uses reflect.ValueOf().Pointer() for Unsubscribe. However, the pointer to a function isn't enough to distinguish one closure from another. You can see the problem in the above output where it shows that the pointer values for handler1 and handler2 are the same.

SubscribeAsync is not stable currently

var (
bus = EventBus.New()
)

// Pub publishes the given interface to any listeners for that interface.
func Pub(topic string, data ...interface{}) {
bus.Publish(topic, data...)
}

// Sub subscribes to specific interfaces with the specified callback
// function.
func Sub(topic string, fn interface{}) error {
//return bus.SubscribeAsync(topic, fn, true)
return bus.Subscribe(topic,fn);
}

func Sub2(topic string, fn interface{}) error {
return bus.SubscribeAsync(topic, fn, true)
//return bus.Subscribe(topic,fn);
}

the Sub2 to subsrible event works,but when sometimes miss error,it will fault forever.

There is no release version available

Hello, Thank you for this beautifully simple and functional library. Can you please release a version so that we can use it instead of master? This helps in shielding our projects from changes in master branch.

PublishAsync

Hi,

Considering the callback func slowCalculator, the way PublishAsync works using Publish means that if multiple events are published asynchronously to the same topic, each subsequent goroutine has to wait for the earlier goroutine callback to finish before executing.

This may be ideal in some scenarios where there may be contention if the callbacks are run in two separate go routines (transactional behaviour).

However, this may not be ideal where there is no contention if a given callback runs more than once concurrently.

The idea tested for this scenario is to

  • lock
  • do all necessary the map operations
  • unlock
  • execute the handler

This will allow each subsequent goroutine callback to gain access to the lock almost instantly and allow the same callback to run multiple times concurrently.

As part of a slightly bigger change, I was thinking of a new struct to represent a handler that contains

  • the callback as reflect.Value
  • flagOnce
  • async bool, to determine whether the callback should run asynchronously
  • transactional bool, to determine whether to use the default behaviour or allow the new behaviour

Additionally:

  • PublishAsync will be unexported and Publish will be the general purpose method for publishing events.
  • SubscribeAsync will be implemented to set the behaviour for async callbacks

The downside to this approach is having 3 extra bools for each handler. The advantage is only one map will be needed.

I'm hoping for some feedback, what you think works, what doesn't or any suggestions related to this change.

Introduce bus interface instead of struct

Hello,

Could we replace structs with interfaces to facilitate testing of bus-dependent services? This would also provide an elegant way to implement the network bus: it would simply be an another implementation of the interface. I'm working on a corresponding PR.
It would be useful to tag releases as well as more and more people use dependency management tools.

Cheers,
Michal

Race conditions in async transactional handlers

I detected race conditions in SubscribeAsync with transactional flag. It is due to the fact that we lock the handler after and not before spawning the async handler goroutine. On a fast machine it causes unpredictable behavior. Preparing a PR.

Code crash when publishing in a case of server/client eventBus

Hi!
My issue is the following:
I'm currently implementing a connection between 2 different code into the same computer. So I have a server and a client.
I run the server and then the client. Then the server publish events which are read and execute by the client.
But when I stop the clients code the server code is crashing.

After test and investigations I found that the subscribers map[string][]*SubscribeArg map from the server object is not updated when the client is disconnected. If the client is not working any more the subscribers object is always the same.

So when I'm using server.HasClientSubscribed(&theArgs) its responding always true event after the disconnection of the client.

My workaround is to use:
rpcClient, _ := rpc.DialHTTPPath("tcp", ":2010", "/_client_bus_")
and check if it not nil before to make a publish

Is there a solution in order to update this map? Is there any another solution in order to avoid the crash of the code?

Best regards !

Potential deadlock due to calling callbacks while holding a lock

See golang mutexes aren't recursive / re-entrant

Bus.Publish(...) method locks mutex and calls callbacks under lock, so any access to the bus in callback will cause deadlock.

The following example reproduces issue:

package main

import (
	"fmt"

	"github.com/asaskevich/EventBus"
)

var bus EventBus.Bus

func showbug(a int, b int) {
	fmt.Printf("%d\n", a+b)

	if a == 20 {
		bus.Publish("main:calculator", a+1, b)
	}
}

func main() {
	bus = EventBus.New()
	bus.Subscribe("main:calculator", showbug)
	bus.Publish("main:calculator", 20, 40)
	bus.Unsubscribe("main:calculator", showbug)
}

We use another implementation of eventbus (inspired by yours) where this issue was fixed: github.com/ispringteam/goeventbus (see copySubscriptions method and nextID field)

Feel free to adapt our solution or introduce another one ;)

Improve testing

Most of the tests have several fail conditions that are unrecognizable (tests do not log which of the conditions failed). I suggest we use some kind of assertion lib (testify is a good pick) to improve readability.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.