Comments (1)
(Copying comments from #267 for reference here)
PR #266 fixes this issue as a new implementation. Existing implementation is preserved and is deprecated (Issue #209)
Comparison of object allocation between the new and old implementation is:
Old Implementation
New Implementation
The above benchmark was done for a ServerSentEvent
data size of 10KB. The code for the test is below:
Server
package io.reactivex.netty.examples.http.sse;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import rx.Observable;
import rx.functions.Func1;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public final class TestSSEServerStart {
private static final ByteBuf data;
static {
final byte[] dataBytes = new byte[10 * 1024];
Arrays.fill(dataBytes, (byte) 'c');
data = Unpooled.buffer().writeBytes(dataBytes).retain();
}
public static final byte[] DATA_PREFIX = "data: ".getBytes();
public static final byte[] EOL = "\n\n".getBytes();
public static void main(String[] args) {
RxNetty.createHttpServer(8091, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
final HttpServerResponse<ByteBuf> response) {
return Observable.interval(1, TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<Void>>() {
@Override
public Observable<Void> call(Long interval) {
for (int i = 0; i < 5000; i++) {
response.writeBytes(DATA_PREFIX);
response.writeBytes(data.retain());
response.writeBytes(EOL);
}
return response.flush();
}
});
}
}).startAndWait();
}
}
Client (Old)
package io.reactivex.netty.examples.http.sse;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import java.util.concurrent.atomic.AtomicLong;
public final class TestSSEDecoderMemoryOld {
public static void main(String[] args) {
testOldSSEDecoder(8091);
}
private static void testOldSSEDecoder(int serverPort) {
System.out.println("Testing old SSE decoder. Server port: " + serverPort);
final AtomicLong counter = new AtomicLong();
RxNetty.<ByteBuf, ServerSentEvent>newHttpClientBuilder("localhost", serverPort)
.pipelineConfigurator(PipelineConfigurators.<ByteBuf>sseClientConfigurator())
.build()
.submit(HttpClientRequest.createGet("/"))
.flatMap(new Func1<HttpClientResponse<ServerSentEvent>, Observable<ServerSentEvent>>() {
@Override
public Observable<ServerSentEvent> call(HttpClientResponse<ServerSentEvent> clientResponse) {
return clientResponse.getContent()
.doOnNext(new Action1<ServerSentEvent>() {
@Override
public void call(ServerSentEvent event) {
if (counter.incrementAndGet() % 1000 == 0) {
System.out.println("Received events count: " + counter.get());
}
}
});
}
}).toBlocking().last();
}
}
Client (New)
package io.reactivex.netty.examples.http.sse;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import java.util.concurrent.atomic.AtomicLong;
public final class TestSSEDecoderMemoryNew {
public static void main(String[] args) {
testNewSSEDecoder(8091);
}
private static void testNewSSEDecoder(final int serverPort) {
System.out.println("Testing new SSE decoder. Server port: " + serverPort);
final AtomicLong counter = new AtomicLong();
RxNetty.<ByteBuf, io.reactivex.netty.protocol.http.sse.ServerSentEvent>newHttpClientBuilder("localhost",
serverPort)
.pipelineConfigurator(PipelineConfigurators.<ByteBuf>clientSseConfigurator())
.build()
.submit(HttpClientRequest.createGet("/"))
.flatMap(
new Func1<HttpClientResponse<io.reactivex.netty.protocol.http.sse.ServerSentEvent>, Observable<io.reactivex.netty.protocol.http.sse.ServerSentEvent>>() {
@Override
public Observable<io.reactivex.netty.protocol.http.sse.ServerSentEvent> call(
HttpClientResponse<io.reactivex.netty.protocol.http.sse.ServerSentEvent> response) {
return response.getContent()
.doOnNext(
new Action1<io.reactivex.netty.protocol.http.sse.ServerSentEvent>() {
@Override
public void call(
io.reactivex.netty.protocol.http.sse.ServerSentEvent serverSentEvent) {
if (counter.incrementAndGet() % 1000 == 0) {
System.out.println(
"Received events count: " + counter.get());
}
}
});
}
})
.toBlocking().last();
}
}
from rxnetty.
Related Issues (20)
- Adding PipelineConfigurator in RxNetty 0.5.x HOT 1
- My async client keeps getting “Content stream is already disposed” error HOT 2
- Any plan to build RxNetty on top of RxJava 2?
- ConnectionHandler on 0.5.x-java2 branch imports rx.Observable HOT 1
- For large POSTs RxNetty seems to need to write everything before reading anything HOT 6
- ClosedChannelException while TcpClient reads from TcpServer HOT 3
- 0.5 Intuitive bytebuf handling HOT 2
- RxNetty 0.5.2 stable? HOT 4
- Create Http Client without binding it to an endpoint HOT 3
- [0.4.20] NullPointException when closing socket HOT 2
- Writes out of order when using multiple threads
- Does RxNetty support http2 multiplexing? HOT 1
- why before response.close() must response.getChannel().deregister()? HOT 2
- Unnecessary synchronised lock
- Connection Leak 0.5.2 HOT 6
- require for documentation around how backpressure works HOT 4
- HTTP POST example (REST) HOT 4
- [SECURITY] unsafeSecure() should not be used in samples HOT 1
- ResourceLeakDetector,LEAK: ByteBuf.release()
- HTTPS Server with RxNetty and existing certificate
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rxnetty.