Giter VIP home page Giter VIP logo

membus's Introduction

MemBus Readme

Build status

In-memory publish-subscribe bus with pluggable subscription and publishing pipeline, first-class support of Observables, contravariant messaging and resolution of handlers through pluggable IOC. Will cover your event aggregation needs and then more.

The concepts of configuration, publishing, subscribing and modifying subscriptions are all separated from each other and pluggable.

The provided way to start a MemBus instance is going through the BusSetup.

var _bus = BusSetup.StartWith<Conservative>().Construct();

Conservative is one of the already available Configurators contained in the "Configurators" namespace.

As you can see from such a setup, what MemBus needs as a minimum is a default publish pipeline, a default subscriptionresolver, and a default strategy what to do with new subscriptions.

The bus that is now set up, and is of type IBus. IBus derives from IPublisher and ISubscriber.

Basic Usage

//Now you can do something like that:
using (_bus.Subscribe((Foo x) => Console.Writeline(x.Text))) {
  _bus.Publish(new Foo("hello world"));
}

Any subscription operation you undertake returns something of type IDisposable.

Publish/Subscribe message matching is by default done on the type of the message. Please note that message subscription is contravariant. I.e. a Method accepting a message of type object, will receive all messages published on MemBus.

There are no other rules apart from the following:

  • Subscriptions match the Action<T> delegate
  • There are no other rules apart from the previous

Subscription overloads

There are three ways to subscribe handlers in MemBus

  • Through an Action - this is the straightforward way as seen in the intro
  • By observing the bus via Observe<M> - returns an observable of type M. If you take a reference on Rx-* packages, you can set up all sorts of goodness on top of it, like filtering, merging, joining, handling messages on a synchronization context, etc, etc.
  • Passing any object instance - this functionality is based on the FlexibleSubscribeAdapter and is described next.

Subscribing based on Convention.

You may notice that one Subscribe overload accepts any object. This works when you setup MemBus with the FlexibleSubscribeAdapter:

bus = BusSetup.StartWith<Conservative>()
       .Apply<FlexibleSubscribeAdapter>(a => a.RegisterMethods("Handle"))
       .Construct()

class Subscriber {
  public void Handle(Foo msg) {}
}
var disposable = _bus.Subscribe(new Subscriber());

The FlexibleSubscribeAdapter allows you to set up the convention by which subscribing methods are picked up. The configuration allows wiring up through "RegisterMethods" or "ByInterface(Type)". The interface may be generic, in that case you specify the open generic. The one rule of subscribing also applies in this scenario: Your subscriptions must match the Action<T> signature.

The IDisposable returned by the Subcribe(object) disposes of all subscriptions that were found on said object.

If your object implements IAcceptsDisposeToken, the disposable that is returned by the subscribe call will be passed into the object being subscribed. That way objects have a way to take themselves out of the messaging, e.g. when they handle a couple of messages only relevant in a limited time of your App.

Your subscribing objects can also work with IObservable. This means they can either accept IObservable instance, return an instance, or both. These methods will also be picked up and hooked into MemBus. That way you can write instances that do not have any dependency on MemBus but can

  • Receive messages
  • Send messages upon reception of one (Also multiple ones by returning something enumerable).
  • Receive Observables so you can use Rx tastiness on it
  • Publish Observables
  • The two above together for Rx-transformation tastiness.

Publishing

There isn't a lot one can say about Publishing. You may pass

  • any object instance into the "Publish" method.
  • An Observable - in this case MemBus will dispatch any observed messages to the MemBus infrastructure. If the Observable raises an exception, the ExceptionOccurred messagewill be sent. Once the observale completes, the MessageStreamCompleted message will be dispatched.
  • Publish in an awaitable fashion - in this case, any configuration with regard to handling the message will be short-circuited an an awaitable version of a SequentialPublisher will be used.

Publishing to a DI Container

One use case of using MemBus is to dispatch handling of a message to an IOC container. Given a message, the implementation of some type is looked up, instantiated by the container and the message is delivered to the handling method.

In order to do that you need to configure MemBus with the IocSupport option:

_bus = BusSetup
    .StartWith<Conservative>()
    .Apply<IoCSupport>(s => s.SetAdapter(_testAdapter).SetHandlerInterface(typeof(GimmeMsg<>)))
    .Construct();

The Adapter is some instance that needs to implement [IocAdapter][1]. Implement this interface to bridge the request to your container of choice. The interface is very straightforward:

public interface IocAdapter
{
    IEnumerable<object> GetAllInstances(Type desiredType);
}

Secondly, you declare the interface that will be requested from the IoCContainer. You need to apply the following rules with regard to the chosen interface:

  • It needs to be generic with one type argument
  • It provides a single void method with one argument. The argument type typically corresponds with the generic type argument.

Licensing

Copyright 2014 Frank-Leonardo Quednau (realfiction.net) Licensed under the Apache License, Version 2.0 (the "License"); you may not use this solution except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

membus's People

Contributors

bryant1410 avatar domasm avatar flq avatar jafin avatar jkonecki avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

membus's Issues

IoCSupport when Handles generic type is different from Message type

I have a case when my Handles interface is defined as follows:

public interface IHandleCommand<T>
    where T : ICommand
{
    void Handle(IEnvelope<T> command);
}

Notice, that interface generic type is not the same as the type of message.

Currently IoCBasedResolver fails in this method:

    private Type constructHandlesType(Type messageType)
    {
        return typeCache.GetOrAdd(messageType, msgT => _handlerType.MakeGenericType(messageType));
    }

while trying to construct the non-generic version of handlerType.

I would like to be able to extend IoCSupport to allow me to provide a callback that will be used to obtain the handler type based on message type.

I'm more than happy to provide a pull request.

Allow FlexibleSubscriberAdapter to pick-up non-void methods from interfaces

The name-based (RegisterMethods()) method subscription allows a non-void method to be picked up so that the method can return an object to be published.

It would be useful for the interface-based version (ByInterface()) to allow the same so that implementers do not have to take a dependency on IBus.

Unless there is an issue blocking this feature ?

WinRT: Provide async Publish implementation

This method would work if the Bus is set up with some sort of ConservativeWinRT setup. It would be awaitable which means that the execution returns once the synchronous pipeline has been executed until the end.

Another possiblity would be that using this method overrides the standard setup that drives execution conventionally.

When does it really make sense to use a project like MemBus vs Hangfire?

Our immediate need is background processing.
Both provide this, however management is full featured(built in UI) and deployment is simpler with Hangfire, but a service bus does seem like it could be more robust.

Aside from sending emails and post processing data. What are some pretty common examples that make using a Service Bus the right choice over http://hangfire.io/ when dealing with large web application?

Request / response option

Hi,

I have been looking at membus and it seems like it could really help me for Some software project. But i was wondering if there also is some sort of request / response message possible? So where i van publish a message and certain subscribers might answer it.

most of my programs are modular, meaning that certain modules are plain dll's which are loaded dynamically. These contain user controls etc. We now have an event system where a module can request settings and modules that are subscribed to this event will determine if the requested settings is from them and answer by giving the wanted information. So a module / plugin might need the connection string to the sql server. It will then make a request for the connection string and the main module will see this and respond with the correct answer. The current request contains a list of wanted settings, so each module that gets the event will go through the list and fill in his appropriate items.

I have been looking at RabbitMQ (which is totally different from yours of course) and they also have such a system. But i only need it in the application, so RabbitMQ give me a lot of overhead i don't want.

ArgumentException when adding large number of subscriptions

The following code will more often than not result in an ArgumentException being raised with the message "An item with the same key has already been added.":

var bus = BusSetup.StartWith<AsyncConfiguration>().Construct();

for (uint i = 0; i < 100000; i++)
{
    bus.Subscribe((int x) => Console.WriteLine(x));
}

Relevant part of the exception stack trace is

   at System.ThrowHelper.ThrowArgumentException(ExceptionResource resource)
   at System.Collections.Generic.Dictionary`2.Insert(TKey key, TValue value, Boolean add)
   at MemBus.CompositeSubscription.JustAdd(ISubscription subscription)
   at MemBus.CompositeSubscription.Add(ISubscription subscription)
   at MemBus.CompositeResolver.Add(ISubscription subscription)
   at MemBus.Subscriber.Subscribe[M](Action`1 subscription, ISubscriptionShaper customization)
   at MemBus.Subscriber.Subscribe[M](Action`1 subscription)
   at MemBus.MessageObservable`1.Subscribe(IObserver`1 observer)
   at System.ObservableExtensions.SubscribeSafe[T](IObservable`1 source, IObserver`1 observer)

My guess is that two DisposableSubscription instances have the same hashcode, resulting in a collision when attempting to add it the subscription storage:

_subscriptions.Add(disposableSub.GetHashCode(), disposableSub);

Changing the subscriptions storage to HashSet would be a possible fix as the default IEqualityComparer will check for equality (in this case reference equality) when getting multiple elements with the same hash code.

PublishAsync is not truly async

Hello flq,

Firstly I like to say that I think that your implementation is quite remarkable and very extendable, things that needed in today development world.

I’ve been playing with Membus trying it out and i think that the PublishAync method lack the full power of async await. By using it with an async action it not really "waiting" for the task result so by using Membus on a web server Membus actually consume worker threads.

The issue is in the SequentialPipeline at the LookAtAsync it calls the push method without waiting for it (and using Task.Run() which schedule tasks).

WDYT?

Add a real async publishing pipeline to MemBus

This issue takes over from #16 and plans out the necessary steps (more or less) to get real async publishing into MemBus.

Aim

  • Ability to await the handling of a published message
  • Ability to implement subscriptions that can be truly async.

TODO

  • Introduce an IAsyncSubscription interface that returns Task instead of void.
  • IDisposableAsyncSubscription- this one needs some thinking with regard to cancellation token
  • There are a number of ISubscription implementations which would need async counterparts, last but not least FilteredSubscription
  • CompositeSubscription - what makes sense here? If you have several async subscriptions, would you want to await all? Or none? Make this configurable?
  • IAsyncSubscriptionShaper for IAsyncSubscription
  • Configuration Entry points for the new pieces.

Then there are the things around Flexible subscriptions. New candidates would now be

  • Non-void methods
  • Methods that return a message -> Task should be possible

Progress will be merged into the branch async-pipeline until it's ready.

Documentation

Brilliant project. Could do with more docs please.

MemBus 2.0.2 and 3.0.1

Hi.
We have the issue running MemBus 3.0.1 and have to rollback to version 2.0.2. The issue really not clear and give us following exception:

External component has thrown an exception.
Description: An unhandled exception occurred during the execution of the current web request. Please review the stack trace for more information about the error and where it originated in the code.

Exception Details: System.Runtime.InteropServices.SEHException: External component has thrown an exception.

Source Error:
An unhandled exception was generated during the execution of the current web request. Information regarding the origin and location of the exception can be identified using the exception stack trace below.

Stack Trace:

[SEHException (0x80004005): External component has thrown an exception.]
System.Reflection.RuntimeAssembly._nLoad(AssemblyName fileName, String codeBase, Evidence assemblySecurity, RuntimeAssembly locationHint, StackCrawlMark& stackMark, IntPtr pPrivHostBinder, Boolean throwOnFileNotFound, Boolean forIntrospection, Boolean suppressSecurityChecks) +0
System.Reflection.RuntimeAssembly.InternalLoadAssemblyName(AssemblyName assemblyRef, Evidence assemblySecurity, RuntimeAssembly reqAssembly, StackCrawlMark& stackMark, IntPtr pPrivHostBinder, Boolean throwOnFileNotFound, Boolean forIntrospection, Boolean suppressSecurityChecks) +484
System.Reflection.RuntimeAssembly.InternalLoad(String assemblyString, Evidence assemblySecurity, StackCrawlMark& stackMark, IntPtr pPrivHostBinder, Boolean forIntrospection) +233
System.Reflection.RuntimeAssembly.InternalLoad(String assemblyString, Evidence assemblySecurity, StackCrawlMark& stackMark, Boolean forIntrospection) +17
System.Reflection.Assembly.Load(String assemblyString) +35
System.Web.Configuration.CompilationSection.LoadAssemblyHelper(String assemblyName, Boolean starDirective) +130

[ConfigurationErrorsException: External component has thrown an exception.]
System.Web.Configuration.CompilationSection.LoadAssemblyHelper(String assemblyName, Boolean starDirective) +1220
System.Web.Configuration.CompilationSection.LoadAllAssembliesFromAppDomainBinDirectory() +323
System.Web.Configuration.AssemblyInfo.get_AssemblyInternal() +76
System.Web.Compilation.BuildManager.GetReferencedAssemblies(CompilationSection compConfig) +342
System.Web.Compilation.BuildManager.GetPreStartInitMethodsFromReferencedAssemblies() +125
System.Web.Compilation.BuildManager.CallPreStartInitMethods(String preStartInitListPath) +154
System.Web.Compilation.BuildManager.ExecutePreAppStart() +158
System.Web.Hosting.HostingEnvironment.Initialize(ApplicationManager appManager, IApplicationHost appHost, IConfigMapPathFactory configMapPathFactory, HostingEnvironmentParameters hostingParameters, PolicyLevel policyLevel, Exception appDomainCreationException) +1037

[HttpException (0x80004005): External component has thrown an exception.]
System.Web.HttpRuntime.FirstRequestInit(HttpContext context) +674
System.Web.HttpRuntime.EnsureFirstRequestInit(HttpContext context) +159
System.Web.HttpRuntime.ProcessRequestNotificationPrivate(IIS7WorkerRequest wr, HttpContext context) +739

Minor additions to improve membus setup

  • CompositeSubscription should be public to allow it to be instantiated in other setups
  • A call to Default Publishpipeline should erase any previous default publish pipeline - verify and rectify if necessary

Allow Methods to be picked up that return IObservable<T>

Once Feature #4 is in place, it should be possible to add this feature much like the FlexibleSubscriptionAdapter only the other way round.

The end result are classes that can accept messages as well as send messages without needing a direct dependency on MemBus. It should also be considered that methods accepting an IObservable<T> get just what they want.

AsyncBlocking Configurator missing

I'd like to configure the Bus in a Async Blocking way. I see that there is already a AsyncConfigurator but is NonBlocking.

My first thought was to write a new ISetup module but I'm blocked at
setup.AddResolver(new CompositeSubscription());

CompositeSubscription() is marked as 'internal' therefore I can't use it in my code.

Is there a way I can build a AsyncBlocking configuration without changing the library code?

Self disposing subscription using weak reference

Is there a way to make the disposable subscriptions dispose themselves using a weak reference?

currently ConstructSubscriptionExtension.ConstructSubscription and ConstructSubscriptionExtension.ConstructPublishingSubscription methods build delegates like:

Action<TMessage> action => message => target.Handle(message);
Func<TMessage, TResult> func => message => return target.Handle(message);

and cause the closures to hold reference to targets

I think it might work if there were 2 other methods, lets say ConstructSubscriptionExtension.ConstructWeakSubscription and ConstructSubscriptionExtension.ConstructWeakPublishingSubscription, which built delegates:

Action<TTarget, TMessage> action => (target, message) => target.Handle(message);
Func<TTarget, TMessage, TResult> func => (target, message) => return target.Handle(message);

and then wrapped these delegates inside WeakMethodInvocation<TTarget, TMsg> and WeakPublishingInvocation<TTarget, TMsg> implemented like:

public class WeakMethodInvocation<TTarget, TMsg> : ISubscription, IKnowsSubscribedInstance {

  private readonly Action<TTarget, TMsg> action;

  private readonly WeakReference instance;

  object IKnowsSubscribedInstance.Instance {
    get {
        return this.instance.Target;
    }
  }

  public void Push(object message) {
    this.action((TTarget)this.Instance, (TMsg)message);
  }
}

And then, somewhere, a WeakSubscription would check whether the WeakReference'd target is still alive or not, and dispose the subscription if the weak reference is void.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.