Giter VIP home page Giter VIP logo

jkcp's Introduction

jkcp

kcp for java . base on netty .

kcp是一种独立于底层通信协议的重传算法,jkcp直接构建于udp之上 并提供方便的编程接口,只需要继承相关的类即可;用户不用关心udp 和kcp的使用细节就能轻松驾驭视频、moba类等需要高速传输环境的应用开发

坐标

  <dependency>
      <groupId>org.beykery</groupId>
      <artifactId>jkcp</artifactId>
      <version>1.3.1</version>
  </dependency>

使用

请参考src/test/java/test目录下的TestServer和TestClient

kcp的算法细节请参考

kcp

jkcp's People

Contributors

beykery avatar dependabot[bot] avatar linkerlin avatar lonng avatar tidus5 avatar wangmanjiayou avatar williamoony avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

jkcp's Issues

给两个连接推送消息的时候,可能消息被release,导致发送失败

下面是日志。收到客户端准备完成的消息,就给两个客户端都发送战斗开始。
但是会出现有消息取不到数据,提示 消息已经被free 的情况。
下面 add send bb 打印是在 KcpOnUdp 的send方法

public synchronized void send(ByteBuf bb)
{
if (!closed)
{
this.sendList.add(bb);
System.err.println("add send bb:"+ Thread.currentThread().getName()+" hash:"+bb.hashCode()+" bb:"+bb+" ref:"+bb.refCnt());
this.needUpdate = true;
}
}

send fail 是在 Kcp 的send 方法

public int send(ByteBuf buffer) {
System.err.println(" sending buffer :" +Thread.currentThread().getName()+" hash:"+ buffer.hashCode() + " buffer:" + buffer+" ref:"+buffer.refCnt());
if (buffer.readableBytes() == 0) {
System.err.println(" send fail, buffer :" + Thread.currentThread().getName()+" hash:" + buffer.hashCode() + " buffer:" + buffer+ " ref:"+buffer.refCnt());
return -1;
}

下面是出错日志:

2018-06-27 14:45:38][kcp thread 0] INFO - at com.test.fightserver.room.Room.clientReady(Room.java:60) -[ kcp ready:local: /0.0.0.0:2222 remote: /192.168.1.105:60305]
2018-06-27 14:45:38][kcp thread 0] INFO - at com.test.fightserver.room.Room.clientReady(Room.java:74) -[ all ready ]
2018-06-27 14:45:38][kcp thread 0] INFO - at com.test.fightserver.room.Room.clientReady(Room.java:90) -[ send player enter to local: /0.0.0.0:2222 remote: /192.168.1.103:60354 msg: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 30, cap: 30) readableBytes :30]
2018-06-27 14:45:38][kcp thread 0] INFO - at com.test.fightserver.room.Room.clientReady(Room.java:90) -[ send player enter to local: /0.0.0.0:2222 remote: /192.168.1.105:60305 msg: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 30, cap: 30) readableBytes :30]
2018-06-27 14:45:38][kcp thread 0] INFO - at com.test.fightserver.room.Room.clientReady(Room.java:118) -[ send battle start to local: /0.0.0.0:2222 remote: /192.168.1.103:60354 msg: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 12, cap: 12) readableBytes :12]
2018-06-27 14:45:38][kcp thread 0] INFO - at com.test.fightserver.room.Room.clientReady(Room.java:118) -[ send battle start to local: /0.0.0.0:2222 remote: /192.168.1.105:60305 msg: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 12, cap: 12) readableBytes :12]
add send bb:kcp thread 0 hash:1304880682 bb:UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 12, cap: 12) ref:1
sending buffer :kcp thread 0 hash:1128752854 buffer:UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 30, cap: 30) ref:1
sending buffer :kcp thread 0 hash:1304880682 buffer:UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 12, cap: 12) ref:1
sending buffer :kcp thread 0 hash:1128752854 buffer:UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 30, cap: 30) ref:1
sending buffer :kcp thread 0 hash:1304880682 buffer:UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 12, cap: 12) ref:1
add send bb:pool-3-thread-1 hash:-168963106 bb:UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 16, cap: 16) ref:1
add send bb:pool-3-thread-1 hash:-168963106 bb:UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 16, cap: 16) ref:1
2018-06-27 14:45:38][kcp thread 0] INFO - at com.test.fightserver.server.UdpServer.handleReceive(UdpServer.java:48) -[ client msg:3012 kcp:local: /0.0.0.0:2222 remote: /192.168.1.103:60354]
sending buffer :kcp thread 0 hash:-168963106 buffer:UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 16, cap: 16) ref:1
sending buffer :kcp thread 0 hash:1 buffer:UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(freed) ref:0
send fail, buffer :kcp thread 0 hash:1 buffer:UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(freed) ref:0
2018-06-27 14:45:38][kcp thread 0] ERROR - at com.test.fightserver.server.UdpServer.handleException(UdpServer.java:62) -[ ]
java.lang.IllegalStateException: send error : -1
at org.beykery.jkcp.KcpOnUdp.update(KcpOnUdp.java:198) [classes/:?]
at org.beykery.jkcp.KcpThread.run(KcpThread.java:154) [classes/:?]
2018-06-27 14:45:38][kcp thread 0] INFO - at com.test.fightserver.server.UdpServer.handleClose(UdpServer.java:68) -[ client close................local: /0.0.0.0:2222 remote: /192.168.1.105:60305]

KcpOnUdp的lastTime字段默认值问题

我进行了部分测试,连接上大量客户端后关闭客户端程序,在服务器端的KcpOnUdp对象没有从map中移除,经过查代码发现有这样的问题:
如果一个客户端上来发送的第一个包不完整后面不发送数据则lastTime为0

   if (this.timeout > 0 && lastTime > 0 && System.currentTimeMillis() - this.lastTime > this.timeout)
    {
      this.closed = true;
      this.release();
      this.listerner.handleClose(this);
    }

这里的close状态永远不会被设置为true,这样导致KcpOnUdp永远存在内存里面了

lastTime 是否应该考虑放在接受到消息就设置时间,而不是收到完整包再设置时间?

为什么test中的实例 和netty的UDP或者TCP直接发送相比,每秒钟发送消息数不如后者多?

TestServer 和 TestClient 的 handleRecieve 都改为类似
@OverRide
public void handleReceive(ByteBuf bb, KcpOnUdp kcp) {
String content = bb.toString(Charset.forName("utf-8"));
System.out.println("KCP "+ content +System.currentTimeMillis());
kcp.send(bb);// echo
}
互相回发,Client的main函数中 sleep 1s 后调用close() 方法,测试下来Server端可以打印100行左右。
而用Netty直接写UDP测试1s可以打印出近2000行消息,TCP 1s 也可以打印近1800行消息。

造成这种差异的原因,是KCP算法本身调度,和增加了流量大小所致,还是别的原因?

测试DUP 用例:
public class Server {
private static Logger logger = LoggerFactory.getLogger(Server.class);

/**
 * udp服务端,接受客户端发送的广播
 */
public static void initServer() {
	EventLoopGroup group = new NioEventLoopGroup();
	try {
		Bootstrap bootstrap = new Bootstrap();
		bootstrap.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
				.handler(new UdpServerHandler());
		Channel channel = bootstrap.bind(AppConstants.SEARCH_PORT).sync().channel();
		channel.closeFuture().await();
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		group.shutdownGracefully();
	}
}

private static class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
		System.out.println("UDP "+ req +System.currentTimeMillis());
		ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(req, CharsetUtil.UTF_8), msg.sender()));
	}
}

public static void main(String args[]) {
	Server.initServer();
}

}

public class Client {
private static Logger logger = LoggerFactory.getLogger(Client.class);
private int scanPort;

public Client(int scanPort) {
	this.scanPort = scanPort;
}

private static class ClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
		String response = msg.content().toString(CharsetUtil.UTF_8);
		System.out.println("UDP "+ response +System.currentTimeMillis());
		ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(response, CharsetUtil.UTF_8), msg.sender()));
	}
}

public void sendPackage() {
	EventLoopGroup group = new NioEventLoopGroup();
	try {
		Bootstrap b = new Bootstrap();
		b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
				.handler(new ClientHandler());

		Channel ch = b.bind(0).sync().channel();

		ch.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("hello!!!", CharsetUtil.UTF_8),
				new InetSocketAddress("255.255.255.255", scanPort))).sync();
	} catch (Exception e) {
		e.printStackTrace();
	} finally {
		group.shutdownGracefully();
	}
}

public static void main(String args[]) {
	Client client = new Client(AppConstants.SEARCH_PORT);
	client.sendPackage();
}

}

public class AppConstants {

public static final int SEARCH_PORT = 22023;

}

自定义编解码

非常感谢您提供这个开源代码,对我很有帮助。

如果加入自定义编解码,我自定义的编解码器应该放在哪里呢,目前看来似乎是要在TestServer的handleReceive中对收到的Bytebuf消息进行编解码处理。
这样以来就没办法利用pipline()来进行消息流处理。
/ch.pipeline().addLast(new UdpMessageDecoder()); // 消息加解密:消息ID(4字节)/

不知道我又没有把问题描述清楚,感谢!

KcpClient exceptionCaught是否调用错方法

@OverRide
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
KcpClient.this.handleException(cause, null);
KcpClient.this.close();
}

KcpClient.this.close(); 是否调整为KcpClient.this.handleClose()更为合理

丢包?

在echo测试下,当发送大于1376大小的数据而分包时,偶尔出现类似丢包现象:收到124之类大小的包。

发送9000个字节的消息会接收失败

客户端建立连接后,发送
ByteBuf bb = Unpooled.buffer(128);
int N = 9000;
for (int i = N; i > 0; i-=4) {
bb.writeInt(i);
}
tc.send(bb);

调试发现 server 的 Kcp.java 的 peekSize 总是走到
if (rcv_queue.size() < seq.frg + 1)
{
return -1;
}

关于结构

我觉得,因为kcp是纯UDP的。
完全可以用java原生的API,单线程就可以实现。最多把发包和收包各一个线程。一共三个线程就够了。
不需要启动那么多的线程。

接收和发送数据缓存

接收和发送数据缓存是不是用 环数组要好点呀,java 有提供的 ArrayDeque ; 数组会比链表快好多把

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.