2台pc测试,一台运行KcpRttExampleServer
KcpRttExampleServer kcpRttExampleServer = new KcpRttExampleServer();
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.nodelay(true,30,2,true);
channelConfig.setSndwnd(1024);
channelConfig.setRcvwnd(1024);
channelConfig.setMtu(1400);
channelConfig.setFecDataShardCount(10);
channelConfig.setFecParityShardCount(3);
channelConfig.setAckNoDelay(true);
channelConfig.setTimeoutMillis(5000);
channelConfig.setUseConvChannel(true);
channelConfig.setCrc32Check(false);
KcpServer kcpServer = new KcpServer();
kcpServer.init(kcpRttExampleServer,channelConfig,20003);
一台运行KcpRttExampleClient
public KcpRttExampleClient() {
data = Unpooled.buffer(220000);
for (int i = 0; i < data.capacity(); i++) {
data.writeByte((byte) i);
}
rtts = new int[30000];
for (int i = 0; i < rtts.length; i++) {
rtts[i] = -1;
}
startTime = System.currentTimeMillis();
scheduleSrv = new ScheduledThreadPoolExecutor(1);
}
public static void main(String[] args) {
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.nodelay(true,30,2,true);
channelConfig.setSndwnd(1024);
channelConfig.setRcvwnd(1024);
channelConfig.setMtu(1400);
channelConfig.setAckNoDelay(true);
channelConfig.setConv(55);
channelConfig.setFecDataShardCount(10);
channelConfig.setFecParityShardCount(3);
channelConfig.setCrc32Check(false);
KcpClient kcpClient = new KcpClient();
kcpClient.init(channelConfig);
KcpRttExampleClient kcpClientRttExample = new KcpRttExampleClient();
kcpClient.connect(new InetSocketAddress("192.168.2.180",20003),channelConfig,kcpClientRttExample);
}
@Override
public void onConnected(Ukcp ukcp) {
future = scheduleSrv.scheduleWithFixedDelay(() -> {
ByteBuf byteBuf = rttMsg(++count);
ukcp.write(byteBuf);
byteBuf.release();
if (count >= rtts.length) {
// finish
future.cancel(true);
byteBuf = rttMsg(-1);
ukcp.write(byteBuf);
byteBuf.release();
}
}, 20, 20, TimeUnit.MILLISECONDS);
}
@Override
public void handleReceive(ByteBuf byteBuf, Ukcp ukcp) {
int curCount = byteBuf.readShort();
if (curCount == -1) {
scheduleSrv.schedule(new Runnable() {
@Override
public void run() {
int sum = 0;
for (int rtt : rtts) {
sum += rtt;
}
System.out.println("average: "+ (sum / rtts.length));
System.out.println(Snmp.snmp.toString());
ukcp.close();
//ukcp.setTimeoutMillis(System.currentTimeMillis());
System.exit(0);
}
}, 3, TimeUnit.SECONDS);
} else {
int idx = curCount - 1;
long time = byteBuf.readInt();
if (rtts[idx] != -1) {
System.out.println("???");
}
//log.info("rcv count {} {}", curCount, System.currentTimeMillis());
rtts[idx] = (int) (System.currentTimeMillis() - startTime - time);
System.out.println("rtt : "+ curCount+" "+ rtts[idx]);
}
}
@Override
public void handleException(Throwable ex, Ukcp kcp)
{
ex.printStackTrace();
}
@Override
public void handleClose(Ukcp kcp) {
scheduleSrv.shutdown();
try {
scheduleSrv.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
int sum = 0;
int max = 0;
for (int rtt : rtts) {
if(rtt>max){
max = rtt;
}
sum += rtt;
}
System.out.println("average: "+ (sum / rtts.length)+" max:"+max);
System.out.println(Snmp.snmp.toString());
System.out.println("lost percent: "+(Snmp.snmp.RetransSegs.doubleValue()/Snmp.snmp.OutPkts.doubleValue()));
}
/**
* count+timestamp+dataLen+data
*
* @param count
* @return
*/
public ByteBuf rttMsg(int count) {
ByteBuf buf = Unpooled.buffer(220000);
buf.writeShort(count);
buf.writeInt((int) (System.currentTimeMillis() - startTime));
//int dataLen = new Random().nextInt(200);
//buf.writeBytes(new byte[dataLen]);
int dataLen = data.readableBytes();
buf.writeShort(dataLen);
buf.writeBytes(data, data.readerIndex(), dataLen);
return buf;
}
KcpRttExampleServer 报错:
bytebuf长度: 1394 读出长度26846
[104] [-34] [56] [0] [0] [0] [-127] [75] [0] [4] [20]。。。。。
Snmp{BytesSent=0, BytesReceived=0, MaxConn=0, ActiveOpens=0, PassiveOpens=0, CurrEstab=0, InErrs=0, InCsumErrors=0, KCPInErrors=0, 收到包=12071, 发送包=15022, InSegs=9058, OutSegs=11557, 收到字节=7091032, 发送字节=14605712, 总共重发数=6269, 快速重发数=0, 空闲快速重发数=0, 超时重发数=6269, 收到重复包数量=1687, fec恢复数=16, fec恢复错误数=1, 收到fecData数=9050, 收到fecParity数=3021, fec缓存冗余淘汰data包数=2085, fec收到重复的数据包=0}
java.lang.IndexOutOfBoundsException: PooledUnsafeDirectByteBuf(ridx: 2, widx: 1394, cap: 1394).slice(2, 26846)
at io.netty.buffer.AbstractUnpooledSlicedByteBuf.checkSliceOutOfBounds(AbstractUnpooledSlicedByteBuf.java:474)
at io.netty.buffer.AbstractUnpooledSlicedByteBuf.<init>(AbstractUnpooledSlicedByteBuf.java:38)
at io.netty.buffer.UnpooledSlicedByteBuf.<init>(UnpooledSlicedByteBuf.java:24)
at io.netty.buffer.AbstractByteBuf.slice(AbstractByteBuf.java:1223)
at com.backblaze.erasure.fec.FecDecode.decode(FecDecode.java:211)
at kcp.Ukcp.input(Ukcp.java:135)
at kcp.ReadTask.execute(ReadTask.java:60)
at threadPool.netty.NettyMessageExecutor.lambda$execute$0(NettyMessageExecutor.java:32)
at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)