terry-mao / goim Goto Github PK
View Code? Open in Web Editor NEWgoim
Home Page: https://goim.io/
License: MIT License
goim
Home Page: https://goim.io/
License: MIT License
首先运行kafka.sh,如下:
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic KafkaPushsTopic
然后运行goim.sh,如下:
cd $GOPATH/bin
nohup $GOPATH/bin/router -c $GOPATH/bin/router.conf 2>&1 > /home/dev/golang2/logs/goim/panic-router.log &
nohup $GOPATH/bin/logic -c $GOPATH/bin/logic.conf 2>&1 > /home/dev/golang2/logs/goim/panic-logic.log &
nohup $GOPATH/bin/comet -c $GOPATH/bin/comet.conf 2>&1 > /home/dev/golang2/logs/goim/panic-comet.log &
nohup $GOPATH/bin/job -c $GOPATH/bin/job.conf 2>&1 > /home/dev/golang2/logs/goim/panic-job.log &
接下来到src/goim/examples/javascript,运行
go run main.go
之后打开浏览器
firefox http://localhost:1999
然后看到两个大字 “client demo”
之后该怎么测试?如下面那样发消息?
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic KafkaPushsTopic
消息内容1
消息内容2
是不是消息格式不对?浏览器上没有任何反应,websoket也没报错
panic-comet.log有一些打印:
[06/10/16 20:38:12] [DEBG] ring wp: 8, idx: 0
[06/10/16 20:38:12] [DEBG] ring rp: 8, idx: 0
[06/10/16 20:38:22] [DEBG] ring wp: 9, idx: 1
[06/10/16 20:38:22] [DEBG] ring rp: 9, idx: 1
[06/10/16 20:38:32] [DEBG] ring wp: 10, idx: 2
[06/10/16 20:38:32] [DEBG] ring rp: 10, idx: 2
[06/10/16 20:38:42] [DEBG] ring wp: 11, idx: 3
[06/10/16 20:38:42] [DEBG] ring rp: 11, idx: 3
[06/10/16 20:38:52] [DEBG] ring wp: 12, idx: 4
[06/10/16 20:38:52] [DEBG] ring rp: 12, idx: 4
最近在做kafka相关的东西
看了一下本项目用的是github.com/wvanbergen/kafka/consumergroup
这个client,但是貌似这个库在kafka的partition节点发生变动的时候不会自动感知吧orz
Hi, 我在自己的项目中借用了goim的timer实现 :), 使用过程中发现了一个问题:
当前的timer实现是检测不到某个TimerData
已经被Del
过,在以下场景中会导致死循环。
tmr := NewTimer(100)
t1 := tmr.Add(time.Second, "test_1", func() {
fmt.Println("test_1")
})
println(t1)
println(tmr.free)
println(tmr.free.next)
tmr.Del(t1)
println(tmr.free)
println(tmr.free.next)
tmr.Del(t1)
println(tmr.free) // 由于Del中会将 td 设置为free,在这时,timer中的free 和 free.next 发生了循环引用
println(tmr.free.next)
t2 := tmr.Add(time.Second, "test_2", func() {
fmt.Println("test_2")
})
t3 := tmr.Add(3*time.Second, "test_3", func() {
fmt.Println("test_3")
})
println(t2)
println(t3) // 由于t2过期后会将index置为-1, t3过期删除自己时,会因为检测到已过期而跳过从`timers`中删除的步骤导致 t3 被反复执行
time.Sleep(1 * time.Hour)
具体是在
https://github.com/Terry-Mao/goim/blob/master/libs/time/timer.go#L94
func (t *Timer) put(td *TimerData) {
td.next = t.free // 如果 td == t.free , 这里会导致 `timers` 中出现重复的`TimerData`指针
t.free = td
}
刚刚定位到问题,还没写fix 😆
日志里错误打印:
dispatch websocket error(json: error calling MarshalJSON for type json.RawMessage: invalid character '\x00' looking for beginning of value)
case就是doc里的 curl -d "{"test": 1}" http://127.0.0.1:7172/1/push/room?rid=1
roomId我是在client.js里做auth的时候随意个userId在body里, 让logic里走到 roomId=1的逻辑返回里生成的roomId
我打日志跟了一下, job模块里,在做proto.Body入buf之前都还是正常的,bytes.Writer之后RPC call到comet就不对了, 不知道我看的对不对。
看上去是在做proto.WriteTo的地方构建协议包的地方offset截断了json中的一个字符导致, proto的Operation也由5变成11,12之类的了,很像是offset问题
不知道是否是这样
websocket下有一处内存泄露,跟了一下pprof应该是在做ws close的时候:
trd = tr.Add(server.Options.HandshakeTimeout, func() {
conn.Close()
})
我觉得此处的闭包会导致conn指针被store, gc的时候会漏掉这里, 随着连接不断建立,断开。 pprof里显示ws使用的内存只增不减, 而且都集中在websocket.newConn,
哦对,我这边使用gorilla/websocket替代了golang.org/x/net/websocket
对比测试针对trd做一次fn reset后gc能正确的回收了。
待我线上确定。我放个pr, 只有简单几行
看tcp的代码, 应该一样会有这个问题,不知道是否有人遇到过.
好像没看到集群的功能啊
从功能上看,把gopush-cluster的client上行协议扩展一下,然后comet通过rpc到web,就可以实现用户A和用户B双向通信了。
群组这些功能在管道上面做在业务逻辑层里面就可以了。
还可以避免在goim里面再定义一套私有协议。
请问从头开始写goim的原因是什么呢?
github.com/Terry-Mao/goim/comet/operation.go
func (operator *DefaultOperator) Disconnect(key string, rid int32) (err error) {
var has bool
if has, err = disconnect(key, rid); err != nil {
return
}
if has {
log.Warn("disconnect key: \"%s\" not exists", key)
}
return
}
if has {
log.Warn("disconnect key: \"%s\" not exists", key)
}
是当key不存在时才打印?
当前的router集群把数据都存储在了内存中,如果服务发布或者其他不可抗拒原因,router服务挂掉(或重启掉) 了,userId
与subkey
和serverId
的对应关系就丢失了,推送服务等于不可用了。 @Terry-Mao 看了代码没有找到router数据落地与恢复
逻辑,当前的玩法是怎么样子的?
有几点问题:
golang有很先进的密码学算法库,几点建议:
1.对称部分,建议直接使用最先进的aead算法,可选的有2种:aes-gcm,chacha20-poly1305 ,
2.非对称部分,建议先改成 rsa-oaep 2048比特做认证+密钥交换,后续完全可以改成 ecdsa+ecdh
在bucket的broadcast中,读数组长度时,个人觉得用RLock()替换Lock()会更好:)
eric@go:~/go/bin$ ./push 0 20000 localhost:7172 60
[06/16/16 18:55:58] [INFO] req: "/1/push", post: "{"m":{"test":1},"u":10000}", res:"{"ret":65535}", ip:"127.0.0.1:56348", time:"0.000036s"
[06/16/16 18:55:58] [EROR] strconv.Atoi("") error(strconv.ParseInt: parsing "": invalid syntax)
[06/16/16 18:55:58] [INFO] req: "/1/push", post: "{"m":{"test":1},"u":10000}", res:"{"ret":65535}", ip:"127.0.0.1:56348", time:"0.000018s"
[06/16/16 18:55:58] [EROR] strconv.Atoi("") error(strconv.ParseInt: parsing "": invalid syntax)
[06/16/16 18:55:58] [INFO] req: "/1/push", post: "{"m":{"test":1},"u":10000}", res:"{"ret":65535}", ip:"127.0.0.1:56348", time:"0.006457s"
使用java sdk和benchmark/client进行测试是comet TCP出错:
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x40a0ee]
goroutine 33205 [running]:
main.LogSlow(0x898900, 0x6, 0xc82033d9b0, 0x4, 0x0)
/home/eric/go/src/goim/comet/slow.go:50 +0x55e
main.(_Server).dispatchTCP(0xc82575d8f0, 0xc82033d9b0, 0x4, 0xc82002c2f0, 0xc82005bbe8, 0xc848f38258, 0xc84cfba000, 0xc82005bba0)
/home/eric/go/src/goim/comet/tcp.go:221 +0xa00
created by main.(_Server).serveTCP
/home/eric/go/src/goim/comet/tcp.go:132 +0x8ec
goroutine 1 [chan receive]:
main.InitSignal()
/home/eric/go/src/goim/comet/signal.go:16 +0x38c
main.main()
/home/eric/go/src/goim/comet/main.go:91 +0xb39
goroutine 17 [syscall, locked to thread]:
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:1721 +0x1
panic: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
根据文档配置完成也跑了起来,不过用客户端demo跑了一下再收到第一个心跳之后服务器端就会崩掉,错误信息如下:
[EROR] key: handshake failed error(default server codec pack length error)
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x40a11f]
goroutine 33209 [running]:
panic(0x8823a0, 0xc8200100f0)
/usr/local/go/src/runtime/panic.go:464 +0x3e6
main.LogSlow(0x917c40, 0x6, 0xc820133908, 0x3, 0x0)
/root/workspace/golang/src/goim/comet/slow.go:50 +0x55f
main.(_Server).dispatchTCP(0xc8255cb8f0, 0xc820133908, 0x3, 0xc820026280, 0xc820056ad8, 0xc848dec000, 0xc8491f6000, 0xc820056a90)
/root/workspace/golang/src/goim/comet/tcp.go:221 +0xa22
created by main.(_Server).serveTCP
/root/workspace/golang/src/goim/comet/tcp.go:132 +0x8e6
不知道是不是我配置的有问题
1.router服务存在是为了存储userId和serverId的对应关系,为什么不把router换成memcache或者redis?logic服务就没有必要在写一致性hash的相关算法了
2.就算是按照目前这种框架,在长连接的时候,comet->logic->router,而不直接comet->router,当然这样会存在一致性的问题
https://github.com/Terry-Mao/goim/blob/master/doc/proto.md
tcp的请求&返回参数定义
seq true int32 bigendian jsonp callback
不应当是
seq true int32 bigendian 序号
kafka安装还是有点小麻烦,要安装jvm,如果能把队列这块写成通用的接口,可以接入redis、MQ等的队列服务进去,让使用者选择哪个队列服务就再好不过了。谢谢!
如题 感觉文档有限 代码看起来很吃力
最近也在做推送和IM相关的优化和重构,考虑将推送作为基础模块支撑IM的运作。将上行下行消息逻辑分开处理,上行消息使用短连接发至IM逻辑处理API,下行消息通过推送服务直达客户端。GoIM有相关的重构需求么?
我在机器上布署了这个环境,但是发现收不到 push 的消息,查日志,发现在 panic-job.log 下面有下面的错误:
[Sarama] 2016/06/01 09:48:45 [kafka_topic_push_group/43efe05adb1c] KafkaPushsTopic :: FAILED to get list of partitions: zk: node does not exist
[06/01/16 09:48:45] [EROR] consumer error(kafka: error while consuming KafkaPushsTopic/-1: zk: node does not exist)
这个问题我应该怎么处理呢?
运行./push 0 20000 localhost:7172 60 出错:
[06/15/16 11:38:21] [INFO] req: "/1/push", post: "{"m":{"test":1},"u":10000}", res:"{"ret":65535}", ip:"127.0.0.1:49390", time:"0.000018s"
[06/15/16 11:38:21] [EROR] strconv.Atoi("") error(strconv.ParseInt: parsing "": invalid syntax)
最近发现在做直播的时候 使用了ijkplayer 发现跟libphone 冲突了 所以想想看这个im 是否会集成voip的通话?
请问,测试中有客户端推送消息,但客户端如何获取消息?
有没有比较完整的文档,谢谢?
可发我邮箱 [email protected]
ws测试例子,为什么在send里面发送消息body带着,onmessage查看body却是空的
function heartbeat() {
ws.send(JSON.stringify({
'ver': 1,
'op': 2,
'seq': 2,
'body': {'ok':'123456'}
}));
}
所以,请问一下,如何测试接受消息?
抱歉把github的Issuse的当提问地了,但该项目网上能查的文档真的太少了
我现在用python模仿了web客户端,建起了连接;
当我要给该客户端推送消息时,首先需要知道该连接的uid
请问我该如何获取此连接的uid呢?或者允许我在建立连接时的请求时指定?
烦请作者解惑
protorpc 参数和返回都是小对象,容易对GC产生压力,因为connect 和 disconnect 参数比较固定,使用tcp + package 方式发送包,使用值拷贝方式替换对象
comet 消息推送也是类似问题,comet 因为连接密集+内存密集,GC需要很严谨,
对于 logic 和router 以及 job 会相对还好
使用curl -d "{"test":1}" http://127.0.0.1:7172/1/push?uid=0,只能使用uid=0,如果换成其他的数字,java sdk无法收到消息
为什么?
在dispatchWebsocket时,显示的 p.Body = nil,看注释avoid memory leak
不过proto在Channel里是用指针传递的,
这里的p.Body = nil, 在一个房间大量channel并发的时候,会导致其他连接得到的是一个空的Body
目前我看到空的消息产生是这样的, 注释这里之后恢复正常。
做comet自动发现
aes解密复用cipher.BlockMode多次解密数据出错的问题,仅第一次正确,后面iv发生变化
README中是GPLv3,LICENSE文件是MIT
请明确,谢谢!
你好,在执行到第四步的第二小步,编译logic模块的时候,提示../../../Shopify/sarama/broker.go:78:28: error: reference to undefined identifier ‘tls.DialWithDialer’
b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)........
我怀疑是版本不兼容。能否上传一个depend,方便使用,谢谢
下面代码的delete map操作是否重复了?还是出于什么考虑?
https://github.com/Terry-Mao/goim/blob/master/router/cleaner.go#L82
func (c *Cleaner) remove(key int64) {
if e, ok := c.maps[key]; ok {
delete(c.maps, key)
e.prev.next = e.next
e.next.prev = e.prev
e.next = nil // avoid memory leaks
e.prev = nil // avoid memory leaks
c.size--
}
}
https://github.com/Terry-Mao/goim/blob/master/router/cleaner.go#L102
for i = 0; i < maxCleanNum; i++ {
if e = c.back(); e != nil {
if e.expire() {
c.remove(e.Key)
delete(c.maps, e.Key)
keys = append(keys, e.Key)
continue
}
}
break
}
client 发送消息
1 client->comet->logic->router
2 caller->logic->kafa->job->client
这两个消息链没有直接联系起来,所以在场景 clientA 发送 消息到 clientB 时,要再次通过 caller 来 push消息。 这样来看,goim 主要是使用场景应该是 push 消息到多个客户端。但是这样有一点,我还没有理解这里的 router 这个角色发挥了什么样的作用呢?
直接去benchmark/client目录下,go install,然后去执行,出错:
eric@go:~/go/bin$ ./client 1 5 localhost:8080
2016-06-16 18:53:39 down:0 down/s:0
[06/16/16 18:53:39] [EROR] net.Dial("localhost:8080") error(dial tcp 127.0.0.1:8080: getsockopt: connection refused)
[06/16/16 18:53:39] [EROR] net.Dial("localhost:8080") error(dial tcp 127.0.0.1:8080: getsockopt: connection refused)
但若使用 go get ./... 产生的client却不出错,难道两个源代码不一样?
round_test.go好像不是最新的
func (server *Server) writeTCPResponse(wr *bufio.Writer, proto *Proto) (err error) {
if Conf.Debug {
log.Debug("write proto: %v", proto)
}
毛总,建议用godep把所有的依赖包都保存到代码里,电脑经常不能google里的代码
在client.conf中修改type 为 2,修改cert.file 为 ./cert.pem,拷贝自带的cert.pem到bin目录下,然后运行,无法运行:
eric@go:/go/bin$ ./client 1 5000 localhost:8090/go/bin$ ./client 1 5000 localhost:8080
[06/20/16 17:28:48] [INFO] ===================WEBSOCKETS!
eric@go:
[06/20/16 17:28:55] [INFO] ===================WEBSOCKETS!
上面的提示信息是我在入口的地方添加的,除此之外没有任何提示信息
1,新闻介绍纯Golang实现,我在readme,看到要依赖废品java环境??
2,英文文档连接,点开是404。
3,中文文档连接,标题写的是《gopush-cluster》,是不是标题应该改一下?
我看了一个礼拜还是不明白模块之间数据是怎么交互的, 远程调用倒是看明白了, 但tcp长连接建立后, 用户信息和server是怎么保存到router的
docker启动比较方便
logic在做push kafka的时候错误
有遇到过吗, 跟了下,代码很深,到github.com/eapache/go-resiliency 库了
问一下,ring.go作用是什么
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.