Giter VIP home page Giter VIP logo

obvs's Introduction

Obvs: an observable microservice bus

observable services, obviously

Join the chat at https://gitter.im/inter8ection/Obvs

.NET

NuGet

Features

  • Obvs is just a library, not a framework - use as much or as little as you need.
  • Leverage messaging and Reactive Extensions to quickly compose a system of decoupled microservices.
  • Add new services and message contracts with a minimal amount of code and effort.
  • Don't tie yourself to any one transport, migrate between transports with minimal code changes required
  • Use a mix of transports and serialization formats, allowing you to pick what works for your services.
  • Declare a new Obvs ServiceBus easily using the fluent code based configuration.
  • Don't want to use Obvs message contract interfaces? Use the generic ServiceBus and supply your own.
  • Standardize on messaging semantics throughout by wrapping integrations with external API's as custom endpoints.
  • Don't distribute if you don't need to, Obvs ServiceBus includes a local in-memory bus.
  • Use one of the many available serialization extensions, or even write your own.
  • Easily debug and monitor your application using logging and performance counter extensions.

Versions/Roadmap

  • V6 - System.Reactive 5.0, supports netstandard2.0, net472 and net5.0. Mono-repo
  • V5 - System.Reactive 4.1, supports netstandard2.0 and net472
  • V4 - System.Reactive 3.1.1, supports netstandard1.6 and net452

More Details

  • Convention based messaging over topics/queues/streams per service.
  • Multiplexing of multiple message types over single topics/queues/streams.
  • Dynamic creation of deserializers per type, auto-discovery of message contracts.
  • Exceptions are caught and raised on an asynchronous error channel.

Extensions

  • Transports: ActiveMQ / RabbitMQ / NetMQ / AzureServiceBus / Kafka / EventStore
  • Serialization: XML / JSON.Net / NetJson / ProtoBuf / MsgPack
  • Logging: NLog / log4net
  • Monitoring: Performance Counters / ElasticSearch
  • Integrations: Slack

Example

Define a root message type to identify messages as belonging to your service:

public interface IMyServiceMessage : IMessage { }

Create command/event/request/response message types:

public class MyCommand : IMyServiceMessage, ICommand { }

public class MyEvent : IMyServiceMessage, IEvent { }

public class MyRequest: IMyServiceMessage, IRequest { }

public class MyResponse : IMyServiceMessage, IResponse { }

Create your service bus:

IServiceBus serviceBus = ServiceBus.Configure()
    .WithActiveMQEndpoints<IMyServiceMessage>()
        .Named("MyService")
        .UsingQueueFor<ICommand>()
        .ConnectToBroker("tcp://localhost:61616")
        .SerializedAsJson()
        .AsClientAndServer()
    .Create();

Send commands:

serviceBus.Commands.Subscribe(c => Console.WriteLine("Received a command!"));
await serviceBus.SendAsync(new MyCommand());

Publish events:

serviceBus.Events.Subscribe(e => Console.WriteLine("Received an event!"));
await serviceBus.PublishAsync(new MyEvent());

Request/response:

serviceBus.Requests
	  .OfType<MyRequest>()
	  .Subscribe(request => serviceBus.ReplyAsync(request, new MyResponse()));

serviceBus.GetResponses(new MyRequest())
	  .OfType<MyResponse>()
	  .Take(1)
	  .Timeout(TimeSpan.FromSeconds(1))
	  .Subscribe(r => Console.WriteLine("Received a response!"), err => Console.WriteLine("Oh no!"));

Define custom endpoints that can wrap API calls or integrations with other systems:

public class MyCustomEndpoint : IServiceEndpointClient
	{
    	Type _serviceType = typeof(IMyCustomServiceMessage);

    	public IObservable<IEvent> Events
    	{
        		get
        		{
            		// subscribe to external MQ broker
        		}
    	}

    	public Task SendAsync(ICommand command)
    	{
        		// call external API
    	}

    	public IObservable<IResponse> GetResponses(IRequest request)
    	{
        		// call external API and wrap response in observable
    	}

    	public bool CanHandle(IMessage message)
    	{
        		return _serviceType.IsInstanceOfType(message);
    	}
	}
	
...

IServiceBus serviceBus = ServiceBus.Configure()
      .WithActiveMQEndpoints<IMyServiceMessage>()
        .Named("MyService")
        .UsingQueueFor<ICommand>()
        .ConnectToBroker("tcp://localhost:61616")
        .SerializedAsJson()
        .AsClientAndServer()
  .WithEndpoints(new MyCustomEndpoint())
    .Create();

Run Examples in Docker

cd examples
docker-compose up

cd client
dotnet run -f netcoreapp3.1 

obvs's People

Contributors

christopherread avatar drub0y avatar fernando-almeida avatar gitter-badger avatar lscpike avatar megakid avatar michaelandrepearce avatar petroemil avatar sharpe5 avatar spocweb avatar ssttgg avatar tommaybe avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

obvs's Issues

Should IServiceBus, IServiceBusClient, IServiceEndpoint et al. be IDisposable?

Would it be smart to make this family of types subtype IDisposable? They all may have underlying resources that need to be cleaned up in some way. Right now the typical use case is these objects are going to live for the lifetime of the application and when the app goes away it's going to terminate all those resources anyway so "who cares?", but it might be wise to give the option to be able to clean up such resources more gracefully on-demand.

There is more than one way to tackle this, not only subtyping IDisposable and forcing it on all implementations, but I just wanted to start a discussion to see what anyone else has to say about it.

Separating interfaces in to a separate library

Currently I am running into loading errors while the library is doing assembly scanning and loading.
Moving messages to a separate library and referencing Obvs solved the issue.

Separating Obvs interfaces to its own library would reduce scanning related errors.

Add support for Azure Service Bus Queues

It would be great to add support for Azure Service Bus Queues as a message transport.

  • Azure ServiceBus Queue based IMessagePublisher implementation
  • Azure ServiceBus Queue based IMessageSource implementation
  • Azure ServiceBus Queue fluent configuration extension methods

Add Support for ØMQ (ZeroMQ)

ØMQ is a messaging library which is used in high-performance environments. It would be nice to support it.

However, since I don't use it currently, the priority should be on Azure Service Bus (#2) for now.

[Question] In-memory bus usage example

Don't distribute if you don't need to, Obvs ServiceBus includes a local in-memory bus.

Is there any usage examples or unit test for in-memory bus?
I could not find out somethings like .WithInMemoryEndpoints<T>().

Issue with message acknowledgement/completion/ownership in current design?

Right now Obvs provides an abstraction over the transport and provides you streams of data types via IObservable<IMessage> and all the other IMessage subtypes. This is nice because the framework allows the apps to just work with its data types and there is little "pollution" of those types from Obvs, but, because all of the messaging semantics have been stripped away, the application loses any kind of responsibility/control over message acknowledgement/completion which is a big problem in the world of competing consumers and fault tolerance.

Suppose a scenario where there is a subscriber for processing CreateUser commands which fails half way through because of an intermittent connectivity issue with its backend data store, so it just throws an exception. That command shouldn't be lost, it should show back up on the queue after a certain amount of time so it can be retried again later.

Also consider a scenario where you have long running processes as the result of a command. You don't want to use message receipts that are too long lived in case the instance that was handling the command crashes, so maybe you have a 5min receipt that you renew each step along the way to retain ownership over the message so its not relisted on the queue. How can you do this when all you're working with is IMessage though?

Or, even just consider the very simple scenario of a message being received, but the power going out on the node half way through processing the message. Obviously you don't want that message lost forever just because Joe from IT tripped over a plug in your data center. :)

In the Obvs.ActiveMQ transport implementation, for example, I notice you auto-acknowledge the message right after you deserialize it. That seems super early as you haven't even delivered it to subscriber(s) yet. In my of Obvs.AzureServiceBus right now I am calling OnNext to deliver the message to any subscribers first and then, only if that doesn't result in an exception, do I auto-complete the message.

Have you given this any thought previously??? I ran into the very same design issue in my own SB that I had been working on before I abandoned it in favor of Obvs. I'm not sure what the best solution is. Spitballing...

  • You wouldn't want to put these control methods on IMessage because then now you put behavior onto your data types and you're going to end up forcing everyone to inherit from a base class.
  • One approach I had thought of is was introducing another interface called IMessageControl like:
public interface IMessageControl<out TMessage>
        where TMessage : IMessage
{
    TMessage Message
    {
        get;
    }
    void Abandon();
    void Complete();
    void Renew(TimeSpan renewTime);
    void Reject(string reason);        
}

Then all the observable streams become IObservable<IMessageControl<TMessage>> based instead. The problem with this is that it forces this upon everyone using the framework even if they wanted to use a non-safe/enterprisey approach to messaging that didn't have message control. So I sort of reject this based on that.

  • The last approach would be to have the message control methods on some other class (the IServiceBus/IServiceBusClient maybe?) themselves so people would still receive just MyCommand instances, but could acknowledge them something like:
serviceBus.Commands.Subscribe(c =>
{
    // whatever logic

    serviceBus.Complete(c);
}

Maybe directly off the bus interfaces isn't a great idea either though, maybe you get a message controller off of the bus somehow instead?

Any thoughts? Am I missing something obvious that would make this way easier to solve?

Serialization design discussion

So the current design of IMessageSerializer::Serialize enforces nothing more than that a particular implementation can transform any object into any other object. The implementations that exist already in Obvs.Serialization seem to adhere to an unspoken contract of returning either a string or a byte[]. Then, the various MessagePublisher implementations have to then test the return type and make a further decision about exactly how those types are going to be passed through the message.

This feels rather "loose". It's also suboptimal because the data types force a fixed size realization of what otherwise lies in buffers underneath Streams which is also going to result in a lot of copying and creation of a lot more garbage that has to be GC'd.

Have you thought at all about changing this interface? My proposal would be to change the signature to void Serialize(Stream destinationStream, object message) instead. Then it's up to the MessagePublisher to provide the most optimal Stream implementation for themselves and all of the serializer implementations would just write to the Stream. JSON.NET supports this (you just layer a TextWriter over the Stream), the Protobuf implementation is already writing to a temp MemoryStream and then having to do that copying I was talking about, and of course the XmlSerializer supports this as well.

On the MessagePublisher side, in the Azure Service Bus implementaiton I would just supply a MemoryStream and looking at the ActiveMQ implementation you could actually write a simple Stream implementation that wraps their IStreamMessage very easily for example.

So what's the downside to this? Well, everything would be bytes data and any "special" treatment that a particular implementation might provide when it knows it's a string would be lost, but I don't personally see any value in that as it's all just bytes at the end of the day and I'd rather see maximum performance possible here.

Now, the flip side to this is obviously IMessageDeserializer::Deserialize which currently takes object and returns TMessage and obviously that too should then change to TMessage Deserialize(Stream sourceStream).

Dependency Injection Support?

Has any thought been given to DI support in Obvs?

It tends to be very hard to go back and bake in support for DI into frameworks later. Is it worth introducing low level support for dependency resolution now while it's early goings to avoid the pain later? You could either introduce a proprietary IDependencyResolver interface as part of Obvs core or have Obvs take a dependency on the Microsoft.Framework.DependencyInjection package which would come with the added benefit of everyone implementing direct support for that for all the popular containers (e.g. AutoFac, Ninject, Unity, etc.) since it's a building block of all the new frameworks in .NET vNext. The downside of taking the dependency on that is that it's still beta and that would mean the version of Obvs that takes that dependency would need to be a beta version itself until that dependency RTMd.

Support for configuring IMessagePropertyProvider instances?

I don't see any support for configuring IMessagePropertyProvider<TMessage> instances during configuration. All existing implementations appear to be just doing new DefaultPropertyProvider<TMessage>() in a hardcoded fashion (which I've also copied the behavior of for now).

Should this be something provided by the base fluent configuration APIs? Or perhaps I am misunderstanding the goal of property providers? Was it ever the intention that higher level consumers of the API could inject additional property providers?

The scenario I was thinking of is that, in Azure Service Bus Subscriptions, you can filter on properties for a subscription, but not on the details serialized in the body. Therefore if you wanted to broadcast events to a topic that subscribers could theoretically filter out based on properties, you would need to be able to supply your own IMessagePropertyProvider<TMessage> implementation that knew exactly how to extract those properties for a given TMessage.

For example, if I have an IEvent called OrderSubmitted that maybe has a PartnerId property on it and I'm broadcasting those events to a topic, but want to make sure that subscribers can filter on that PartnerId I would expect to write a custom OrderSubmittedEventPropertyProvider that provides explicit support for pulling that out into a property and then I'd need a way to configure that. Of course you could imagine more advanced, generic implementations based on C# expressions, but this is just a very simple example.

The other thing this raises is that, if you can configure property providers, then likely the API should allow for configuring multiple providers. One way to do that would be some kind of CompositePropertyProvider<TMessage> that allows for adding more than one for a TMessage and executes them all so that IMessagePublisher<TMessage> implementations can always work with just IMessagePropertyProvider<TMessage> so that they are agnostic of one or many.

Deprecated package reference on Nuget (Rx-Linq)

Hi,

seems Obvs 3.0.1.58 (current version) on Nuget references Rx-Linq, which as far as I know is deprecated/unlisted and has been replaced by System.Reactive.Linq.

Any plans to update this here & on Nuget?

As for me, I'm getting exceptions when using both (e.g. reference the latest System.Reactive) as Obvs seems to fail finding/loading the old Rx DLLs, in spite of having assembly redirects in place. Haven't spent much time on the issue yet as I figured an Obvs upgrade to the latest Rx libraries probably makes more sense anyway?

Is there support for Azure Service Bus?

Hey there
So I was wondering, does this package support Azure Service Bus? I looked up another repository and it was several years old and it's NuGet package was missing, probably due to being deprecated.

Add async version of IMessagePublisher::Publish

Right now IMessagePublisher only offers a synchronous version of the Publish method. Since many implementations are likely to be making a network request as a result of this method being called it would be beneficial to offer an async version of this method.

Suggested method signature:

Task PublishAsync(TMessage message);

I could see it going two ways:

  1. Simply add an additional method to the interface and allow each implementation to provide the best possible implementation for both the synchronous version and the asynchronous version. This has a slight downside of requiring all IMessagePublisher implementations to provide an implementation of both versions of the method which could theoretically allow them to optimize for both scenarios, but I suspect you'd see most implementations implementing the sync version by simply calling the async version and blocking on the Task::Result. Vice versa, and worse for most cases, they could implement the async version by calling the sync version and wrapping the result in Task::FromResult.
  2. Take a "hard" stance and keep the API surface lean by replacing the existing synchronous signature with the asynchronous one.

My personal vote would be for option number 2.

Request/Response Correlation - Introduce a Provider Model?

So Obvs has a baked in approach to setting RequesterId (and RequestId if not specified) when ServiceBusClient::GetResponses is called. There's no way to override this behavior at present. Two things:

  1. Right now the logic only sets RequestId if it's not already set. However, RequesterId is always set and overrides any value that the application might have already specified. I would argue that RequesterId should use the same rule of only overwriting if not already set. At a bare minimum, I would love to see this changed as soon as possible with the goal of allowing the application itself to decide this on a per-request basis if it wants to.
  2. How do you feel about introducing a provider model for this so that people can plug in their own implementations? I imagine something like:
public interface IRequestCorrelationProvider
{
    void ProvideRequestCorrelationIds(IRequest request);
}

Then a companion configuration API:

public interface ICanSpecifyRequestCorrelationProvider : IWhateverMakesSenseHere
{
    IWhateverMakesSenseHere WithRequestCorrelationProvider(IRequestCorrelationProvider provider);
}

I can send a PR around this if it's something you're in agreement with.

Transport/serializer source?

So you mention ActiveMQ/NetMQ support and you have separate binary packages for those up on NuGet which is awesome, but are you going to publish the repos containing the source for those as well?

I would seriously like to consider implementing Azure Service Bus support for this, but I would like to be able to look at your ActiveMQ implementation for example as a reference.

[Question] Is `IObservable` filtering applied on the client or server?

I'm interested in the library but was wondering what happens when you filter a given IObservable<T> before subscribing: is filtering (and all RX operators) applied on the client side (that is, after the message was already received), or is it somehow performed on the sender, before it is sent?

Regardless of the answer here, I'd suggest updating the documentation with this for future reference.

Question: .net Core?

Hi.

Does this library support .net Core? We're starting a new project on .net Core and would like to use this library for IPC.

Cheers.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.