flq / membus Goto Github PK
View Code? Open in Web Editor NEWPure In-Memory Publish/Subscribe .NET Bus
Pure In-Memory Publish/Subscribe .NET Bus
Hi,
I have been looking at membus and it seems like it could really help me for Some software project. But i was wondering if there also is some sort of request / response message possible? So where i van publish a message and certain subscribers might answer it.
most of my programs are modular, meaning that certain modules are plain dll's which are loaded dynamically. These contain user controls etc. We now have an event system where a module can request settings and modules that are subscribed to this event will determine if the requested settings is from them and answer by giving the wanted information. So a module / plugin might need the connection string to the sql server. It will then make a request for the connection string and the main module will see this and respond with the correct answer. The current request contains a list of wanted settings, so each module that gets the event will go through the list and fill in his appropriate items.
I have been looking at RabbitMQ (which is totally different from yours of course) and they also have such a system. But i only need it in the application, so RabbitMQ give me a lot of overhead i don't want.
This method would work if the Bus is set up with some sort of ConservativeWinRT setup. It would be awaitable which means that the execution returns once the synchronous pipeline has been executed until the end.
Another possiblity would be that using this method overrides the standard setup that drives execution conventionally.
The name-based (RegisterMethods()
) method subscription allows a non-void method to be picked up so that the method can return an object to be published.
It would be useful for the interface-based version (ByInterface()
) to allow the same so that implementers do not have to take a dependency on IBus.
Unless there is an issue blocking this feature ?
I have a case when my Handles interface is defined as follows:
public interface IHandleCommand<T>
where T : ICommand
{
void Handle(IEnvelope<T> command);
}
Notice, that interface generic type is not the same as the type of message.
Currently IoCBasedResolver fails in this method:
private Type constructHandlesType(Type messageType)
{
return typeCache.GetOrAdd(messageType, msgT => _handlerType.MakeGenericType(messageType));
}
while trying to construct the non-generic version of handlerType.
I would like to be able to extend IoCSupport to allow me to provide a callback that will be used to obtain the handler type based on message type.
I'm more than happy to provide a pull request.
Brilliant project. Could do with more docs please.
This issue takes over from #16 and plans out the necessary steps (more or less) to get real async publishing into MemBus.
IAsyncSubscription
interface that returns Task
instead of void
.IDisposableAsyncSubscription
- this one needs some thinking with regard to cancellation tokenFilteredSubscription
CompositeSubscription
- what makes sense here? If you have several async subscriptions, would you want to await all? Or none? Make this configurable?IAsyncSubscriptionShaper
for IAsyncSubscription
Then there are the things around Flexible subscriptions. New candidates would now be
Progress will be merged into the branch async-pipeline until it's ready.
Once Feature #4 is in place, it should be possible to add this feature much like the FlexibleSubscriptionAdapter only the other way round.
The end result are classes that can accept messages as well as send messages without needing a direct dependency on MemBus. It should also be considered that methods accepting an IObservable<T>
get just what they want.
This article (http://realfiction.net/go/180), although a bit old, seems to suggest there's an overload of Subscribe that accepts a lambda that can configure certain subscription options (such as UI thread dispatch). IBus doesn't actually seem to have any such overload, and Bus, which does, is marked internal.
For a particular handler, how would I configure UI thread dispatch?
The following code will more often than not result in an ArgumentException being raised with the message "An item with the same key has already been added.":
var bus = BusSetup.StartWith<AsyncConfiguration>().Construct();
for (uint i = 0; i < 100000; i++)
{
bus.Subscribe((int x) => Console.WriteLine(x));
}
Relevant part of the exception stack trace is
at System.ThrowHelper.ThrowArgumentException(ExceptionResource resource)
at System.Collections.Generic.Dictionary`2.Insert(TKey key, TValue value, Boolean add)
at MemBus.CompositeSubscription.JustAdd(ISubscription subscription)
at MemBus.CompositeSubscription.Add(ISubscription subscription)
at MemBus.CompositeResolver.Add(ISubscription subscription)
at MemBus.Subscriber.Subscribe[M](Action`1 subscription, ISubscriptionShaper customization)
at MemBus.Subscriber.Subscribe[M](Action`1 subscription)
at MemBus.MessageObservable`1.Subscribe(IObserver`1 observer)
at System.ObservableExtensions.SubscribeSafe[T](IObservable`1 source, IObserver`1 observer)
My guess is that two DisposableSubscription instances have the same hashcode, resulting in a collision when attempting to add it the subscription storage:
MemBus/MemBus/CompositeSubscription.cs
Line 99 in 8613a7e
Changing the subscriptions storage to HashSet would be a possible fix as the default IEqualityComparer will check for equality (in this case reference equality) when getting multiple elements with the same hash code.
Could you please add windows phone 8.1 support in PCL profile
Our immediate need is background processing.
Both provide this, however management is full featured(built in UI) and deployment is simpler with Hangfire, but a service bus does seem like it could be more robust.
Aside from sending emails and post processing data. What are some pretty common examples that make using a Service Bus the right choice over http://hangfire.io/ when dealing with large web application?
Is there a way to make the disposable subscriptions dispose themselves using a weak reference?
currently ConstructSubscriptionExtension.ConstructSubscription
and ConstructSubscriptionExtension.ConstructPublishingSubscription
methods build delegates like:
Action<TMessage> action => message => target.Handle(message);
Func<TMessage, TResult> func => message => return target.Handle(message);
and cause the closures to hold reference to targets
I think it might work if there were 2 other methods, lets say ConstructSubscriptionExtension.ConstructWeakSubscription
and ConstructSubscriptionExtension.ConstructWeakPublishingSubscription
, which built delegates:
Action<TTarget, TMessage> action => (target, message) => target.Handle(message);
Func<TTarget, TMessage, TResult> func => (target, message) => return target.Handle(message);
and then wrapped these delegates inside WeakMethodInvocation<TTarget, TMsg>
and WeakPublishingInvocation<TTarget, TMsg>
implemented like:
public class WeakMethodInvocation<TTarget, TMsg> : ISubscription, IKnowsSubscribedInstance {
private readonly Action<TTarget, TMsg> action;
private readonly WeakReference instance;
object IKnowsSubscribedInstance.Instance {
get {
return this.instance.Target;
}
}
public void Push(object message) {
this.action((TTarget)this.Instance, (TMsg)message);
}
}
And then, somewhere, a WeakSubscription
would check whether the WeakReference'd target is still alive or not, and dispose the subscription if the weak reference is void.
Hi.
We have the issue running MemBus 3.0.1 and have to rollback to version 2.0.2. The issue really not clear and give us following exception:
External component has thrown an exception.
Description: An unhandled exception occurred during the execution of the current web request. Please review the stack trace for more information about the error and where it originated in the code.
Exception Details: System.Runtime.InteropServices.SEHException: External component has thrown an exception.
Source Error:
An unhandled exception was generated during the execution of the current web request. Information regarding the origin and location of the exception can be identified using the exception stack trace below.
Stack Trace:
[SEHException (0x80004005): External component has thrown an exception.]
System.Reflection.RuntimeAssembly._nLoad(AssemblyName fileName, String codeBase, Evidence assemblySecurity, RuntimeAssembly locationHint, StackCrawlMark& stackMark, IntPtr pPrivHostBinder, Boolean throwOnFileNotFound, Boolean forIntrospection, Boolean suppressSecurityChecks) +0
System.Reflection.RuntimeAssembly.InternalLoadAssemblyName(AssemblyName assemblyRef, Evidence assemblySecurity, RuntimeAssembly reqAssembly, StackCrawlMark& stackMark, IntPtr pPrivHostBinder, Boolean throwOnFileNotFound, Boolean forIntrospection, Boolean suppressSecurityChecks) +484
System.Reflection.RuntimeAssembly.InternalLoad(String assemblyString, Evidence assemblySecurity, StackCrawlMark& stackMark, IntPtr pPrivHostBinder, Boolean forIntrospection) +233
System.Reflection.RuntimeAssembly.InternalLoad(String assemblyString, Evidence assemblySecurity, StackCrawlMark& stackMark, Boolean forIntrospection) +17
System.Reflection.Assembly.Load(String assemblyString) +35
System.Web.Configuration.CompilationSection.LoadAssemblyHelper(String assemblyName, Boolean starDirective) +130
[ConfigurationErrorsException: External component has thrown an exception.]
System.Web.Configuration.CompilationSection.LoadAssemblyHelper(String assemblyName, Boolean starDirective) +1220
System.Web.Configuration.CompilationSection.LoadAllAssembliesFromAppDomainBinDirectory() +323
System.Web.Configuration.AssemblyInfo.get_AssemblyInternal() +76
System.Web.Compilation.BuildManager.GetReferencedAssemblies(CompilationSection compConfig) +342
System.Web.Compilation.BuildManager.GetPreStartInitMethodsFromReferencedAssemblies() +125
System.Web.Compilation.BuildManager.CallPreStartInitMethods(String preStartInitListPath) +154
System.Web.Compilation.BuildManager.ExecutePreAppStart() +158
System.Web.Hosting.HostingEnvironment.Initialize(ApplicationManager appManager, IApplicationHost appHost, IConfigMapPathFactory configMapPathFactory, HostingEnvironmentParameters hostingParameters, PolicyLevel policyLevel, Exception appDomainCreationException) +1037
[HttpException (0x80004005): External component has thrown an exception.]
System.Web.HttpRuntime.FirstRequestInit(HttpContext context) +674
System.Web.HttpRuntime.EnsureFirstRequestInit(HttpContext context) +159
System.Web.HttpRuntime.ProcessRequestNotificationPrivate(IIS7WorkerRequest wr, HttpContext context) +739
This could be potential breaking changes, as some methods and props will go away, but probably nothing on the major interfaces
This would mean providing a Publish method that accepts an IObservable<T>
. This means that whenever that Observable generates messages, they will be fed into the MemBus infrastructure.
I'd like to configure the Bus in a Async Blocking way. I see that there is already a AsyncConfigurator but is NonBlocking.
My first thought was to write a new ISetup module but I'm blocked at
setup.AddResolver(new CompositeSubscription());
CompositeSubscription() is marked as 'internal' therefore I can't use it in my code.
Is there a way I can build a AsyncBlocking configuration without changing the library code?
Hello flq,
Firstly I like to say that I think that your implementation is quite remarkable and very extendable, things that needed in today development world.
I’ve been playing with Membus trying it out and i think that the PublishAync method lack the full power of async await. By using it with an async action it not really "waiting" for the task result so by using Membus on a web server Membus actually consume worker threads.
The issue is in the SequentialPipeline at the LookAtAsync it calls the push method without waiting for it (and using Task.Run() which schedule tasks).
WDYT?
above is the line where the async pipe becomes sync currently. It would instead need to call async subscribers and await.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.