Giter VIP home page Giter VIP logo

zmq4's People

Contributors

bartekpacia avatar bryanpaluch avatar dewei-verkada avatar egorse avatar encse avatar guidog avatar ijt avatar inphi avatar iotmod avatar jwgcarlson avatar sbinet avatar speakyourcode avatar superfashi avatar thielepaul avatar ziyan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

zmq4's Issues

High/Low Water mark support?

I want to be able to PUSH only a limited number of packets to avoid losing them if the PULL side crashes. Is this possible or planned? Looking at the code I see that go's io.ReadWriteCloser is used underneath so it seems no message level limiting is possible.

No messages getting through XPUB/XSUB proxy

I wrote the following small test server, which creates XPUB and XSUB sockets, and a proxy to send all messages received on the XSUB port to the XPUB port:

package main

import (
    "context"
    "log"
    "time"
    zmq "github.com/go-zeromq/zmq4"
)

func main() {
    pub_conn_str := "tcp://*:9002"
    sub_conn_str := "tcp://*:9003"
    pub_sock := zmq.NewXPub(context.Background())
    sub_sock := zmq.NewXSub(context.Background())
    if err := pub_sock.Listen(pub_conn_str); err != nil {
        log.Fatalf("Error binding pub to %s: %v\n", pub_conn_str, err)
    }
    if err := sub_sock.Listen(sub_conn_str); err != nil {
        log.Fatalf("Error binding sub to %s: %v\n", sub_conn_str, err)
    }
    log.Println("Sleeping 5s...")
    time.Sleep(5*time.Second)
    log.Println("Starting proxy")
    p := zmq.NewProxy(context.Background(), sub_sock, pub_sock, nil) // send anything received on sub_sock to pub_sock
    p.Run()
}

And a test client which connects to both ports, sends a series of 10 messages to the pub port (which is the server's sub port), and listens for them on the sub port (the server's pub port):

package main

import (
    "context"
    "log"
    "strconv"
    "sync"
    "time"
    zmq "github.com/go-zeromq/zmq4"
)

func main() {
    var wg sync.WaitGroup
    sub_conn_str := "tcp://localhost:9002"  // server's pub socket is our sub socket
    pub_conn_str := "tcp://localhost:9003"  //  ...and vice versa
    pub_sock := zmq.NewPub(context.Background())
    sub_sock := zmq.NewSub(context.Background())
    if err := pub_sock.Dial(pub_conn_str); err != nil {
        log.Fatalf("Error connecting pub to %s: %v\n", pub_conn_str, err)
    }
    if err := sub_sock.Dial(sub_conn_str); err != nil {
        log.Fatalf("Error connecting sub to %s: %v\n", sub_conn_str, err)
    }
    if err := sub_sock.SetOption(zmq.OptionSubscribe, ""); err != nil { // subscribe to all messages
        log.Fatalf("Subscribe error: %v\n", err)
        return
    }
    log.Printf("Sleeping 5s...")
    time.Sleep(5*time.Second)

    wg.Add(1)
    go func() {  // receiver goroutine - wait for incoming messages
        defer wg.Done()
        log.Println("Receiver starting")
        for {
            if msg, err := sub_sock.Recv(); err != nil {
                log.Printf("Recv error: %v\n", err)
            } else {
                log.Printf("Received msg [%s] %s\n", msg.Frames[0], msg.Frames[1])
            }
        }
    }()

    // Just send numbers 0-9, 1s apart
    for i := 0; i < 10; i++ {
        msg := zmq.NewMsgFrom([]byte("TEST"), []byte(strconv.FormatInt(int64(i), 16)))
        log.Printf("Sending message %d\n", i)
        if err := pub_sock.Send(msg); err != nil {
            log.Printf("Send error: %v\n", err)
        }
        time.Sleep(1*time.Second)
    }
    wg.Wait()
}

When I run the server in one window and the client in another, the server binds the ports properly and the client seems to be connecting and sending the messages, but the receiver in the client never gets them and never prints anything. I'm expecting to see a series of "Sending message [0-9]" and "Received msg [TEST] [0-9]". I see the "Sending message" prints, but not the "Received" ones.

I'm using Go 1.13 on Ubuntu 18.04.

One more note: I have almost exactly this same code using https://github.com/pebbe/zmq4 (with adjustments for API differences like Bind() vs. Listen(), etc.) and it works fine.

connection not properly closed

In rep/req mode, the connection used by zmq4 not properly closed,

they are all stay at CLOSE_WAIT,

the closed connection should be TIME_WAIT

netstat -nat | grep 1234

tcp 0 0 127.0.0.1:49232 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49230 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49224 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49236 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49228 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49226 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49240 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49238 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49222 127.0.0.1:1234 FIN_WAIT2
tcp 0 0 127.0.0.1:49234 127.0.0.1:1234 FIN_WAIT2
tcp6 0 0 :::1234 :::* LISTEN
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49222 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49236 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49238 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49226 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49224 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49232 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49230 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49234 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49228 CLOSE_WAIT
tcp6 0 0 127.0.0.1:1234 127.0.0.1:49240 CLOSE_WAIT

the following code can reproduce the issue

test code for server, golang

package main

import (
    "context"
    "github.com/go-zeromq/zmq4"
    "log"
)

func main() {
    sock := zmq4.NewRep(context.Background())
    if err := sock.Listen("tcp://*:1234"); err != nil {
        log.Fatal(err)
    }

    for {
        msg, err := sock.Recv()
        if err != nil {
            log.Println("recv", err)
            continue
        }

        log.Printf("recv: %s", msg.Frames[0])
        if err := sock.Send(zmq4.NewMsg(msg.Frames[0])); err != nil {
            log.Println("send", err)
        }
    }
}

test code for client, python

import zmq
import threading
import time


def f():
    ctx = zmq.Context()
    c = ctx.socket(zmq.REQ)
    c.connect("tcp://127.0.0.1:1234")

    c.send("1234")
    print(c.recv())
    time.sleep(1)
    c.close()


threads = []
for i in range(10):
    t = threading.Thread(target=f)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Greeting crash

If I use NetMQ (https://github.com/zeromq/netmq) to connect to a socket hosted by zmq4, the zmq4 host crashes with "zmq4: invalid greeting received" (

return errGreeting
).

It seems that NetMQ does follow the negotiation protocol correctly (https://rfc.zeromq.org/spec:23/ZMTP/#version-negotiation), but zmq4 does not...

  • zmq4 should handle the greeting as specified in the ZMTP protocol
  • zmq4 should not give panic errors

Unfortunately I lack Go skills to fix this bug.

Multipart messages and TCP packets

Hello - I'm new to zmq, but when inspecting packets using Wireshark over a loopback connection, I see that the multipart message is being sent out in separate TCP frames. On the same machine, libzmq sends a multipart message within a single frame with the correct delimiters & length octets between each part of the multipart message. But in zmq4, it appears that you're sending the flags and length within a single packet and then the message part followed by the next flag and length in a TCP packet followed by the next part and so on.

Is this expected? I haven't dug far enough into the implementation yet, but was wondering if 1) this is known and 2) this is expected?

Thx!

consider using net.Buffers for zmq4.Msg.Frames

$> go doc net.Buffers
type Buffers [][]byte
    Buffers contains zero or more runs of bytes to write.

    On certain machines, for certain types of connections, this is optimized
    into an OS-specific batch write operation (such as "writev").


func (v *Buffers) Read(p []byte) (n int, err error)
func (v *Buffers) WriteTo(w io.Writer) (n int64, err error)

implement native Go ZeroMQ transport

  • implement push/pull
  • implement req/rep
  • implement dealer/router
  • implement pub/sub
  • implement xpub
  • implement xsub
  • implement pair
  • implement bus
  • implement multi-part messages

Send blocks on PUB

  1. The following code blocks on Send if there're no subscribers. I think it should never block on PUB socket.
pub := zmq4.NewPub(context.Background())
err := pub.Listen("tcp://*:3000")
err = pub.Send(zmq4.NewMsg([]byte("MSG")))
  1. Also, after disconnecting of one of many SUBs, on PUB side Send starts returning an error while data is sent correctly.
    How to publish correctly data to unknown number of subscribers with go-zeromq?
    I can't find any pure Go alternative.

Can't get a proxy to work (XSUB/XPUB)

I'm trying to recreate this system (from the ZMQ Guide), but I can't get it to work:

Screenshot 2022-11-06 at 6 30 06 PM

I've got 3 programs: a publisher in pub/main.go, a proxy in proxy/main.go, and a subscriber in sub/main.go. Below are full codes of these programs:

pub

package main

import (
	"flag"
	"fmt"
	"log"
	"time"

	"github.com/docker/distribution/context"
	zmq "github.com/go-zeromq/zmq4"
)

var defaultMsg = `{` +
	`"action": 2,` +
	`"container_id": "0123456789AB"` +
	`}`

var (
	port *int
	msg  *string
)

func init() {
	port = flag.Int("port", 8080, "port to dial to")
	msg = flag.String("msg", defaultMsg, "default message to send")
	flag.Parse()
}

func main() {
	addr := fmt.Sprintf("tcp://localhost:%d", *port)
	socket := zmq.NewPub(context.Background())
	err := socket.Dial(addr)
	if err != nil {
		log.Fatalf("failed to dial: %v\n", err)
	}
	defer socket.Close()

	log.Printf("listening on %s\n", addr)
	for {
		msg := zmq.NewMsgFromString([]string{*msg})
		err := socket.Send(msg)
		if err != nil {
			panic(err) //  Interrupted
		}

		log.Println("message sent")
		time.Sleep(2 * time.Second) //  Wait for 2 seconds
	}
}

proxy

package main

import (
	"context"
	"flag"
	"fmt"
	"log"

	zmq "github.com/go-zeromq/zmq4"
)

var (
	pubPort *int
	subPort *int
)

func init() {
}

func main() {
	subPort = flag.Int("port-sub", 8080, "port to listen for publisher input on")
	pubPort = flag.Int("port-pub", 8081, "port to publish output on")
	flag.Parse()

	ctx := context.Background()

	subSocket := zmq.NewXSub(ctx)
	subAddr := fmt.Sprintf("tcp://localhost:%v", *subPort)
	err := subSocket.Listen(subAddr)
	if err != nil {
		log.Fatalf("failed to dial: %v\n", err)
	}
	err = subSocket.SetOption(zmq.OptionSubscribe, "")
	if err != nil {
		log.Fatalf("failed to set option: %v\n", err)
	}

	pubSocket := zmq.NewXPub(ctx)
	pubAddr := fmt.Sprintf("tcp://localhost:%v", *pubPort)
	err = pubSocket.Listen(pubAddr)
	if err != nil {
		log.Fatalf("failed to listen: %v\n", err)
	}

	listener := zmq.NewPair(ctx)
	listener.Listen("inproc://pipe")
	err = listener.SetOption(zmq.OptionSubscribe, "")
	if err != nil {
		log.Fatalf("failed to set option: %v\n", err)
	}

	go func() {
		for {
			log.Println("listener is waiting for a message")
			msg, err := listener.Recv()
			if err != nil {
				log.Fatalf("failed to recv from socket: %v\n", err)
			}
			b := msg.Bytes()
			fmt.Printf("message: %s, len: %v\n", b, len(b))
		}
	}()

	proxy := zmq.NewProxy(ctx, subSocket, pubSocket, listener)

	log.Printf("listening for publishers on %s\n", subAddr)
	log.Printf("listening for subscribers on %s\n", pubAddr)

	fmt.Println("running proxy...")
	err = proxy.Run()
	if err != nil {
		log.Fatalln(err)
	}
}

sub

package main

import (
	"context"
	"flag"
	"fmt"
	"log"

	zmq "github.com/go-zeromq/zmq4"
)

func main() {
	port := flag.Int("port", 8081, "port to listen on")
	flag.Parse()

	socket := zmq.NewSub(context.Background())
	defer socket.Close()

	addr := fmt.Sprintf("tcp://localhost:%d", *port)
	err := socket.Dial(addr)
	if err != nil {
		log.Fatalf("failed to dial: %v\n", err)
	}

	err = socket.SetOption(zmq.OptionSubscribe, "")
	if err != nil {
		log.Fatalf("failed to set option: %v\n", err)
	}

	fmt.Printf("dialing to %s\n", addr)

	for {
		msg, err := socket.Recv()
		if err != nil {
			log.Fatalf("failed to recv from socket: %v\n", err)
		}

		b := msg.Bytes()
		fmt.Printf("message: %s, len: %v\n", b, len(b))
	}
}

I'm pretty sure I'm using correct socket types and also enable subscribing all topics using socket.SetOption(zmq.OptionSubscribe, ""). I don't have any more ideas on how to get this simple example to work.

Might be related to #108

REP socket races on client connection

Noticed that there may be problem to send 1st response over REP socket with v0.14.0

The issue seem that receiver listener goroutine starts prior connection added to repReader/repWriter, so it may happens that send over REP called prior writer get the connection registered.

The next test is failing:

func TestRepSocket(t *testing.T) {
	defer goleak.VerifyNone(t)
	assert := asserts.New(t)

	ctx, cancel := context.WithCancel(context.Background())
	rep := zmq4.NewRep(ctx)
	ep := "ipc://@test.rep.socket"
	err := rep.Listen(ep)
	assert.NoError(err)

	maxClients := 100
	maxMsgs := 1000
	wgClients := &sync.WaitGroup{}
	wgServer := &sync.WaitGroup{}
	client := func() {
		defer wgClients.Done()
		ping := zmq4.NewMsgString("ping")
		for n := 0; n < maxMsgs; n++ {
			func() {
				ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
				defer cancel()
				req := zmq4.NewReq(ctx)
				err := req.Dial(ep)
				assert.NoError(err)
				err = req.Send(ping)
				assert.NoError(err)
				msg, err := req.Recv() // WARN the ctx is not supported yet!!!
				assert.NoError(err)
				assert.Equal(string(msg.Frames[0]), "pong")
				err = req.Close()
				assert.NoError(err)
			}()
		}
	}
	server := func() {
		defer wgServer.Done()
		pong := zmq4.NewMsgString("pong")
		for {
			msg, err := rep.Recv()
			if errors.Is(err, context.Canceled) {
				break
			}
			assert.NoError(err)
			if err != nil {
				break
			}
			assert.Equal(string(msg.Frames[0]), "ping")
			err = rep.Send(pong)
			assert.NoError(err)
		}
	}

	wgServer.Add(1)
	go server()
	wgClients.Add(maxClients)
	for n := 0; n < maxClients; n++ {
		go client()
	}
	wgClients.Wait()
	cancel()
	wgServer.Wait()
	rep.Close()
}

Blocking receive

I've setup a subscriber socket with a given topic. However, it seems that sub.Recv() is non blocking and constantly returns empty messages even when the publisher isn't sending any messages. Is this behavior wanted and if so, how can I make sub.Recv() block until actually a message is available?

Here is a code snippet showing my intend:
len(msg.Frames) == 0 is constantly hit

// 2 second connect timeout
ctx, _ := context.WithTimeout(context.Background(), time.Duration(2)*time.Second)
sub := zmq4.NewSub(ctx)

if err := sub.Dial(fmt.Sprintf("tcp://%s", zmqURL)); err != nil {
	collectLog.Errorf("coul not dial: %s", err)
	return err
}

if err := sub.SetOption(zmq4.OptionSubscribe, "spent_address"); err != nil {
	collectLog.Errorf("coul not dial: %s", err)
	return err
}

collectLog.Info("connected")

for {
	msg, err := sub.Recv()
	if err != nil {
		return err
	}

	if msg.Err() != nil {
		collectLog.Error(msg.Err())
		continue
	}

	if len(msg.Frames) == 0 {
		continue
	}

   // handle message
}

Pull socket can not be properly closed, if no clients ever connected

Sorry for bit wacky title but step into problem that next test is fails.

func TestPullSocket1(t *testing.T) {
	assert := asserts.New(t)

	// Create pull socket
	ctx, cancel := context.WithCancel(context.Background())
	pull := zmq4.NewPull(ctx)
	assert.NotNil(pull)
	err := pull.Listen("ipc://@TestPullSocket1")
	assert.NoError(err)

	// Cancel and close socket
	// This as well might be in time.AfterFunc with the same result!!!!
	cancel()
	err = pull.Close()
	assert.NoError(err)

	// Read out
	msg, err := pull.Recv()
	assert.Error(err)
	_ = msg

	// Final close
	err = pull.Close()
	_ = err
}

To me it looks like Close() should make qreader.sem.enable() as in this case qreader.addConn() is never called and qreader.read() stuck forever in q.sem.lock()

Why is go-zeromq/zmq4 not needing libzmq on windows?

I tried using pebbe/zmq on windows (had great success of linux) but impossible to make it run as libzmq is need straight forward installable on windows.
How come go-zeromq works no problem without these issues?

in other words, how would i modify pebbe to work as go-zeromq does since the latter is not going to be maintained?

zmq4: socket close and {w,r}pool close

as noted in #76 (comment) there's a bit of a confusing ownership between who's responsible for closing net.Conn between the underlying zmq4.socket and the various {r,w}pool implementations.

this leads to either non-executed code or "double-close" of connections.

we should probably make sure the {r,w}pool implementations have the complete ownership/responsibility for closing connections.

Dead lock, how to fix?

goroutine stack behind, how to fix this problem?

1 @ 0x43a576 0x44aff3 0x44afcd 0x4661e5 0x481732 0x74fa67 0x7763d7 0x774a46 0x78c84d 0x78cd87 0x78cf78 0x46a141
#	0x4661e4	sync.runtime_Semacquire+0x24				/opt/go/src/runtime/sema.go:56
#	0x481731	sync.(*WaitGroup).Wait+0x51				/opt/go/src/sync/waitgroup.go:136
#	0x74fa66	golang.org/x/sync/errgroup.(*Group).Wait+0x26		/home/alexw/go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:53
#	0x7763d6	github.com/go-zeromq/zmq4.(*routerMWriter).write+0x2d6	/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/router.go:242
#	0x774a45	github.com/go-zeromq/zmq4.(*routerSocket).Send+0xe5	/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/router.go:40
#	0x78c84c	sdwan/ctrl/device.(*ZeroMQ).sendMsg+0x28c		/home/alexw/workspace/hw-agent/ctrl/device/zmq.go:60
#	0x78cd86	sdwan/ctrl/device.(*ZeroMQ).runSend+0x126		/home/alexw/workspace/hw-agent/ctrl/device/zmq.go:102
#	0x78cf77	sdwan/ctrl/device.RunZmq+0xf7				/home/alexw/workspace/hw-agent/ctrl/device/zmq.go:124

1 @ 0x43a576 0x44aff3 0x44afcd 0x466305 0x47fca5 0x47fadb 0x778a99 0x771a02 0x7719ee 0x771b39 0x77117c 0x76fb2d 0x7764ba 0x74fc04 0x46a141
#	0x466304	sync.runtime_SemacquireMutex+0x24				/opt/go/src/runtime/sema.go:71
#	0x47fca4	sync.(*Mutex).lockSlow+0x164					/opt/go/src/sync/mutex.go:162
#	0x47fada	sync.(*Mutex).Lock+0x3a						/opt/go/src/sync/mutex.go:81
#	0x778a98	github.com/go-zeromq/zmq4.(*socket).scheduleRmConn+0x38		/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/socket.go:334
#	0x771a01	github.com/go-zeromq/zmq4.(*Conn).notifyOnCloseError+0x41	/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:500
#	0x7719ed	github.com/go-zeromq/zmq4.(*Conn).SetClosed+0x2d		/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:472
#	0x771b38	github.com/go-zeromq/zmq4.(*Conn).checkIO+0xf8			/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:492
#	0x77117b	github.com/go-zeromq/zmq4.(*Conn).send+0x17b			/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:357
#	0x76fb2c	github.com/go-zeromq/zmq4.(*Conn).SendMsg+0x16c			/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:190
#	0x7764b9	github.com/go-zeromq/zmq4.(*routerMWriter).write.func1+0x39	/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/router.go:239
#	0x74fc03	golang.org/x/sync/errgroup.(*Group).Go.func1+0x63		/home/alexw/go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:75

1 @ 0x43a576 0x44aff3 0x44afcd 0x466305 0x47fca5 0x47fadb 0x778a99 0x771a02 0x7719ee 0x771b39 0x77188f 0x7758c8 0x46a141
#	0x466304	sync.runtime_SemacquireMutex+0x24				/opt/go/src/runtime/sema.go:71
#	0x47fca4	sync.(*Mutex).lockSlow+0x164					/opt/go/src/sync/mutex.go:162
#	0x47fada	sync.(*Mutex).Lock+0x3a						/opt/go/src/sync/mutex.go:81
#	0x778a98	github.com/go-zeromq/zmq4.(*socket).scheduleRmConn+0x38		/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/socket.go:334
#	0x771a01	github.com/go-zeromq/zmq4.(*Conn).notifyOnCloseError+0x41	/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:500
#	0x7719ed	github.com/go-zeromq/zmq4.(*Conn).SetClosed+0x2d		/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:472
#	0x771b38	github.com/go-zeromq/zmq4.(*Conn).checkIO+0xf8			/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:492
#	0x77188e	github.com/go-zeromq/zmq4.(*Conn).read+0x68e			/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/conn.go:385
#	0x7758c7	github.com/go-zeromq/zmq4.(*routerQReader).listen+0x1a7		/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/router.go:161

1 @ 0x43a576 0x44aff3 0x44afcd 0x466305 0x47fca5 0x481296 0x481275 0x7782c5 0x777c5c 0x46a141
#	0x466304	sync.runtime_SemacquireMutex+0x24			/opt/go/src/runtime/sema.go:71
#	0x47fca4	sync.(*Mutex).lockSlow+0x164				/opt/go/src/sync/mutex.go:162
#	0x481295	sync.(*Mutex).Lock+0x35					/opt/go/src/sync/mutex.go:81
#	0x481274	sync.(*RWMutex).Lock+0x14				/opt/go/src/sync/rwmutex.go:139
#	0x7782c4	github.com/go-zeromq/zmq4.(*socket).addConn+0x64	/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/socket.go:285
#	0x777c5b	github.com/go-zeromq/zmq4.(*socket).accept+0x1fb	/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/socket.go:227

1 @ 0x43a576 0x44aff3 0x44afcd 0x466305 0x47fca5 0x775ea5 0x775e75 0x77897c 0x778f1d 0x46a141
#	0x466304	sync.runtime_SemacquireMutex+0x24			/opt/go/src/runtime/sema.go:71
#	0x47fca4	sync.(*Mutex).lockSlow+0x164				/opt/go/src/sync/mutex.go:162
#	0x775ea4	sync.(*Mutex).Lock+0x64					/opt/go/src/sync/mutex.go:81
#	0x775e74	github.com/go-zeromq/zmq4.(*routerMWriter).rmConn+0x34	/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/router.go:211
#	0x77897b	github.com/go-zeromq/zmq4.(*socket).rmConn+0x23b	/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/socket.go:329
#	0x778f1c	github.com/go-zeromq/zmq4.(*socket).connReaper+0x17c	/home/alexw/go/pkg/mod/github.com/go-zeromq/[email protected]/socket.go:393

zmq4: router.Close() returns invalid socket error

Closing a Router socket without connections returns zmq4: invalid socket error:

func TestRouterInvalidSocket(t *testing.T) {
	router := zmq4.NewRouter(context.Background())
	router.Listen("tcp://*:*")
	err := router.Close()
	if err != nil {
		t.Errorf("want err %v, got %q", nil, err)
	}
}

it is coming from socket.go:

// Close closes the open Socket
func (sck *socket) Close() error {
        ...
	sck.mu.RLock()
	defer sck.mu.RUnlock()
	if sck.conns == nil {
		return errInvalidSocket
	}
        ...
}

router node restart recv block

After the router node restarts,dealer node send messages normally, but router node cannot receive the message sent by the dealer

use of closed network connection

After disconnecting one of several SUBs, PUB starts returning the following error on Send:
zmq4: error sending frame 1/1: write tcp 127.0.0.1:1234->127.0.0.1:58966: use of closed network connection
Effectively, the server needs to be restarted after one client has been disconnected.

A minimal Go example for this would be the following, but it also happens with clients in other languages:

package main

import (
	"context"
	zmq "github.com/go-zeromq/zmq4"
	"log"
	"time"
)

// Send continuously
func send() {
	sock := zmq.NewPub(context.Background())
	defer sock.Close()

	if err := sock.Listen("tcp://*:1234"); err != nil {
		log.Fatal(err)
	}

	for {
		if err := sock.Send(zmq.NewMsgString("test")); err != nil {
			log.Println(err)
		}

		time.Sleep(time.Second)
	}
}

// Receive once
func receive() {
	sub := zmq.NewSub(context.Background())
	defer sub.Close()

	err := sub.Dial("tcp://localhost:1234")
	if err != nil {
		log.Fatal(err)
	}

	err = sub.SetOption(zmq.OptionSubscribe, "test")
	if err != nil {
		log.Fatal(err)
	}

	msg, err := sub.Recv()
	if err != nil {
		log.Fatal(err)
	}
	log.Println(msg)
}

func main() {
	go send()
	receive() // receive one and close
	time.Sleep(5 * time.Second)
}

support to modify dial retry times.

Could you consider to add a new field in socket like retryTimes, to support modify retry times in socket.go , rather than use 10 as default, like:

func WithDialerRetryTime(retryTimes int) Option {
	return func(s *socket) {
		s.retryTimes = retryTimes
	}
}

subSocket.Topics() is not accessible

As the NewSub returns interface, and subSocket is not exported, the Topics() function is not accessible.

NewSub:

zmq4/sub.go

Line 16 in 2ae9328

func NewSub(ctx context.Context, opts ...Option) Socket {

Topics:

zmq4/sub.go

Lines 120 to 129 in 2ae9328

func (sub *subSocket) Topics() []string {
sub.mu.RLock()
var topics = make([]string, 0, len(sub.topics))
for topic := range sub.topics {
topics = append(topics, topic)
}
sub.mu.RUnlock()
sort.Strings(topics)
return topics
}

zmq4: data race in socket.Close

it would seem #47 introduced a data race on socket.closedConns.

==================
WARNING: DATA RACE
Write at 0x00c000278130 by goroutine 87:
  runtime.closechan()
      /home/travis/.gimme/versions/go1.11.13.linux.amd64/src/runtime/chan.go:327 +0x0
  github.com/go-zeromq/zmq4.(*socket).Close.func1()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:92 +0x6e
  github.com/go-zeromq/zmq4.(*socket).Close()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:116 +0x317
  github.com/go-zeromq/zmq4.(*dealerSocket).Close()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/dealer.go:26 +0x67
  runtime.call32()
      /home/travis/.gimme/versions/go1.11.13.linux.amd64/src/runtime/asm_amd64.s:522 +0x3a
  testing.(*common).Fatalf()
      /home/travis/.gimme/versions/go1.11.13.linux.amd64/src/testing/testing.go:634 +0x94
  github.com/go-zeromq/zmq4_test.TestRouterDealer.func1()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/zmq4_routerdealer_test.go:233 +0xf09
  testing.tRunner()
      /home/travis/.gimme/versions/go1.11.13.linux.amd64/src/testing/testing.go:827 +0x162

Previous read at 0x00c000278130 by goroutine 50:
  runtime.chansend()
      /home/travis/.gimme/versions/go1.11.13.linux.amd64/src/runtime/chan.go:140 +0x0
  github.com/go-zeromq/zmq4.(*socket).scheduleRmConn()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:290 +0x105
  github.com/go-zeromq/zmq4.(*socket).scheduleRmConn-fm()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:184 +0x4b
  github.com/go-zeromq/zmq4.(*Conn).notifyOnCloseError()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:443 +0xac
  github.com/go-zeromq/zmq4.(*Conn).SetClosed()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:420 +0xab
  github.com/go-zeromq/zmq4.(*Conn).checkIO()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:435 +0x150
  github.com/go-zeromq/zmq4.(*Conn).read()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:333 +0xcf7
  github.com/go-zeromq/zmq4.(*qreader).listen()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/msgio.go:103 +0xe7

Goroutine 87 (running) created at:
  testing.(*T).Run()
      /home/travis/.gimme/versions/go1.11.13.linux.amd64/src/testing/testing.go:878 +0x659
  github.com/go-zeromq/zmq4_test.TestRouterDealer()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/zmq4_routerdealer_test.go:96 +0x2b2
  testing.tRunner()
      /home/travis/.gimme/versions/go1.11.13.linux.amd64/src/testing/testing.go:827 +0x162

Goroutine 50 (running) created at:
  github.com/go-zeromq/zmq4.(*qreader).addConn()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/msgio.go:66 +0xa2
  github.com/go-zeromq/zmq4.(*socket).addConn()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:255 +0x40d
  github.com/go-zeromq/zmq4.(*socket).Dial()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:241 +0x73d
  github.com/go-zeromq/zmq4.(*dealerSocket).Dial()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/dealer.go:47 +0x7b
  github.com/go-zeromq/zmq4_test.TestRouterDealer.func1.2.1()
      /home/travis/gopath/src/github.com/go-zeromq/zmq4/zmq4_routerdealer_test.go:182 +0xc0
  golang.org/x/sync/errgroup.(*Group).Go.func1()
      /home/travis/gopath/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x64
==================
panic: send on closed channel

goroutine 494 [running]:
github.com/go-zeromq/zmq4.(*socket).scheduleRmConn(0xc000162500, 0xc0000e7220)
	/home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:290 +0x106
github.com/go-zeromq/zmq4.(*socket).scheduleRmConn-fm(0xc0000e7220)
	/home/travis/gopath/src/github.com/go-zeromq/zmq4/socket.go:184 +0x4c
github.com/go-zeromq/zmq4.(*Conn).notifyOnCloseError(0xc0000e7220)
	/home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:443 +0xad
github.com/go-zeromq/zmq4.(*Conn).SetClosed(0xc0000e7220)
	/home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:420 +0xac
github.com/go-zeromq/zmq4.(*Conn).checkIO(0xc0000e7220, 0x6fbac0, 0xc000247a40)
	/home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:435 +0x151
github.com/go-zeromq/zmq4.(*Conn).read(0xc0000e7220, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
	/home/travis/gopath/src/github.com/go-zeromq/zmq4/conn.go:333 +0xcf8
github.com/go-zeromq/zmq4.(*qreader).listen(0xc000213b30, 0x6fcac0, 0xc000262c40, 0xc0000e7220)
	/home/travis/gopath/src/github.com/go-zeromq/zmq4/msgio.go:103 +0xe8
created by github.com/go-zeromq/zmq4.(*qreader).addConn
	/home/travis/gopath/src/github.com/go-zeromq/zmq4/msgio.go:66 +0xa3
FAIL	github.com/go-zeromq/zmq4	19.152s
travis_time:end:0610ee78:start=1571132857856874905,finish=1571132886612396666,duration=28755521761,event=script

Unable to dial ipv6 addresses

The reassembly dial address from string "tcp://[aaa:bbb::ccc]:55555" not working properly and tries to dual "aaa:bbb::ccc:55555" instead of "[aaa:bbb::ccc]:55555"

zmq4: provide zmq4.Proxy

we should have a way to expose something akin to zmq_proxy:

int zmq_proxy (const void *frontend, const void *backend, const void *capture);

The zmq_proxy() function starts the built-in ØMQ proxy in the current application thread.

The proxy connects a frontend socket to a backend socket. Conceptually, data flows from
frontend to backend. Depending on the socket types, replies may flow in the opposite
direction.
The direction is conceptual only; the proxy is fully symmetric and there is no technical
difference between frontend and backend.

Before calling zmq_proxy() you must set any socket options, and connect or bind both
frontend and backend sockets. The two conventional proxy models are:

zmq_proxy() runs in the current thread and returns only if/when the current context is
closed.

If the capture socket is not NULL, the proxy shall send all messages, received on both
frontend and backend, to the capture socket. The capture socket should be
 a ZMQ_PUB, ZMQ_DEALER, ZMQ_PUSH, or ZMQ_PAIR socket.

github.com/pebbe/zmq2 (2.2.0) sub socket cannot connect to github.com/go-zeromq/zmq4 pub socket

As soon as the subscriber connects(subscribes) the pub socket server starts logging:

zmq4: could not open a ZMTP connection with "tcp://127.0.0.1:5000": zmq4: could not initialize ZMTP connection: zmq4: could not exchange greetings: zmq4: could not recv greeting: could not read ZMTP greeting: unexpected EOF

The subscriber does not report any error, but also receives no events

I think 2.2.0 should be compatible with zmq4, but i can't get it to work.
http://wiki.zeromq.org/area:faq#toc9

Or am i just incorrectly assuming this should work? :)

testclient for github.com/pebbe/zmq2

package main

import (
	"log"
	"os"
	"time"

	zmq "github.com/pebbe/zmq2"
)

func main() {

	const addr = "tcp://localhost:5000"

	socket, err := zmq.NewSocket(zmq.SUB)
	if err != nil {
		log.Fatal(err)
	}
	if err := socket.Connect(addr); err != nil {
		log.Fatal(err)
	}

	if err := socket.SetSubscribe(os.Args[1]); err != nil {
		log.Fatal(err)
	}

	poller := zmq.NewPoller()
	poller.Add(socket, zmq.POLLIN)

	for {
		sockets, err := poller.Poll(2 * time.Second)
		if err != nil {
			log.Fatal(err)
		} else {
			for _, socket := range sockets {
				data, err := socket.Socket.RecvBytes(0)
				if err != nil {
					log.Fatal(err)
				} else {
					log.Println("GOT", string(data))
				}
			}
		}
	}
}

testserver for github.com/go-zeromq/zmq4

package main

import (
	"context"
	"log"
	"os"
	"time"

	"github.com/go-zeromq/zmq4"
)

func main() {

	const addr = "tcp://localhost:5000"

	pub := zmq4.NewPub(context.Background())
	defer pub.Close()

	if err := pub.Listen(addr); err != nil {
		log.Fatal(err)
	}

	event := func() zmq4.Msg {
		return zmq4.NewMsgFrom([]byte(os.Args[1]), []byte("RANDOM_STRING"))
	}
	for {
		select {
		case <-time.After(1 * time.Second):
			if err := pub.Send(event()); err != nil {
				log.Fatal(err)
			} else {
				log.Println("SENT event")
			}
		}
	}
}

while a slightly modified example can connect and receive events fine:

package main

import (
	"context"
	"log"
	"os"

	"github.com/go-zeromq/zmq4"
)

func main() {
	log.SetPrefix("psenvsub: ")

	//  Prepare our subscriber
	sub := zmq4.NewSub(context.Background())
	defer sub.Close()

	err := sub.Dial("tcp://localhost:5000")
	if err != nil {
		log.Fatalf("could not dial: %v", err)
	}

	err = sub.SetOption(zmq4.OptionSubscribe, os.Args[1])
	if err != nil {
		log.Fatalf("could not subscribe: %v", err)
	}

	for {
		// Read envelope
		msg, err := sub.Recv()
		if err != nil {
			log.Fatalf("could not receive message: %v", err)
		}
		log.Printf("[%s] %s\n", msg.Frames[0], msg.Frames[1])
	}
}

zmq4: implement proper REQ/REP semantics

I'm using the REP/REQ sockets in a single-server multi-client scenario. According to the ZMQ docs, this should work properly because the server will send back the reply to the client it received the request from.

In my case however, i'm seeing that subsequent connects to the REP socket by other REQ sockets results in replies message being sent to all REQ sockets, and it acting more like a publish.

Is this supposed to work?

pub sub err , when close the server the client get error and close

in the zmq the server or client close never cause another one close
but the the below happen
i do not key why
could not receive message : read tcp 127.0.0.1:57509->127.0.0.1:5563: wsarecv: An existing connection was forcibly closed by the remote host.

server

package main

import (
"context"
"log"
"time"

"github.com/go-zeromq/zmq4"

)

func main() {
log.SetPrefix("psenvpub: ")

// prepare the publisher
pub := zmq4.NewPub(context.Background())
defer pub.Close()

err := pub.Listen("tcp://*:5563")
if err != nil {
	log.Fatalf("could not listen: %v", err)
}

msgA := zmq4.NewMsgFrom(
	[]byte("A"),
	[]byte("We don't want to see this"),
)
msgB := zmq4.NewMsgFrom(
	[]byte("B"),
	[]byte("We would like to see this"),
)
for {
	//  Write two messages, each with an envelope and content
	err = pub.Send(msgA)
	if err != nil {
		log.Fatal(err)
	}
	err = pub.Send(msgB)
	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(time.Second)
}

}

client

package main

import (
"context"
"log"

"github.com/go-zeromq/zmq4"

)

func main() {
log.SetPrefix("psenvsub: ")

//  Prepare our subscriber
sub := zmq4.NewSub(context.Background())
defer sub.Close()

err := sub.Dial("tcp://localhost:5563")
if err != nil {
	log.Fatalf("could not dial: %v", err)
}

err = sub.SetOption(zmq4.OptionSubscribe, "B")
if err != nil {
	log.Fatalf("could not subscribe: %v", err)
}

for {
	// Read envelope
	msg, err := sub.Recv()
	if err != nil {
		log.Println("could not receive message :", err)
	}
	log.Printf("[%s] %s\n", msg.Frames[0], msg.Frames[1])
}

}

SUB socket SetOption must come after Dial, goczmq/pebbe don't have such limitation

in these 6 combinations,

./gozeromq.go --zmq=pebbe --dialfirst
./gozeromq.go --zmq=pebbe
./gozeromq.go --zmq=czmq --dialfirst
./gozeromq.go --zmq=czmq
./gozeromq.go --zmq=zmq --dialfirst
./gozeromq.go --zmq=zmq

./gozeromq.go --zmq=zmq blocks forever, which does not correspond to czmq/pebbe

//usr/bin/env GO111MODULE=off go run $0 $@;exit

package main
import (
    "context"
    "flag"
    "log"

    "github.com/zeromq/goczmq"
    "github.com/go-zeromq/zmq4"
    pebbe "github.com/pebbe/zmq4"
)

func main(){
    endpoint := "tcp://127.0.0.1:55555"
    zmq := flag.String("zmq", "zmq", "zmq type (czmq / zmq / pebbe)")
    fdialfirst := flag.Bool("dialfirst", false, "dial first")
    flag.Parse()
    if *zmq == "czmq" {
        s := goczmq.NewSock(goczmq.Sub)
        if *fdialfirst {
            err := s.Attach(endpoint, false)
            if err != nil { log.Fatal(err) }
            log.Print("dial done")
            s.SetOption(goczmq.SockSetSubscribe(""))
            log.Print("set option done")
        } else {
            s.SetOption(goczmq.SockSetSubscribe(""))
            log.Print("set option done")
            err := s.Attach(endpoint, false)
            if err != nil { log.Fatal(err) }
            log.Print("dial done")
        }
        reply, err := s.RecvMessage()
        if err != nil { log.Fatal(err) }
        log.Print("recv done")
        log.Print(string(reply[0]))
        s.Destroy()
    } else if *zmq == "zmq" {
        s := zmq4.NewSub(context.Background())
        defer s.Close()
        if *fdialfirst {
            err := s.Dial(endpoint)
            if err != nil { log.Fatal(err) }
            log.Print("dial done")
            err = s.SetOption(zmq4.OptionSubscribe, "")
            if err != nil { log.Fatal(err) }
            log.Print("set option done")
        } else {
            /// this code path does not work (recv blocks forever) ///
            err := s.SetOption(zmq4.OptionSubscribe, "")
            if err != nil { log.Fatal(err) }
            log.Print("set option done")
            err = s.Dial(endpoint)
            if err != nil { log.Fatal(err) }
            log.Print("dial done")
        }
        raw, err := s.Recv()
        if err != nil { log.Fatal(err) }
        log.Print("recv done")
        log.Print(raw.String())
    } else if *zmq == "pebbe" {
        // maybe pebbe can be refered as it is mentioned in https://zeromq.org/languages/go/
        s, err := pebbe.NewSocket(pebbe.SUB)
        defer s.Close()
        if *fdialfirst {
            err := s.Connect(endpoint)
            if err != nil { log.Fatal(err) }
            log.Print("dial done")
            err = s.SetSubscribe("")
            if err != nil { log.Fatal(err) }
            log.Print("set option done")
        } else {
            err := s.SetSubscribe("")
            if err != nil { log.Fatal(err) }
            log.Print("set option done")
            err = s.Connect(endpoint)
            if err != nil { log.Fatal(err) }
            log.Print("dial done")
        }
        reply, err := s.Recv(0)
        if err != nil { log.Fatal(err) }
        log.Print("recv done")
        log.Print(reply)
    }
}

/*
publisher:

#!/usr/bin/python
import zmq
import time
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind("tcp://*:55555")

msgid = 0
while True:
    pub.send(str(msgid))
    msgid = (msgid+1)%100000
    time.sleep(0.1)
*/

zmq4: provide a log.Logger

instead of logging messages directly to log's default logger, we should provide a WithLogger option (that would default to log)

Subscribe() after Listen() doesn't works

When I create sub, and then Listen() and set OptionSubscribe, it never subscribe to accepted connections.

This is example of code which describe the issue:

        ctx := context.Background()
	sub := zmq4.NewSub(ctx)
	err := sub.Listen("tcp://*:1234") // listen
	if err != nil {
		log.Fatalf("could not listen: %v", err)
	}
	err = sub.SetOption(zmq4.OptionSubscribe, "") // subscribe for all messages
	if err != nil {
		log.Fatalf("could not subscribe to topic %v", err)
	}
	go func() {
		for {
			msg, err := sub.Recv() // never gets message because OptionSubscribe was not applied after connection was accepted.
			if err != nil {
				log.Printf("could not recv message: %v", err)
			} else {
				fmt.Printf("recv %+v\n", msg)
			}
		}
	}()

	pub := zmq4.NewPub(ctx)
	err = pub.Dial("tcp://*:1234")
	if err != nil {
		log.Fatalf("could not dial: %v", err)
	}
	for {
		msg := zmq4.NewMsgString("test abc")
		err := pub.Send(msg)
		if err != nil {
			log.Printf("could send message: %v", err)
		}
		fmt.Printf("msg send\n")
		time.Sleep(time.Second)
	}

Workaround: when I add one goroutine which in cycle call Subscribe() it works - but it is not effective and unreliable ...

        ctx := context.Background()
	sub := zmq4.NewSub(ctx)
	err := sub.Listen("tcp://*:1234")
	if err != nil {
		log.Fatalf("could not listen: %v", err)
	}
	err = sub.SetOption(zmq4.OptionSubscribe, "")
	if err != nil {
		log.Fatalf("could not subscribe to topic %v", err)
	}
	go func() {
		for {
			msg, err := sub.Recv()
			if err != nil {
				log.Printf("could not recv message: %v", err)
			} else {
				fmt.Printf("recv %+v\n", msg)
			}
		}
	}()

        // set subscription too all accepted socket
	go func() {
		for {
			err = sub.SetOption(zmq4.OptionSubscribe, "")
			if err != nil {
				log.Printf("could not subscribe to topic %v", err)
			}
			fmt.Printf("subscribe\n")
			time.Sleep(time.Second)
		}
	}()

	pub := zmq4.NewPub(ctx)
	err = pub.Dial("tcp://*:1234")
	if err != nil {
		log.Fatalf("could not dial: %v", err)
	}
	for {
		msg := zmq4.NewMsgString("test abc")
		err := pub.Send(msg)
		if err != nil {
			log.Printf("could send message: %v", err)
		}
		fmt.Printf("msg send\n")
		time.Sleep(time.Second)
	}

no reconnect possible when using `zmq4.NewPub` with `socket.Dial`

I have the following code:

socket := zmq4.NewPub(ctx)
socket.Dial(endpoint)
err := socket.SendMulti(zmq4.NewMsgFrom(partA, partB))

It works fine at first, but when I restart the other side of the connection at endpoint then there is neither an automatic reconnect nor can I do it manually, because the send method will not return an error.

My current workaround is to use reflection to check in the unexported conns field, if a connection is there, but I think there should be a better way to do this:

func isConnected(socket zmq4.Socket) bool {
	return reflect.ValueOf(socket).Elem().FieldByName("sck").Elem().FieldByName("conns").Len() != 0
}

I could think of the following solutions:

  • handle the reconnect internally
  • add an IsConnected() bool method to the Socket interface so the user can reconnect on connection loss
  • return an error from the SendMulti and Send methods to let the user know that connection is broken

consider using a pool of zmq4.Msg to reduce alloc pressure

right now, the Socket interface reads:

type Socket interface {
	// Send puts the message on the outbound send queue.
	// Send blocks until the message can be queued or the send deadline expires.
	Send(msg Msg) error

	// Recv receives a complete message.
	Recv() (Msg, error)

	...
}

internally, each time somebody calls sck.Recv() a new zmq4.Msg is created, anew.

we could perhaps consider going with:

type Socket interface {
	Recv(*Msg) error
}

so the Socket implementation wouldn't have to allocate (and let the user optionally use a pool).

alternatively:

  • leave the API of Socket as is,
  • use a pool internally (ie: inside the Socket implementation)
  • add a Release method to zmq4.Msg (that calls back to sync.Pool.Put)

Socket can only Listen to one endpoint

Hi!

Discovered while working on #85.

The socket struct does only allow for one listener per socket.

IMHO this is against the ZeroMQ idea.
Would classify that as a bug.

Could use that as the reason to revamp the whole listen/connect/reconnect mechanic.

Relates to #40.

Cheers
Guido

The frame is lost in request response

not sure if this is an issue or correct zmq semantics. In request response socket communication if the message is multiframe on sending side the first frame is lost on receiving side. To reproduce the issue just change the rrclient.go in example to send request like that:-

err := req.Send(zmq4.NewMsgFromString([]string {"Hello?","Hello!"}))

The result on receiving side will be
rrworker: 2020/11/02 08:02:42 received request: [Hello!]

The "Hello?" part of the message is lost

I have read that each request in zmq is prepended with the delimiter frame, may be that what confuses me, because I still expect the number of frames on sending side be equal to the number of frames receiving on side.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.