cysharp / messagepipe Goto Github PK
View Code? Open in Web Editor NEWHigh performance in-memory/distributed messaging pipeline for .NET and Unity.
License: MIT License
High performance in-memory/distributed messaging pipeline for .NET and Unity.
License: MIT License
AsTransient
, IPublisher
does not connect to ISubscriber
because the instances (including MessageBrokerCore
) are all different.AsSingle
is used to guarantee that the ResultType instance (e.g. MessageBroker
) is single. However the ResultType classes are hided from MessagePipe users, so AsSingle
need not to be used.AsSingle
, "Zenject Identifier" feature can't be used because AsSignle
are applied even if the ResultTypes are bound with different identifiers.So, I think that Zenject scopes should be fixed as "AsCached".
I also make diff to fix the scope.
I would appreciate if you could give me your opinion.
Hello
after I installed the packages via the Git links provided in the Unity Section Unity throws some errors about not beeing able to resolve types. Following Errors occur:
Library\PackageCache\com.cysharp.messagepipe.vcontainer@c395f76\Runtime\ContainerBuilderExtensions.cs(161,16): error CS0246: The type or namespace name 'Lifetime' could not be found (are you missing a using directive or an assembly reference?)
Library\PackageCache\com.cysharp.messagepipe.vcontainer@c395f76\Runtime\TypeProxy.cs(9,18): error CS0246: The type or namespace name 'IContainerBuilder' could not be found (are you missing a using directive or an assembly reference?)
Library\PackageCache\com.cysharp.messagepipe.vcontainer@c395f76\Runtime\TypeProxy.cs(64,10): error CS0246: The type or namespace name 'PreserveAttribute' could not be found (are you missing a using directive or an assembly reference?)
Library\PackageCache\com.cysharp.messagepipe.vcontainer@c395f76\Runtime\TypeProxy.cs(64,10): error CS0246: The type or namespace name 'Preserve' could not be found (are you missing a using directive or an assembly reference?)
Library\PackageCache\com.cysharp.messagepipe.vcontainer@c395f76\Runtime\TypeProxy.cs(62,9): error CS0246: The type or namespace name 'IObjectResolver' could not be found (are you missing a using directive or an assembly reference?)
Library\PackageCache\com.cysharp.messagepipe.vcontainer@c395f76\Runtime\TypeProxy.cs(3,7): error CS0246: The type or namespace name 'VContainer' could not be found (are you missing a using directive or an assembly reference?)
Is there a way to resolve this issues without changing the packages code?
I have tried to regenerate the sln and .csproj but that did not work out. There are in total 33 issues with types.
I am using Unity 2021.3.10f1
Maybe(/probably) I'm missing something but I'm not quite sure why IDistributedPub/Sub
forces me to specify a key?
I'm using this with named pipes for direct IPC between a windowless backgound service application and a desktop agent, both running on the same machine. Currently, I'm just passing the pipe name as the key but I really don't need it for anything...
Could a keyless IDistributedPub/Sub<T>
be added or are there any arguments against it?
Hi, we have tried to use the distributed publisher and subscriber in Unity for sending and receiving events between different instances of an application through the network. We tried to follow the example that uses MessagePack.Redis and StackExchange.Redis, but we had several issues when we imported all the dll dependencies in Unity.
We have tried to start from an empty project. We imported the Unity versions of MessagePipe and MessagePack, and also imported the scripts from MessagePipe.Redis raw into the project (changing ValueTask to UniTask) and it compiles. But we cannot figure out how to call AddMessagePipeRedis extension method from the Zenject installer.
I'd want to ask if there is somewhere a Unity package version for the MessagePack.Redis that we missed out, or some example or documentation that we can follow in order to make it work.
Hello, I'm planning to use MessagePipe in order to Communicate between MagicOnion Hub & MagicOnion Service.
But I wonder that what would be different between redis pub&sub and MessagePipe pub&sub?
I assume pipes load would be on backend servers instead of redis.. Isnt it?
Is there else?
Thank you
Hello Team,
This is a wonderful project. I would like to know, does this library work with Godot? If yes could you please give share with me the instructions to set up?
thanks in advance for the support.
I am unable to load the debug symbols for MessagePipe
using the default NuGet and Microsoft symbol server
* Searching for 'MessagePipe.pdb' on the configured symbol servers........
Loaded '/home/<omitted>/Workspace/Playground/MMPack/MMPack.Client/bin/Debug/net7.0/MessagePipe.dll'. Cannot find or open the PDB file.
* Searching for 'MessagePipe.Interprocess.pdb' on the configured symbol servers...
Loaded '/home/<omitted>/Workspace/Playground/MMPack/MMPack.Client/bin/Debug/net7.0/MessagePipe.Interprocess.dll'. Cannot find or open the PDB file.
Loaded '/home/<omitted>/Workspace/Playground/MMPack/MMPack.Client/bin/Debug/net7.0/MessagePack.dll'. Symbols loaded.
Loaded '/home/<omitted>/Workspace/Playground/MMPack/MMPack.Client/bin/Debug/net7.0/MessagePack.Annotations.dll'. Symbols loaded.
If we have Microsoft.Extensions.DependencyInjection
libs in Unity3D then Microsoft.Extensions.DependencyInjection.IServiceCollection
conflicts with MessagePipe.IServiceCollection
cause the latter replaces the first one.
Thus it's not possible to register MessagePipe within the original Microsoft.Extensions.DependencyInjection
container.
The only workaround is to build MessagePipe with BuiltinContainerBuilder
but nothing would be injected from the main container then.
This hinders usage of the Microsoft's implementation:
#if !UNITY_2018_3_OR_NEWER
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
#endif
Could that be optional?
Is it possible to publish an event with a long time delay without using task.delay() from DelayRequestFilter ?
Hi,
after updating to 1.5.1 AsObservable() extension for ISubscriber is missing.
I'm evaluating some of my technology choices because I might rewrite my game's UI system, especially read-only UI like on-screen score displays or timers (as opposed to menu elements like buttons). At the same time I saw that you released this library recently.
Do you believe MessagePipe is well-suited for UI? Or am I misunderstanding its intended use case?
autowire term is typically used automatically field/property mapping on instantiate.
rename to auto-registration is more better.
to allow state per handler.
Hi, does any of the messagePipes for InterprocessPubSub supports multiple subscribers (or multiple pubs) to the same port/uds path? Whenever I try adding multiple processes subscribing to the same port, or uds path, it throws the following exception. I've tried AddMessagePipeUdpInterprocess
, AddMessagePipeTcpInterprocess
and AddMessagePipeTcpInterprocessUds
.
Unhandled exception. System.Net.Sockets.SocketException (10048): Only one usage of each socket address (protocol/network address/port) is normally permitted.
at System.Net.Sockets.Socket.UpdateStatusAfterSocketErrorAndThrowException(SocketError error, String callerName)
at System.Net.Sockets.Socket.DoBind(EndPoint endPointSnapshot, SocketAddress socketAddress)
at System.Net.Sockets.Socket.Bind(EndPoint localEP)
at MessagePipe.Interprocess.Workers.SocketTcpServer.ListenUds(String domainSocketPath, Nullable`1 sendBufferSize, Nullable`1 recvBufferSize)
at MessagePipe.Interprocess.Workers.TcpWorker.<>c__DisplayClass12_0.<.ctor>b__0()
at System.Lazy`1.ViaFactory(LazyThreadSafetyMode mode)
at System.Lazy`1.ExecutionAndPublication(LazyHelper executionAndPublication, Boolean useDefaultConstructor)
at System.Lazy`1.CreateValue()
at System.Lazy`1.get_Value()
at MessagePipe.Interprocess.Workers.TcpWorker.StartReceiver()
at MessagePipe.Interprocess.TcpDistributedSubscriber`2..ctor(TcpWorker worker, MessagePipeInterprocessTcpUdsOptions options, IAsyncSubscriber`2 subscriberCore, FilterAttachedMessageHandlerFactory syncHandlerFactory, FilterAttachedAsyncMessageHandlerFactory asyncHandlerFactory)
at System.RuntimeMethodHandle.InvokeMethod(Object target, Object[] arguments, Signature sig, Boolean constructor, Boolean wrapExceptions)
at System.Reflection.RuntimeConstructorInfo.Invoke(BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitConstructor(ConstructorCallSite constructorCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite callSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitScopeCache(ServiceCallSite callSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitConstructor(ConstructorCallSite constructorCallSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite callSite, RuntimeResolverContext context)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
at Microsoft.Extensions.DependencyInjection.ServiceProvider.CreateServiceAccessor(Type serviceType)
at System.Collections.Concurrent.ConcurrentDictionary`2.GetOrAdd(TKey key, Func`2 valueFactory)
at Microsoft.Extensions.DependencyInjection.ServiceProvider.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
at CysharpMessagePipe.ServiceCollectionExtensions.UpdateServiceRegistry(IServiceProvider services) in C:\source\4_helix\helix-core\samples\CysharpMessagePipe\CysharpMessagePipe\ServiceCollectionExtensions.cs:line 63
at CysharpMessagePipe.Program.Main(String[] args) in C:\source\4_helix\helix-core\samples\CysharpMessagePipe\CysharpMessagePipe\Program.cs:line 29
When trying to bind MessagePipe as per the "zenject sample" instructions and validate it through Zenject's scene validator (Asset/Zenject/Validate [any]),
GlobalMessagePipe.SetProvider throws an error at line 21 when it's trying to access an EventFactory binding.
Null cast error by default, ("EventFactory has no binding" when the null check's replaced with an is not T
,) even though it works properly in a build/play mode.
(or any suggested names)
Hello!
Firstly thanks so much for the effort you put into all your work - I've used UniTask extensively and am just starting to try MessagePipe.
To explain my situation, I want to be able to publish/ subscribe in specific contexts, for example I have a bunch of players and I only want to publish/ subscribe to a specific player. Currently I just use GlobalMessagePipe.
What would be the 'expected' way to handle that? It seems like I can create a BuiltinContainerBuilder for each of these contexts, however this surely has quite a lot of overhead If I'm handling 1000s of contexts?
Alternatively filters seem like they would also work, but I'm unsure if this is correct usage of them since they'd be iterating over many Messages.
Any suggestion/ advice is appreciated :)
Hi,
just tried the support for UDP Interprocess in regural C# console app and all worked well but there is a problem when trying to make it work in Unity <-> Unity 2020.3 using Zenject.
There is no way to actually call "AddMessagePipeUdpInterprocess" for the DiContainer as the overload only extends IServiceCollection and only proxy for DiContainer - DiContainerProxy is marked "internal"
Looks that "AddMessagePipeUdpInterprocess" does not actually register "IDistributedPublisher<> and IDistributedSubscriber<>" when Unity version is above 2018:
#if !UNITY_2018_3_OR_NEWER
services.Add(typeof(IDistributedPublisher<,>), typeof(UdpDistributedPublisher<,>), options.InstanceLifetime);
services.Add(typeof(IDistributedSubscriber<,>), typeof(UdpDistributedSubscriber<,>), options.InstanceLifetime);
return services;
#else
MessagePipe is just wonderful. His performance is unmatched by competitors today. I used it in an experiment project of mine and the difference in load tests is noticeable.
The only thing that bothered me when I used it was the following: In a scenario of an application using CQRS, the code using MessagePipe is more coupled and repetitive than with MediatR.
I'll give an example: This is a gRPC service using MagicOnion + MessagePipe that I used in my studies:
namespace Cpnucleo.GRPC.Services;
[Authorize]
public class SistemaGrpcService : ServiceBase<ISistemaGrpcService>, ISistemaGrpcService
{
private readonly IAsyncRequestHandler<CreateSistemaCommand, CreateSistemaResponse> _createSistemaCommand;
private readonly IAsyncRequestHandler<ListSistemaQuery, ListSistemaResponse> _listSistemaQuery;
private readonly IAsyncRequestHandler<GetSistemaQuery, GetSistemaResponse> _getSistemaQuery;
private readonly IAsyncRequestHandler<RemoveSistemaCommand, RemoveSistemaResponse> _removeSistemaCommand;
private readonly IAsyncRequestHandler<UpdateSistemaCommand, UpdateSistemaResponse> _updateSistemaCommand;
public SistemaGrpcService(IAsyncRequestHandler<CreateSistemaCommand, CreateSistemaResponse> createSistemaCommand,
IAsyncRequestHandler<ListSistemaQuery, ListSistemaResponse> listSistemaQuery,
IAsyncRequestHandler<GetSistemaQuery, GetSistemaResponse> getSistemaQuery,
IAsyncRequestHandler<RemoveSistemaCommand, RemoveSistemaResponse> removeSistemaCommand,
IAsyncRequestHandler<UpdateSistemaCommand, UpdateSistemaResponse> updateSistemaCommand)
{
_createSistemaCommand = createSistemaCommand;
_listSistemaQuery = listSistemaQuery;
_getSistemaQuery = getSistemaQuery;
_removeSistemaCommand = removeSistemaCommand;
_updateSistemaCommand = updateSistemaCommand;
}
public async UnaryResult<OperationResult> AddAsync(CreateSistemaCommand command)
{
return await _createSistemaCommand.InvokeAsync(command);
}
public async UnaryResult<ListSistemaViewModel> AllAsync(ListSistemaQuery query)
{
return await _listSistemaQuery.InvokeAsync(query);
}
public async UnaryResult<GetSistemaViewModel> GetAsync(GetSistemaQuery query)
{
return await _getSistemaQuery.InvokeAsync(query);
}
public async UnaryResult<OperationResult> RemoveAsync(RemoveSistemaCommand command)
{
return await _removeSistemaCommand.InvokeAsync(command);
}
public async UnaryResult<OperationResult> UpdateAsync(UpdateSistemaCommand command)
{
return await _updateSistemaCommand.InvokeAsync(command);
}
}
It works very well, as expected. The only issue in this implementation focused on CQRS is that the modules need to be unique for each request (Commands/Queries and their responses) end up making the implementation very repetitive and verbose.
This is the same gRPC service using MagicOnion + MediatR:
namespace Cpnucleo.GRPC.Services;
[Authorize]
public class SistemaGrpcService : ServiceBase<ISistemaGrpcService>, ISistemaGrpcService
{
private readonly IMediator _mediator;
public SistemaGrpcService(IMediator mediator)
{
_mediator = mediator;
}
public async UnaryResult<OperationResult> AddAsync(CreateSistemaCommand command)
{
return await _mediator.Send(command);
}
public async UnaryResult<ListSistemaViewModel> AllAsync(ListSistemaQuery query)
{
return await _mediator.Send(query);
}
public async UnaryResult<GetSistemaViewModel> GetAsync(GetSistemaQuery query)
{
return await _mediator.Send(query);
}
public async UnaryResult<OperationResult> RemoveAsync(RemoveSistemaCommand command)
{
return await _mediator.Send(command);
}
public async UnaryResult<OperationResult> UpdateAsync(UpdateSistemaCommand command)
{
return await _mediator.Send(command);
}
}
Wouldn't it be possible to have a RequestHandler in MessagePipe that works similarly to how MediatR works for cases like the one I mentioned above?
First off - fantastic library! Already using it in a production project.
Quick question - I have an IAsyncPublisher and am calling .Publish. If I set a breakpoint in the subscriber the Publish call is in the call stack and blocks until the subscriber completes.
Is there a configuration option necessary to make these fire-and-forget operations? The docs page indicates .Publish should be fire-and-forget.
Thank you again.
TcpTest.cs
what's the problem for the code.
im add providerClient
Subscribe by Key "hogemogeman"
add the reulst to result2
but its give me a error
[Fact]
public async Task MoreHugeSizeUdsTest()
{
var filePath = System.IO.Path.GetTempFileName();
if (System.IO.File.Exists(filePath))
{
System.IO.File.Delete(filePath);
}
try
{
var provider = TestHelper.BuildServiceProviderTcpWithUds(filePath, helper);
var providerClient = TestHelper.BuildServiceProviderTcpWithUds(filePath, helper, asServer:false);
using (provider as IDisposable)
{
var p1 = provider.GetRequiredService<IDistributedPublisher<string, string>>();
var s1 = provider.GetRequiredService<IDistributedSubscriber<string, string>>();
var s2= providerClient.GetRequiredService<IDistributedSubscriber<string, string>>();
var result = new List<string>();
var result2 = new List<string>();
await s1.SubscribeAsync("hogemogeman", x =>
{
result.Add(x);
});
await s2.SubscribeAsync("hogemogeman", x =>
{
result2.Add(x);
});
var ldata1 = new string('a', 99999);
var ldata2 = new string('b', 99999);
var ldata3 = new string('c', 99999);
var ldata = string.Concat(ldata1, ldata2, ldata3);
await Task.Delay(TimeSpan.FromSeconds(1)); // wait for receive data...
await p1.PublishAsync("hogemogeman", ldata);
await Task.Delay(TimeSpan.FromSeconds(1)); // wait for receive data...
result.Should().Equal(ldata);
result2.Should().Equal(ldata);
}
}
finally
{
if (System.IO.File.Exists(filePath))
{
System.IO.File.Delete(filePath);
}
}
}
MessagePipe.Interprocess.Tests.TcpTest.MoreHugeSizeUdsTest
源: TcpTest.cs 行 234
持续时间: 75 毫秒
消息:
System.Net.Sockets.SocketException : Only one usage of each socket address (protocol/network address/port) is normally permitted
堆栈跟踪:
Socket.UpdateStatusAfterSocketErrorAndThrowException(SocketError error, String callerName)
Socket.DoBind(EndPoint endPointSnapshot, SocketAddress socketAddress)
Socket.Bind(EndPoint localEP)
SocketTcpServer.ListenUds(String domainSocketPath, Nullable`1 sendBufferSize, Nullable`1 recvBufferSize) 行 52
<.ctor>b__0() 行 83
Lazy`1.ViaFactory(LazyThreadSafetyMode mode)
Lazy`1.ExecutionAndPublication(LazyHelper executionAndPublication, Boolean useDefaultConstructor)
Lazy`1.CreateValue()
Lazy`1.get_Value()
TcpWorker.StartReceiver() 行 170
TcpDistributedSubscriber`2.ctor(TcpWorker worker, MessagePipeInterprocessTcpUdsOptions options, IAsyncSubscriber`2 subscriberCore, FilterAttachedMessageHandlerFactory syncHandlerFactory, FilterAttachedAsyncMessageHandlerFactory asyncHandlerFactory) 行 55
RuntimeMethodHandle.InvokeMethod(Object target, Span`1& arguments, Signature sig, Boolean constructor, Boolean wrapExceptions)
RuntimeConstructorInfo.Invoke(BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
CallSiteRuntimeResolver.VisitConstructor(ConstructorCallSite constructorCallSite, RuntimeResolverContext context)
CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite callSite, RuntimeResolverContext context)
CallSiteRuntimeResolver.VisitScopeCache(ServiceCallSite callSite, RuntimeResolverContext context)
CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
<>c__DisplayClass2_0.<RealizeService>b__0(ServiceProviderEngineScope scope)
ServiceProvider.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
ServiceProvider.GetService(Type serviceType)
ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
TcpTest.MoreHugeSizeUdsTest() 行 249
--- End of stack trace from previous location ---
Hello!
First of all, thank you for the amazing libraries, they are very fast and easy to work with. I plan to use your libraries as much as possible. I will also submit pull requests when I create new features.
I have a use case where I have one class, IAbsoluteFilePath
that can call many services such as:
IAbsoluteFileDeleteService
IAbsoluteFileCopyService
IAbsoluteFileMoveService
For the above scenerio, using the library it will look something like this:
public interface IAbsoluteFilePath { }
// Services
public interface IAbsoluteFileDeleteService { }
public class AbsoluteFileDeleteRequest{}
public class AbsoluteFileDeleteResponse { }
public interface IAbsoluteFileCopyService { }
public class AbsoluteFileCopyRequest { }
public class AbsoluteFileCopyResponse { }
public interface IAbsoluteFileMoveService { }
public class AbsoluteFileMoveRequest { }
public class AbsoluteFileMoveResponse { }
// What the library instructs me to do
public class AbsoluteFilePath : IAbsoluteFilePath
{
private readonly IRequestHandler<AbsoluteFileDeleteRequest, AbsoluteFileDeleteResponse> _deleteHandler;
private readonly IRequestHandler<AbsoluteFileCopyRequest, AbsoluteFileCopyResponse> _copyHandler;
private readonly IRequestHandler<AbsoluteFileMoveRequest, AbsoluteFileMoveResponse> _moveHandler;
public AbsoluteFilePath(
IRequestHandler<AbsoluteFileDeleteRequest, AbsoluteFileDeleteResponse> deleteHandler,
IRequestHandler<AbsoluteFileCopyRequest, AbsoluteFileCopyResponse> copyHandler,
IRequestHandler<AbsoluteFileMoveRequest, AbsoluteFileMoveResponse> moveHandler)
{
this._deleteHandler = deleteHandler;
this._copyHandler = copyHandler;
this._moveHandler = moveHandler;
}
}
Above can become messy very quickly, in addition, if I have 15 services, I will need to inject all 15 services each time, and this can hurt performance.
I would like to do something like this:
public interface IMediator {}
// What I want to do
public class AbsoluteFilePathV2 : IAbsoluteFilePath
{
private readonly IMediator _mediator;
public AbsoluteFilePathV2(IMediator mediator)
{
this._mediator = mediator;
}
}
The documentation on the MessagePipe
library claims this is possible and requests that I review the Microsoft documentation
From MessagePipe
Docs:
The issue is, the MediaR
and Microsoft documentation has these classes below, while MessagePipe
is missing these similar classes:
IMediator
IRequest
How can I accomplish this below using MessagePipe
?
public interface IMediator {}
// What I want to do
public class AbsoluteFilePathV2 : IAbsoluteFilePath
{
private readonly IMediator _mediator;
public AbsoluteFilePathV2(IMediator mediator)
{
this._mediator = mediator;
}
}
Thanks
Am I realized correctly that IRequestHandler<, >
class is stateless? I would like to have IRequestHandler singleton with its own state. But IRequestHandler seams to creating inside of BuildServiceProvider. Can I make it outside and just somehow register?
Example:
public class ToDoServiceHandlers : MonoBehaviour,
IRequestHandler<InitialStateRequest, InitialStateResponse>,
IRequestHandler<ClearItemsRequest, ClearItemsResponse>,
IRequestHandler<AddItemRequest, AddItemResponse>
{
private ToDoState _model;
private void Awake()
{
_model = ToDoState.InitialState;;
}
public InitialStateResponse Invoke(InitialStateRequest request)
{
var response = GlobalMessagePipe.GetRequestHandler<LoadToDoItemsRequest, LoadToDoItemsResponse>().Invoke(default);
_model.Items = response.Value;
return new InitialStateResponse {Value = _model.Items};
}
}
Add Id, Timestamp.
MessagePipe DiagnosticsInfo -> shows timestamp when subscribe
public IDisposable Subscribe(IMessageHandler<TMessage> handler, params MessageHandlerFilter<TMessage>[] filters)
{
if (IsValueType || lastMessage != null)
{
handler.Handle(lastMessage!);
}
return core.Subscribe(handler);
}
Hello!
I'm trying to use the MessagePie on my project but i'm only able to sucefully use it if i declare by myself the DI linking the AsyncRequestHandlers to their respective Handlers.
I'm asking this because i don't have saw on the docs that declare the DI by myself is a required step to make MessagePie works.
So i've did:
services
.AddSingleton<IAsyncRequestHandler<CreateSistemaCommand, CreateSistemaResponse>, SistemaHandler>()
.AddSingleton<IAsyncRequestHandler<ListSistemaQuery, ListSistemaResponse>, SistemaHandler>()
.AddSingleton<IAsyncRequestHandler<GetSistemaQuery, GetSistemaResponse>, SistemaHandler>()
.AddSingleton<IAsyncRequestHandler<RemoveSistemaCommand, RemoveSistemaResponse>, SistemaHandler>()
.AddSingleton<IAsyncRequestHandler<UpdateSistemaCommand, UpdateSistemaResponse>, SistemaHandler>();
If i don't declare by myself, the code throws an exception when i try to use any of the non declared DIs:
System.InvalidOperationException: Unable to resolve service for type 'MessagePipe.IAsyncRequestHandlerCore`2[Cpnucleo.Infra.CrossCutting.Util.Queries.Sistema.ListSistema.ListSistemaQuery,Cpnucleo.Infra.CrossCutting.Util.Queries.Sistema.ListSistema.ListSistemaResponse]' while attempting to activate 'MessagePipe.AsyncRequestHandler`2[Cpnucleo.Infra.CrossCutting.Util.Queries.Sistema.ListSistema.ListSistemaQuery,Cpnucleo.Infra.CrossCutting.Util.Queries.Sistema.ListSistema.ListSistemaResponse]'.
The question is: The MessagePipe should be handle by ifself the DIs (like Mediatr does) or it's necessary declare on the client side?
Here's my solution: https://github.com/jonathanperis/cpnucleo/tree/feat-message-pipe-hellyeah
The projects are the Cpnucleo.GRPC that contains the mediator like classes to invoke their handlers contained on the Cpnucleo.Application project
Classes used by their handlers are shared on the Cpnucleo.Infra.CrossCutting.Util project
no need for extra DI library.
Would you please add a license text?
+ var types = AutowireEngine.CollectFromCurrentDomain();
Calling GlobalMessagePipe.SetProvider (provider)
during IL2CPP Build results in an InvalidOperationException
.
using MessagePipe;
using UnityEngine;
public class Installer
{
[RuntimeInitializeOnLoadMethod]
private static void Initialize()
{
var builder = new BuiltinContainerBuilder();
builder.AddMessagePipe();
builder.AddMessageBroker<int>();
var provider = builder.BuildServiceProvider();
GlobalMessagePipe.SetProvider(provider);
}
}
InvalidOperationException: Sequence contains no elements
at System.Linq.Enumerable.First[TSource] (System.Collections.Generic.IEnumerable`1[T] source) [0x00000] in <00000000000000000000000000000000>:0
at MessagePipe.ServiceProviderType..ctor (System.Type type) [0x00000] in <00000000000000000000000000000000>:0
at MessagePipe.BuiltinContainerBuilderServiceProvider+<>c__DisplayClass2_1.<.ctor>b__1 () [0x00000] in <00000000000000000000000000000000>:0
at System.Func`1[TResult].Invoke () [0x00000] in <00000000000000000000000000000000>:0
at System.Lazy`1[T].CreateValue () [0x00000] in <00000000000000000000000000000000>:0
at System.Lazy`1[T].LazyInitValue () [0x00000] in <00000000000000000000000000000000>:0
at MessagePipe.BuiltinContainerBuilderServiceProvider.GetService (System.Type serviceType, System.Int32 depth) [0x00000] in <00000000000000000000000000000000>:0
at MessagePipe.DependencyInjectionShims.GetRequiredService[T] (System.IServiceProvider provider) [0x00000] in <00000000000000000000000000000000>:0
at MessagePipe.GlobalMessagePipe.SetProvider (System.IServiceProvider provider) [0x00000] in <00000000000000000000000000000000>:0
Hello,
I've recently upgraded from Unity 2020 LTS to Unity 2021 LTS (2021.3.8f1 as of submitting this issue). This broke VContainer code generation and now I have to remove VContainer.Enable.CodeGen from each assembly that references MessagePipe for the game to build. I'm not sure why this is the case but it seems the MessagePipe assembly loads as null during code generation. This only happened to me with MessagePipe so far, so I'm opening an issue here as well in case this is related to MessagePipe more than VContainer.
Thanks in advance
I'm trying to use IRemoteRequestHandler<TRequest,TResponse>
with named pipes between two processes on the same machine but I'm having no luck. As far as I can tell from debugging, the problem is that the NamedPipeWorker
is never actually instantiated in the server process and thus the ReceiveLoop is never initiated and thus requests from the client are sent into nothingness and are awaited forever (the cancellation token passed to InvokeAsync()
does not appear to be used - but that's a separate issue)...
It seems that all the samples for IRemoteRequestHandler
that I've looked at probably only work because they have the client and the server classes both inside the same executable.. In that case the NamedPipeRemoteRequestHandler
that gets called into life by injecting an IRemoteRequestHandler
into a client class takes care of instantiating the worker - this doesn't have any effect however when the request handler class is in a different process. It seems that for remote request handlers, launching the receiver in the server process should probably be triggered by (auto-)registering an IAsyncRequestHandler
or something like that, right?
P.S.: I haven't done any actual tests, but just from glancing at the code I suspect that the exact same problem would also apply to the TCP transport...
void IUniTaskSource.GetResult(short token)
and public IUniTaskStatus UnsafeGetStatus()
In
dotnet build MessagePipe.sln
This may be caused by executing tools/PostBuildUnity/PostBuildUnity.csproj in PostBuildEvent.
This is no problem if executing only once, but executed twice, public UniTaskStatus GetStatus(short token)
will be hit and generating same lines again.
MessagePipe/tools/PostBuildUtility/Program.cs
Lines 49 to 51 in 7a6a2bf
Hello.
I don't want to report an issue, but give a suggestion instead.
The documentation on how to initialize a global message pipe in Unity with DI plugins seems not ideal. I would think that you'd want to initalize such a global service inside the composition root, rather than in some random class. For VContainer, I believe this could be a much cleaner way to do it, than what is suggested in the documentation.
protected override void Configure(IContainerBuilder builder)
{
var options = builder.RegisterMessagePipe();
builder.RegisterBuildCallback(c => GlobalMessagePipe.SetProvider(c.AsServiceProvider()));
}
This way, you don't have to inject IObjectResolver
anywhere in order to initialize the global message pipe. I think this should work no matter what.
Hi!
I've migrated my project to use MessagePipe but i have a situation that breaks me to deploy this change:
I have my Infrastructure buit with Scoped contexts and when i have to consume any IAsyncRequestHandlerCore that on the other side have a context injected that have been declared as Scoped it happens an exception:
Is this a expected behavior? The actual source code is hosted here:
https://github.com/jonathanperis/cpnucleo/tree/feat-message-pipe-hellyeah
The project Cpnucleo.Infra.Data have the classes injected as Scoped on the Configuration folder (in case you adventure on investigate). The implementation of the MessagePipe is happening on the Cpnucleo.GRPC (consume) and Cpnucleo.Application (implementation)
If I declare a non-mono :
TestClass{
[Preserve]
public TestClass(ISubscriber<Signal> subscriber)
{
_subscriber = subscriber;
_subscriber.Subscribe(_ => OnSignal());
private void OnSignal()
{
Debug.Log("Sucess");
}
}
}
then the injection never takes place and my method is never called.
Workaroud: If I make it derive from IStartable and leave the Start() empty, it works...
This is reproducible from 1.11 and 1.12 (just released today and still has this).
It is not always, but I had this problem on my project and now I made a prototype to reproduce this, let me know if you want it.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.