Giter VIP home page Giter VIP logo

rxnetty's Introduction

Project Status

2018-02-014

  • 1.0.x will be the RxNetty-2 update. It is currently pending an RFC for API changes.
  • 0.5.x is the current release branch. This is no longer under active development but will have major patches applied and will accept pull requests.
  • 0.4.x is now considered legancy and will only have critical patches applied.

Branch Status

This is the current branch for RxNetty and is now API stable.

Motivations

Motivations and detailed status of the breaking changes in 0.5.x can be found here RxNetty

Download Average time to resolve an issue Percentage of issues still open

Reactive Extension (Rx) Adaptor for Netty

Getting Started

The best place to start exploring this library is to look at the examples for some common usecases addressed by RxNetty.

A very simple HTTP server example can be found here and the corresponding HTTP client is here

Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at http://search.maven.org.

Example for Maven:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxnetty-http</artifactId>
    <version>x.y.z</version>
</dependency>

and for Ivy:

<dependency org="io.reactivex" name="rxnetty-http" rev="x.y.z" />

and for Gradle:

implementation 'io.reactivex:rxnetty-http:x.y.z'
Unintentional release artifacts

There are two artifacts in maven central 0.5.0 and 0.5.1 which were unintentionally released from 0.4.x branch. Do not use them. More details here

Build

To build:

$ git clone https://github.com/ReactiveX/RxNetty.git -b 0.5.x
$ cd RxNetty/
$ ./gradlew build

Bugs and Feedback

For bugs, questions and discussions please use the Github Issues.

LICENSE

Copyright 2014 Netflix, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

rxnetty's People

Contributors

andrewreitz avatar ardacebi avatar benjchristensen avatar brharrington avatar dangets avatar daschl avatar david-at-aws avatar davidmoten avatar diptanu avatar dmuino avatar elandau avatar fnxrassmate avatar forresthopkinsa avatar g9yuayon avatar groodt avatar jamesgorman2 avatar jonashallfnx avatar jstnbckr avatar karishmag9 avatar manuelp avatar niteshkant avatar quidryan avatar randgalt avatar raphaelbrugier avatar rdegnan avatar rhart avatar rspieldenner avatar rstoyanchev avatar stevegury avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rxnetty's Issues

UriInfo does not handle certain URI

Tried to execute this code:

        URI uri = new URI("http://jenkins_slave-cf8d2895:50913/");
        System.out.println(UriInfo.fromUri(uri).getHost());

The output is "localhost"

Server onError Handling

The server does not have default error handling when Observable<Void> calls onError.

Right now I have to manually do it or errors are swallowed:

                    return testEndpoint(request, response).onErrorFlatMap(error -> {
                        return writeError(request, response, "Unknown error: " + error.getMessage());
                    });

Add logging framework

We don't have any logging in rx-netty module now. I think we need some logging when the code reaches production to help debugging. For example, we have debugged and found issues in Apache HttpClient connection pool when DEBUG logging is turned on.

chunked encoding header is added prematurely

In HttpRequest.withContent(T content), it adds Transfer-encoding: chunked header if the content length is not set. However, content length can be set later with withHeader() method since we allow arbitrary sequence of the with...() calls in the builder.

Content-length is set for GET request

According to HTTP specification:

The presence of a message-body in a request is signaled by the inclusion of a Content-Length or Transfer-Encoding header field in the request's message-headers. A message-body MUST NOT be included in a request if the specification of the request method (section 5.1.1) does not allow sending an entity-body in requests.

But looks like in the last update "Content-length: 0" is automatically added for all requests without a content. This leads to Ribbon test failure when it talks to MockHttpServer which throws exception when seeing "Content-length" header on GET request.

Public vs Private APIs

Right now we don't have the public APIs correctly exposed. For example, we have some classes hiding in rx.netty.impl that are actually part of the public API so they should be inside an impl package. The ObservableHttpResponse object is deep inside a protocol package that is definitely full of private objects.

The RxNetty class is definitely the entry point, and anything it exposes should also become part of the rx.netty package structure I think, or something other public package off of rx.netty. The rx.netty.impl and rx.netty.protocol packages should be filtered from the public Javadoc and treated as internal implementation details I think.

Reusable HTTPRequests?

HttpRequest provided by HttpClient contains a ContentSource which is a collection of Objects that would be sent as payload of the HTTP request sent by HttpClient.

In order for Ribbon or any other higher level framework to provide retries, it is imperative that it should be able to send the same request to multiple clients. However, with the current model, the same HttpRequest can not be reused as the ContentSource contains state of the iteration.

Assuming we want to support usecases like this, without forcing the users to clone the object before every use, there is the following we have to do:

  • The framework overwrites any state changes it makes to headers while sending the request, eg: The host header.
  • The content source set in the HttpRequest should provide a way to reset state. This can be achieved in two possible ways:
    • Provide a reset method on the ContentSource. This is too intrusive for content sources that do not need reuse and providing a different interface adds to confusion.
    • Return a new instance of ContentSource, every time a request is to be sent i.e. instead of returning the same contentSource object here return a new instance every time. In order to achieve this, we would have to optionally provide a override method of withContentSource() that takes a ContentSourceFactory. In case, this factory is set, we would use this factory to create the new content source instance, else we will use the content source object as is.

@benjchristensen @g9yuayon @allenxwang Thoughts?

RxClient should provide a way to shutdown

This is specifically required since we have also implemented a connection pool and hence there should be a way to cleanup those connections when they are no longer required.

This should in turn shutdown the underlying pool.

HttpObjectAggregator is not in the correct position in the client pipeline

PipelineConfigurator.httpClientConfigurator() returns a PipelineConfigurator that includes HttpObjectAggregator. However, it seems that this is never used as ClientRequestResponseConverter does not pass the HttpContent object to the next handler in the pipeline.

It is not clear what the intention is. Are we still going to support out-of-the-box pipeline configuration that supports aggregated http response?

Implement Cookie & URI parsing for HTTP

HttpRequest & HttpResponse both for client and server does not implement Cookies.
Also, different methods in HttpRequest for server for parsing URI are not yet implemented.

AbstractClientBuilder.SHARED_IDLE_CLEANUP_SCHEDULER prevents application from closing

    HttpClient<ByteBuf, ByteBuf> client = new HttpClientBuilder<ByteBuf, ByteBuf>('localhost', 8080)
            .//withNoIdleConnectionCleanup()
            .withMaxConnections(DEFAULT_MAX_CONNECTIONS)
            .build()

        println client.submit(HttpClientRequest.createPost('/api/profile/login')
           // [...]
          .toBlockingObservable().last()

        client.shutdown()

Without withNoIdleConnectionCleanup, a static 1-thread scheduler is created and prevents application from closing.

=> Why it is shared ? Could it just be instanciated per client, since each client aslo schedules its own cleanup task and delay ? Ansi since each client can handle a lot of connections, there shouldn't be so much client instances within an app ?

Thank you ;-)

Required Client side metrics

As there was no insight initially on which metrics should be provided. This comment is updated post-implementation to provide information about the available metrics.
Following metrics will be available for out of the box servo metrics plugin.

TCP
  • Live Connections: The number of open connections from this clients to a server. This is a gauge.
  • Connection count: The total number of connections ever created by this client. This is a monotonically increasing counter.
  • Pending Connections: The number of connections that are pending. This is a gauge.
  • Failed connects: Total number of connect failures.
  • Connection times: Time taken to establish a connection.
  • Pending connection close: Number of connections which are requested to be closed but are not yet closed. This is a gauge.
  • Failed connection close: Number of times when the connection close failed. This is a monotonically increasing counter.
  • Pending pool acquires: For clients with a connection pool, the number of acquires that are pending. This is a gauge.
  • Failed pool acquires: For clients with a connection pool, the number of acquires that failed. This is a monotonically increasing counter.
  • Pool acquire times: For clients with a connection pool, time taken to acquire a connection from the pool.
  • Pending pool releases: For clients with a connection pool, the number of releases that are pending. This is a gauge.
  • Failed pool releases: For clients with a connection pool, the number of releases that failed. This is a monotonically increasing counter.
  • Pool releases times: For clients with a connection pool, time taken to release a connection to the pool.
  • Pool acquires: For clients with a connection pool, the total number of acquires from the pool.
  • Pool evictions: For clients with a connection pool, the total number of evictions from the pool.
  • Pool reuse: For clients with a connection pool, the total number of times a connection from the pool was reused.
  • Pool releases: For clients with a connection pool, the total number of releases to the pool.
  • Pending Writes: Writes that are pending to be written over the socket. This includes writes which are not flushed.
    This is a gauge.
  • Pending Flushes: Flushes that are issued but are not over yet. This is a gauge.
  • Bytes Written: Total number of bytes written. This is a monotonically increasing counter.
  • Write Times: The time taken to finish a write.
  • Bytes Read: The total number of bytes read. This is a monotonically increasing counter.
  • Failed Writes: The total number of writes that failed. This is a monotonically increasing counter.
  • Failed Flushes: The total number of flushes that failed. This is a monotonically increasing counter.
  • Flush times: The time taken to finish a flush.
HTTP

HTTP contains all the metrics that are available from TCP. The following metrics are specific to HTTP:

  • Request backlog: The number of requests that have been submitted but not started processing. This is a gauge.
  • Inflight requests: The number of requests that have been started processing but not yet finished processing. This is a gauge.
  • Processed Requests: Total number of requests processed. This is a monotonically increasing counter.
  • Request Write Times: Time taken to write requests, including headers and content.
  • Response Read Times: Time taken to read a response.
  • Failed Responses: Total number of responses that failed i.e. for which the requests were sent but response was an error.
  • Failed request writes: Total number of requests for which the writes failed.
UDP

UDP contains all the metrics that are available from TCP.

Use of www.google.com in tests

A lot of tests in HttpClientTest tries to connect to "www.google.com".
This can easily be replaced by starting a local RxServer on a port. The dependency on a website can cause flakiness of tests and also adds an unnecessary requirement of connecting to the internet for running the tests (Yes I want to run these tests on a flight :) )

Simple URI mapping

We'd still need an simple URI mapping that dispatches different handlers to different URI patterns. Mantis already has a few use cases for this function. I'd imagine that URI mapping is the first thing that most users will need, it would make sense to put it in RxNetty.

HttpClient should cancel request write & response processing on unsubscription

Currently HttpClient cancels the connection subscription when the caller of HttpClient.submit() unsubscribe. This in turn closes the connection.
The relevant code is here:

https://github.com/Netflix/RxNetty/blob/master/src/main/java/io/reactivex/netty/protocol/http/HttpClientImpl.java#L71

Ideally, it should do the following:

  1. If write is not completed, cancel write, which if successful should close connection.
  2. If the response is not completed, we wait for response to complete and then close the connection.

In the above statements, close the connection in presence of connection pooling (#20) would signify connection returning to pool. In such a case, the connection should be in a sane state to re-use i.e. no read/write should be pending.

AbstractQueueBasedChannelPool issues duplicate channel lifecycle events

The lifecycle of a channel in netty is defined as

channelRegistered -> channelActive -> channelInactive -> channeUnregistered

However, going by the code here:

https://github.com/Netflix/RxNetty/blob/master/rx-netty/src/main/java/io/reactivex/netty/client/pool/AbstractQueueBasedChannelPool.java#L150

The lifecycle will be:

channelRegistered -> channelActive -> channelRegistered -> channelActive -> channelInactive -> channeUnregistered

for the channel that gets reused once.

This will violate the contract of channel lifecycle and hence create confusion for people adding lifecycle handlers to the pipeline.

On a related note, why do we have to re-initialize the pipeline here? As discussed here: #20 I thought we would not worry about resetting the pipeline and channel states on connection reuse?

Metrics

In order to use RxNetty effectively in production, we would require to have some metrics providing insights into the clients and servers created by RxNetty. Currently, there isn't any metrics published by RxNetty.

Pluggability

Since, RxNetty strives to be not opinionated on many aspects but for the fact that it uses Netty for the network stack and RxJava for async interfaces. It would not be prudent for us to bind to a particular metrics framework like servo, yammer metrics, etc.
Instead we should provide hooks into the system for anyone to be able to provide a plugin for various metrics framework.

Hooks

There are a few ways of providing hooks for the metrics system.

Event based

This essentially means that we publish events for any interesting state change inside RxNetty via Observables, from where people can infer these metrics.
Current RxClient connection pool implementation provides such a hook here

Advantages

The advantage of this approach is that we do not have to maintain any state inside RxNetty and would not face problems around holding this metrics around overflow which without a metrics framework would create more work for us.
One subtle advantage is that we do not have to upfront decide for everyone, which metrics are useful, people can infer newer metrics from the available events.

Disadvantages

The disadvantage is that for every plugin, people have to re-invent the wheel from the point of view of understanding these events and correlating them to infer these metrics. This gets complex when the metrics are to be inferred from a set of events instead of one.

Data based

We can also provide a data based plugin mechanism, wherein we store all the metrics of interest and provide a hook to transform and send these metrics to the metric framework of choice.

Advantages

  • Lesser things to do for every plugin provider.
  • No one has to understand the infrastructure to create the metrics.

Disadvantages

  • We would have to create basic metrics constructs like rolling counters, percentiles, etc. which comes for free with
    any metrics framework.

Required metrics

Client

All required client side metrics are defined in the issue #96

Server

All required server side metrics are defined in the issue #97

Race condition of pool idle connection cleanup task and acquiring connection

In the code of IdleConnectionsCleanupTask:

    PooledConnection<I, O> idleConnection = iterator.next(); // 1
    if (!idleConnection.isUsable()) { // 2
           iterator.remove(); 
           discardConnection(idleConnection); 
    }

Follow the sequence below:

  1. clean up thread gets an idle connection from iterator.next() at mark 1
  2. clean up thread gets suspended
  3. another thread acquires the same connection via idleConnections.poll()
  4. clean up thread resumes, and at mark 2 idleConnection.isUsable() may return false since some time has elapsed
  5. clean up thread closes the connection, which is already in use in another thread

Possibly make sense to use an AtomicBoolean to indicate that a PooledConnection has been owned by a thread?

ObservableConnection vs ObservableHttpResponse

Does it continue to make sense giving HTTP it's own response type instead of the generic ObservableConnection?

Thinking more since yesterday I think it probably still does because HTTP is such a common protocol and different from the others in that it is unidirectional once established.

I do think though we should make sure ObservableConnection and ObservableHttpResponse feel similar.

Redirect loops & infinite redirects

HttpClient follow redirect implementation does not detect redirect loops and does not limit the number of redirects followed per request.

Handling read timeout

Netty's handling of read timeout is not very intuitive: you need to add a ReadTimeoutHandler in the pipeline. However, channel read can still happen even if ReadTimeoutHandler has thrown out a ReadTimeoutException down the pipe.

In the blocking socket, setting read timeout ensures that socket resources will be released after the timeout is passed. How does the read timeout play a role in non-blocking socket?

Do we still care about read timeout ? Should it be handled in RxNetty, Ribbon, or through Observable APIs by the caller?

Support writing to HTTP connection

HttpClient needs to have an API to support writing to HTTP connection. This is useful when full content is not available at the time when POST request is sent.

Add wire debugging functionality

RxNetty should be able to add wire debugging if it is configured to do so as part of ClientConfig.

The implementation can probably use Netty's LoggingHandler.

Rename HttpRequest/HttpResponse with server/client perspective

When doing server side programming with RxNetty which also uses its client to talk to other services, the server side HttpRequest/Response are mangled with client's corresponding objects that have the same name, which ends up having to use fully qualified name.

Adding Headers Before Flush Still Doesn't Add Them

If I write anything to the channel and then add headers (even without flushing) the headers don't show up.

The use case is placing headers after we're finish writing that give the server-side duration (we do this all over Netflix).

If we flush manually, then this is impossible of course, but if we don't flush and add a header before closing then the headers should show up.

RxClient.DEFAULT_CONFIG may be muted

If people use this code

    ClientConfig config = new Builder(DEFAULT_CONFIG).build();
    config.readTimeout(...);

It ends up that the DEFAULT_CONFIG is muted for the read timeout.

We probably should just copy the attributes instead of holding the reference of the passed in config in this constructor.

HttpClientConfig has the same problem.

ServerSentEventDecoder optimization

The ServerSentEventDecoder can be optimized as follows:

It currently does a data copy as it converts data to a string. Instead you can read only the attribute name & the rest can be a view of the underlying ByteBuf. This can help in cases where people want to handle raw ByteBuf. You can have a utility method on Message to get data/value as String.
It will be more optimal not to add the created message to the out list as it gets accumulated by the ByteToMessageDecoder. Instead you can just fire a channel read event which will send the created message upstream instantly.

Connection pool shutdown

Currently, the ChannelPool implementation does not provide a way to shutdown the pool. After we resolve issue #80 of providing a shutdown for the client, we should also be able to shutdown the associated pool.
If we do not do this, we will end up with unused opened connections, if a client is shutdown and the process is still alive.

This should possibly also close the "in-use" connections and not just the idle ones.

Review comments

The following are my comments on doing the first pass of the netty related code.

Bugs

  • ObservableHttpClient in the unsubscribe action of the returned subscription calls: connectionPromise.channel().close().sync(); but the channel returned by promise can be null.
  • The decoder does not handle chunked + SSE combination as such.
  • ServerSentEventDecoder does a checkpoint() after readFullLine() which does not eliminate re-reading on buffer underflow, which the comment suggests.

Optimizations

The ServerSentEventDecoder can be optimized as follows:

  • It currently does a data copy as it converts data to a string. Instead you can read only the attribute name & the rest can be a view of the underlying ByteBuf. This can help in cases where people want to handle raw ByteBuf. You can have a utility method on Message to get data/value as String.
  • It will be more optimal not to add the created message to the out list as it gets accumulated by the ByteToMessageDecoder. Instead you can just fire a channel read event which will send the created message upstream instantly.

General code improvement

  • ObservableHttpClient.ChannelSetting: I do not see a need for this if you have the bootstrap initialized by the builder or passed to the ObservableHttpClient constructor.
  • ConnectionPromise may be better off as package private & be completed with a Channel as opposed to a ResponseWriter.
  • You are explicitly using the pooled bytebuf allocator which isn't yet blessed by netty which is why the default is unpooled. You may want to stick to the default which will get upgraded to pooled when it is ripe.
  • For content-type & other headers parsing you may want to look at this library: simpleframework: http://search.maven.org/#artifactdetails%7Corg.simpleframework%7Csimple%7C5.1.6%7Cjar It is a very lightweight library & provides useful utilities.
  • Looks to me that the RequestCompletionPromise is not required in ObservableHttpClient as you guys are always dealing with endless streams. The current code does not much on it but for cancel, which the user does not have a handle on.
  • RequestWriter creates a new promise from the channel & then return another RequestWrittenPromise. You can just create one RequestWrittenPromise and use it for both.

I haven't yet read the entire code, I will update this issue list if I see anything else. If you want me to see a specific piece of code, let me know.

Move host and port to RxClient method signature

Currently host and port are required arguments for the constructor and instance variables for RxClientImpl. I am proposing moving them to method signature like connect() or submit(). This will have the following benefit:

  • Makes RxClient reusable when it needs to connect to a different server but uses the same client and pipeline configuration
  • Makes it possible to have APIs that support full HTTP URI that includes host and port
  • Provides an easier way to integrate with Ribbon load balancer which provides host and port.

Make sure server for HttpClientTest runs with multiple threads

Test testTimeout() in HttpClientTest will send a request to RxNetty server and causes it to wait for 10 seconds. If the server runs with only one thread, this will cause subsequent tests to timeout if they send request to server and install the ReadTimeoutHandler, which is the case for test testNoReadTimeout().

Synchronized methods in AbstractQueueBasedChannelPool

This could lead to hot locks specially if executed within a completely async application and gets executed in an eventloop.

For the methods in question,

  1. getTotalChannelsInPool() can just be a best effort and hence can remove synchronization.
  2. setMaxTotal() uses a semaphore underneath so does not need to be synchronized, rite?

Support GZip for FullHttpResponse

We need to add another configuration option in ClientConfig to indicate that GZip support is needed and add appropriate handler in the pipeline.

AbstractQueueBasedChannelPool close idle connection from another route on pool exhaustion

May be I am out of context here, so correct me if I am mistaken.

AbstractQueueBasedChannelPool has this code:
https://github.com/Netflix/RxNetty/blob/master/rx-netty/src/main/java/io/reactivex/netty/client/pool/AbstractQueueBasedChannelPool.java#L172

which seems to suggest that if a connection is not available for a server, it will take it from another server.
On the face of it, this seems to be misleading to say the least as the caller of requestChannel() expects to get the channel from the server it is requesting the connection from.

Infrastructure Contexts

Internal to Netflix we have a concept of Contexts, tied to a request that gets passed across service calls. These contexts are majorly targeted towards "infrastructure concerns" like tracing information which is hidden from a "developer next door"!

Since, propagation of these contexts requires participation from both client and server, it is imperative that the same is in a place which can be used by both ends of the IPC stack. Hence, this proposal of adding this functionality to RxNetty.

Features

  • Opt-in: This feature would be completely opt-in from RxNetty's standpoint and will be in a different module. Karyon & Ribbon which are built on top of RxNetty, however will support this by default.
  • Non-intrusive: This will be completely driven by netty's ChannelHandler, i.e. even if the binary is in the classpath. A user has to explicitly choose to add this handler into the pipeline if they want to use this feature.
  • Multi-protocol: This feature should be pluggable into any wire-protocol. The pluggability would be around "how to read-write this context from the wire". Apart from the wire-level protocol handling, everything else would be identical for all protocols.
  • Serialization: How to serialize-de-serialize these contexts over the wire would be extensible, even for the same protocol. eg: I can serialize the context into a json, custom string, binary data, etc.

Out of scope

  • After these contexts get into the application layer i.e. in most cases after the completion of netty's pipeline, it is upto the application to manage these contexts across threads. So, RxNetty will not provide ways to copy this data from one thread to another during the execution of the request.

Practical usage

  • Although, one can do enough work to use this facility directly from RxNetty, it will be much easier if used via karyon and ribbon

Problems with synchronized channel connection and callbacks on ConnectException

In RxClientImpl, channels are connected synchronously:

final ChannelFuture f = clientBootstrap.connect(serverInfo.getHost(), serverInfo.getPort()).sync();

This causes Ribbon junit test failure on retries if the retry happens on the same thread. The exception received is BlockingOperationException.

If removing sync() call, then this JUnit test will fail as Observer.onCompleted() is called unexpectedly:

@Test
public void testConnectException() throws Exception {
    HttpClientBuilder<FullHttpRequest, FullHttpResponse> clientBuilder =
            new HttpClientBuilder<FullHttpRequest, FullHttpResponse>("localhost", 81);
    HttpClient<FullHttpRequest, FullHttpResponse> client = clientBuilder.channelOption(
            ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000).build();
    FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
    Observable<ObservableHttpResponse<FullHttpResponse>> response = client.submit(request);
    final CountDownLatch latch = new CountDownLatch(1);
    final AtomicReference<Throwable> ex = new AtomicReference<Throwable>();
    response.subscribe(new Action1<ObservableHttpResponse<FullHttpResponse>>() {
        @Override
        public void call(ObservableHttpResponse<FullHttpResponse> t1) {
        }

    }, new Action1<Throwable>() {
        @Override
        public void call(Throwable t1) {
            ex.set(t1);
            latch.countDown();
        }
    }, new Action0() {

        @Override
        public void call() {
            System.err.println("OnCompleted called");
            latch.countDown();
        }

    });
    latch.await();
    System.err.println(ex.get());
    assertNotNull(ex.get());
}

The output shows that OnCompleted is called, instead of OnError.

Channel pool should listen for channel close events

Currently the channel pool implementation does not have a way to react to close of underlying socket. It would depend on the next reuse of the connection or a background thread to detect such socket closures.
A scenario in which this limitation is highlighted is a ReadTimeoutHandler closing the connection after firing the timeout exception. In this case, the underlying connection will be in the idle pool till it is picked for reuse and a close is detected.

Required server side metrics

As there was no insight initially on which metrics should be provided. This comment is updated post-implementation to provide information about the available metrics.

Following metrics will be available for out of the box servo metrics plugin.

TCP
  • Live Connections: The number of open connections from all clients to this server. This is a gauge.
  • Inflight Connections: The number of connections that are currently being processed by this server. This is a gauge.
  • Failed Connections: The number of times that connection handling failed. This is a monotonically increasing counter.
  • Connection processing times: Time taken for processing a connection.
  • Pending connection close: Number of connections which are requested to be closed but are not yet closed. This is a gauge.
  • Failed connection close: Number of times when the connection close failed. This is a monotonically increasing counter.
  • Connection close times: Time taken for closing a connection.
  • Pending Writes: Writes that are pending to be written over the socket. This includes writes which are not flushed.
    This is a gauge.
  • Pending Flushes: Flushes that are issued but are not over yet. This is a gauge.
  • Bytes Written: Total number of bytes written. This is a monotonically increasing counter.
  • Write Times: The time taken to finish a write.
  • Bytes Read: The total number of bytes read. This is a monotonically increasing counter.
  • Failed Writes: The total number of writes that failed. This is a monotonically increasing counter.
  • Failed Flushes: The total number of flushes that failed. This is a monotonically increasing counter.
  • Flush times: The time taken to finish a flush.
HTTP

HTTP contains all the metrics that are available from TCP. The following metrics are specific to HTTP:

  • Request backlog: The number of requests that have been received but not started processing. This is a gauge.
  • Inflight requests: The number of requests that have been started processing but not yet finished processing. This is a gauge.
  • Response Write Times: Time taken to write responses, including headers and content.
  • Request Read Times: Time taken to read a request.
  • Processed Requests: Total number of requests processed. This is a monotonically increasing counter.
  • Failed Requests: Total number of requests for which the request handling failed.
  • Failed response writes: Total number of responses for which the writes failed.
UDP

UDP contains all the metrics that are available from TCP.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.