Giter VIP home page Giter VIP logo

mediator.net's Introduction

Mediator.Net on Stack Overflow

Build status example workflow example workflow codecov

Mediator.Net

A mediator project for .NET

logo_sm

Get Packages

You can get Mediator.Net by grabbing the latest NuGet packages.

Get Started

Install the nuget package Mediator.Net

Install-Package Mediator.Net

Simple usage

// Setup a mediator builder
var mediaBuilder = new MediatorBuilder();
var mediator = mediaBuilder.RegisterHandlers(typeof(this).Assembly).Build();

Sending a command with no response

await _mediator.SendAsync(new TestBaseCommand(Guid.NewGuid()));

Sending a command with response

var pong = await _mediator.SendAsync<Ping, Pong>(new Ping());

Sending request with response

var result = await _mediator.RequestAsync<GetGuidRequest, GetGuidResponse>(new GetGuidRequest(_guid));

Publishing an event

await _mediator.Publish(new OrderPlacedEvent);

Publishing an event as the result of a command

Inside a command handler.Handle method, a IReceiveContext expose a method of Publish

public async Task Handle(IReceiveContext<DerivedTestBaseCommand> context, CancellationToken cancellationToken)
{
    // Do you work
    await context.Publish(new OrderPlacedEvent());
}

Create stream of responses

Sometimes you might want to get multiple responses by one request or command, you can do that by using the CreateStream method

// Define a StreamHandler by implementing the IStreamRequestHandler or IStreamCommandHandler interfaces for IRequest and ICommand
public class GetMultipleGuidStreamRequestHandler : IStreamRequestHandler<GetGuidRequest, GetGuidResponse>
{
    public async IAsyncEnumerable<GetGuidResponse> Handle(IReceiveContext<GetGuidRequest> context, [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        for (var i = 0; i < 5; i++)
        {
            await Task.Delay(100, cancellationToken);
            yield return await Task.FromResult(new GetGuidResponse(Guid.NewGuid() ){Index = i});
        }
    }
}

// You can now get multiple responses back by using this
IAsyncEnumerable<GetGuiResponse> result = mediator.CreateStream<GetGuidRequest, GetGuidResponse>(new GetGuidRequest(_guid));

await foreach (var r in result)
{
  Console.WriteLine(r.Id.ToString());
}

How about EventHandler? What would be the use cases of a stream of events? So it is currently not supported

How about middleware? You can use middleware as normal, keep in mind that middleware will only get invoked once for each IRequest or ICommand thought that multiple responses might return

Handling message from handler

Once a message is sent, it will reach its handlers, you can only have one handler for ICommand and IRequest and can have multi handlers for IEvent. ReceiveContext will be delivered to the handler.

class TestBaseCommandHandler : ICommandHandler<TestBaseCommand>
{
    public Task Handle(ReceiveContext<TestBaseCommand> context)
    {
        Console.WriteLine(context.Message.Id);
        return Task.FromResult(0);
    }
}

// Or in async
class AsyncTestBaseCommandHandler : ICommandHandler<TestBaseCommand>
{
    public async Task Handle(ReceiveContext<TestBaseCommand> context)
    {
        Console.WriteLine(context.Message.Id);
        await Task.FromResult(0);
    }
}

Handler Registration

Handlers explicit registration

var mediator = builder.RegisterHandlers(() =>
{
    var binding = new List<MessageBinding>
    {
        new MessageBinding(typeof(TestBaseCommand), typeof(TestBaseCommandHandler)),
        new MessageBinding(typeof(DerivedTestBaseCommand), typeof(DerivedTestBaseCommandHandler))
    };
    return binding;
}).Build();

Scan registration

var mediaBuilder = new MediatorBuilder();
var mediator = mediaBuilder.RegisterHandlers(typeof(this).Assembly).Build();

Using pipelines

There are 5 different type of pipelines you can use image

GlobalReceivePipeline

This pipeline will be triggered whenever a message is sent, published or requested before it reaches the next pipeline and handler

CommandReceivePipeline

This pipeline will be triggered just after the GlobalReceivePipeline and before it reaches its command handler, this pipeline will only be used for ICommand

EventReceivePipeline

This pipeline will be triggered just after the GlobalReceivePipeline and before it reaches its event handler/handlers, this pipeline will only be used for IEvent

RequestReceivePipeline

This pipeline will be triggered just after the GlobalReceivePipeline and before it reaches its request handler, this pipeline will only be used for IRequest

PublishPipeline

This pipeline will be triggered when an IEvent is published inside your handler, this pipeline will only be used for IEvent and is usually being used as outgoing interceptor

Setting up middlewares

The most powerful thing for the pipelines above is you can add as many middlewares as you want. Follow the following steps to setup a middleware

  • Add a static class for your middleware
  • Add a public static extension method in that class you just added, usually follow the UseXxxx naming convention
  • Add another class for your middleware's specification, note that this is the implementation of your middleware

You might need some dependencies in your middleware, there are two ways to do it

  • Pass them in explicitly
  • Let the IoC container to resolve it for you (if you are using IoC)

Here is a sample middleware

Middleware class

public static class SerilogMiddleware
{
    public static void UseSerilog<TContext>(this IPipeConfigurator<TContext> configurator, LogEventLevel logAsLevel, ILogger logger = null)
        where TContext : IContext<IMessage>
    {
        if (logger == null && configurator.DependencyScope == null)
        {
            throw new DependencyScopeNotConfiguredException($"{nameof(ILogger)} is not provided and IDependencyScope is not configured, Please ensure {nameof(ILogger)} is registered properly if you are using IoC container, otherwise please pass {nameof(ILogger)} as parameter");
        }
        logger = logger ?? configurator.DependencyScope.Resolve<ILogger>();

        configurator.AddPipeSpecification(new SerilogMiddlewareSpecification<TContext>(logger, logAsLevel));
    }
}

Specification class

class SerilogMiddlewareSpecification<TContext> : IPipeSpecification<TContext> where TContext : IContext<IMessage>
    {
        private readonly ILogger _logger;
        private readonly Func<bool> _shouldExecute;
        private readonly LogEventLevel _level;

        public SerilogMiddlewareSpecification(ILogger logger, LogEventLevel level, Func<bool> shouldExecute )
        {
            _logger = logger;
            _level = level;
            _shouldExecute = shouldExecute;
        }
        public bool ShouldExecute(TContext context, CancellationToken cancellationToken)
        {
            if (_shouldExecute == null)
            {
                return true;
            }
            return _shouldExecute.Invoke();
        }

        public Task BeforeExecute(TContext context, CancellationToken cancellationToken)
        {
            return Task.FromResult(0);
        }

        public Task Execute(TContext context, CancellationToken cancellationToken)
        {
            if (ShouldExecute(context, cancellationToken))
            {
                switch (_level)
                {
                    case LogEventLevel.Error:
                        _logger.Error("Receive message {@Message}", context.Message);
                        break;
                    case LogEventLevel.Debug:
                        _logger.Debug("Receive message {@Message}", context.Message);
                        break;
                    case LogEventLevel.Fatal:
                        _logger.Fatal("Receive message {@Message}", context.Message);
                        break;
                    case LogEventLevel.Information:
                        _logger.Information("Receive message {@Message}", context.Message);
                        break;
                    case LogEventLevel.Verbose:
                        _logger.Verbose("Receive message {@Message}", context.Message);
                        break;
                    case LogEventLevel.Warning:
                        _logger.Verbose("Receive message {@Message}", context.Message);
                        break;
                    default:
                        throw new ArgumentOutOfRangeException();
                }
            }
            return Task.FromResult(0);
        }

        public Task AfterExecute(TContext context, CancellationToken cancellationToken)
        {
            return Task.FromResult(0);
        }

        public void OnException(Exception ex, TContext context)
        {
            throw ex;
        }
    }

To hook up middlewares into pipelines

var builder = new MediatorBuilder();
_mediator = builder.RegisterHandlers(() =>
    {
        return new List<MessageBinding>()
        {
            new MessageBinding(typeof(TestBaseCommand), typeof(TestBaseCommandHandlerRaiseEvent)),
            new MessageBinding(typeof(TestEvent), typeof(TestEventHandler)),
            new MessageBinding(typeof(GetGuidRequest), typeof(GetGuidRequestHandler))
        };
    })
    .ConfigureGlobalReceivePipe(x =>
    {
        x.UseDummySave();
    })
    .ConfigureCommandReceivePipe(x =>
    {
        x.UseConsoleLogger1();
    })
    .ConfigureEventReceivePipe(x =>
    {
        x.UseConsoleLogger2();
    })
    .ConfigureRequestPipe(x =>
    {
        x.UseConsoleLogger3();
    })
    .ConfigurePublishPipe(x =>
    {
        x.UseConsoleLogger4();
    })
.Build();

ReceiveContext in Handlers

As you might already noticed, mediator will deliver ReceiveContext to the handler and it has a property Message which is the original message sent, in some cases you might have one event being handled in multiple handlers and you might want to share something between, ReceiveContext would is good place that to register your service or instance. For example you can make a middleware and register the service from there.

Register DummyTransaction from middleware

public class SimpleMiddlewareSpecification<TContext> : IPipeSpecification<TContext>
    where TContext : IContext<IMessage>
{
    public bool ShouldExecute(TContext context)
    {
        return true;
    }

    public Task BeforeExecute(TContext context)
    {
        return Task.FromResult(0);
    }

    public Task Execute(TContext context)
    {
        if (ShouldExecute(context))
        {
            context.RegisterService(new DummyTransaction());
        }
        return Task.FromResult(0);
    }

    public Task AfterExecute(TContext context)
    {
        return Task.FromResult(0);
    }
}

Get the DummyTransaction registered in the middleware from the handler

public Task Handle(ReceiveContext<SimpleCommand> context)
{
    _simpleService.DoWork();
    if (context.TryGetService(out DummyTransaction transaction))
    {
        transaction.Commit();
    }
    return Task.FromResult(0);
}

Using dependency injection(IoC) frameworks

Autofac

Install the nuget package Mediator.Net.Autofac

Install-Package Mediator.Net.Autofac

An extension method RegisterMediator for ContainerBuilder from Autofac is used to register the builder

The super simple use case

var mediaBuilder = new MediatorBuilder();
mediaBuilder.RegisterHandlers(typeof(TestContainer).Assembly);
var containerBuilder = new ContainerBuilder();
containerBuilder.RegisterMediator(mediaBuilder);
 _container = containerBuilder.Build();

You can also setup middlewares for each pipe before register it

var mediaBuilder = new MediatorBuilder();
mediaBuilder.RegisterHandlers(typeof(TestContainer).Assembly)
    .ConfigureCommandReceivePipe(x =>
    {
        x.UseSimpleMiddleware();
    });
var containerBuilder = new ContainerBuilder();
containerBuilder.RegisterMediator(mediaBuilder);
_container = containerBuilder.Build();

StructureMap

Install-Package Mediator.Net.StructureMap

Setup an IContainer and do your normal registration, then pass it along with the MediatorBuilder to the StructureMapExtensions class to register Mediator.Net

var mediaBuilder = new MediatorBuilder();
mediaBuilder.RegisterHandlers(TestUtilAssembly.Assembly)
    .ConfigureCommandReceivePipe(x =>
    {
        x.UseSimpleMiddleware();
    });
_container = new Container();
_container.Configure(x =>
{
    // Do your thing
});
StructureMapExtensions.Configure(mediaBuilder, _container);

Unity

Install-Package Mediator.Net.Unity

Setup an IUnityContainer and do your normal registration, then pass it along with the MediatorBuilder to the UnityExtensions class to register Mediator.Net

var mediaBuilder = new MediatorBuilder();
var mediaBuilder = new MediatorBuilder();
mediaBuilder.RegisterHandlers(TestUtilAssembly.Assembly)
    .ConfigureCommandReceivePipe(x =>
    {
        x.UseSimpleMiddleware();
    });
_container = new UnityContainer();
_container.RegisterType<SimpleService>();
_container.RegisterType<AnotherSimpleService>();

UnityExtensions.Configure(mediaBuilder, _container);

SimpleInjector

Install-Package Mediator.Net.SimpleInjector

We have created a helper class InjectHelper to register all necessary components for Mediator.Net

var mediaBuilder = new MediatorBuilder();
mediaBuilder.RegisterHandlers(TestUtilAssembly.Assembly)
    .ConfigureCommandReceivePipe(x =>
    {
        x.UseSimpleMiddleware();
    });
_container = new Container();
_container.Options.DefaultScopedLifestyle = new LifetimeScopeLifestyle();
_container.Register<SimpleService>();
_container.Register<AnotherSimpleService>();

InjectHelper.RegisterMediator(_container, mediaBuilder);

Thought that you can have transient registration for IMediator, but we recommend to use lifetime scope, you can do constructor injection as well as the following

using (var scope = _container.BeginLifetimeScope())
{
    _mediator = scope.GetInstance<IMediator>();
    _task = _mediator.RequestAsync<SimpleRequest, SimpleResponse>(new SimpleRequest());
}

Middlewares

One of the key feature for Mediator.Net is you can plug as many middlewares as you like, we have implemented some common one as below

Mediator.Net.Middlewares.UnitOfWork

Install-Package Mediator.Net.Middlewares.UnitOfWork

This middleware provide a CommittableTransaction inside the context, handlers can enlist the transaction if it requires UnitOfWork Mediator.Net.Middlewares.UnitOfWork - Middleware for Mediator.Net to support unit of work.

Mediator.Net.Middlewares.Serilog

Install-Package Mediator.Net.Middlewares.Serilog

This middleware logs every message by using Serilog

Mediator.Net.Middlewares.EventStore

Install-Package Mediator.Net.Middlewares.EventStore

Middleware for Mediator.Net to write events to GetEventStore, it is a Middleware for Mediator.Net that plugs into the publish pipeline Mediator.Net.Middlewares.UnitOfWork - Middleware for Mediator.Net to persist event to EventStore.

mediator.net's People

Contributors

mayuanyang avatar nikitasavinov avatar osoftware avatar ppxd avatar simoncropp avatar stoyandimov avatar timothymakkison avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mediator.net's Issues

Few questions about Mediator.NET

Hi Again,

I'm using your mediator in two projects which are live and so far it has performed well. both the projects are web based. So kudos to you on a very clean and smooth implementation of CQRS.

I'm planning to use it in a much larger system but have a few questions which are not clear to me.

  1. What is the limit of handlers if any? In the system I'm estimating it to cross well over 800. In the projects that I have used it the number has reached to 270 max and so far no issues.

  2. Can we have a dedicated thread pool for the handlers? off course that requires a custom scheduler. I'm thinking of assigning priorities to the commands, queries and with dedicated threads based on priorities of the handlers?

  3. When delving into the code for SimpleInjector.Mediator I noticed that mediator is registered as scoped. As dotnet core apps are mostly long running apps. I'm not clear if it makes sense to register the mediator instance as scoped, isn't it better to have it as singleton?

thanks in advance

Registering handlers that handles multiple messages

Hi,

I tried to register a command handler that implements multiple ICommandHandlers, then to discover that only the first ICommand was binded to the Handler. I saw your implementation in MediatorBuilder.RegisterHandlers(...) and it explicitly binds just the first interface.

Is that by design? I mean, is there any problem having Handlers with multiple ICommandHandler implementations? And finally, if there's not, do you consider allowing it?

Thanks!

calling an async method in Request handler crashes the bus

Consider the following code:

 public async Task<TResponse> Handle(ReceiveContext<TRequest> context, CancellationToken token)
        {
            var request = context.Message as TRequest;
            var response = Activator.CreateInstance<TResponse>();

            Raise<NullReferenceException>.If(response == null, $"Could not create an instance of {typeof(TResponse).Name}");

            if (request.IsQueryValid(token, out var errors))
            {
                if (_cnn.State != ConnectionState.Open)
                    _cnn.Open();

                Builder.ApplyLimitingAndSorting(request);
                await Inquire(request, response, token);
                
                if (_cnn?.State == ConnectionState.Open)
                {
                    _cnn.Close();
                    _cnn.Dispose();
                }
            }
            else
                Raise<ApplicationException>.If(true, string.Join(", ", errors.Select(x => x.ErrorMessage)));

            return response;
        }

It crashes the mediator with the following error:
14-Mar-2019 16:21:23.35||ERROR|System.NullReferenceException: Object reference not set to an instance of an object.
at Mediator.Net.Pipeline.EmptyPipeSpecification1.OnException(Exception ex, TContext context) at Mediator.Net.Pipeline.GlobalReceivePipe1.Connect(TContext context, CancellationToken cancellationToken)
at Mediator.Net.Mediator.SendMessage[TMessage](TMessage msg, CancellationToken cancellationToken)
at Mediator.Net.Mediator.RequestAsync[TRequest,TResponse](TRequest request, CancellationToken cancellationToken)

However when I make changes like this:

public Task<TResponse> Handle(ReceiveContext<TRequest> context, CancellationToken token)
        {
            var request = context.Message as TRequest;
            var response = Activator.CreateInstance<TResponse>();

            Raise<NullReferenceException>.If(response == null, $"Could not create an instance of {typeof(TResponse).Name}");

            if (request.IsQueryValid(token, out var errors))
            {
                if (_cnn.State != ConnectionState.Open)
                    _cnn.Open();

                Builder.ApplyLimitingAndSorting(request);
                Inquire(request, response, token).GetAwaiter();
                
                if (_cnn?.State == ConnectionState.Open)
                {
                    _cnn.Close();
                    _cnn.Dispose();
                }
            }
            else
                Raise<ApplicationException>.If(true, string.Join(", ", errors.Select(x => x.ErrorMessage)));

            return Task.FromResult(response);
        }

it runs perfectly fine.
Aren't we supposed to call async methods ?

dll nuget not signet

I would like to be able to use the dll but since my project is signed when compiling it gives an error publickeytoken = null.

Would it be possible?

Thanks!!

async/await broken in ASP.NET projects

I'm using Mediator.Net with Autofac in an ASP.NET WebApi2 project. I've found out the IRequestHandler is not working properly when it's an async method with awaits.

I was able to reproduce the issue on a blank new WebApi project with the following code:

    public class DefaultController : ApiController
    {
        private readonly IMediator m;
        public DefaultController(IMediator m) => this.m = m;

        [Route("ping"), HttpGet]
        public async Task<IHttpActionResult> Ping() 
            => Ok(await m.RequestAsync<Ping, Pong>(new Ping()));

        [Route("hang"), HttpGet]
        public async Task<IHttpActionResult> Ping2()
            => Ok(await m.RequestAsync<Ping2, Pong2>(new Ping2()));
    }

    public class Ping : IRequest { }
    public class Ping2 : IRequest { }
    public class Pong : IResponse { }
    public class Pong2 : IResponse { }

    public class WorkingHandler : IRequestHandler<Ping, Pong>
    {
        public Task<Pong> Handle(ReceiveContext<Ping> context)
        {
            return Task.FromResult(new Pong());
        }
    }

    public class BrokenHandler : IRequestHandler<Ping2, Pong2>
    {
        public async Task<Pong2> Handle(ReceiveContext<Ping2> context)
        {
            await Task.Yield(); // Or whatever awaitable
            return new Pong2(); // Never reached
        }
    }

The Configuration is as folllows:

    public static class WebApiConfig
    {
        public static void Register(HttpConfiguration config)
        {
            config.MapHttpAttributeRoutes();

            var b = new ContainerBuilder();
            b.RegisterApiControllers(Assembly.GetExecutingAssembly());
            b.RegisterMediator(new MediatorBuilder().RegisterHandlers(Assembly.GetExecutingAssembly()));
            var c = b.Build();

            config.DependencyResolver = new AutofacWebApiDependencyResolver(c);
        }
    }

The /ping endpoint works as expected. The /hang endpoint never terminates.

I was not able to reproduce the issue in unit tests. It only happens in ASP.NET.

My first guess is that it's a deadlock caused by a blocking call of Result in RequestPipe.cs which interfered with the way IIS is handling tasks.

IPipeSpecification AfterExecute能否支持修改TContext的Result的实体


xxxcontroller.cs:

await mediator.RequestAsync<ShouldUnifyResponseCommand, UnifyResponse>(new ShouldUnifyResponseCommand());

xxxSpecification.cs:
public class xxxSpecification : IPipeSpecification
{
public Task AfterExecute(TContext context, CancellationToken cancellationToken)
{
var data = context.Result;
var unifiedTypeInstance = (Messages.Global.UnifyResponse)Activator.CreateInstance(typeof(Messages.Global.UnifyResponse));
unifiedTypeInstance.Data = data;
}
}

ShouldUnifyResponseCommandHandler.cs:
public class ShouldUnifyResponseCommandHandler : ICommandHandler<ShouldUnifyResponseCommand, ShouldUnifyResponse>
{
private readonly ITestService _service;

public ShouldUnifyResponseCommandHandler(ITestService service)
{
    _service = service;
}

public async Task<ShouldUnifyResponse> Handle(IReceiveContext<ShouldUnifyResponseCommand> context,
    CancellationToken cancellationToken)
{
    return await _service.ShouldUnifyResponse(context.Message);
}

}


如上 Handler使用的ShouldUnifyResponse , 最后通过pipe返回UnifyResponse

Request response result can be handled by middleware

we added the ability for SendAsync<MyCommand, MyResponse> be able to handle result in middleware, a common usage is to UnifyResponse before we send it back to the client

We need the same ability in RequestAsync<MyRequest, MyResponse>

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.