Giter VIP home page Giter VIP logo

cmux's People

Contributors

acomagu avatar bufdev avatar dmitshur avatar ekle avatar emosbaugh avatar golint-fixer avatar jan25 avatar maguro avatar soheilhy avatar tamird avatar tmm1 avatar yaojingguo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cmux's Issues

gPRC and HTTP/2 health check with TLS

I can't get gPRC and HTTP/2 health checks (both with TLS) to work. I believe the prescribed approach is to follow the RecursiveCmux example; we need to handle the TLS handshake before we can match on HTTP2HeaderFieldPrefix for gRPC (all other TLS traffic would be routed to the HTTP/2 health check server).

The problem then arises that, for the HTTP/2 health check server, we can't use http.ListenAndServeTLS since the TLS is already resolved using tls.NewListener. So we must use http.Serve. http.Serve documentation says:

HTTP/2 support is only enabled if the Listener returns *tls.Conn connections and they
were configured with "h2" in the TLS Config.NextProtos.

Adding "h2" to the TLS Config.NextProtos is straight forward enough. However, even though tls.NewListener returns a listener that returns *tls.Conn connections, it appears that when this listener is wrapped in the CMux object, it modifies the listener to return MuxConn connections rather than *tls.Conn. This breaks things. When I execute:

curl https://localhost:443 --http2 -Ik

I get a response of

curl: (16) Error in the HTTP2 framing layer

Is it possible to serve gPRC and HTTP/2 health checks (both with TLS) using the cmux package?

cmux.CMux stutters

I'm surprised that golint doesn't complain, perhaps a better name is cmux.Server?

Compatible issues with HTTP Keep-Alive option, and TCP connection reuse

I saw the comments/documents on the matchers: https://github.com/soheilhy/cmux/blob/master/matchers.go#L137 (HTTP1HeaderFieldPrefix returns a matcher matching the header fields of the first request of an HTTP 1 connection. If the header with key name has a value prefixed with valuePrefix, this will match.), this will cause issues with HTTP Keep-Alive option, and some connection pool, actually i am already experiencing problem with Chrome and istio-proxy/envoy.

Explanation:

  1. I have 1st http server, and which has the matcher cmux.HTTP1HeaderFieldPrefix("User-Agent", "xxx/m.n"), and which has the endpoint /api/a
  2. I have 2nd http server, and which has the matcher cmux.HTTP1Fast(), and which has the endpoint /api/b
  3. I have 3rd grpc server, and which has the matcher cmux.HTTP2HeaderField("content-type", "application/grpc")

When the client turn on the Keep-Alive, or has the connection pool(only tag the connection by host+port), to reuse the underlying TCP connection, then this will happen:

  • First request WITH the "User-Agent": "xxx/m.n", and call /api/a, then the TCP connection will be built and connected to above server 1, it will work fine;
  • Second request WITHOUT the "User-Agent": "xxx/m.n", and call /api/b which is supposed to go to above server 2, but since client directly reuse the first connection(host+port is same, and no matcher check will happen), now the request will go to hit the above server 1, which will result 404
  • Similar stuff will happen to server 3 also

I know the root cause is the client doesn't tag the connection properly, for our own connection pool we can do that, but for those public client, like Chrome, like those sidecar proxy product, if they are not configurable on how to tag one connection, then we can do nothing.

What is the solution for this kind of case, what are you guys' suggestions? Pls help.

@soheilhy @tamird @acomagu @tmm1

panic due to index out of range if a request came in too early

Could be related to #29 and #30

I have a server (gRPC, HTTP, and HTTPs) that is intended to run on Kubernetes, relevant code:

// code  in main() simplified.
var serverClosed chan bool
listenAddr := ":8091"
func main() {
    serverClosed = make(chan bool)
    srvListener, _ = net.Listen("tcp", listenAddr)
    mainServer := &http.Server{TLSConfig: &tls.Config{}} // tls configured
    healthCheckServer := &http.Server{}
    go startServer(mainServer, healthCheckServer, srvListener)

    // wait for the servet to exit
    // this is actually done because I have another goroutine that updates the certificate 
    // through let's encrypt and then closes srvListener, changes the TLSConfig on the
    // mainServer and calls startServer again. It's not visible here because the server is crashing
    // before that goroutine even run (it runs twice a day).
    <- serverClosed
}

func startServer(mainServer, healthCheckServer *http.Server, srvListener net.Listener) {
  log.Printf("starting the HTTP/HTTPS/gRPC server bound to %q", listenAddr)
  // create a new connection multiplexer.
  m := cmux.New(srvListener)
  // we first match on HTTP 1.1 methods.
  httpl := m.Match(cmux.HTTP1Fast())
  // if not matched, we assume that its TLS.
  tlsl := m.Match(cmux.Any())
  // start the mainServer
  go mainServer.Serve(tls.NewListener(tlsl, mainServer.TLSConfig))
  // start the healthCheckServer
  go healthCheckServer.Serve(httpl)
  // boot the connection multiplexer, this should return once the srvListener
  // is closed.
  m.Serve()

  select {
  case serverClosed <- true:
  default:
  }
}

Having a pod health check setup as follows:

          livenessProbe:
            tcpSocket:
              port: 8091
            initialDelaySeconds: 5
            timeoutSeconds: 1
          readinessProbe:
            httpGet:
              port: 8091
              path: /healthz
            initialDelaySeconds: 5
            timeoutSeconds: 1

Given the initialDelaySeconds=5, I assume the server has no time to setup all of the matchers and:

panic: runtime error: index out of range

goroutine 86 [running]:
panic(0xf203a0, 0xc820014010)
        /usr/local/go/src/runtime/panic.go:481 +0x3e6
github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux.(*ptNode).match(0xc820405590, 0xc82051d6b8, 0x0, 0x8, 0x1, 0x0)
        /go/src/github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux/patricia.go:162 +0x1f0
github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux.(*patriciaTree).matchPrefix(0xc82040ea00, 0x7f5a1e877b08, 0xc820272550, 0x40cfca)
        /go/src/github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux/patricia.go:52 +0x9f
github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux.(*patriciaTree).(github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux.matchPrefix)-fm(0x7f5a1e877b08, 0xc820272550, 0x7f5a1e8675b0)
        /go/src/github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux/matchers.go:37 +0x34
github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux.matchersToMatchWriters.func1(0x7f5a1e8675b0, 0xc820022508, 0x7f5a1e877b08, 0xc820272550, 0xc820022508)
        /go/src/github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux/cmux.go:112 +0x32
github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux.(*cMux).serve(0xc82046a200, 0x7f5a1e866e90, 0xc820022508, 0xc8204691a0, 0xc82051d700)
        /go/src/github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux/cmux.go:168 +0x33e
created by github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux.(*cMux).Serve
        /go/src/github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux/cmux.go:158 +0x188

Changing initialDelaySeconds to 20 seconds solves the problem.

It's hard to extend protocol

some protocols are private and strange, so I'd like provide a method to write customer defined protocol Matcher.

HTTP2SendSettings?

Methods exist for

  • HTTP2MatchHeaderFieldPrefixSendSettings()
  • HTTP2MatchHeaderFieldSendSettings()

But there is not a SendSettings option for HTTP2(). Does the existing HTTP2 method not have the same conflict with java gRPC clients as HTTP2MatchHeaderField and HTTP2MatchHeaderFieldPrefix (which require the SendSettings alternatives)?

I would like to match only on the protocol, but need to maintain support for java clients. Is this possible?

Notice: grpc-go will stop sending RPCs before HTTP/2 SETTINGS frame is received

This is a notice that grpc-go intends to change in a way that we know will break the way cmux currently works by default. This will bring grpc-go in line with grpc-java's behavior, and C/wrapped languages will be following suit as well. Details and justification for the change can be found in grpc/grpc#17006. grpc-go's migration plan is proposed in grpc/grpc-go#2406. Please feel free to comment in the appropriate PR/issue for questions or concerns about this. Apologies in advance for the breaking change.

Matching a no-op tcp connection will blocking for-ever

Once a tcp connection established, but without send any data. Like nc localhost 8080 with no latter
operations.

This may cause the matching process to block for-ever until a min length of data is received and the connection may never receives a stop signal(donec). Issue code line :

cmux/cmux.go

Line 184 in 8a8ea3c

matched := s(muc.Conn, muc.startSniffing())

Cmux example

You're example has a tiny bug, which tripped me up for a few days. You're matchers are case sensitive. So

// Match connections in order:
// First grpc, then HTTP, and otherwise Go RPC/TCP.
- grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
+ grpcL := m.Match(cmux.HTTP2HeaderField("Content-type", "application/grpc"))
httpL := m.Match(cmux.HTTP1Fast())
trpcL := m.Match(cmux.Any()) // Any means anything that is not yet matched.

metrics for mux usage

many thanks for this mux - extremely useful and bulletproof.

I have a framework that muxes grpc and http over plaintext and TLS.
I'd like to add prometheus metrics to count the number of incoming connections on each mux but I can't figure out where to add that.
Can anyone steer me in the right direction?

Refactor cmux.CMux into a public struct?

I am partial to interfaces as well, but two things here:

  1. Interfaces are not backwards compatible, ie if you add a method to an interface, it breaks the SemVer minor/patch contract.
  2. It saves a vtable lookup (I know this doesn't matter that much in this context since none of the functions are in the hot path, but yea).

Thoughts?

cmux leaks memory over long term use

The leak happened again. The same condition .
I did a pprof this is what i found.
(cmux.serve) -> HTTP2MatchHeaderFieldSendSettings->matchHTTP2Field->ReadFrame->readFrameHeader->io.ReadFull->ReadAtLeast->bufferedReader.Read->s.buffer.Write->bytes.(*Buffer).grow->bytes.makeSlice

same path as before.

Do you think a rouge client causes a connection leak even after the connection is closed. or a lot of rouge clients trying to connect but since due to bad authorization the connection is rejected but having their BufferReader bytes never freed by garbage collector even after connection being rejected.

The current usage i have seen is a inuse space not a alloc space that means it is still there not being collected by go garbage collector . One thing that is hard to reproduce is when does this happen. The last fix you gave would terminate any bad connections immediately since we are checking the continuos frame as well. But we have seen leak happening in production and server nodes shutting down after 10 days due to out of memory.

Merge the valuable bits of cockroachdb/cmux

cockroachdb/cmux is a strange mix of dubious changes and fixes

Would you mind looking at it and merging back what's worthwhile? I'd love to forget cockroachdb/cmux ever existed

cmux leaks goroutines

Hey there, we're using cmux to serve HTTP and gRPC together but our application is receiving a very large amount of requests (both gRPC and HTTP). We see tons of goroutines leaking around like:

   813 goroutine 754 [IO wait, 215 minutes]:
   814 internal/poll.runtime_pollWait(0x7eff86b282b0, 0x72, 0x0)
   815         /usr/lib/golang/src/runtime/netpoll.go:173 +0x57
   816 internal/poll.(*pollDesc).wait(0xc420329718, 0x72, 0xffffffffffffff00, 0x341d980, 0x340d       888)
   817         /usr/lib/golang/src/internal/poll/fd_poll_runtime.go:85 +0xae
   818 internal/poll.(*pollDesc).waitRead(0xc420329718, 0xc42172a000, 0x1000, 0x1000)
   819         /usr/lib/golang/src/internal/poll/fd_poll_runtime.go:90 +0x3d
   820 internal/poll.(*FD).Read(0xc420329700, 0xc42172a000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
   821         /usr/lib/golang/src/internal/poll/fd_unix.go:125 +0x18a
   822 net.(*netFD).Read(0xc420329700, 0xc42172a000, 0x1000, 0x1000, 0x13ce, 0xc420956000, 0x13       ce)
   823         /usr/lib/golang/src/net/fd_unix.go:202 +0x52
   824 net.(*conn).Read(0xc4207b2158, 0xc42172a000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
   825         /usr/lib/golang/src/net/net.go:176 +0x6d
   826 github.com/kubernetes-incubator/cri-o/vendor/github.com/soheilhy/cmux.(*bufferedReader).       Read(0xc4207be380, 0xc42172a000, 0x1000, 0x1000, 0xc42098f8c8, 0x74db11, 0xc4202e9688)
   827         /root/go/src/github.com/kubernetes-incubator/cri-o/_output/src/github.com/kubern       etes-incubator/cri-o/vendor/github.com/soheilhy/cmux/buffer.go:53 +0x144
   828 github.com/kubernetes-incubator/cri-o/vendor/github.com/soheilhy/cmux.(*MuxConn).Read(0x       c4207be370, 0xc42172a000, 0x1000, 0x1000, 0x0, 0x13ce, 0x1c6e)
   829         /root/go/src/github.com/kubernetes-incubator/cri-o/_output/src/github.com/kubern       etes-incubator/cri-o/vendor/github.com/soheilhy/cmux/cmux.go:259 +0x4f
   830 net/http.(*connReader).Read(0xc4202e9680, 0xc42172a000, 0x1000, 0x1000, 0xc420460380, 0x       1cee4a0, 0xc420460380)
   831         /usr/lib/golang/src/net/http/server.go:753 +0x105
   832 bufio.(*Reader).fill(0xc4200599e0)
   833         /usr/lib/golang/src/bufio/bufio.go:97 +0x11a
   834 bufio.(*Reader).ReadSlice(0xc4200599e0, 0x46860a, 0x48d7dd, 0xc42098fa20, 0xfffffffe1104       c0ba, 0xc42098fa60, 0x48da77)
   835         /usr/lib/golang/src/bufio/bufio.go:338 +0x2c
   836 bufio.(*Reader).ReadLine(0xc4200599e0, 0x100, 0xf8, 0x2157540, 0x468676, 0x22002098faa8,        0xf8)
   837         /usr/lib/golang/src/bufio/bufio.go:367 +0x34
   838 net/textproto.(*Reader).readLineSlice(0xc4202e96b0, 0xc42098fb20, 0xc42098fb20, 0x41b6d8       , 0x100, 0x2157540)
   839         /usr/lib/golang/src/net/textproto/reader.go:55 +0x70
   840 net/textproto.(*Reader).ReadLine(0xc4202e96b0, 0xc420750800, 0x0, 0x0, 0xc42098fba8)
   841         /usr/lib/golang/src/net/textproto/reader.go:36 +0x2b
   842 net/http.readRequest(0xc4200599e0, 0x0, 0xc420750800, 0x0, 0x0)
   843         /usr/lib/golang/src/net/http/request.go:925 +0x99
   844 net/http.(*conn).readRequest(0xc42096c280, 0x3435900, 0xc4207e4880, 0x0, 0x0, 0x0)
   845         /usr/lib/golang/src/net/http/server.go:933 +0x17c
   846 net/http.(*conn).serve(0xc42096c280, 0x3435900, 0xc4207e4880)
   847         /usr/lib/golang/src/net/http/server.go:1739 +0x50e
   848 created by net/http.(*Server).Serve
   849         /usr/lib/golang/src/net/http/server.go:2720 +0x288

Do you guys know what's happening?

Doesn't work with grpc-go's hello world example

If I change https://github.com/grpc/grpc-go/blob/81b95b1854d7caf3cc21aed316fc222e1749cf31/examples/helloworld/greeter_server/main.go#L53 to:

m := cmux.New(lis)
go func() {
	if err := s.Serve(m.Match(cmux.Any())); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}()
m.Serve()

This works as expected. The client side prints 2020/12/12 09:37:44 Greeting: Hello world. But if I add the content-type matcher,

m := cmux.New(lis)
go func() {
	if err := s.Serve(m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}()
m.Serve()

The client cannot connect and blocks forever. In this switch

switch f := f.(type) {
I see that the case *http2.SettingsFrame: is being taken but not case *http2.ContinuationFrame: or case *http2.HeadersFrame: and I am guessing either of those two branches are supposed to be taken. Unit tests in this repo are passing. I'm not sure what is wrong.

Multiplexing gRPC & HTTP handlers with H2C+Prior Knowledge

Background: Services on Cloud Run can only expose 1 port. If you want to expose a combination of gRPC and a regular REST API, you have to multiplex them. (e.g. using cmux)

In a standard setup, Cloud Run downgrades HTTP/2 requests to HTTP/1, and the setup with cmux seems works fine.
However, if you want to support gRPC streaming, you have to enable End-to-end HTTP/2 support on your Cloud Run service. The Cloud Run instructions say:

Your Cloud Run service must handle requests in HTTP/2 cleartext (h2c) format, because TLS is still terminated automatically by Cloud Run. To confirm that your service supports h2c requests, test the service locally using this cURL command: curl -i --http2-prior-knowledge http://localhost:PORT

So, my current setup is:

gsrv := grpc.NewServer(...)
hsrv := &http.Server{ Handler: h2c.NewHandler(..., &http2.Server{})}
srv := cmux.New(conn)
go gsrv.Serve(srv.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")))
go hsrv.Serve(srv.Match(cmux.Any()))

This unfortunately does not work (even locally, no Cloud Run involved):

  • Without the gRPC matcher, normal REST (i.e. hsrv, using curl --http2-prior-knowledge) works fine
  • Once I add the gRPC matcher, gRPC works, but normal REST (using curl --http2-prior-knowledge) no longer works: I get the error http2: server connection error from [::1]:64054: connection error: PROTOCOL_ERROR

Without knowing the code or HTTP/2, my guess from what I'm seeing is that the cmux matcher sends a http2 settings frame during negotiation, but when it still fails to match gRPC and falls back to the next 'regular' http2 handler, the http2 handler starts negotiation from scratch and sends its own settings frame, which causes problems

Cannot check closed error with errors.Is

I have a gRPC server behind cmux. In this server, I follow graceful shutdown practices.

So when the program receives a termination signal, I stop servers in this order:

	go func() {
		<-ctx.Done()
		grpcServer.GracefulStop()
		mux.Close()
	}()

However, my grpcServer.Serve(grpcListener) receives an error like:

mux: Server closed

This is not great, because I am trying to do this and I do not have a way of seemingly checking this error with errors.Is or with something like err == cmux.ErrServerClosed because no such type is defined.

 	go func() {
		defer wg.Done()
		if err := grpcServer.Serve(grpcLis); err != nil && !errors.Is(err, http.ErrServerClosed) {
			log.Fatal("grpc: server failed", zap.Error(err))
		}
		log.Debug("grpc: server closed without error")
	}()

I know docs say users can just do go grpcServer.Serve(grpcListener) but I do not find that to be reliable enough.

Serve http + tcp with cmux: TCP server read incomplete message

Hi

I am trying to serve a http and tcp server with cmux. The http server works great but I can not fix a problem with the tcp server.

The tcp server for testing is a simple echo service. The problem is after client is connected and send out the first message, the server will see fragmented message (which is read by two successive conn.Read() )

Attached the source code.
Server: cmux_http_tcp.go

package main

import (
	"github.com/soheilhy/cmux"
	"net"
	"net/http"
	"fmt"
	"time"
)

func tcpServer (l net.Listener) {
	// echo service
	for {
		conn, _ := l.Accept()
		fmt.Println("tcp connected")
		go func() {
			for {
				data := make([]byte, 100)
				conn.SetReadDeadline(time.Now().Add(100*time.Second))
				n, err := conn.Read(data)
				if err==nil {
					fmt.Println(string(data[:n]))
					conn.Write(data)
				}
			}
		}()
	}
}

type exampleHTTPHandler struct{}

func (h *exampleHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, "example http response")
}

func serveHTTP(l net.Listener) {
    s := &http.Server{
        Handler: &exampleHTTPHandler{},
    }
    if err := s.Serve(l); err != cmux.ErrListenerClosed {
        panic(err)
    }
}


func both(l net.Listener) {
	m := cmux.New(l)
	httpL := m.Match(cmux.HTTP1Fast())
	tcpL := m.Match(cmux.Any())

	go tcpServer(tcpL)
	//go httpServer(httpL)
	go serveHTTP(httpL)

	m.Serve()
}

func main() {
	l, err := net.Listen("tcp", ":3999")
	if err!=nil {
		panic(err.Error())
	}
	fmt.Println("server started")

	// only tcp
	//tcpServer(l)

	// only http
	//serveHTTP(l)

	// both
	both(l)

}

Client: cmux_tcp_client.go

package main

import (
	"net"
	"fmt"
	"time"
)

func keepWriting(conn net.Conn) {
	for {
		// request - send something
		_, err := conn.Write([]byte("hello tcp server"))
		fmt.Printf("sent: hello tcp server\n")
		if err != nil {
			fmt.Println(err.Error())
		}
		time.Sleep(5*time.Second)
	}
}

func keepReading(conn net.Conn) {
	for {
		response := make([]byte, 100)
		n, err := conn.Read(response)
		if err != nil {
			fmt.Println(err.Error())
		}
		fmt.Printf("received: %v\n", string(response[:n]))
	}
}

func main() {
	// make address
	tcpAddr, err := net.ResolveTCPAddr("tcp", "localhost:3999")
	if err!=nil {
		fmt.Println(err.Error())
	}

	// connect
	conn, err := net.DialTCP("tcp", nil, tcpAddr)
	if err != nil {
		fmt.Println(err.Error())
	}
	fmt.Println("connected to server")

	go keepWriting(conn)
	keepReading(conn)

}

the client log:

➜  networking git:(master) ✗ go run cmux_tcp_client.go
connected to server
//*** the first message is fragmented ***
sent: hello tcp server
received: hello tc
received: p server
//******* following messages are complete *******************
sent: hello tcp server
received: hello tcp server
^Csignal: interrupt

BTW I tested the tcp server separately (uncomment tcpServer(l) in server side) and everything works well. Please help.

Edit: I know TCP is not a packet protocol that provide 1:1 packet relationship between server and client ... My question is whether the fragmentation is done by to cmux? Why only the first message is fragmented and the following ones are not?

Server unexpectedly doesn't receive traffic from cmux.Any()

I have a server that receives traffic from a standard tcp listener, but doesn't receive traffic from a cmux.Any() connection derived from the same listener. I have a small, reproducible case:

This works (i.e. can receive traffic):

  listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
  if err != nil {
    logger.Fatal("Could not listen on port.")
  }

  go grpcServer.Start(listener) // Works - connections are received and processed.

This doesn't work (i.e. connection hangs when making a request):

  listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
  if err != nil {
    logger.Fatal("Could not listen on port.")
  }
  m := cmux.New(listener)
  grpcListener := m.Match(cmux.Any())

  go grpcServer.Start(grpcListener) // Doesn't work - connections hang.

cmux fails to compile, what are the correct libs/versions for usage ?

Hi,

I can't find the precise libs and respective versions required to run this project, so I've saved a godeps package with my current ones.

However when I run the tests, I'm facing the following compilation problems.
Could you advise me, what are the proper libs I should have installed in order to by pass this ?

Cheers.

$ godep go test
# cmux/vendor/google.golang.org/grpc/transport
vendor/google.golang.org/grpc/transport/http2_client.go:840: undefined: http2.MetaHeadersFrame
vendor/google.golang.org/grpc/transport/http2_client.go:943: undefined: http2.MetaHeadersFrame
vendor/google.golang.org/grpc/transport/http2_server.go:145: undefined: http2.MetaHeadersFrame
vendor/google.golang.org/grpc/transport/http2_server.go:281: undefined: http2.MetaHeadersFrame
vendor/google.golang.org/grpc/transport/http_util.go:382: f.fr.ReadMetaHeaders undefined (type *http2.Framer has no field or method ReadMetaHeaders)
vendor/google.golang.org/grpc/transport/http_util.go:512: f.fr.ErrorDetail undefined (type *http2.Framer has no field or method ErrorDetail)
FAIL    cmux [build failed]
godep: go exit status 2

This is the godeps I've got after git clone from master and run godep save:

{
	"ImportPath": "cmux",
	"GoVersion": "go1.7",
	"GodepVersion": "v74",
	"Deps": [
		{
			"ImportPath": "github.com/golang/protobuf/proto",
			"Rev": "7cc19b78d562895b13596ddce7aafb59dd789318"
		},
		{
			"ImportPath": "github.com/soheilhy/cmux",
			"Comment": "v0.1.2",
			"Rev": "bf4a8ede9e87c006fe1d4278c6c7f2b8be1fa84c"
		},
		{
			"ImportPath": "golang.org/x/net/context",
			"Rev": "cbbbe2bc0f2efdd2afb318d93f1eadb19350e4a3"
		},
		{
			"ImportPath": "golang.org/x/net/http2",
			"Rev": "cbbbe2bc0f2efdd2afb318d93f1eadb19350e4a3"
		},
		{
			"ImportPath": "golang.org/x/net/http2/hpack",
			"Rev": "cbbbe2bc0f2efdd2afb318d93f1eadb19350e4a3"
		},
		{
			"ImportPath": "golang.org/x/net/internal/timeseries",
			"Rev": "cbbbe2bc0f2efdd2afb318d93f1eadb19350e4a3"
		},
		{
			"ImportPath": "golang.org/x/net/trace",
			"Rev": "cbbbe2bc0f2efdd2afb318d93f1eadb19350e4a3"
		},
		{
			"ImportPath": "golang.org/x/net/websocket",
			"Rev": "cbbbe2bc0f2efdd2afb318d93f1eadb19350e4a3"
		},
		{
			"ImportPath": "google.golang.org/grpc",
			"Comment": "v1.0.2-30-g9eaed1a",
			"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
		},
		{
			"ImportPath": "google.golang.org/grpc/codes",
			"Comment": "v1.0.2-30-g9eaed1a",
			"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
		},
		{
			"ImportPath": "google.golang.org/grpc/credentials",
			"Comment": "v1.0.2-30-g9eaed1a",
			"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
		},
		{
			"ImportPath": "google.golang.org/grpc/examples/helloworld/helloworld",
			"Comment": "v1.0.2-30-g9eaed1a",
			"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
		},
		{
			"ImportPath": "google.golang.org/grpc/grpclog",
			"Comment": "v1.0.2-30-g9eaed1a",
			"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
		},
		{
			"ImportPath": "google.golang.org/grpc/internal",
			"Comment": "v1.0.2-30-g9eaed1a",
			"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
		},
		{
			"ImportPath": "google.golang.org/grpc/metadata",
			"Comment": "v1.0.2-30-g9eaed1a",
			"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
		},
		{
			"ImportPath": "google.golang.org/grpc/naming",
			"Comment": "v1.0.2-30-g9eaed1a",
			"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
		},
		{
			"ImportPath": "google.golang.org/grpc/peer",
			"Comment": "v1.0.2-30-g9eaed1a",
			"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
		},
		{
			"ImportPath": "google.golang.org/grpc/transport",
			"Comment": "v1.0.2-30-g9eaed1a",
			"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
		}
	]
}

Ability to stop cmux listeners

Feature request:
Right now once we start cmux.Serve is there a way to clean shut down the cmux listeners and all its internal listeners it starts. So that way during the whole service shut down we call cmux.Stop and make sure all the listeners are shut down clean.

socks5 matcher not working

I wrote a socks5 matcher like TLS,just match one byte

func SOCKS5(versions ...int) Matcher {
	if len(versions) == 0 {
		versions = []int{
			0x05,
		}
	}
	prefixes := [][]byte{}
	for _, v := range versions {
		prefixes = append(prefixes, []byte{byte(v)})
	}
	return prefixByteMatcher(prefixes...)
}

when I put socks5 matcher after HTTP1Fast matcher, it is not working, not reach the socks5 server handler
but put it before http matcher, it's working

// failed!
httpL := m.Match(cmux.HTTP1FastOptions())
socksL := m.Match(cmux.SOCKS5())

CMux doesn't match Vanilla Java gRPC client

I created a cmux server as described in the example to match the application/grpc content-type HTTP2 header tag. It successfully matches against C++ and Go gRPC clients.

However, the Java client from https://github.com/grpc/grpc-java is not matched. The implementation of matchHTTP2Field blocks on framer.ReadFrame(). It reads a SETTINGS and WINDOWS_UPDATE frame, but doesn't get to the HEADER frame. Perhaps it needs to negotiate the http2 connection before it gets to the HEADER frame?

How to serve gRPC + SSH with cmux?

Hi,

I'm trying to use cmux to serve gRPC + SSH connections, but I cannot make it works.

lis, _ := net.Listen("tcp", ":8080")

l := cmux.New(lis)
grpcL := l.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
sshL := l.Match(cmux.Any())

It serves gRPC well but it blocks when I connect with a ssh client.

If I comment the gRPC Match the SSH connections work. Am I doing something wrong?

Thanks

Multiplexing gRPC and Prometheus metrics services using Google Cloud Run

cmux is very useful.

Thank you!

Google Cloud Run is a serverless solution built atop Knative and it limits deployed services to a single port.

This is a challenge when running, e.g. gRPC services that also expose Prometheus metrics.

Using cmux, I'm able to multiplex both services onto the single port and serve gRPC and Prometheus metrics with little effort.

I wrote up the solution here: Multiplexing gRPC and HTTP (Prometheus) endpoints with Cloud Run

NOTE I did need to use the Java gRPC client solution to get this to work (locally and when deployed to Cloud Run) with Golang clients (my own and gRPCurl

Close not working properly on gRPC graceful stop

example code:

package main

import (
	"fmt"
	"net"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/soheilhy/cmux"
	"google.golang.org/grpc"
)

func main() {
	lis, err := net.Listen("tcp", ":8080")
	if err != nil {
		panic(err)
	}

	mux := cmux.New(lis)
	grpcL := mux.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
	_ = grpcL

	s := grpc.NewServer()
	// xxx.RegisterXXXServiceServer(s, XXXIMPL)

	go func() {
		// FIXME: using muxed grpcL here can not graceful stop
		err = s.Serve(lis)
		fmt.Println(err)
	}()

	fmt.Println("wait for sig")
	c := make(chan os.Signal, 1)
	signal.Notify(c, []os.Signal{syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT}...)
	<-c

        // FIXME: hangs on when start server with grpcL
	s.GracefulStop()

	fmt.Println("stopped")
	time.Sleep(time.Second * 1)
}

cmux leaks memory over long term use

the connection multiplexer leaks memory for long term usage. i did a pprof this is what i found.
(cmux.serve) -> HTTP2MatchHeaderFieldSendSettings->matchHTTP2Field->ReadFrame->readFrameHeader->io.ReadFull->ReadAtLeast->bufferedReader.Read->s.buffer.Write->bytes.(*Buffer).grow->bytes.makeSlice

and have seen this slice keep increasing and this is inuse space i am talking about which doesn't get freed. so that is causing our application to use too much memory after couple of days it shuts down eventually.

Is there something we can do address this ?

i was thinking once we determine we have that setting frame or not we should release this buffer right . But for some reason it doesn't happen.

Thanks

HTTP1Fast matcher omits PATCH

Hi, cmux has been working great for us. Recently we added a PATCH endpoint to our application and were surprised the HTTP1Fast matcher doesn't automatically match on PATCH.
Is there a particular reason for not including PATCH or is this an oversight?

All other methods are in there except PATCH and OPTIONS.

cmux/matchers.go

Lines 46 to 55 in 5ec6847

var defaultHTTPMethods = []string{
"OPTIONS",
"GET",
"HEAD",
"POST",
"PUT",
"DELETE",
"TRACE",
"CONNECT",
}

Advice for cmux TLS gRPC + plaintext http/1

Are there material downsides to supporting TLS gRPC + plaintext http using a configuration as below?

listener, _ := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%v", port))

combinedMux := cmux.New(listener)
routeListener := combinedMux.Match(cmux.HTTP1Fast())
grpcListener := combinedMux.Match(cmux.Any())

creds, _ := credentials.NewServerTLSFromFile(	serverPemFilePath, serverKeyFilePath)
grpcServer := grpc.NewServer(grpc.Creds(creds)...)

routeMux := http.NewServeMux()
routeMux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
	//some handler
}
s := &http.Server{
	Handler: routeMux,
}

go  grpcServer.Serve(grpcListener)

go s.Serve(routeListener)

combinedMux.Serve()

Release?

You're probably on it but not sure how to communicate with you otherwise :-) - can we get a release for the latest changes? I want to get off pinning to master and use SemVer.

Please tag releases

Please consider assigning version numbers and tagging releases. Tags/releases
are useful for downstream package maintainers (in Debian and other distributions) to export source tarballs, automatically track new releases and to declare dependencies between packages. Read more in the Debian Upstream Guide.

Versioning provides additional benefits to encourage vendoring of a particular (e.g. latest stable) release contrary to random unreleased snapshots.

Thank you.

See also

websocket support

Your work is very helpful to me.
Do you have any plan to support websocket in cmux?
If possible, plz give examples using http1, https, http2, websocket.

Cannot get Matchers to mux h2c gRPC and HTTP2(REST) requests

Hi there,

Situration: my project is running in istio + k8s. When the http1.1 requests comes into the istio, it will be upgraded to http2 and passed to k8s pod. Also, I need to run non-tls gRPC and HTTP server on the same port, e.g. :9000

Issue: I try to use cmux to create machers to split grpc and rest requests but i am failed.

Could you please provide an example to achieve that?

Thanks,

badly-behaved client can deadlock server stopping

Documenting my findings debugging a production issue:

tl;dr is that a client can mess with stopping of a server, because the sniffing mechanism has no notion of draining for connections that have yet to be matched to a sub-listener. The specific scenario I encountered is:

  • grpc.Server.GracefulStop() is called, closes the wrapped net.Listener, and waits for the grpc Server's WaitGroup (note grpc.Server.Stop() would exhibit the same behavior, since it waits on the same WG)
  • That grpc WG can't finish until grpc.Serve finishes, which won't happen until it's listener.Accept() returns (a cmux listener).
  • That Accept doesn't return until the cmux WG is done / cmux.Serve() returns.
  • cmux.Serve() has identified that the wrapped listener error'd, and wants to return, but is blocked in a defer waiting for its WG.
  • The cmux WG is held hostage by a client which has connected but hasn't transmitted an opening frame for cmux to sniff.

Net effect is that grpc.Server.Stop/GracefulStop() & cmux.Serve() can't return until the client connection is remotely closed.

Not entirely sure what the right behavior here is. My gut take is that cmux Accept() should preserve the exit semantics of the wrapped listener Accept, and return its error even though there our outstanding, still-to-be-sniffed connections.

Collected traces:

crux.Serve has found that the wrapped listener Accept has error’d.
It’s trying to return, but is blocked on it's own WG within a defer:

goroutine 1798 [semacquire, 5 minutes]:
sync.runtime_Semacquire(0xc00066c008)
        /usr/local/go/src/runtime/sema.go:56 +0x42
sync.(*WaitGroup).Wait(0xc00066c000)
        /usr/local/go/src/sync/waitgroup.go:130 +0x64
github.com/soheilhy/cmux.(*cMux).Serve.func1(0xc00012a4b0, 0xc00066c000)
        /gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/cmux.go:150 +0x55
github.com/soheilhy/cmux.(*cMux).Serve(0xc00012a4b0, 0x1497b00, 0xc006278640)
        /gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/cmux.go:165 +0x120
go.gazette.dev/core/server.(*Server).QueueTasks.func1(0x20, 0x20)
        /gazette/server/server.go:124 +0x40
go.gazette.dev/core/task.(*Group).GoRun.func1(0x0, 0x0)
        /gazette/task/group.go:72 +0x43
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000530a80, 0xc001390080)
        /gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x59
created by golang.org/x/sync/errgroup.(*Group).Go
        /gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:54 +0x66

That WG can't finish because a connection thread is stuck waiting to sniff an HTTP/2 header:

goroutine 491775429 [IO wait, 31 minutes]:
internal/poll.runtime_pollWait(0x7f7a20ed50f8, 0x72, 0xffffffffffffffff)
        /usr/local/go/src/runtime/netpoll.go:203 +0x55
internal/poll.(*pollDesc).wait(0xc0065afb98, 0x72, 0x0, 0x9, 0xffffffffffffffff)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:87 +0x45
internal/poll.(*pollDesc).waitRead(...)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:92
internal/poll.(*FD).Read(0xc0065afb80, 0xc001e66498, 0x9, 0x9, 0x0, 0x0, 0x0)
        /usr/local/go/src/internal/poll/fd_unix.go:169 +0x19b
net.(*netFD).Read(0xc0065afb80, 0xc001e66498, 0x9, 0x9, 0x865b8e, 0x10401, 0xc000000000)
        /usr/local/go/src/net/fd_unix.go:202 +0x4f
net.(*conn).Read(0xc0013b8020, 0xc001e66498, 0x9, 0x9, 0x0, 0x0, 0x0)
        /usr/local/go/src/net/net.go:184 +0x8e
github.com/soheilhy/cmux.(*bufferedReader).Read(0xc004b59820, 0xc001e66498, 0x9, 0x9, 0xc000184a80, 0x7f7a9f3327d0, 0x0)
        /gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/buffer.go:53 +0x12c
io.ReadAtLeast(0x1496d40, 0xc004b59820, 0xc001e66498, 0x9, 0x9, 0x9, 0x85db95, 0xc0091bf440, 0xc0091b0004)
        /usr/local/go/src/io/io.go:310 +0x87
io.ReadFull(...)
        /usr/local/go/src/io/io.go:329
golang.org/x/net/http2.readFrameHeader(0xc001e66498, 0x9, 0x9, 0x1496d40, 0xc004b59820, 0x0, 0x0, 0xc0091bf440, 0x0)
        /gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/http2/frame.go:237 +0x87
golang.org/x/net/http2.(*Framer).ReadFrame(0xc001e66460, 0x14a4d60, 0xc0091bf440, 0x0, 0x0)
        /gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/http2/frame.go:492 +0xa1
github.com/soheilhy/cmux.matchHTTP2Field(0x1497b80, 0xc0013b8020, 0x1496d40, 0xc004b59820, 0x121d9ea, 0xc, 0xc0054e9ec0, 0x415285)
        /gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/matchers.go:236 +0x148
github.com/soheilhy/cmux.HTTP2MatchHeaderFieldSendSettings.func1(0x1497b80, 0xc0013b8020, 0x1496d40, 0xc004b59820, 0xc0013b8020)
        /gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/matchers.go:173 +0xca
github.com/soheilhy/cmux.(*cMux).serve(0xc00012a4b0, 0x14c1ea0, 0xc0013b8020, 0xc00015c120, 0xc00066c000)
        /gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/cmux.go:184 +0x1d3
created by github.com/soheilhy/cmux.(*cMux).Serve
        /gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/cmux.go:171 +0x19d

Meanwhile, gRPC Serve() is blocked waiting for Accept to return. It must do so before it can notify the gRPC server’s own WG, which is a prerequisite for GracefulStop or Stop to return:

goroutine 1800 [chan receive, 5 minutes]:
github.com/soheilhy/cmux.muxListener.Accept(...)
        /gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/cmux.go:229
google.golang.org/grpc.(*Server).Serve(0xc0001661a0, 0x14b2060, 0xc000113820, 0x0, 0x0)
        /gazette/.build/go-path/pkg/mod/google.golang.org/[email protected]/server.go:621 +0x210
go.gazette.dev/core/server.(*Server).QueueTasks.func3(0x0, 0x1)
        /gazette/server/server.go:136 +0x44
go.gazette.dev/core/task.(*Group).GoRun.func1(0x0, 0x0)
        /gazette/task/group.go:72 +0x43
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000530a80, 0xc0013900c0)
        /gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x59
created by golang.org/x/sync/errgroup.(*Group).Go
        /gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:54 +0x66

For completeness, here's where GracefulStop is wedged waiting on it's WG, held hostage by grpc.Serve:

goroutine 1802 [semacquire, 5 minutes]:
sync.runtime_Semacquire(0xc00016631c)
        /usr/local/go/src/runtime/sema.go:56 +0x42
sync.(*WaitGroup).Wait(0xc00016631c)
        /usr/local/go/src/sync/waitgroup.go:130 +0x64
google.golang.org/grpc.(*Server).GracefulStop(0xc0001661a0)
        /gazette/.build/go-path/pkg/mod/google.golang.org/[email protected]/server.go:1551 +0x1b1
go.gazette.dev/core/broker.(*Service).QueueTasks.func2(0xc00030eb90, 0x50)
        /gazette/broker/service.go:71 +0xa7
go.gazette.dev/core/task.(*Group).GoRun.func1(0x14b6660, 0xc0004b03c0)
        /gazette/task/group.go:72 +0x43
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000530a80, 0xc001390100)
        /gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x59
created by golang.org/x/sync/errgroup.(*Group).Go
        /gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:54 +0x66

Using cmux with `MatchWithWriters` consumes too much CPU

We are using cmux with gRPC and grpc-gateway like this example. . But we found out that using
grpcl := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) consumes too much cpu.

$ ~> ps axo pid,etime,%cpu,%mem,cmd | grep 'server-combine' | grep -v grep 7054 00:07 123 0.1 server-combine

Using m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc")) is ok. but if try to use the L28 this takes too much CPU.

Is there any fix for this?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.