hashicorp / yamux Goto Github PK
View Code? Open in Web Editor NEWGolang connection multiplexing library
License: Mozilla Public License 2.0
Golang connection multiplexing library
License: Mozilla Public License 2.0
window server return ping ack : [0 2 0 2 0 0 0 0 0 0 0 1]
but client recive :[0 2 0 2 0 0 0 0 0 0 0 0]
id not correct ?
Consider a situation when we have a client-server connection and a keepalive on the client side fails. The underlying connection gets closed without sending GoAway message.
Now, on the server side, there's a race who notices the connection got closed first and calls (*Session).exitErr()
. If (*Session).recv
wins (I think it also applies to (*Session).send
) when (*Session).recvLoop
is in the io.ReadFull
call, the (*Session).exitErr()
is called with "connection reset by peer" net.Error
. That error is written to Session.shutdownErr
and will be returned by (*Session).Accept
on all subsequent calls.
And here's the problem. If that server Session
is used as a net.Listener
with http.Server
it will cause an infinite error loop. That's because "connection reset by peer" is a temporary error and those are retried indefinitely, see here.
Ideally, all yamux errors should implement net.Error
and set Temporary()
and Timeout()
appropriately, but that would be a breaking API change.
The backward compatible solution is to never return errors that implement net.Error
because the context where they originally happen and the context where they are returned by yamux differ and lead to erroneous situations like this one.
In practical terms, it simply means wrapping errors passed to (*Session).exitErr()
if they are not yamux errors.
I can submit a PR for this but I would first like to know if that's something you would accept.
I use yamux just for control flow, one stream per connection, and rely on net.Conn interface to communicate.
net.Conn in this case is a yamux.Stream
Any code that relies on net.Conn.Close() to do the right thing, can't really rely on that, as the session and hence the underlying connection is not closed, so things like closing and reopening a connection do not work.
Usually I end up wrapping yamux.Stream with something that closes the session as you close the stream, but I'd be nice if there was an option in the config to deal with that.
Hi @armon, @mitchellh, I was looking at this package in a little more detail and was wondering what the reasoning behind the deadline implementation is? I noticed that when setting a deadline it expects a certain point in time instead of a duration and I fail to understand why it is implemented this way?
Looking at this part of the code for example, I see that readDeadline
is checked to see if it has been set and if so the delay is calculated by subtracting the current time from the configured deadline time.
So when I open a new stream and then call stream.SetDeadline(time.Now().Add(30 * time.Second))
the readDeadline
would be set to 30 seconds in the future. So if my program starts sending and receiving data within 30 seconds from the time I opened the stream, all should be good right?
But say I communicate using the newly opened stream for about 60 seconds after which some local processing will be done before sending another request over the stream. It will then get to that same code part again, but this time it will subtract the current time from a time in the past making the duration negative and so time.After(delay)
will fire immediately giving me a timeout error.
So is it expected and designed to behave this way? Or do I misread/misunderstand how this is meant to be used? I would think that changing the deadlines to durations it could function as a generic timeout which works not only when the stream is opened and is waiting for it's first communication, but also in between communications. Additionally it could be used to make sure streams are cleaned up after not being used for the configured duration (when parallelism or the program layout make it hard to determine when to call Close()
on the stream safely).
Thanks,
Sander
This may be a misuse of yamux instead of a yamux issue, but I just wanted to let you know that the following code hangs for me in hijack() (really abortPendingRead() in go/src/net/http/server.go:3341) on go version devel +3067376 Mon Feb 26 22:10:51 2018 +0000 linux/amd64.
package main
import (
"context"
"flag"
"fmt"
"github.com/hashicorp/yamux"
"net"
"net/http"
"time"
)
var endpoint = flag.String("ep", "127.0.0.1:8000", "endpoint")
func client() {
// Get a TCP connection
conn, err := net.Dial("tcp", *endpoint)
if err != nil {
panic(err)
}
// Setup client side of yamux
session, err := yamux.Client(conn, nil)
if err != nil {
panic(err)
}
http.Serve(session, nil)
}
func server() {
// Accept a TCP connection
listener, err := net.Listen("tcp", *endpoint)
if err != nil {
panic(err)
}
conn, err := listener.Accept()
if err != nil {
panic(err)
}
// Setup server side of yamux
session, err := yamux.Server(conn, nil)
if err != nil {
panic(err)
}
tr := &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return session.Open()
},
}
client := &http.Client{Transport: tr}
resp, err := client.Get("http://127.0.0.1:8000/works")
if err != nil {
panic(err)
}
fmt.Println(resp)
fmt.Println("hanging in abortPendingRead()...")
resp, err = client.Get("http://127.0.0.1:8000/bug")
if err != nil {
fmt.Println("we should not see this because it should hang in abortPendingRead()")
fmt.Println(err)
}
fmt.Println(resp)
}
func main() {
flag.Parse()
http.HandleFunc("/works", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "this works")
})
http.HandleFunc("/bug", func(w http.ResponseWriter, r *http.Request) {
h, ok := w.(http.Hijacker)
if !ok {
fmt.Printf("does not support hijack")
}
c, _, err := h.Hijack()
if err != nil {
panic(err)
}
fmt.Fprintf(c, "this works")
c.Close()
})
go server()
time.Sleep(90 * time.Millisecond)
client()
}
MPL 2.0 requires everybody using this library to disclose source code. Apache2 would encourage more adoption.
I'm wondering Authentication is something yamux should/can support and if so how it should work like. I'm willing to submit a PR if guidance is provided.
I am working on a port-forwarding service to replace an existing sshd based service, as that one encrypts and decrypts data which is not needed in my case. I started working with yamux, and it looks like I have it basically working. My test scenario so far is putting a forwarded HTTP server under siege, and it fails after a couple of transfers. The number itself varies, usually after 20 to 100 MB yamux returns ErrRecvWindowExceeded. I looked into this, and in that case atomic.LoadUint32(&s.recvWindow) is always 0.
If I just disable the test below for ErrRecvWindowExceeded, everything works perfect. What could I do wrong here?
if length > atomic.LoadUint32(&s.recvWindow) {
s.session.logger.Printf("[ERR] yamux: receive window exceeded")
return ErrRecvWindowExceeded
}
I have been testing yamux and it does just what I need when looking at 5 to around 100 streams, however the application use case will required 10k or more streams per connection when I simulate this by connecting a client that just openstreams and starts sending as fast as it can on each stream (with an accept backlog of 5k) the accept rate for yamux is painfully slow taking around 3m to accept 10k streams. From what I have profiled it seems the locking interplay between data and accept is painful. Any plans to refactor or perhaps avoid some or all of the locking? I watched your video on the plugin arch and the pain you went through to build yamux so I don't want to have to re-implement myself ;)
Any pointers would be helpful
Thx
Leif
While spelunking I noticed that there's a loop that attempts to write all bytes of a header:
https://github.com/hashicorp/yamux/blob/master/session.go#L358
Looking at the Go documentation, a Write()
call that doesn't get all the bytes out will always return a non-nil error, so this loop can never run. We should just delete the loop.
at stream.readData():
if _, err := io.Copy(s.recvBuf, conn); err != nil {
s.session.logger.Printf("[ERR] yamux: Failed to read stream data: %v", err)
s.recvLock.Unlock()
return err
}
// Decrement the receive window
s.recvWindow -= length <-- this should be number of read bytes.
s.recvLock.Unlock()
when there is connection breaks io.Copy() will return partial reads.
I need to recover from errors when connection is broken.
So there should be some way to check why the session was closed at client side.
e.g
[ERR] yamux: Failed to read header: read tcp 10.0.1.10:55190->10.0.3.20:443: read: connection timed out
now read() and write() just return io.EOF
Hi all, I ran into an issue, and I wanted to get your opinion on what the correct behavior should be.
I have a yamux client that writes data to a Stream, and then closes the Stream, and also the Session.
The problem is that some times, all the data arrives at the other end, and other times, some data remains in the send queue when the underlying connection actually closes.
First, this causes this warning message:
[ERR] yamux: Failed to write header: tls: use of closed connection
Second, the data does not arrive at the other end.
This commit contains a unit test that shows what happens; the failure happens only sometimes because it is a race condition: yipal@a5eed1e
I noticed that the existing code finishes sending the data when the client calls Stream.Close(), so that makes me think
the correct behavior is for Session.Close() to also finish draining the send queue before actually closing the underlying connection.
go version : go version go1.11.1 linux/amd64
go test ./...
output:
2018/10/29 06:57:50 [ERR] yamux: keepalive failed: i/o deadline reached
2018/10/29 06:57:50 [ERR] yamux: keepalive failed: i/o deadline reached
2018/10/29 06:57:50 [WARN] yamux: failed to send ping reply: session shutdown
--- FAIL: TestSendData_Large (0.58s)
session_test.go:396: err: EOF
session_test.go:427: err: session shutdown
2018/10/29 06:58:06 [WARN] yamux: failed to send ping reply: connection write timeout
FAIL
FAIL github.com/hashicorp/yamux 17.765s
=== RUN TestGoAway
session_test.go:571: err: <nil>
2023/01/15 18:56:28 [WARN] yamux: frame for missing stream: Vsn:0 Type:1 Flags:1 StreamID:1 Length:0
--- FAIL: TestGoAway (0.00s)
FAIL
FAIL github.com/hashicorp/yamux 0.005s
FAIL
Hi, I'm having this weird problem when using yamux with http.ReverseProxy. The thing I want to achieve is:
:8888
for client to connectexample.org
:8080
, which would proxy to the clientThe weird part is that the first curl localhost:8080
would success (with 404 from example.org), but then all the following curl localhost:8080
would return 502 bad gateway, with the client prints out http: proxy error: context canceled
with each request sent.
Here's some clue:
DisableKeepAlives: true
to server's transport, the problem would go away too because each request would Dial a new connection thenBelow is the detailed code and steps to reproduce the problem:
server code:
package main
import (
"log"
"net"
"net/http"
"net/http/httputil"
"net/url"
"time"
"github.com/hashicorp/yamux"
)
func main() {
lis, err := net.Listen("tcp", ":8888")
if err != nil {
panic(err)
}
conn, err := lis.Accept()
if err != nil {
panic(err)
}
sess, err := yamux.Server(conn, nil)
if err != nil {
panic(err)
}
go func() {
u, err := url.Parse("http://whatever")
if err != nil {
panic(err)
}
rp := httputil.NewSingleHostReverseProxy(u)
rp.Transport = &http.Transport{
Dial: func(network string, addr string) (net.Conn, error) {
return sess.Open()
},
}
log.Println("serving at :8080")
if err := http.ListenAndServe(":8080", rp); err != nil {
panic(err)
}
}()
for {
time.Sleep(time.Second)
}
}
client code:
package main
import (
"log"
"net"
"net/http"
"net/http/httputil"
"net/url"
"github.com/hashicorp/yamux"
)
func main() {
conn, err := net.Dial("tcp", "localhost:8888")
if err != nil {
panic(err)
}
sess, err := yamux.Client(conn, nil)
if err != nil {
panic(err)
}
u, err := url.Parse("http://example.org")
if err != nil {
panic(err)
}
log.Println("serving")
if err := http.Serve(sess, httputil.NewSingleHostReverseProxy(u)); err != nil {
panic(err)
}
}
And do below in terminal to see the problem:
curl localhost:8080 // success
curl localhost:8080 // 502 bad gateway
I'm working on a reverse proxy server via websocket and this great library is used as a multiplexer over single ws connection. It works fine when proxying small http request but will panic if body is large.
panic: runtime error: index out of range [29934] with length 7680
goroutine 13 [running]:
bufio.(*Reader).Read(0xc000443920, {0xc0005fa000, 0x1e00, 0xc00014bcc0})
/usr/local/Cellar/go/1.17/libexec/src/bufio/bufio.go:218 +0x319
io.(*LimitedReader).Read(0xc00048e2a0, {0xc0005fa000, 0x153bb86, 0xc00014bd00})
/usr/local/Cellar/go/1.17/libexec/src/io/io.go:473 +0x45
bytes.(*Buffer).ReadFrom(0xc0000a11d0, {0x18eb320, 0xc00048e2a0})
/usr/local/Cellar/go/1.17/libexec/src/bytes/buffer.go:204 +0x98
io.copyBuffer({0x18ea9a0, 0xc0000a11d0}, {0x18eb320, 0xc00048e2a0}, {0x0, 0x0, 0x0})
/usr/local/Cellar/go/1.17/libexec/src/io/io.go:409 +0x14b
io.Copy(...)
/usr/local/Cellar/go/1.17/libexec/src/io/io.go:382
github.com/hashicorp/yamux.(*Stream).readData(0xc0000a20d0, {0xc00011c040, 0xc000000002, 0xc}, 0xc998, {0x18ea940, 0xc000443920})
/Users/hey/go/pkg/mod/github.com/hashicorp/[email protected]/stream.go:482 +0x21a
github.com/hashicorp/yamux.(*Session).handleStreamMessage(0xc0000d9080, {0xc00011c040, 0xc00011c040, 0xc})
/Users/hey/go/pkg/mod/github.com/hashicorp/[email protected]/session.go:550 +0x285
github.com/hashicorp/yamux.(*Session).recvLoop(0xc0000d9080)
/Users/hey/go/pkg/mod/github.com/hashicorp/[email protected]/session.go:501 +0x114
github.com/hashicorp/yamux.(*Session).recv(0xc0005148a0)
/Users/hey/go/pkg/mod/github.com/hashicorp/[email protected]/session.go:462 +0x1e
created by github.com/hashicorp/yamux.newSession
/Users/hey/go/pkg/mod/github.com/hashicorp/[email protected]/session.go:113 +0x49b
exit status 2
Lines 471 to 475 in 26ff87c
Maybe we should grow it's recv buffer
large enough to avoid panics? I have tried this modification and it never panics again. But I don't know whether makes it right.
if s.recvBuf.Cap() < int(length) {
s.recvBuf.Grow(int(length) - s.recvBuf.Cap())
}
I've been having some issues with yamux writes hanging lately. See goroutine 14008165 in: https://ipfs.io/ipfs/QmQn8YDTHWJzF7GVKUKanBzoxbzd5jQpwsgM2PfFVTKuGR
I think its also correlated with the internet connection of the machine its running on dropping during a write. Heres what i suspect is happening:
WAIT
code (where we see the write being stuck in the linked stack trace)I'm not sure if the above is exactly whats happening, but i'm quite confident that if we somehow ended up in the write wait loop after the stream has been closed, its possible that the sendNotifyCh signal got missed and we will block forever. To address that possibility, I think that we should close the notify channels when the streams get closed, so that they are always ready to receive on.
cc @slackpad
On the page of yamux, the words of function Accept and AcceptSteam are as follows:
func (s *Session) Accept() (net.Conn, error)
Accept is used to block until the next available stream is ready to be accepted.
func (s *Session) AcceptStream() (*Stream, error)
AcceptStream is used to block until the next available stream is ready to be accepted.
Both are the same. Maybe the right illustration of function Accept is:
Accept is used to block until the next available connection is ready to be accepted.
Having Yamux log errors means that software that uses this library have no real control over what messages are logged where. Since Yamux is already returning named errors, which basically contain the message being logged, it would be much nicer if that error could be handled at a higher level and logged there.
Hi everyone,
I just reviewed the library a little bit since I'm also interested in stream multiplexing (actually want something for websockets instead of plain TCP, but I guess yamux would work there too).
Overall it looks really well written and I like the fact that you have a spec :-)
One thing that I stumbled upon during review was there might be situations where the send timeout for the session might corrupt outgoing data: If we look at the waitForSendErr
function it will start the timout and try to send the data in parallel be handing it over to the send channel.
The select/timeout logic looks like it introduces a data race to me: If the timeout is expiring here the send will still be in progress. That means the data (header and body) will still be in use in the send routine. However the waitForSendErr
call returns with an the calling stream could think the data is no longer required and reuse it. It might probably not do it for sendHdr
as it sees that writes fail and no further writes and sendHdr
mutations follow, but the payload data slice might be reused by the application and mutated - while the send is still in progress. As an end result you could send data to the wire that was not desired.
Fixing this is probably not that easy. I guess that there probably could be some cancellation or sendInProgress flags in the enqueued sendReady
struct. If the message was not yet fetched by the writing proc it can be marked as cancelled and will there no longer be processed. However as soon as the writer goroutine catches it the safest thing is to let it write to completion as there is no kind of CancellationToken that can be passed to to the Write
or Copy
methods.
SESSION关闭后session.NumStreams()依然可以获取通道数?是不是有问题。
客户端连接后,产生两个流通道,然后客户端ctrl+c结束程序。服务端自动关闭session之后session.NumStreams()依然反回值是2
It's doing what it is supposed to be doing so we should squelch this to cut down on log spam.
At present, yamux.Config.Logger
is a pointer to a concrete struct. This makes it impossible to use popular libraries such as https://github.com/sirupsen/logrus.
Fortunately, most logging libraries implicitly satisfy the following interface:
type Logger interface {
Fatal(v ...interface{})
Fatalf(format string, v ...interface{})
Fatalln(v ...interface{})
Panic(v ...interface{})
Panicf(format string, v ...interface{})
Panicln(v ...interface{})
Prefix() string
Print(v ...interface{})
Printf(format string, v ...interface{})
Println(v ...interface{})
}
Config
should take an interface like the one above in lieu of *log.Logger
.
In relation to discussion with @mitchellh in hashicorp/go-plugin#27, I'm investigating the possibility of writing a NodeJS implementation of the protocol described in spec.md.
I'm not a licensing expert, but I get the basics. It's my understanding that the MPL applies to the implementation code only, and not necessarily the protocol. However, I'm also aware ports where the developer has access to the source code are in a little bit of a fuzzy area in regards to licensing.
I'd like to publish my port under the MIT License, since this is somewhat customary for the NPM package community, but will maintain the Mozilla Public License for this module if it is the desire of this module's author.
I will provide attribution for the protocol design no matter what license I publish the code under.
Can I publish my port under the MIT License?
After read about the stream receive window code, I found when a stream's receive window exceeded, the error will cause the session's recvLoop()
return and then the session shutdown.
If above is correct, when stream is individual, why a stream's receive window exceeded(usually caused by stream's consumer been stuck) make the session shutdown and then stop all other streams? Can we only close the problem stream and return the error message to it's consumer ?
Is this library thread-safe? If I Accept()
multiple streams from a single session, can I access different streams from different goroutines concurrently?
Hi
A Session
instance returns Stream
one from Accept
call as a net.Conn
here
Yet net.Conn
among the rest defines
// LocalAddr returns the local network address, if known.
LocalAddr() Addr
// RemoteAddr returns the remote network address, if known.
RemoteAddr() Addr
which Stream
seems like doesn't implement.
So I'm not really sure how this is compiled at all (while it indeed is)?
But my main concern here how can the accepted connection be passed to other consumers which are really supposed to have a pure net.Conn
?
Should I wrap with some stubs the connections myself? Or are there any intended solution for that?
Thx.
run TestPing on Linux
$ go test . -run '^TestPing$' -v
=== RUN TestPing
--- PASS: TestPing (0.00s)
PASS
ok github.com/hashicorp/yamux 0.002s
run TestPing on Windows
PS C:\Users\jinmiaoluo\repo\yamux> go test . -run '^TestPing$' -v
=== RUN TestPing
session_test.go:98: bad: 0s
--- FAIL: TestPing (0.00s)
FAIL
FAIL github.com/hashicorp/yamux 0.178s
FAIL
PS C:\Users\jinmiaoluo\repo\yamux>
debug on Linux
$ dlv test .
Type 'help' for list of commands.
(dlv) b ./session_test.go:94
Breakpoint 1 set at 0x56eb44 for github.com/hashicorp/yamux.TestPing() ./session_test.go:94
(dlv) c
> github.com/hashicorp/yamux.TestPing() ./session_test.go:94 (hits goroutine(8):1 total:1) (PC: 0x56eb44)
89: client, server := testClientServer()
90: defer client.Close()
91: defer server.Close()
92:
93: rtt, err := client.Ping()
=> 94: if err != nil {
95: t.Fatalf("err: %v", err)
96: }
97: if rtt == 0 {
98: t.Fatalf("bad: %v", rtt)
99: }
(dlv) p rtt
196033
(dlv)
debug on Windows
(dlv) b c:/users/jinmiaoluo/repo/yamux/session_test.go:94
Breakpoint 1 set at 0x5b8544 for github.com/hashicorp/yamux.TestPing() c:/users/jinmiaoluo/repo/yamux/session_test.go:94
(dlv) c
> github.com/hashicorp/yamux.TestPing() c:/users/jinmiaoluo/repo/yamux/session_test.go:94 (hits goroutine(8):1 total:1) (PC: 0x5b8544)
89: client, server := testClientServer()
90: defer client.Close()
91: defer server.Close()
92:
93: rtt, err := client.Ping()
=> 94: if err != nil {
95: t.Fatalf("err: %v", err)
96: }
97: if rtt == 0 {
98: t.Fatalf("bad: %v", rtt)
99: }
(dlv) p rtt
0
(dlv)
OpenStream call sendWindowUpdate, then sendWindowUpdate will sync window from client to server.
incrSendWindow and wirte will happen race. if incrSendWindow first run it is ok, else will lead sendWindow is not the same value to recvWindow, will out of sync.
// session_test.go
func TestSendData_Large(t *testing.T) {
cfg := testConf()
cfg.MaxStreamWindowSize = 4 * 1024
// ...
}
// const.go
const (
// initialStreamWindow is the initial stream window size
initialStreamWindow uint32 = 1 * 1024
)
=== RUN TestSendData_Large
--- FAIL: TestSendData_Large (5.00s)
session_test.go:415: short read: 1024
session_test.go:441: err: stream reset
panic: timeout [recovered]
panic: timeout
The initialStreamWindow const in yamux is 256 * 1024.
In some cases, the initial 256K window is too small and you have to wait for the window update.
This will result in a longer wait for the buffering.
So how can I enlarge the initialStreamWindow gracefully other than modifying yamux code.
Even if Config -> EnableKeepAlive and Config -> KeepAliveInterval are set and the connectivity is lost (i.e. internet goes down), Session.Accept()
returns no error.
I'm wondering if it's by design or if it is a bug. If Session.Accept doesn't return any error you can't implement fallback mechanism (e.g. redial the connection), isn't it?
I write a simple tcp tunnel
with yamux, and test the speed:
client --> server
): 64mbps ~ 75mbpsclient --> yamux client --stream--> yamux server --> server
) : 24mbps ~ 28mbpsFrom the rough test above, we may conclude that yamux speed down a lot of bandwidth. Is there any methods that can improve throughtput yamux?
Isn't possible to have multiple sessions under the same underlying connection? If so why would one session close all the others?
How do I fix this issue? Regular io.copy with net.Listen and net.Dial works but when trying net.Listen and session.Open(stream), I get the error yamux: Invalid protocol version: 123
io.Copy(conn, remote) go io.Copy(remote,conn)
Works
io.Copy(conn, stream) go io.Copy(stream ,conn)
yamux: Invalid protocol version: 123
The library currently uses bytes.Buffer for buffering data that was received from the network connection until it was fetched by the user through a Read()
call.
The problem with that is that all writes from the network will append to the end of the buffer. Reads will read from the current position and shift the read pointer to the end of the buffer, but the already read data will still stay at the beginning of the buffer - until the stream is closed.
This means e.g. for transferring a 100MB file through yamux the receivebuffer for the stream will grow to 100MB.
This could be fixed by utilizing a ring buffer, which is big enough to hold the maximum window size. Or a buffer of window size where each read copies remaining data to the front, resets the read pointer to 0 and resets the writer pointer according to it.
Currently GoAway is handled only on Session.Open. However there is also the case when the client sends a GoAway request. In that case the server should handle it on Accept(). A common use case is when the the client acts as a "server" (also known as back connection/ reverse connection)
This minimal example hangs on go 1.10 when I add log.Printf to hijackLocked() (net/http/server.go:300). It hangs on abortPendingRead() (net/http/server.go:692) when I can get it to hang. What are some possible issues with hijacking?
package main
import (
"context"
"flag"
"fmt"
"github.com/hashicorp/yamux"
"net"
"net/http"
"time"
)
var endpoint = flag.String("ep", "127.0.0.1:8000", "endpoint")
func client() {
// Get a TCP connection
conn, err := net.Dial("tcp", *endpoint)
if err != nil {
panic(err)
}
// Setup client side of yamux
session, err := yamux.Client(conn, nil)
if err != nil {
panic(err)
}
http.Serve(session, nil)
}
func server() {
// Accept a TCP connection
listener, err := net.Listen("tcp", *endpoint)
if err != nil {
panic(err)
}
conn, err := listener.Accept()
if err != nil {
panic(err)
}
// Setup server side of yamux
session, err := yamux.Server(conn, nil)
if err != nil {
panic(err)
}
tr := &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return session.Open()
},
}
client := &http.Client{Transport: tr}
resp, err := client.Get("http://127.0.0.1:8000/works")
if err != nil {
panic(err)
}
fmt.Println(resp)
fmt.Println("hanging in abortPendingRead()...")
resp, err = client.Get("http://127.0.0.1:8000/bug")
if err != nil {
fmt.Println("we should not see this because it should hang in abortPendingRead()")
fmt.Println(err)
}
fmt.Println(resp)
}
func main() {
flag.Parse()
http.HandleFunc("/works", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "this works")
})
http.HandleFunc("/bug", func(w http.ResponseWriter, r *http.Request) {
h, ok := w.(http.Hijacker)
if !ok {
fmt.Printf("does not support hijack")
}
c, _, err := h.Hijack()
if err != nil {
panic(err)
}
fmt.Fprintf(c, "this works")
c.Close()
})
go server()
time.Sleep(90 * time.Millisecond)
client()
}
Client()
and Server()
indicate "There must be at most one ...-side connection." Is this just per-conn, or does it mean something different? I don't see anything that would prevent there being multiple sessions on different conns, but hate to miss something basic and critical.
This is in relation to hashicorp/nomad#4455
I had a look at this today while I was on the hunt for dependency issues in our products.
Turns out the commit in question does exist and govendor fetch github.com/hashicorp/yamux@2658be15c5f05e76244154714161f17e3e77de2e
works fine: 2658be1
It came from this PR: (#53) which was merged and has a parent of:
However, the trees for master:
vs the tree for: https://github.com/hashicorp/yamux/commits/2658be15c5f05e76244154714161f17e3e77de2e
is different. Not sure what happened here but 683f491
and 3fc4056
are swapped in the two trees so all the hashes of the commits before them differ.
Hope this helps but I don't know the code base well enough to do surgery on it so I will leave it to the pros :). Looks like it might just be a rebase error.
If I read and understand this code correctly, it seems to me that it imposes a hard limit here.
So what are the options if you would like to use Yamux in a long running process that could use more than math.MaxUint32-1
streams? Most likely not all at the same time of course, but as the preferred way to manage stream ID's seem to be to increment the ID with 1
you will hit the limit sooner or later.
So could this logic be updated/changed without breaking existing stuff? And/or are there other ways to (safely) be able to reuse old ID's of streams that aren't used anymore?
I am trying to build a small client server, where the server is reachable via client, however client is not reachable via server. We need both ways duplex streams where either side can initiate a connection and send messages. Looking at documentation it seems YAMUX will fit this use case, however I dont see how this could be achieved. Any guidance will be helpful.
over on the ipfs project we updated the version of yamux we were using to the latest master from 9feabe6 about a month ago. Since then we started noticing random hanging issues, which i got around to debugging today and it looks like the culprit is the synCh
channel on the session. Commits that don't contain that logic work just fine, but every commit i've tried with it has hung.
It looks to be an issue with not replacing tokens in the semaphore in all needed cases, maybe a call to Close
isnt actually releasing a stream the way it should?
Increasing the AcceptBacklog
in the config 'fixes' the problem, but that just seems like its just buying me more time until it hangs again.
I try yamux
here's my yamux client
package main
import (
"log"
"net"
"strconv"
"time"
"github.com/hashicorp/yamux"
)
func main() {
addr := "127.0.0.1:3333"
conn, _ := net.Dial("tcp", addr)
// Setup client side of yamux
session, err := yamux.Client(conn, nil)
if err != nil {
panic(err)
}
for i := 1; i <= 100; i++ {
go testC(session, strconv.Itoa(i))
}
time.Sleep(60 * time.Second)
conn.Close()
}
func testC(session *yamux.Session, st string) {
// Open a new stream
stream, err := session.Open()
if err != nil {
panic(err)
}
// stream.SetDeadline(time.Now().Add(3))
_, err = stream.Write([]byte(st))
if err != nil {
panic(err)
}
reply := make([]byte, 7000)
_, err = stream.Read(reply)
if err != nil {
panic(err)
}
log.Println("STRING " + st + ":" + string(reply))
}
and here's my server
package main
import (
"bufio"
"fmt"
"log"
"net"
)
func handleConnection(conn net.Conn) {
fmt.Println("Handling new connection...")
// Close connection when this function ends
defer func() {
fmt.Println("Closing connection...")
conn.Close()
}()
// timeoutDuration := 5 * time.Second
bufReader := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
for {
// Set a deadline for reading. Read operation will fail if no data
// is received after deadline.
// conn.SetWriteDeadline(time.Now().Add(timeoutDuration))
// Read tokens delimited by newline
bytes, err := bufReader.ReadString('\n')
if err != nil {
fmt.Println(err)
return
}
_, err = bufReader.WriteString(bytes)
if err != nil {
log.Println(err)
return
}
err = bufReader.Flush()
if err != nil {
log.Println("Flush failed.", err)
}
if string(bytes) == "exit" {
log.Println("should be exit")
conn.Close()
}
fmt.Printf("%s", bytes)
}
}
func main() {
// Start listening to port 3333 for TCP connection
listener, err := net.Listen("tcp", ":3333")
if err != nil {
fmt.Println(err)
return
}
defer func() {
listener.Close()
fmt.Println("Listener closed")
}()
for {
// Get net.TCPConn object
conn, err := listener.Accept()
if err != nil {
fmt.Println(err)
break
}
go handleConnection(conn)
}
}
but why it can't send message to server?
is both server and client should implement yamux?
or any issue on my code? Thanks
Essentially providing Temporary() and Timeout()
Please, what is intended graceful disconnection flow?
Trying to close just a client side leads to EOF
s and keepalive timeouts
. So is there design supposed scenario or I've to manage it on my app explicitly?
Thx
I don't think I fully understand something. I telnet to code below but I get the error [ERR] yamux: Invalid protocol version: 102
. What exactly is wrong?
Here is my code
import (
"net"
"github.com/hashicorp/yamux"
)
func main() {
server()
}
func server() {
listener, _ := net.Listen("tcp", "0.0.0.0:1234")
for {
// Accept a TCP connection
conn, err := listener.Accept()
if err != nil {
panic(err)
}
// Setup server side of smux
session, err := yamux.Server(conn, nil)
if err != nil {
panic(err)
}
// Accept a stream
stream, err := session.Accept()
if err != nil {
panic(err)
}
// Listen for a message
buf := make([]byte, 4)
stream.Read(buf)
stream.Close()
session.Close()
}
}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.