Giter VIP home page Giter VIP logo

membus's Issues

Request / response option

Hi,

I have been looking at membus and it seems like it could really help me for Some software project. But i was wondering if there also is some sort of request / response message possible? So where i van publish a message and certain subscribers might answer it.

most of my programs are modular, meaning that certain modules are plain dll's which are loaded dynamically. These contain user controls etc. We now have an event system where a module can request settings and modules that are subscribed to this event will determine if the requested settings is from them and answer by giving the wanted information. So a module / plugin might need the connection string to the sql server. It will then make a request for the connection string and the main module will see this and respond with the correct answer. The current request contains a list of wanted settings, so each module that gets the event will go through the list and fill in his appropriate items.

I have been looking at RabbitMQ (which is totally different from yours of course) and they also have such a system. But i only need it in the application, so RabbitMQ give me a lot of overhead i don't want.

WinRT: Provide async Publish implementation

This method would work if the Bus is set up with some sort of ConservativeWinRT setup. It would be awaitable which means that the execution returns once the synchronous pipeline has been executed until the end.

Another possiblity would be that using this method overrides the standard setup that drives execution conventionally.

Allow FlexibleSubscriberAdapter to pick-up non-void methods from interfaces

The name-based (RegisterMethods()) method subscription allows a non-void method to be picked up so that the method can return an object to be published.

It would be useful for the interface-based version (ByInterface()) to allow the same so that implementers do not have to take a dependency on IBus.

Unless there is an issue blocking this feature ?

IoCSupport when Handles generic type is different from Message type

I have a case when my Handles interface is defined as follows:

public interface IHandleCommand<T>
    where T : ICommand
{
    void Handle(IEnvelope<T> command);
}

Notice, that interface generic type is not the same as the type of message.

Currently IoCBasedResolver fails in this method:

    private Type constructHandlesType(Type messageType)
    {
        return typeCache.GetOrAdd(messageType, msgT => _handlerType.MakeGenericType(messageType));
    }

while trying to construct the non-generic version of handlerType.

I would like to be able to extend IoCSupport to allow me to provide a callback that will be used to obtain the handler type based on message type.

I'm more than happy to provide a pull request.

Documentation

Brilliant project. Could do with more docs please.

Add a real async publishing pipeline to MemBus

This issue takes over from #16 and plans out the necessary steps (more or less) to get real async publishing into MemBus.

Aim

  • Ability to await the handling of a published message
  • Ability to implement subscriptions that can be truly async.

TODO

  • Introduce an IAsyncSubscription interface that returns Task instead of void.
  • IDisposableAsyncSubscription- this one needs some thinking with regard to cancellation token
  • There are a number of ISubscription implementations which would need async counterparts, last but not least FilteredSubscription
  • CompositeSubscription - what makes sense here? If you have several async subscriptions, would you want to await all? Or none? Make this configurable?
  • IAsyncSubscriptionShaper for IAsyncSubscription
  • Configuration Entry points for the new pieces.

Then there are the things around Flexible subscriptions. New candidates would now be

  • Non-void methods
  • Methods that return a message -> Task should be possible

Progress will be merged into the branch async-pipeline until it's ready.

Allow Methods to be picked up that return IObservable<T>

Once Feature #4 is in place, it should be possible to add this feature much like the FlexibleSubscriptionAdapter only the other way round.

The end result are classes that can accept messages as well as send messages without needing a direct dependency on MemBus. It should also be considered that methods accepting an IObservable<T> get just what they want.

ArgumentException when adding large number of subscriptions

The following code will more often than not result in an ArgumentException being raised with the message "An item with the same key has already been added.":

var bus = BusSetup.StartWith<AsyncConfiguration>().Construct();

for (uint i = 0; i < 100000; i++)
{
    bus.Subscribe((int x) => Console.WriteLine(x));
}

Relevant part of the exception stack trace is

   at System.ThrowHelper.ThrowArgumentException(ExceptionResource resource)
   at System.Collections.Generic.Dictionary`2.Insert(TKey key, TValue value, Boolean add)
   at MemBus.CompositeSubscription.JustAdd(ISubscription subscription)
   at MemBus.CompositeSubscription.Add(ISubscription subscription)
   at MemBus.CompositeResolver.Add(ISubscription subscription)
   at MemBus.Subscriber.Subscribe[M](Action`1 subscription, ISubscriptionShaper customization)
   at MemBus.Subscriber.Subscribe[M](Action`1 subscription)
   at MemBus.MessageObservable`1.Subscribe(IObserver`1 observer)
   at System.ObservableExtensions.SubscribeSafe[T](IObservable`1 source, IObserver`1 observer)

My guess is that two DisposableSubscription instances have the same hashcode, resulting in a collision when attempting to add it the subscription storage:

_subscriptions.Add(disposableSub.GetHashCode(), disposableSub);

Changing the subscriptions storage to HashSet would be a possible fix as the default IEqualityComparer will check for equality (in this case reference equality) when getting multiple elements with the same hash code.

When does it really make sense to use a project like MemBus vs Hangfire?

Our immediate need is background processing.
Both provide this, however management is full featured(built in UI) and deployment is simpler with Hangfire, but a service bus does seem like it could be more robust.

Aside from sending emails and post processing data. What are some pretty common examples that make using a Service Bus the right choice over http://hangfire.io/ when dealing with large web application?

Self disposing subscription using weak reference

Is there a way to make the disposable subscriptions dispose themselves using a weak reference?

currently ConstructSubscriptionExtension.ConstructSubscription and ConstructSubscriptionExtension.ConstructPublishingSubscription methods build delegates like:

Action<TMessage> action => message => target.Handle(message);
Func<TMessage, TResult> func => message => return target.Handle(message);

and cause the closures to hold reference to targets

I think it might work if there were 2 other methods, lets say ConstructSubscriptionExtension.ConstructWeakSubscription and ConstructSubscriptionExtension.ConstructWeakPublishingSubscription, which built delegates:

Action<TTarget, TMessage> action => (target, message) => target.Handle(message);
Func<TTarget, TMessage, TResult> func => (target, message) => return target.Handle(message);

and then wrapped these delegates inside WeakMethodInvocation<TTarget, TMsg> and WeakPublishingInvocation<TTarget, TMsg> implemented like:

public class WeakMethodInvocation<TTarget, TMsg> : ISubscription, IKnowsSubscribedInstance {

  private readonly Action<TTarget, TMsg> action;

  private readonly WeakReference instance;

  object IKnowsSubscribedInstance.Instance {
    get {
        return this.instance.Target;
    }
  }

  public void Push(object message) {
    this.action((TTarget)this.Instance, (TMsg)message);
  }
}

And then, somewhere, a WeakSubscription would check whether the WeakReference'd target is still alive or not, and dispose the subscription if the weak reference is void.

MemBus 2.0.2 and 3.0.1

Hi.
We have the issue running MemBus 3.0.1 and have to rollback to version 2.0.2. The issue really not clear and give us following exception:

External component has thrown an exception.
Description: An unhandled exception occurred during the execution of the current web request. Please review the stack trace for more information about the error and where it originated in the code.

Exception Details: System.Runtime.InteropServices.SEHException: External component has thrown an exception.

Source Error:
An unhandled exception was generated during the execution of the current web request. Information regarding the origin and location of the exception can be identified using the exception stack trace below.

Stack Trace:

[SEHException (0x80004005): External component has thrown an exception.]
System.Reflection.RuntimeAssembly._nLoad(AssemblyName fileName, String codeBase, Evidence assemblySecurity, RuntimeAssembly locationHint, StackCrawlMark& stackMark, IntPtr pPrivHostBinder, Boolean throwOnFileNotFound, Boolean forIntrospection, Boolean suppressSecurityChecks) +0
System.Reflection.RuntimeAssembly.InternalLoadAssemblyName(AssemblyName assemblyRef, Evidence assemblySecurity, RuntimeAssembly reqAssembly, StackCrawlMark& stackMark, IntPtr pPrivHostBinder, Boolean throwOnFileNotFound, Boolean forIntrospection, Boolean suppressSecurityChecks) +484
System.Reflection.RuntimeAssembly.InternalLoad(String assemblyString, Evidence assemblySecurity, StackCrawlMark& stackMark, IntPtr pPrivHostBinder, Boolean forIntrospection) +233
System.Reflection.RuntimeAssembly.InternalLoad(String assemblyString, Evidence assemblySecurity, StackCrawlMark& stackMark, Boolean forIntrospection) +17
System.Reflection.Assembly.Load(String assemblyString) +35
System.Web.Configuration.CompilationSection.LoadAssemblyHelper(String assemblyName, Boolean starDirective) +130

[ConfigurationErrorsException: External component has thrown an exception.]
System.Web.Configuration.CompilationSection.LoadAssemblyHelper(String assemblyName, Boolean starDirective) +1220
System.Web.Configuration.CompilationSection.LoadAllAssembliesFromAppDomainBinDirectory() +323
System.Web.Configuration.AssemblyInfo.get_AssemblyInternal() +76
System.Web.Compilation.BuildManager.GetReferencedAssemblies(CompilationSection compConfig) +342
System.Web.Compilation.BuildManager.GetPreStartInitMethodsFromReferencedAssemblies() +125
System.Web.Compilation.BuildManager.CallPreStartInitMethods(String preStartInitListPath) +154
System.Web.Compilation.BuildManager.ExecutePreAppStart() +158
System.Web.Hosting.HostingEnvironment.Initialize(ApplicationManager appManager, IApplicationHost appHost, IConfigMapPathFactory configMapPathFactory, HostingEnvironmentParameters hostingParameters, PolicyLevel policyLevel, Exception appDomainCreationException) +1037

[HttpException (0x80004005): External component has thrown an exception.]
System.Web.HttpRuntime.FirstRequestInit(HttpContext context) +674
System.Web.HttpRuntime.EnsureFirstRequestInit(HttpContext context) +159
System.Web.HttpRuntime.ProcessRequestNotificationPrivate(IIS7WorkerRequest wr, HttpContext context) +739

AsyncBlocking Configurator missing

I'd like to configure the Bus in a Async Blocking way. I see that there is already a AsyncConfigurator but is NonBlocking.

My first thought was to write a new ISetup module but I'm blocked at
setup.AddResolver(new CompositeSubscription());

CompositeSubscription() is marked as 'internal' therefore I can't use it in my code.

Is there a way I can build a AsyncBlocking configuration without changing the library code?

Minor additions to improve membus setup

  • CompositeSubscription should be public to allow it to be instantiated in other setups
  • A call to Default Publishpipeline should erase any previous default publish pipeline - verify and rectify if necessary

PublishAsync is not truly async

Hello flq,

Firstly I like to say that I think that your implementation is quite remarkable and very extendable, things that needed in today development world.

I’ve been playing with Membus trying it out and i think that the PublishAync method lack the full power of async await. By using it with an async action it not really "waiting" for the task result so by using Membus on a web server Membus actually consume worker threads.

The issue is in the SequentialPipeline at the LookAtAsync it calls the push method without waiting for it (and using Task.Run() which schedule tasks).

WDYT?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.