go-zeromq / zmq4 Goto Github PK
View Code? Open in Web Editor NEW[WIP] Pure-Go implementation of ZeroMQ-4
License: BSD 3-Clause "New" or "Revised" License
[WIP] Pure-Go implementation of ZeroMQ-4
License: BSD 3-Clause "New" or "Revised" License
I want to be able to PUSH only a limited number of packets to avoid losing them if the PULL side crashes. Is this possible or planned? Looking at the code I see that go's io.ReadWriteCloser
is used underneath so it seems no message level limiting is possible.
I wrote the following small test server, which creates XPUB and XSUB sockets, and a proxy to send all messages received on the XSUB port to the XPUB port:
package main
import (
"context"
"log"
"time"
zmq "github.com/go-zeromq/zmq4"
)
func main() {
pub_conn_str := "tcp://*:9002"
sub_conn_str := "tcp://*:9003"
pub_sock := zmq.NewXPub(context.Background())
sub_sock := zmq.NewXSub(context.Background())
if err := pub_sock.Listen(pub_conn_str); err != nil {
log.Fatalf("Error binding pub to %s: %v\n", pub_conn_str, err)
}
if err := sub_sock.Listen(sub_conn_str); err != nil {
log.Fatalf("Error binding sub to %s: %v\n", sub_conn_str, err)
}
log.Println("Sleeping 5s...")
time.Sleep(5*time.Second)
log.Println("Starting proxy")
p := zmq.NewProxy(context.Background(), sub_sock, pub_sock, nil) // send anything received on sub_sock to pub_sock
p.Run()
}
And a test client which connects to both ports, sends a series of 10 messages to the pub port (which is the server's sub port), and listens for them on the sub port (the server's pub port):
package main
import (
"context"
"log"
"strconv"
"sync"
"time"
zmq "github.com/go-zeromq/zmq4"
)
func main() {
var wg sync.WaitGroup
sub_conn_str := "tcp://localhost:9002" // server's pub socket is our sub socket
pub_conn_str := "tcp://localhost:9003" // ...and vice versa
pub_sock := zmq.NewPub(context.Background())
sub_sock := zmq.NewSub(context.Background())
if err := pub_sock.Dial(pub_conn_str); err != nil {
log.Fatalf("Error connecting pub to %s: %v\n", pub_conn_str, err)
}
if err := sub_sock.Dial(sub_conn_str); err != nil {
log.Fatalf("Error connecting sub to %s: %v\n", sub_conn_str, err)
}
if err := sub_sock.SetOption(zmq.OptionSubscribe, ""); err != nil { // subscribe to all messages
log.Fatalf("Subscribe error: %v\n", err)
return
}
log.Printf("Sleeping 5s...")
time.Sleep(5*time.Second)
wg.Add(1)
go func() { // receiver goroutine - wait for incoming messages
defer wg.Done()
log.Println("Receiver starting")
for {
if msg, err := sub_sock.Recv(); err != nil {
log.Printf("Recv error: %v\n", err)
} else {
log.Printf("Received msg [%s] %s\n", msg.Frames[0], msg.Frames[1])
}
}
}()
// Just send numbers 0-9, 1s apart
for i := 0; i < 10; i++ {
msg := zmq.NewMsgFrom([]byte("TEST"), []byte(strconv.FormatInt(int64(i), 16)))
log.Printf("Sending message %d\n", i)
if err := pub_sock.Send(msg); err != nil {
log.Printf("Send error: %v\n", err)
}
time.Sleep(1*time.Second)
}
wg.Wait()
}
When I run the server in one window and the client in another, the server binds the ports properly and the client seems to be connecting and sending the messages, but the receiver in the client never gets them and never prints anything. I'm expecting to see a series of "Sending message [0-9]" and "Received msg [TEST] [0-9]". I see the "Sending message" prints, but not the "Received" ones.
I'm using Go 1.13 on Ubuntu 18.04.
One more note: I have almost exactly this same code using https://github.com/pebbe/zmq4 (with adjustments for API differences like Bind() vs. Listen(), etc.) and it works fine.
In rep/req mode, the connection used by zmq4 not properly closed,
they are all stay at CLOSE_WAIT,
the closed connection should be TIME_WAIT
netstat -nat | grep 1234
tcp 0 0 127.0.0.1:49232 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49230 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49224 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49236 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49228 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49226 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49240 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49238 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49222 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49234 127.0.0.1:1234 FIN_WAIT2
tcp6 0 0 :::1234 :::* LISTEN
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49222 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49236 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49238 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49226 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49224 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49232 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49230 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49234 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49228 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49240 CLOSE_WAIT
the following code can reproduce the issue
test code for server, golang
package main
import (
"context"
"github.com/go-zeromq/zmq4"
"log"
)
func main() {
sock := zmq4.NewRep(context.Background())
if err := sock.Listen("tcp://*:1234"); err != nil {
log.Fatal(err)
}
for {
msg, err := sock.Recv()
if err != nil {
log.Println("recv", err)
continue
}
log.Printf("recv: %s", msg.Frames[0])
if err := sock.Send(zmq4.NewMsg(msg.Frames[0])); err != nil {
log.Println("send", err)
}
}
}
test code for client, python
import zmq
import threading
import time
def f():
ctx = zmq.Context()
c = ctx.socket(zmq.REQ)
c.connect("tcp://127.0.0.1:1234")
c.send("1234")
print(c.recv())
time.sleep(1)
c.close()
threads = []
for i in range(10):
t = threading.Thread(target=f)
threads.append(t)
t.start()
for t in threads:
t.join()
optional context for recv
? Or use WithTimeout in
Line 121 in 91805d6
If I use NetMQ (https://github.com/zeromq/netmq) to connect to a socket hosted by zmq4, the zmq4 host crashes with "zmq4: invalid greeting received" (
Line 115 in 78ce94b
It seems that NetMQ does follow the negotiation protocol correctly (https://rfc.zeromq.org/spec:23/ZMTP/#version-negotiation), but zmq4 does not...
Unfortunately I lack Go skills to fix this bug.
Hello - I'm new to zmq, but when inspecting packets using Wireshark over a loopback connection, I see that the multipart message is being sent out in separate TCP frames. On the same machine, libzmq sends a multipart message within a single frame with the correct delimiters & length octets between each part of the multipart message. But in zmq4, it appears that you're sending the flags and length within a single packet and then the message part followed by the next flag and length in a TCP packet followed by the next part and so on.
Is this expected? I haven't dug far enough into the implementation yet, but was wondering if 1) this is known and 2) this is expected?
Thx!
$> go doc net.Buffers
type Buffers [][]byte
Buffers contains zero or more runs of bytes to write.
On certain machines, for certain types of connections, this is optimized
into an OS-specific batch write operation (such as "writev").
func (v *Buffers) Read(p []byte) (n int, err error)
func (v *Buffers) WriteTo(w io.Writer) (n int64, err error)
push/pull
req/rep
dealer/router
pub/sub
xpub
xsub
pair
bus
multi-part messages
Send
if there're no subscribers. I think it should never block on PUB socket.pub := zmq4.NewPub(context.Background())
err := pub.Listen("tcp://*:3000")
err = pub.Send(zmq4.NewMsg([]byte("MSG")))
I'm trying to recreate this system (from the ZMQ Guide), but I can't get it to work:
I've got 3 programs: a publisher in pub/main.go
, a proxy in proxy/main.go
, and a subscriber in sub/main.go
. Below are full codes of these programs:
package main
import (
"flag"
"fmt"
"log"
"time"
"github.com/docker/distribution/context"
zmq "github.com/go-zeromq/zmq4"
)
var defaultMsg = `{` +
`"action": 2,` +
`"container_id": "0123456789AB"` +
`}`
var (
port *int
msg *string
)
func init() {
port = flag.Int("port", 8080, "port to dial to")
msg = flag.String("msg", defaultMsg, "default message to send")
flag.Parse()
}
func main() {
addr := fmt.Sprintf("tcp://localhost:%d", *port)
socket := zmq.NewPub(context.Background())
err := socket.Dial(addr)
if err != nil {
log.Fatalf("failed to dial: %v\n", err)
}
defer socket.Close()
log.Printf("listening on %s\n", addr)
for {
msg := zmq.NewMsgFromString([]string{*msg})
err := socket.Send(msg)
if err != nil {
panic(err) // Interrupted
}
log.Println("message sent")
time.Sleep(2 * time.Second) // Wait for 2 seconds
}
}
package main
import (
"context"
"flag"
"fmt"
"log"
zmq "github.com/go-zeromq/zmq4"
)
var (
pubPort *int
subPort *int
)
func init() {
}
func main() {
subPort = flag.Int("port-sub", 8080, "port to listen for publisher input on")
pubPort = flag.Int("port-pub", 8081, "port to publish output on")
flag.Parse()
ctx := context.Background()
subSocket := zmq.NewXSub(ctx)
subAddr := fmt.Sprintf("tcp://localhost:%v", *subPort)
err := subSocket.Listen(subAddr)
if err != nil {
log.Fatalf("failed to dial: %v\n", err)
}
err = subSocket.SetOption(zmq.OptionSubscribe, "")
if err != nil {
log.Fatalf("failed to set option: %v\n", err)
}
pubSocket := zmq.NewXPub(ctx)
pubAddr := fmt.Sprintf("tcp://localhost:%v", *pubPort)
err = pubSocket.Listen(pubAddr)
if err != nil {
log.Fatalf("failed to listen: %v\n", err)
}
listener := zmq.NewPair(ctx)
listener.Listen("inproc://pipe")
err = listener.SetOption(zmq.OptionSubscribe, "")
if err != nil {
log.Fatalf("failed to set option: %v\n", err)
}
go func() {
for {
log.Println("listener is waiting for a message")
msg, err := listener.Recv()
if err != nil {
log.Fatalf("failed to recv from socket: %v\n", err)
}
b := msg.Bytes()
fmt.Printf("message: %s, len: %v\n", b, len(b))
}
}()
proxy := zmq.NewProxy(ctx, subSocket, pubSocket, listener)
log.Printf("listening for publishers on %s\n", subAddr)
log.Printf("listening for subscribers on %s\n", pubAddr)
fmt.Println("running proxy...")
err = proxy.Run()
if err != nil {
log.Fatalln(err)
}
}
package main
import (
"context"
"flag"
"fmt"
"log"
zmq "github.com/go-zeromq/zmq4"
)
func main() {
port := flag.Int("port", 8081, "port to listen on")
flag.Parse()
socket := zmq.NewSub(context.Background())
defer socket.Close()
addr := fmt.Sprintf("tcp://localhost:%d", *port)
err := socket.Dial(addr)
if err != nil {
log.Fatalf("failed to dial: %v\n", err)
}
err = socket.SetOption(zmq.OptionSubscribe, "")
if err != nil {
log.Fatalf("failed to set option: %v\n", err)
}
fmt.Printf("dialing to %s\n", addr)
for {
msg, err := socket.Recv()
if err != nil {
log.Fatalf("failed to recv from socket: %v\n", err)
}
b := msg.Bytes()
fmt.Printf("message: %s, len: %v\n", b, len(b))
}
}
I'm pretty sure I'm using correct socket types and also enable subscribing all topics using socket.SetOption(zmq.OptionSubscribe, "")
. I don't have any more ideas on how to get this simple example to work.
Might be related to #108
Noticed that there may be problem to send 1st response over REP socket with v0.14.0
The issue seem that receiver listener goroutine starts prior connection added to repReader/repWriter, so it may happens that send over REP called prior writer get the connection registered.
The next test is failing:
func TestRepSocket(t *testing.T) {
defer goleak.VerifyNone(t)
assert := asserts.New(t)
ctx, cancel := context.WithCancel(context.Background())
rep := zmq4.NewRep(ctx)
ep := "ipc://@test.rep.socket"
err := rep.Listen(ep)
assert.NoError(err)
maxClients := 100
maxMsgs := 1000
wgClients := &sync.WaitGroup{}
wgServer := &sync.WaitGroup{}
client := func() {
defer wgClients.Done()
ping := zmq4.NewMsgString("ping")
for n := 0; n < maxMsgs; n++ {
func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
req := zmq4.NewReq(ctx)
err := req.Dial(ep)
assert.NoError(err)
err = req.Send(ping)
assert.NoError(err)
msg, err := req.Recv() // WARN the ctx is not supported yet!!!
assert.NoError(err)
assert.Equal(string(msg.Frames[0]), "pong")
err = req.Close()
assert.NoError(err)
}()
}
}
server := func() {
defer wgServer.Done()
pong := zmq4.NewMsgString("pong")
for {
msg, err := rep.Recv()
if errors.Is(err, context.Canceled) {
break
}
assert.NoError(err)
if err != nil {
break
}
assert.Equal(string(msg.Frames[0]), "ping")
err = rep.Send(pong)
assert.NoError(err)
}
}
wgServer.Add(1)
go server()
wgClients.Add(maxClients)
for n := 0; n < maxClients; n++ {
go client()
}
wgClients.Wait()
cancel()
wgServer.Wait()
rep.Close()
}
per http://api.zeromq.org/4-1:zmq-disconnect, it'll be convenient to be able to disconnect a specific endpoint on a socket rather than maintaining multiple sockets and using Socket.Close()
I've setup a subscriber socket with a given topic. However, it seems that sub.Recv()
is non blocking and constantly returns empty messages even when the publisher isn't sending any messages. Is this behavior wanted and if so, how can I make sub.Recv()
block until actually a message is available?
Here is a code snippet showing my intend:
len(msg.Frames) == 0
is constantly hit
// 2 second connect timeout
ctx, _ := context.WithTimeout(context.Background(), time.Duration(2)*time.Second)
sub := zmq4.NewSub(ctx)
if err := sub.Dial(fmt.Sprintf("tcp://%s", zmqURL)); err != nil {
collectLog.Errorf("coul not dial: %s", err)
return err
}
if err := sub.SetOption(zmq4.OptionSubscribe, "spent_address"); err != nil {
collectLog.Errorf("coul not dial: %s", err)
return err
}
collectLog.Info("connected")
for {
msg, err := sub.Recv()
if err != nil {
return err
}
if msg.Err() != nil {
collectLog.Error(msg.Err())
continue
}
if len(msg.Frames) == 0 {
continue
}
// handle message
}
Sorry for bit wacky title but step into problem that next test is fails.
func TestPullSocket1(t *testing.T) {
assert := asserts.New(t)
// Create pull socket
ctx, cancel := context.WithCancel(context.Background())
pull := zmq4.NewPull(ctx)
assert.NotNil(pull)
err := pull.Listen("ipc://@TestPullSocket1")
assert.NoError(err)
// Cancel and close socket
// This as well might be in time.AfterFunc with the same result!!!!
cancel()
err = pull.Close()
assert.NoError(err)
// Read out
msg, err := pull.Recv()
assert.Error(err)
_ = msg
// Final close
err = pull.Close()
_ = err
}
To me it looks like Close() should make qreader.sem.enable() as in this case qreader.addConn() is never called and qreader.read() stuck forever in q.sem.lock()
I tried using pebbe/zmq on windows (had great success of linux) but impossible to make it run as libzmq is need straight forward installable on windows.
How come go-zeromq works no problem without these issues?
in other words, how would i modify pebbe to work as go-zeromq does since the latter is not going to be maintained?
as noted in #76 (comment) there's a bit of a confusing ownership between who's responsible for closing net.Conn
between the underlying zmq4.socket
and the various {r,w}pool
implementations.
this leads to either non-executed code or "double-close" of connections.
we should probably make sure the {r,w}pool
implementations have the complete ownership/responsibility for closing connections.
goroutine stack behind, how to fix this problem?
1 @ 0x43a576 0x44aff3 0x44afcd 0x4661e5 0x481732 0x74fa67 0x7763d7 0x774a46 0x78c84d 0x78cd87 0x78cf78 0x46a141
# 0x4661e4 sync.runtime_Semacquire+0x24 /opt/go/src/runtime/sema.go:56
# 0x481731 sync.(*WaitGroup).Wait+0x51 /opt/go/src/sync/waitgroup.go:136
# 0x74fa66 golang.org/x/sync/errgroup.(*Group).Wait+0x26 /home/alexw/go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:53
# 0x7763d6 github.com/go-zeromq/zmq4.(*routerMWriter).write+0x2d6 /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/router.go:242
# 0x774a45 github.com/go-zeromq/zmq4.(*routerSocket).Send+0xe5 /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/router.go:40
# 0x78c84c sdwan/ctrl/device.(*ZeroMQ).sendMsg+0x28c /home/alexw/workspace/hw-agent/ctrl/device/zmq.go:60
# 0x78cd86 sdwan/ctrl/device.(*ZeroMQ).runSend+0x126 /home/alexw/workspace/hw-agent/ctrl/device/zmq.go:102
# 0x78cf77 sdwan/ctrl/device.RunZmq+0xf7 /home/alexw/workspace/hw-agent/ctrl/device/zmq.go:124
1 @ 0x43a576 0x44aff3 0x44afcd 0x466305 0x47fca5 0x47fadb 0x778a99 0x771a02 0x7719ee 0x771b39 0x77117c 0x76fb2d 0x7764ba 0x74fc04 0x46a141
# 0x466304 sync.runtime_SemacquireMutex+0x24 /opt/go/src/runtime/sema.go:71
# 0x47fca4 sync.(*Mutex).lockSlow+0x164 /opt/go/src/sync/mutex.go:162
# 0x47fada sync.(*Mutex).Lock+0x3a /opt/go/src/sync/mutex.go:81
# 0x778a98 github.com/go-zeromq/zmq4.(*socket).scheduleRmConn+0x38 /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/socket.go:334
# 0x771a01 github.com/go-zeromq/zmq4.(*Conn).notifyOnCloseError+0x41 /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:500
# 0x7719ed github.com/go-zeromq/zmq4.(*Conn).SetClosed+0x2d /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:472
# 0x771b38 github.com/go-zeromq/zmq4.(*Conn).checkIO+0xf8 /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:492
# 0x77117b github.com/go-zeromq/zmq4.(*Conn).send+0x17b /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:357
# 0x76fb2c github.com/go-zeromq/zmq4.(*Conn).SendMsg+0x16c /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:190
# 0x7764b9 github.com/go-zeromq/zmq4.(*routerMWriter).write.func1+0x39 /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/router.go:239
# 0x74fc03 golang.org/x/sync/errgroup.(*Group).Go.func1+0x63 /home/alexw/go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:75
1 @ 0x43a576 0x44aff3 0x44afcd 0x466305 0x47fca5 0x47fadb 0x778a99 0x771a02 0x7719ee 0x771b39 0x77188f 0x7758c8 0x46a141
# 0x466304 sync.runtime_SemacquireMutex+0x24 /opt/go/src/runtime/sema.go:71
# 0x47fca4 sync.(*Mutex).lockSlow+0x164 /opt/go/src/sync/mutex.go:162
# 0x47fada sync.(*Mutex).Lock+0x3a /opt/go/src/sync/mutex.go:81
# 0x778a98 github.com/go-zeromq/zmq4.(*socket).scheduleRmConn+0x38 /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/socket.go:334
# 0x771a01 github.com/go-zeromq/zmq4.(*Conn).notifyOnCloseError+0x41 /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:500
# 0x7719ed github.com/go-zeromq/zmq4.(*Conn).SetClosed+0x2d /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:472
# 0x771b38 github.com/go-zeromq/zmq4.(*Conn).checkIO+0xf8 /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:492
# 0x77188e github.com/go-zeromq/zmq4.(*Conn).read+0x68e /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:385
# 0x7758c7 github.com/go-zeromq/zmq4.(*routerQReader).listen+0x1a7 /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/router.go:161
1 @ 0x43a576 0x44aff3 0x44afcd 0x466305 0x47fca5 0x481296 0x481275 0x7782c5 0x777c5c 0x46a141
# 0x466304 sync.runtime_SemacquireMutex+0x24 /opt/go/src/runtime/sema.go:71
# 0x47fca4 sync.(*Mutex).lockSlow+0x164 /opt/go/src/sync/mutex.go:162
# 0x481295 sync.(*Mutex).Lock+0x35 /opt/go/src/sync/mutex.go:81
# 0x481274 sync.(*RWMutex).Lock+0x14 /opt/go/src/sync/rwmutex.go:139
# 0x7782c4 github.com/go-zeromq/zmq4.(*socket).addConn+0x64 /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/socket.go:285
# 0x777c5b github.com/go-zeromq/zmq4.(*socket).accept+0x1fb /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/socket.go:227
1 @ 0x43a576 0x44aff3 0x44afcd 0x466305 0x47fca5 0x775ea5 0x775e75 0x77897c 0x778f1d 0x46a141
# 0x466304 sync.runtime_SemacquireMutex+0x24 /opt/go/src/runtime/sema.go:71
# 0x47fca4 sync.(*Mutex).lockSlow+0x164 /opt/go/src/sync/mutex.go:162
# 0x775ea4 sync.(*Mutex).Lock+0x64 /opt/go/src/sync/mutex.go:81
# 0x775e74 github.com/go-zeromq/zmq4.(*routerMWriter).rmConn+0x34 /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/router.go:211
# 0x77897b github.com/go-zeromq/zmq4.(*socket).rmConn+0x23b /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/socket.go:329
# 0x778f1c github.com/go-zeromq/zmq4.(*socket).connReaper+0x17c /home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/socket.go:393
Closing a Router socket without connections returns zmq4: invalid socket
error:
func TestRouterInvalidSocket(t *testing.T) {
router := zmq4.NewRouter(context.Background())
router.Listen("tcp://*:*")
err := router.Close()
if err != nil {
t.Errorf("want err %v, got %q", nil, err)
}
}
it is coming from socket.go
:
// Close closes the open Socket
func (sck *socket) Close() error {
...
sck.mu.RLock()
defer sck.mu.RUnlock()
if sck.conns == nil {
return errInvalidSocket
}
...
}
After the router node restarts,dealer node send messages normally, but router node cannot receive the message sent by the dealer
After disconnecting one of several SUBs, PUB starts returning the following error on Send:
zmq4: error sending frame 1/1: write tcp 127.0.0.1:1234->127.0.0.1:58966: use of closed network connection
Effectively, the server needs to be restarted after one client has been disconnected.
A minimal Go example for this would be the following, but it also happens with clients in other languages:
package main
import (
"context"
zmq "github.com/go-zeromq/zmq4"
"log"
"time"
)
// Send continuously
func send() {
sock := zmq.NewPub(context.Background())
defer sock.Close()
if err := sock.Listen("tcp://*:1234"); err != nil {
log.Fatal(err)
}
for {
if err := sock.Send(zmq.NewMsgString("test")); err != nil {
log.Println(err)
}
time.Sleep(time.Second)
}
}
// Receive once
func receive() {
sub := zmq.NewSub(context.Background())
defer sub.Close()
err := sub.Dial("tcp://localhost:1234")
if err != nil {
log.Fatal(err)
}
err = sub.SetOption(zmq.OptionSubscribe, "test")
if err != nil {
log.Fatal(err)
}
msg, err := sub.Recv()
if err != nil {
log.Fatal(err)
}
log.Println(msg)
}
func main() {
go send()
receive() // receive one and close
time.Sleep(5 * time.Second)
}
Could you consider to add a new field in socket like retryTimes
, to support modify retry times in socket.go , rather than use 10 as default, like:
func WithDialerRetryTime(retryTimes int) Option {
return func(s *socket) {
s.retryTimes = retryTimes
}
}
It would be awesome if you would add a channel interface based on the concept of vaughan0/go-zmq.
it would seem #47 introduced a data race on socket.closedConns
.
==================
WARNING: DATA RACE
Write at 0x00c000278130 by goroutine 87:
runtime.closechan()
/home/travis/.gimme/versions/go1.11.13.linux.amd64/src/runtime/chan.go:327 +0x0
github.com/go-zeromq/zmq4.(*socket).Close.func1()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:92 +0x6e
github.com/go-zeromq/zmq4.(*socket).Close()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:116 +0x317
github.com/go-zeromq/zmq4.(*dealerSocket).Close()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/dealer.go:26 +0x67
runtime.call32()
/home/travis/.gimme/versions/go1.11.13.linux.amd64/src/runtime/asm_amd64.s:522 +0x3a
testing.(*common).Fatalf()
/home/travis/.gimme/versions/go1.11.13.linux.amd64/src/testing/testing.go:634 +0x94
github.com/go-zeromq/zmq4_test.TestRouterDealer.func1()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/zmq4_routerdealer_test.go:233 +0xf09
testing.tRunner()
/home/travis/.gimme/versions/go1.11.13.linux.amd64/src/testing/testing.go:827 +0x162
Previous read at 0x00c000278130 by goroutine 50:
runtime.chansend()
/home/travis/.gimme/versions/go1.11.13.linux.amd64/src/runtime/chan.go:140 +0x0
github.com/go-zeromq/zmq4.(*socket).scheduleRmConn()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:290 +0x105
github.com/go-zeromq/zmq4.(*socket).scheduleRmConn-fm()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:184 +0x4b
github.com/go-zeromq/zmq4.(*Conn).notifyOnCloseError()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:443 +0xac
github.com/go-zeromq/zmq4.(*Conn).SetClosed()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:420 +0xab
github.com/go-zeromq/zmq4.(*Conn).checkIO()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:435 +0x150
github.com/go-zeromq/zmq4.(*Conn).read()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:333 +0xcf7
github.com/go-zeromq/zmq4.(*qreader).listen()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/msgio.go:103 +0xe7
Goroutine 87 (running) created at:
testing.(*T).Run()
/home/travis/.gimme/versions/go1.11.13.linux.amd64/src/testing/testing.go:878 +0x659
github.com/go-zeromq/zmq4_test.TestRouterDealer()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/zmq4_routerdealer_test.go:96 +0x2b2
testing.tRunner()
/home/travis/.gimme/versions/go1.11.13.linux.amd64/src/testing/testing.go:827 +0x162
Goroutine 50 (running) created at:
github.com/go-zeromq/zmq4.(*qreader).addConn()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/msgio.go:66 +0xa2
github.com/go-zeromq/zmq4.(*socket).addConn()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:255 +0x40d
github.com/go-zeromq/zmq4.(*socket).Dial()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:241 +0x73d
github.com/go-zeromq/zmq4.(*dealerSocket).Dial()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/dealer.go:47 +0x7b
github.com/go-zeromq/zmq4_test.TestRouterDealer.func1.2.1()
/home/travis/gopath/src/github.com/go-zeromq/zmq4/zmq4_routerdealer_test.go:182 +0xc0
golang.org/x/sync/errgroup.(*Group).Go.func1()
/home/travis/gopath/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x64
==================
panic: send on closed channel
goroutine 494 [running]:
github.com/go-zeromq/zmq4.(*socket).scheduleRmConn(0xc000162500, 0xc0000e7220)
/home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:290 +0x106
github.com/go-zeromq/zmq4.(*socket).scheduleRmConn-fm(0xc0000e7220)
/home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:184 +0x4c
github.com/go-zeromq/zmq4.(*Conn).notifyOnCloseError(0xc0000e7220)
/home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:443 +0xad
github.com/go-zeromq/zmq4.(*Conn).SetClosed(0xc0000e7220)
/home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:420 +0xac
github.com/go-zeromq/zmq4.(*Conn).checkIO(0xc0000e7220, 0x6fbac0, 0xc000247a40)
/home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:435 +0x151
github.com/go-zeromq/zmq4.(*Conn).read(0xc0000e7220, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
/home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:333 +0xcf8
github.com/go-zeromq/zmq4.(*qreader).listen(0xc000213b30, 0x6fcac0, 0xc000262c40, 0xc0000e7220)
/home/travis/gopath/src/github.com/go-zeromq/zmq4/msgio.go:103 +0xe8
created by github.com/go-zeromq/zmq4.(*qreader).addConn
/home/travis/gopath/src/github.com/go-zeromq/zmq4/msgio.go:66 +0xa3
FAIL github.com/go-zeromq/zmq4 19.152s
travis_time:end:0610ee78:start=1571132857856874905,finish=1571132886612396666,duration=28755521761,event=script
The reassembly dial address from string "tcp://[aaa:bbb::ccc]:55555" not working properly and tries to dual "aaa:bbb::ccc:55555" instead of "[aaa:bbb::ccc]:55555"
as specified in:
How can I send donations for this awesome project?
If you have nothing set up yet but are looking for an easy and transparent way, I can suggest OpenCollective.
go get: github.com/go-zeromq/[email protected] requires
github.com/zeromq/goczmq/[email protected]: reading github.com/zeromq/goczmq/go.mod at revision v4.2.1: unknown revision v4.2.1
How to dial with timeout ?
thanks !
we should have a way to expose something akin to zmq_proxy:
int zmq_proxy (const void *frontend, const void *backend, const void *capture);
The zmq_proxy() function starts the built-in ØMQ proxy in the current application thread.
The proxy connects a frontend socket to a backend socket. Conceptually, data flows from
frontend to backend. Depending on the socket types, replies may flow in the opposite
direction.
The direction is conceptual only; the proxy is fully symmetric and there is no technical
difference between frontend and backend.
Before calling zmq_proxy() you must set any socket options, and connect or bind both
frontend and backend sockets. The two conventional proxy models are:
zmq_proxy() runs in the current thread and returns only if/when the current context is
closed.
If the capture socket is not NULL, the proxy shall send all messages, received on both
frontend and backend, to the capture socket. The capture socket should be
a ZMQ_PUB, ZMQ_DEALER, ZMQ_PUSH, or ZMQ_PAIR socket.
As soon as the subscriber connects(subscribes) the pub socket server starts logging:
zmq4: could not open a ZMTP connection with "tcp://127.0.0.1:5000": zmq4: could not initialize ZMTP connection: zmq4: could not exchange greetings: zmq4: could not recv greeting: could not read ZMTP greeting: unexpected EOF
The subscriber does not report any error, but also receives no events
I think 2.2.0 should be compatible with zmq4, but i can't get it to work.
http://wiki.zeromq.org/area:faq#toc9
Or am i just incorrectly assuming this should work? :)
testclient for github.com/pebbe/zmq2
package main
import (
"log"
"os"
"time"
zmq "github.com/pebbe/zmq2"
)
func main() {
const addr = "tcp://localhost:5000"
socket, err := zmq.NewSocket(zmq.SUB)
if err != nil {
log.Fatal(err)
}
if err := socket.Connect(addr); err != nil {
log.Fatal(err)
}
if err := socket.SetSubscribe(os.Args[1]); err != nil {
log.Fatal(err)
}
poller := zmq.NewPoller()
poller.Add(socket, zmq.POLLIN)
for {
sockets, err := poller.Poll(2 * time.Second)
if err != nil {
log.Fatal(err)
} else {
for _, socket := range sockets {
data, err := socket.Socket.RecvBytes(0)
if err != nil {
log.Fatal(err)
} else {
log.Println("GOT", string(data))
}
}
}
}
}
testserver for github.com/go-zeromq/zmq4
package main
import (
"context"
"log"
"os"
"time"
"github.com/go-zeromq/zmq4"
)
func main() {
const addr = "tcp://localhost:5000"
pub := zmq4.NewPub(context.Background())
defer pub.Close()
if err := pub.Listen(addr); err != nil {
log.Fatal(err)
}
event := func() zmq4.Msg {
return zmq4.NewMsgFrom([]byte(os.Args[1]), []byte("RANDOM_STRING"))
}
for {
select {
case <-time.After(1 * time.Second):
if err := pub.Send(event()); err != nil {
log.Fatal(err)
} else {
log.Println("SENT event")
}
}
}
}
while a slightly modified example can connect and receive events fine:
package main
import (
"context"
"log"
"os"
"github.com/go-zeromq/zmq4"
)
func main() {
log.SetPrefix("psenvsub: ")
// Prepare our subscriber
sub := zmq4.NewSub(context.Background())
defer sub.Close()
err := sub.Dial("tcp://localhost:5000")
if err != nil {
log.Fatalf("could not dial: %v", err)
}
err = sub.SetOption(zmq4.OptionSubscribe, os.Args[1])
if err != nil {
log.Fatalf("could not subscribe: %v", err)
}
for {
// Read envelope
msg, err := sub.Recv()
if err != nil {
log.Fatalf("could not receive message: %v", err)
}
log.Printf("[%s] %s\n", msg.Frames[0], msg.Frames[1])
}
}
I'm using the REP/REQ sockets in a single-server multi-client scenario. According to the ZMQ docs, this should work properly because the server will send back the reply to the client it received the request from.
In my case however, i'm seeing that subsequent connects to the REP socket by other REQ sockets results in replies message being sent to all REQ sockets, and it acting more like a publish.
Is this supposed to work?
Hi!
A new connReaper
go routine is started on every call to Connect/Listen in socket.go
.
connReaper
loops for the lifetime of the socket, so one instance is enough.
I'd classify that as a bug.
Cheers
Guido
in the zmq the server or client close never cause another one close
but the the below happen
i do not key why
could not receive message : read tcp 127.0.0.1:57509->127.0.0.1:5563: wsarecv: An existing connection was forcibly closed by the remote host.
server
package main
import (
"context"
"log"
"time"
"github.com/go-zeromq/zmq4"
)
func main() {
log.SetPrefix("psenvpub: ")
// prepare the publisher
pub := zmq4.NewPub(context.Background())
defer pub.Close()
err := pub.Listen("tcp://*:5563")
if err != nil {
log.Fatalf("could not listen: %v", err)
}
msgA := zmq4.NewMsgFrom(
[]byte("A"),
[]byte("We don't want to see this"),
)
msgB := zmq4.NewMsgFrom(
[]byte("B"),
[]byte("We would like to see this"),
)
for {
// Write two messages, each with an envelope and content
err = pub.Send(msgA)
if err != nil {
log.Fatal(err)
}
err = pub.Send(msgB)
if err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
}
client
package main
import (
"context"
"log"
"github.com/go-zeromq/zmq4"
)
func main() {
log.SetPrefix("psenvsub: ")
// Prepare our subscriber
sub := zmq4.NewSub(context.Background())
defer sub.Close()
err := sub.Dial("tcp://localhost:5563")
if err != nil {
log.Fatalf("could not dial: %v", err)
}
err = sub.SetOption(zmq4.OptionSubscribe, "B")
if err != nil {
log.Fatalf("could not subscribe: %v", err)
}
for {
// Read envelope
msg, err := sub.Recv()
if err != nil {
log.Println("could not receive message :", err)
}
log.Printf("[%s] %s\n", msg.Frames[0], msg.Frames[1])
}
}
in these 6 combinations,
./gozeromq.go --zmq=pebbe --dialfirst
./gozeromq.go --zmq=pebbe
./gozeromq.go --zmq=czmq --dialfirst
./gozeromq.go --zmq=czmq
./gozeromq.go --zmq=zmq --dialfirst
./gozeromq.go --zmq=zmq
./gozeromq.go --zmq=zmq
blocks forever, which does not correspond to czmq/pebbe
//usr/bin/env GO111MODULE=off go run $0 $@;exit
package main
import (
"context"
"flag"
"log"
"github.com/zeromq/goczmq"
"github.com/go-zeromq/zmq4"
pebbe "github.com/pebbe/zmq4"
)
func main(){
endpoint := "tcp://127.0.0.1:55555"
zmq := flag.String("zmq", "zmq", "zmq type (czmq / zmq / pebbe)")
fdialfirst := flag.Bool("dialfirst", false, "dial first")
flag.Parse()
if *zmq == "czmq" {
s := goczmq.NewSock(goczmq.Sub)
if *fdialfirst {
err := s.Attach(endpoint, false)
if err != nil { log.Fatal(err) }
log.Print("dial done")
s.SetOption(goczmq.SockSetSubscribe(""))
log.Print("set option done")
} else {
s.SetOption(goczmq.SockSetSubscribe(""))
log.Print("set option done")
err := s.Attach(endpoint, false)
if err != nil { log.Fatal(err) }
log.Print("dial done")
}
reply, err := s.RecvMessage()
if err != nil { log.Fatal(err) }
log.Print("recv done")
log.Print(string(reply[0]))
s.Destroy()
} else if *zmq == "zmq" {
s := zmq4.NewSub(context.Background())
defer s.Close()
if *fdialfirst {
err := s.Dial(endpoint)
if err != nil { log.Fatal(err) }
log.Print("dial done")
err = s.SetOption(zmq4.OptionSubscribe, "")
if err != nil { log.Fatal(err) }
log.Print("set option done")
} else {
/// this code path does not work (recv blocks forever) ///
err := s.SetOption(zmq4.OptionSubscribe, "")
if err != nil { log.Fatal(err) }
log.Print("set option done")
err = s.Dial(endpoint)
if err != nil { log.Fatal(err) }
log.Print("dial done")
}
raw, err := s.Recv()
if err != nil { log.Fatal(err) }
log.Print("recv done")
log.Print(raw.String())
} else if *zmq == "pebbe" {
// maybe pebbe can be refered as it is mentioned in https://zeromq.org/languages/go/
s, err := pebbe.NewSocket(pebbe.SUB)
defer s.Close()
if *fdialfirst {
err := s.Connect(endpoint)
if err != nil { log.Fatal(err) }
log.Print("dial done")
err = s.SetSubscribe("")
if err != nil { log.Fatal(err) }
log.Print("set option done")
} else {
err := s.SetSubscribe("")
if err != nil { log.Fatal(err) }
log.Print("set option done")
err = s.Connect(endpoint)
if err != nil { log.Fatal(err) }
log.Print("dial done")
}
reply, err := s.Recv(0)
if err != nil { log.Fatal(err) }
log.Print("recv done")
log.Print(reply)
}
}
/*
publisher:
#!/usr/bin/python
import zmq
import time
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind("tcp://*:55555")
msgid = 0
while True:
pub.send(str(msgid))
msgid = (msgid+1)%100000
time.sleep(0.1)
*/
I don't think ZMQ connects should be timing out like you are doing. See http://api.zeromq.org/4-1:zmq-connect, the whole point is that pub/sub/bind/connect order doesn't matter. pebbe/zmq4 does not time out on connect/Dial.
instead of logging messages directly to log
's default logger, we should provide a WithLogger
option (that would default to log
)
When I create sub, and then Listen()
and set OptionSubscribe
, it never subscribe to accepted connections.
This is example of code which describe the issue:
ctx := context.Background()
sub := zmq4.NewSub(ctx)
err := sub.Listen("tcp://*:1234") // listen
if err != nil {
log.Fatalf("could not listen: %v", err)
}
err = sub.SetOption(zmq4.OptionSubscribe, "") // subscribe for all messages
if err != nil {
log.Fatalf("could not subscribe to topic %v", err)
}
go func() {
for {
msg, err := sub.Recv() // never gets message because OptionSubscribe was not applied after connection was accepted.
if err != nil {
log.Printf("could not recv message: %v", err)
} else {
fmt.Printf("recv %+v\n", msg)
}
}
}()
pub := zmq4.NewPub(ctx)
err = pub.Dial("tcp://*:1234")
if err != nil {
log.Fatalf("could not dial: %v", err)
}
for {
msg := zmq4.NewMsgString("test abc")
err := pub.Send(msg)
if err != nil {
log.Printf("could send message: %v", err)
}
fmt.Printf("msg send\n")
time.Sleep(time.Second)
}
Workaround: when I add one goroutine which in cycle call Subscribe()
it works - but it is not effective and unreliable ...
ctx := context.Background()
sub := zmq4.NewSub(ctx)
err := sub.Listen("tcp://*:1234")
if err != nil {
log.Fatalf("could not listen: %v", err)
}
err = sub.SetOption(zmq4.OptionSubscribe, "")
if err != nil {
log.Fatalf("could not subscribe to topic %v", err)
}
go func() {
for {
msg, err := sub.Recv()
if err != nil {
log.Printf("could not recv message: %v", err)
} else {
fmt.Printf("recv %+v\n", msg)
}
}
}()
// set subscription too all accepted socket
go func() {
for {
err = sub.SetOption(zmq4.OptionSubscribe, "")
if err != nil {
log.Printf("could not subscribe to topic %v", err)
}
fmt.Printf("subscribe\n")
time.Sleep(time.Second)
}
}()
pub := zmq4.NewPub(ctx)
err = pub.Dial("tcp://*:1234")
if err != nil {
log.Fatalf("could not dial: %v", err)
}
for {
msg := zmq4.NewMsgString("test abc")
err := pub.Send(msg)
if err != nil {
log.Printf("could send message: %v", err)
}
fmt.Printf("msg send\n")
time.Sleep(time.Second)
}
I have the following code:
socket := zmq4.NewPub(ctx)
socket.Dial(endpoint)
err := socket.SendMulti(zmq4.NewMsgFrom(partA, partB))
It works fine at first, but when I restart the other side of the connection at endpoint
then there is neither an automatic reconnect nor can I do it manually, because the send method will not return an error.
My current workaround is to use reflection to check in the unexported conns
field, if a connection is there, but I think there should be a better way to do this:
func isConnected(socket zmq4.Socket) bool {
return reflect.ValueOf(socket).Elem().FieldByName("sck").Elem().FieldByName("conns").Len() != 0
}
I could think of the following solutions:
IsConnected() bool
method to the Socket
interface so the user can reconnect on connection lossSendMulti
and Send
methods to let the user know that connection is brokenright now, the Socket
interface reads:
type Socket interface {
// Send puts the message on the outbound send queue.
// Send blocks until the message can be queued or the send deadline expires.
Send(msg Msg) error
// Recv receives a complete message.
Recv() (Msg, error)
...
}
internally, each time somebody calls sck.Recv()
a new zmq4.Msg
is created, anew.
we could perhaps consider going with:
type Socket interface {
Recv(*Msg) error
}
so the Socket
implementation wouldn't have to allocate (and let the user optionally use a pool).
alternatively:
Socket
as is,Socket
implementation)Release
method to zmq4.Msg
(that calls back to sync.Pool.Put
)Close on ipc:// cause unix socket be removed.
I would believe that's not what is should do, otherway client reconnection wont work after 1st connection.
not sure if this is an issue or correct zmq semantics. In request response socket communication if the message is multiframe on sending side the first frame is lost on receiving side. To reproduce the issue just change the rrclient.go in example to send request like that:-
err := req.Send(zmq4.NewMsgFromString([]string {"Hello?","Hello!"}))
The result on receiving side will be
rrworker: 2020/11/02 08:02:42 received request: [Hello!]
The "Hello?" part of the message is lost
I have read that each request in zmq is prepended with the delimiter frame, may be that what confuses me, because I still expect the number of frames on sending side be equal to the number of frames receiving on side.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.