Giter VIP home page Giter VIP logo

pulsar-client-go's Introduction

PkgGoDev Go Report Card Language LICENSE

Apache Pulsar Go Client Library

A Go client library for Apache Pulsar. For the supported Pulsar features, see Client Feature Matrix.

Purpose

This project is a pure-Go client library for Pulsar that does not depend on the C++ Pulsar library.

Once feature parity and stability are reached, this will supersede the current CGo-based library.

Requirements

  • Go 1.20+

Note:

While this library should work with Golang versions as early as 1.16, any bugs specific to versions earlier than 1.18 may not be fixed.

Status

Check the Projects page at https://github.com/apache/pulsar-client-go/projects for tracking the status and the progress.

Usage

Import the client library:

import "github.com/apache/pulsar-client-go/pulsar"

Create a Producer:

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: "pulsar://localhost:6650",
})

defer client.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
	Topic: "my-topic",
})

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
	Payload: []byte("hello"),
})

defer producer.Close()

if err != nil {
    fmt.Println("Failed to publish message", err)
} else {
    fmt.Println("Published message")
}

Create a Consumer:

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: "pulsar://localhost:6650",
})

defer client.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "my-topic",
        SubscriptionName: "my-sub",
        Type:             pulsar.Shared,
    })

defer consumer.Close()

msg, err := consumer.Receive(context.Background())
    if err != nil {
        log.Fatal(err)
    }

fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
            msg.ID(), string(msg.Payload()))

Create a Reader:

client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
	log.Fatal(err)
}

defer client.Close()

reader, err := client.CreateReader(pulsar.ReaderOptions{
	Topic:          "topic-1",
	StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
	log.Fatal(err)
}
defer reader.Close()

for reader.HasNext() {
	msg, err := reader.Next(context.Background())
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
		msg.ID(), string(msg.Payload()))
}

Build and Test

Build the sources:

make build

Run the tests:

make test

Run the tests with specific versions of GOLANG and PULSAR:

make test GO_VERSION=1.20 PULSAR_VERSION=2.10.0

Contributing

Contributions are welcomed and greatly appreciated. See CONTRIBUTING.md for details on submitting patches and the contribution workflow.

If your contribution adds Pulsar features for Go clients, you need to update both the Pulsar docs and the Client Feature Matrix. See Contribution Guide for more details.

Community

Mailing lists
Name Scope
[email protected] User-related discussions Subscribe Unsubscribe Archives
[email protected] Development-related discussions Subscribe Unsubscribe Archives
Slack

Pulsar slack channel #dev-go at https://apache-pulsar.slack.com/

You can self-register at https://apache-pulsar.herokuapp.com/

License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0

Troubleshooting

Go module 'ambiguous import' error

If you've upgraded from a previous version of this library, you may run into an 'ambiguous import' error when building.

github.com/apache/pulsar-client-go/oauth2: ambiguous import: found package github.com/apache/pulsar-client-go/oauth2 in multiple modules

The fix for this is to make sure you don't have any references in your go.mod file to the old oauth2 module path. So remove any lines similar to the following, and then run go mod tidy.

github.com/apache/pulsar-client-go/oauth2 v0.0.0-20220630195735-e95cf0633348 // indirect

pulsar-client-go's People

Contributors

bewaremypower avatar cckellogg avatar crossoverjie avatar dependabot[bot] avatar dferstay avatar eronwright avatar freeznet avatar gleiphir2769 avatar gprabhudas avatar gunli avatar jiangpengcheng avatar jiazhai avatar jonyhy96 avatar labuladong avatar leizhiyuan avatar liangyepianzhou avatar limingnihao avatar maxsxu avatar merlimat avatar nodece avatar pgier avatar reugn avatar robertindie avatar shibd avatar shoothzj avatar tisonkun avatar wolfstudy avatar wuyin avatar zymap avatar zzzming avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pulsar-client-go's Issues

[Code format] use golangci-lint check code of master

Expected behavior

golangci-lint run success.

Actual behavior

pkg/compression/compression_test.go:44:8: Using the variable on range scope `p` in function literal (scopelint)
pkg/compression/compression_test.go:49:18: Using the variable on range scope `p` in function literal (scopelint)
pkg/compression/compression_test.go:50:25: Using the variable on range scope `p` in function literal (scopelint)
pulsar/internal/commands.go:61:41: `smm` can be `github.com/golang/protobuf/proto.Message` (interfacer)
pulsar/internal/commands.go:71:32: `cmdSend` can be `github.com/golang/protobuf/proto.Message` (interfacer)
pulsar/internal/connection.go:464:52: unnecessary conversion (unconvert)
pulsar/internal/connection.go:454: G402: TLS InsecureSkipVerify may be true. (gosec)
pulsar/impl_client.go:33:2: `options` is unused (structcheck)
pulsar/impl_client.go:42:2: `consumerIdGenerator` is unused (structcheck)
pulsar/impl_client.go:38:2: `auth` is unused (structcheck)
pulsar/impl_client.go:41:2: `producerIdGenerator` is unused (structcheck)
perf/perf-consumer.go:94:16: Error return value of `consumer.Ack` is not checked (errcheck)
perf/pulsar-perf-go.go:48:17: Error return value of `rootCmd.Execute` is not checked (errcheck)
pkg/compression/zlib.go:40:9: Error return value of `w.Write` is not checked (errcheck)
pkg/compression/zlib.go:53:8: Error return value of `r.Read` is not checked (errcheck)
pulsar/internal/hash.go:35:9: Error return value of `h.Write` is not checked (errcheck)
pkg/auth/token.go:34:2: ifElseChain: rewrite if-else to switch statement (gocritic)
pulsar/impl_client.go:57:2: ifElseChain: rewrite if-else to switch statement (gocritic)
pulsar/internal/batch_builder.go:101:2: ifElseChain: rewrite if-else to switch statement (gocritic)
pulsar/impl_producer.go:81:46: loopclosure: loop variable partition captured by func literal (govet)
pulsar/impl_producer.go:82:23: loopclosure: loop variable partitionIdx captured by func literal (govet)
pulsar/impl_client.go:65:10: nilness: impossible condition: nil != nil (govet)
pulsar/producer_test.go:46:3: shadow: declaration of "err" shadows declaration at line 33 (govet)
pulsar/producer_test.go:136:5: shadow: declaration of "err" shadows declaration at line 122 (govet)
pulsar/producer_test.go:168:3: shadow: declaration of "err" shadows declaration at line 153 (govet)
pulsar/internal/lookup_service.go:98:9: ineffectual assignment to `err` (ineffassign)
pkg/auth/disabled.go:24: File is not `gofmt`-ed with `-s` (gofmt)
pkg/auth/provider.go:23: File is not `gofmt`-ed with `-s` (gofmt)
perf/perf-consumer.go:26: File is not `gofmt`-ed with `-s` (gofmt)
pkg/auth/token.go:24: File is not `goimports`-ed (goimports)
perf/perf-consumer.go:25: File is not `goimports`-ed (goimports)
pkg/compression/compression_test.go:23: File is not `goimports`-ed (goimports)
pkg/auth/disabled.go:42:17: method GetTlsCertificate should be GetTLSCertificate (golint)
pkg/auth/tls.go:53:27: method GetTlsCertificate should be GetTLSCertificate (golint)
pkg/auth/token.go:48:11: `if` block ends with a `return` statement, so drop this `else` and outdent its block (golint)
pkg/auth/token.go:66:11: `if` block ends with a `return` statement, so drop this `else` and outdent its block (golint)
pkg/auth/token.go:83:29: method GetTlsCertificate should be GetTLSCertificate (golint)
pkg/auth/token.go:91:9: `if` block ends with a `return` statement, so drop this `else` and outdent its block (golint)
perf/perf-consumer.go:84:26: should drop = 0 from declaration of var msgReceived; it is the zero value (golint)
perf/perf-consumer.go:85:28: should drop = 0 from declaration of var bytesReceived; it is the zero value (golint)
perf/perf-producer.go:96:39: should drop = nil from declaration of var rateLimiter; it is the zero value (golint)
perf/perf-producer.go:143:4: should replace `messagesPublished += 1` with `messagesPublished++` (golint)
pulsar/impl_message.go:27:6: type `messageId` should be `messageID` (golint)
pulsar/impl_message.go:34:6: func newMessageId should be newMessageID (golint)
pulsar/impl_message.go:44:2: var `msgId` should be `msgID` (golint)
pulsar/impl_message.go:54:6: func deserializeMessageId should be deserializeMessageID (golint)
pulsar/impl_message.go:55:2: var `msgId` should be `msgID` (golint)
pulsar/impl_partition_producer.go:52:2: struct field `producerId` should be `producerID` (golint)
pulsar/impl_partition_producer.go:54:2: struct field `sequenceIdGenerator` should be `sequenceIDGenerator` (golint)
pulsar/impl_partition_producer.go:150:3: var `nextSequenceId` should be `nextSequenceID` (golint)
pulsar/impl_partition_producer.go:248:2: var `sequenceId` should be `sequenceID` (golint)
pulsar/impl_partition_producer.go:268:2: struct field `sequenceId` should be `sequenceID` (golint)
pulsar/impl_partition_producer.go:273:13: var `sequenceId` should be `sequenceID` (golint)
pulsar/impl_producer.go:139:18: should drop = nil from declaration of var err; it is the zero value (golint)
pulsar/impl_producer.go:149:18: should drop = nil from declaration of var err; it is the zero value (golint)
pulsar/message.go:83:2: comment on exported var `EarliestMessage` should be of the form `EarliestMessage ...` (golint)
pulsar/message.go:84:18: should omit type MessageID from declaration of var EarliestMessage; it will be inferred from the right-hand side (golint)
pulsar/message.go:86:2: comment on exported var `LatestMessage` should be of the form `LatestMessage ...` (golint)
pulsar/message.go:87:16: should omit type MessageID from declaration of var LatestMessage; it will be inferred from the right-hand side (golint)
pulsar/test_helper.go:28:2: const `serviceUrl` should be `serviceURL` (golint)
pulsar/test_helper.go:29:2: const `serviceUrlTls` should be `serviceURLTLS` (golint)
pulsar/internal/batch_builder.go:48:2: struct field `producerId` should be `producerID` (golint)
pulsar/internal/batch_builder.go:57:61: func parameter `producerId` should be `producerID` (golint)
pulsar/internal/batch_builder.go:84:15: should replace `errors.New(fmt.Sprintf(...))` with `fmt.Errorf(...)` (golint)
pulsar/internal/batch_builder.go:99:65: method parameter `sequenceId` should be `sequenceID` (golint)
pulsar/internal/batch_builder.go:125:2: should replace `bb.numMessages += 1` with `bb.numMessages++` (golint)
pulsar/internal/batch_builder.go:137:52: method result `sequenceId` should be `sequenceID` (golint)
pulsar/internal/commands.go:128:3: should replace `i += 1` with `i++` (golint)
pulsar/internal/connection.go:54:14: interface method parameter `requestId` should be `requestID` (golint)
pulsar/internal/connection.go:66:2: const `connectionTcpConnected` should be `connectionTCPConnected` (golint)
pulsar/internal/connection.go:95:2: struct field `requestIdGenerator` should be `requestIDGenerator` (golint)
pulsar/internal/connection.go:348:34: method parameter `requestId` should be `requestID` (golint)
pulsar/internal/connection.go:361:37: method parameter `requestId` should be `requestID` (golint)
pulsar/internal/connection.go:374:2: var `producerId` should be `producerID` (golint)
pulsar/internal/connection.go:448:22: method newRequestId should be newRequestID (golint)
pulsar/internal/connection.go:452:22: method getTlsConfig should be getTLSConfig (golint)
pulsar/internal/connection_pool.go:58:10: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary) (golint)
pulsar/internal/connection_pool.go:75:9: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary) (golint)
pulsar/internal/lookup_service.go:42:2: struct field `serviceUrl` should be `serviceURL` (golint)
pulsar/internal/lookup_service.go:45:44: func parameter `serviceUrl` should be `serviceURL` (golint)
pulsar/internal/lookup_service.go:127:16: should replace `errors.New(fmt.Sprintf(...))` with `fmt.Errorf(...)` (golint)
pulsar/internal/lookup_service_test.go:30:6: type `mockedRpcClient` should be `mockedRPCClient` (golint)

Steps to reproduce

Install golangci-lint and run golangci-lint run

System configuration

Pulsar version: latest

The logic of Flush error

Expected behavior

When I enable the batch function, the data in the batch should not be successfully consumed by the consumer when the size of the batch is not reached. Before the specified size of the batch is reached, only the operation that triggered Flush can flush the data in the batch.

producer, err := client.CreateProducer(ProducerOptions{
		Topic:                   topicName,
		DisableBatching:         false,
		BatchingMaxMessages:     uint(numOfMessages),
		BatchingMaxPublishDelay: time.Second * 10,
		BlockIfQueueFull:        true,
	})

Actual behavior

When I enable the batch function, the data in the batch can be successfully consumed by the consumer when the size of the batch is not reached.

Steps to reproduce

func TestFlushInProducer(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: serviceURL,
	})
	assert.NoError(t, err)

	topicName := "test-flush-in-producer"
	subName := "subscription-name"
	numOfMessages := 10
	ctx:=context.Background()

	// set batch message number numOfMessages, and max delay 60s
	producer, err := client.CreateProducer(ProducerOptions{
		Topic:                   topicName,
		DisableBatching:         false,
		BatchingMaxMessages:     uint(numOfMessages),
		BatchingMaxPublishDelay: time.Second * 10,
		BlockIfQueueFull:        true,
		Properties: map[string]string{
			"producer-name": "test-producer-name",
			"producer-id":   "test-producer-id",
		},
	})

	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:            topicName,
		SubscriptionName: subName,
	})
	assert.Nil(t, err)
	defer consumer.Close()

	prefix := "msg-batch-async"
	msgCount := 0

	for i := 0; i < numOfMessages/2; i++ {
		messageContent := prefix + fmt.Sprintf("%d", i)
		err := producer.Send(ctx, &ProducerMessage{
			Payload: []byte(messageContent),
		})
		assert.Nil(t, err)
	}

	for i := 0; i < numOfMessages/2; i++ {
		_, err := consumer.Receive(ctx)
		assert.Nil(t, err)
		msgCount++
	}
	assert.Equal(t, msgCount, numOfMessages/2)
}

System configuration

Pulsar version: x.y

Data race

Expected behaviour

Running go test ./... -race should not fail

Tell us what should happen

Actual behaviour

There are race conditions when running the tests.

$ go test ./... -race
?   	github.com/apache/pulsar-client-go/examples/consumer	[no test files]
?   	github.com/apache/pulsar-client-go/examples/consumer-listener	[no test files]
?   	github.com/apache/pulsar-client-go/examples/producer	[no test files]
?   	github.com/apache/pulsar-client-go/perf	[no test files]
?   	github.com/apache/pulsar-client-go/pkg/auth	[no test files]
ok  	github.com/apache/pulsar-client-go/pkg/compression	(cached)
?   	github.com/apache/pulsar-client-go/pkg/pb	[no test files]
time="2019-10-11T14:29:21+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
==================
WARNING: DATA RACE
Write at 0x00c000154070 by goroutine 9:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:189 +0x7df
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c000154070 by goroutine 8:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xa9
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:86 +0x22f
  github.com/apache/pulsar-client-go/pulsar.newConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:73 +0x27e
  github.com/apache/pulsar-client-go/pulsar.(*client).Subscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:100 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestProducerConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:50 +0x25e
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 9 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:86 +0x22f
  github.com/apache/pulsar-client-go/pulsar.newConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:73 +0x27e
  github.com/apache/pulsar-client-go/pulsar.(*client).Subscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:100 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestProducerConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:50 +0x25e
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 8 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
==================
WARNING: DATA RACE
Write at 0x00c000154010 by goroutine 9:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:191 +0x888
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c000154010 by goroutine 8:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xcc
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:86 +0x22f
  github.com/apache/pulsar-client-go/pulsar.newConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:73 +0x27e
  github.com/apache/pulsar-client-go/pulsar.(*client).Subscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:100 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestProducerConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:50 +0x25e
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 9 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:86 +0x22f
  github.com/apache/pulsar-client-go/pulsar.newConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:73 +0x27e
  github.com/apache/pulsar-client-go/pulsar.(*client).Subscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:100 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestProducerConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:50 +0x25e
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 8 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
time="2019-10-11T14:29:21+01:00" level=info msg="Connection is ready" laddr="[::1]:65393" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:21+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
==================
WARNING: DATA RACE
Write at 0x00c0001c4070 by goroutine 17:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:189 +0x7df
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0001c4070 by goroutine 15:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xa9
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).grabCnx()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_consumer.go:174 +0xa67
  github.com/apache/pulsar-client-go/pulsar.newPartitionConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_consumer.go:127 +0x5a9
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:104 +0xab

Goroutine 17 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).grabCnx()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_consumer.go:174 +0xa67
  github.com/apache/pulsar-client-go/pulsar.newPartitionConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_consumer.go:127 +0x5a9
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:104 +0xab

Goroutine 15 (running) created at:
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:103 +0x3b2
  github.com/apache/pulsar-client-go/pulsar.newConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:73 +0x27e
  github.com/apache/pulsar-client-go/pulsar.(*client).Subscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:100 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestProducerConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:50 +0x25e
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
==================
==================
WARNING: DATA RACE
Write at 0x00c0001c4010 by goroutine 17:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:191 +0x888
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0001c4010 by goroutine 15:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xcc
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).grabCnx()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_consumer.go:174 +0xa67
  github.com/apache/pulsar-client-go/pulsar.newPartitionConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_consumer.go:127 +0x5a9
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:104 +0xab

Goroutine 17 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).grabCnx()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_consumer.go:174 +0xa67
  github.com/apache/pulsar-client-go/pulsar.newPartitionConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_consumer.go:127 +0x5a9
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:104 +0xab

Goroutine 15 (running) created at:
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:103 +0x3b2
  github.com/apache/pulsar-client-go/pulsar.newConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:73 +0x27e
  github.com/apache/pulsar-client-go/pulsar.(*client).Subscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:100 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestProducerConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:50 +0x25e
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
==================
time="2019-10-11T14:29:21+01:00" level=info msg="Connection is ready" laddr="[::1]:65394" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:21+01:00" level=info msg="Created consumer" name="<nil>" topic="persistent://public/default/my-topic"
time="2019-10-11T14:29:21+01:00" level=info msg="Created producer" name=standalone-0-85 topic="persistent://public/default/my-topic"
==================
WARNING: DATA RACE
Read at 0x00c0001f0360 by goroutine 22:
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).ReceivedSendReceipt()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:365 +0x13a
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleSendReceipt()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:412 +0x19e
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).receivedCommand()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:354 +0x327
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionReader).readFromConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_reader.go:54 +0x1de

Previous write at 0x00c0001f0360 by goroutine 26:
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).internalFlush()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:297 +0x26e
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).runEventsLoop()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:207 +0x28b

Goroutine 22 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:256 +0x82
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:150 +0x66

Goroutine 26 (running) created at:
  github.com/apache/pulsar-client-go/pulsar.newPartitionProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:114 +0xace
  github.com/apache/pulsar-client-go/pulsar.newProducer.func2()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:82 +0x70
==================
==================
WARNING: DATA RACE
Read at 0x00c0001ec2b0 by goroutine 22:
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).ReceivedSendReceipt()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:365 +0x181
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleSendReceipt()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:412 +0x19e
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).receivedCommand()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:354 +0x327
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionReader).readFromConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_reader.go:54 +0x1de

Previous write at 0x00c0001ec2b0 by goroutine 26:
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).internalFlush()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:297 +0x22d
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).runEventsLoop()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:207 +0x28b

Goroutine 22 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:256 +0x82
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:150 +0x66

Goroutine 26 (running) created at:
  github.com/apache/pulsar-client-go/pulsar.newPartitionProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:114 +0xace
  github.com/apache/pulsar-client-go/pulsar.newProducer.func2()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:82 +0x70
==================
==================
WARNING: DATA RACE
Read at 0x00c0001da340 by goroutine 22:
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).ReceivedSendReceipt()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:367 +0x1bc
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleSendReceipt()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:412 +0x19e
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).receivedCommand()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:354 +0x327
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionReader).readFromConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_reader.go:54 +0x1de

Previous write at 0x00c0001da340 by goroutine 26:
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).internalFlush()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:299 +0xd5
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).runEventsLoop()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:207 +0x28b

Goroutine 22 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:256 +0x82
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:150 +0x66

Goroutine 26 (running) created at:
  github.com/apache/pulsar-client-go/pulsar.newPartitionProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:114 +0xace
  github.com/apache/pulsar-client-go/pulsar.newProducer.func2()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:82 +0x70
==================
==================
WARNING: DATA RACE
Read at 0x00c0001da348 by goroutine 22:
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).ReceivedSendReceipt()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:372 +0x1e1
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleSendReceipt()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:412 +0x19e
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).receivedCommand()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:354 +0x327
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionReader).readFromConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_reader.go:54 +0x1de

Previous write at 0x00c0001da348 by goroutine 26:
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).internalFlush()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:299 +0xd5
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).runEventsLoop()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:207 +0x28b

Goroutine 22 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:256 +0x82
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:150 +0x66

Goroutine 26 (running) created at:
  github.com/apache/pulsar-client-go/pulsar.newPartitionProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:114 +0xace
  github.com/apache/pulsar-client-go/pulsar.newProducer.func2()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:82 +0x70
==================
time="2019-10-11T14:29:21+01:00" level=info msg="Closing producer" name=standalone-0-85 topic="persistent://public/default/my-topic"
time="2019-10-11T14:29:21+01:00" level=info msg="Closed producer" name=standalone-0-85 topic="persistent://public/default/my-topic"
time="2019-10-11T14:29:21+01:00" level=info msg="Closing consumer" name="<nil>" topic="persistent://public/default/my-topic"
time="2019-10-11T14:29:21+01:00" level=info msg="Closed consumer" name="<nil>" topic="persistent://public/default/my-topic"
--- FAIL: TestProducerConsumer (0.10s)
    testing.go:853: race detected during execution of test
time="2019-10-11T14:29:21+01:00" level=info msg="Connecting to broker" raddr="pulsar://invalid-hostname:6650"
time="2019-10-11T14:29:21+01:00" level=error msg="Error: MetadataError, Error Message: Consumer not found" laddr="[::1]:65394" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:21+01:00" level=info msg="Connection closed" laddr="[::1]:65394" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:21+01:00" level=info msg="Error reading from connection" error="Short read when reading frame size" laddr="[::1]:65394" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:21+01:00" level=warning msg="Failed to connect to broker." error="dial tcp: lookup invalid-hostname: no such host" raddr="pulsar://invalid-hostname:6650"
time="2019-10-11T14:29:21+01:00" level=info msg="Connection closed" raddr="pulsar://invalid-hostname:6650"
time="2019-10-11T14:29:21+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
==================
WARNING: DATA RACE
Write at 0x00c0001c4150 by goroutine 32:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:189 +0x7df
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0001c4150 by goroutine 31:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xa9
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestBatchMessageReceive()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:144 +0x28a
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 32 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestBatchMessageReceive()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:144 +0x28a
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 31 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
==================
WARNING: DATA RACE
Write at 0x00c0001c40f0 by goroutine 32:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:191 +0x888
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0001c40f0 by goroutine 31:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xcc
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestBatchMessageReceive()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:144 +0x28a
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 32 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestBatchMessageReceive()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:144 +0x28a
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 31 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
time="2019-10-11T14:29:21+01:00" level=info msg="Connection is ready" laddr="[::1]:65395" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:21+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
==================
WARNING: DATA RACE
Write at 0x00c0001c4230 by goroutine 39:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:189 +0x7df
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0001c4230 by goroutine 38:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xa9
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).grabCnx()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:128 +0x7ac
  github.com/apache/pulsar-client-go/pulsar.newPartitionProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:104 +0x753
  github.com/apache/pulsar-client-go/pulsar.newProducer.func2()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:82 +0x70

Goroutine 39 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).grabCnx()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:128 +0x7ac
  github.com/apache/pulsar-client-go/pulsar.newPartitionProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:104 +0x753
  github.com/apache/pulsar-client-go/pulsar.newProducer.func2()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:82 +0x70

Goroutine 38 (running) created at:
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:81 +0x35b
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestBatchMessageReceive()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:144 +0x28a
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
==================
==================
WARNING: DATA RACE
Write at 0x00c0001c41d0 by goroutine 39:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:191 +0x888
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0001c41d0 by goroutine 38:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xcc
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).grabCnx()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:128 +0x7ac
  github.com/apache/pulsar-client-go/pulsar.newPartitionProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:104 +0x753
  github.com/apache/pulsar-client-go/pulsar.newProducer.func2()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:82 +0x70

Goroutine 39 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).grabCnx()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:128 +0x7ac
  github.com/apache/pulsar-client-go/pulsar.newPartitionProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:104 +0x753
  github.com/apache/pulsar-client-go/pulsar.newProducer.func2()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:82 +0x70

Goroutine 38 (running) created at:
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:81 +0x35b
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestBatchMessageReceive()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:144 +0x28a
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
==================
time="2019-10-11T14:29:21+01:00" level=info msg="Connection is ready" laddr="[::1]:65396" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:21+01:00" level=info msg="Created producer" name=standalone-0-86 topic="persistent://public/default/receive-batch"
time="2019-10-11T14:29:21+01:00" level=info msg="Created consumer" name="<nil>" topic="persistent://public/default/receive-batch"
time="2019-10-11T14:29:21+01:00" level=info msg="Closing producer" name=standalone-0-86 topic="persistent://public/default/receive-batch"
time="2019-10-11T14:29:21+01:00" level=info msg="Closed producer" name=standalone-0-86 topic="persistent://public/default/receive-batch"
time="2019-10-11T14:29:21+01:00" level=info msg="Closing consumer" name="<nil>" topic="persistent://public/default/receive-batch"
time="2019-10-11T14:29:21+01:00" level=info msg="Closed consumer" name="<nil>" topic="persistent://public/default/receive-batch"
--- FAIL: TestBatchMessageReceive (0.41s)
    testing.go:853: race detected during execution of test
subscription name is required for consumer: Result(29)
time="2019-10-11T14:29:21+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
==================
WARNING: DATA RACE
Write at 0x00c00039c070 by goroutine 50:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:189 +0x7df
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c00039c070 by goroutine 49:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xa9
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_SubscriptionEarliestPos()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:235 +0x30b
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 50 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_SubscriptionEarliestPos()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:235 +0x30b
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 49 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
==================
WARNING: DATA RACE
Write at 0x00c00039c010 by goroutine 50:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:191 +0x888
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c00039c010 by goroutine 49:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xcc
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_SubscriptionEarliestPos()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:235 +0x30b
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 50 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_SubscriptionEarliestPos()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:235 +0x30b
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 49 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
time="2019-10-11T14:29:21+01:00" level=info msg="Connection is ready" laddr="[::1]:65397" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:21+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:21+01:00" level=info msg="Connection is ready" laddr="[::1]:65398" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:21+01:00" level=info msg="Created producer" name=standalone-0-87 topic="persistent://public/default/testSeek-1570800561"
time="2019-10-11T14:29:21+01:00" level=info msg="Created consumer" name="<nil>" topic="persistent://public/default/testSeek-1570800561"
time="2019-10-11T14:29:21+01:00" level=info msg="Closing consumer" name="<nil>" topic="persistent://public/default/testSeek-1570800561"
==================
WARNING: DATA RACE
Write at 0x00c0003646c0 by goroutine 67:
  runtime.mapdelete_fast64()
      /usr/local/Cellar/go/1.13/libexec/src/runtime/map_fast64.go:272 +0x0
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).DeleteConsumeHandler()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:582 +0xb7
  github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).internalClose()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_consumer.go:590 +0x3e0
  github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_consumer.go:557 +0x2fe

Previous read at 0x00c0003646c0 by goroutine 64:
  runtime.mapaccess2_fast64()
      /usr/local/Cellar/go/1.13/libexec/src/runtime/map_fast64.go:52 +0x0
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleMessage()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:421 +0x178
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).receivedCommand()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:359 +0x449
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionReader).readFromConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_reader.go:54 +0x1de

Goroutine 67 (running) created at:
  github.com/apache/pulsar-client-go/pulsar.newPartitionConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_partition_consumer.go:147 +0x999
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:104 +0xab

Goroutine 64 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:256 +0x82
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:150 +0x66
==================
time="2019-10-11T14:29:21+01:00" level=info msg="Closed consumer" name="<nil>" topic="persistent://public/default/testSeek-1570800561"
time="2019-10-11T14:29:21+01:00" level=info msg="Closing producer" name=standalone-0-87 topic="persistent://public/default/testSeek-1570800561"
time="2019-10-11T14:29:21+01:00" level=info msg="Closed producer" name=standalone-0-87 topic="persistent://public/default/testSeek-1570800561"
--- FAIL: TestConsumer_SubscriptionEarliestPos (0.06s)
    testing.go:853: race detected during execution of test
time="2019-10-11T14:29:21+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
==================
WARNING: DATA RACE
Write at 0x00c0001c4310 by goroutine 69:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:189 +0x7df
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0001c4310 by goroutine 68:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xa9
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:86 +0x22f
  github.com/apache/pulsar-client-go/pulsar.newConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:73 +0x27e
  github.com/apache/pulsar-client-go/pulsar.(*client).Subscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:100 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumerKeyShared()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:296 +0x27d
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 69 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:86 +0x22f
  github.com/apache/pulsar-client-go/pulsar.newConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:73 +0x27e
  github.com/apache/pulsar-client-go/pulsar.(*client).Subscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:100 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumerKeyShared()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:296 +0x27d
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 68 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
==================
WARNING: DATA RACE
Write at 0x00c0001c42b0 by goroutine 69:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:191 +0x888
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0001c42b0 by goroutine 68:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xcc
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:86 +0x22f
  github.com/apache/pulsar-client-go/pulsar.newConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:73 +0x27e
  github.com/apache/pulsar-client-go/pulsar.(*client).Subscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:100 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumerKeyShared()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:296 +0x27d
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 69 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:86 +0x22f
  github.com/apache/pulsar-client-go/pulsar.newConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:73 +0x27e
  github.com/apache/pulsar-client-go/pulsar.(*client).Subscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:100 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumerKeyShared()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:296 +0x27d
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 68 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
time="2019-10-11T14:29:21+01:00" level=info msg="Connection is ready" laddr="[::1]:65399" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:21+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:21+01:00" level=info msg="Connection is ready" laddr="[::1]:65400" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:21+01:00" level=info msg="Created consumer" name="<nil>" topic="persistent://public/default/test-topic-6"
time="2019-10-11T14:29:21+01:00" level=info msg="Created consumer" name="<nil>" topic="persistent://public/default/test-topic-6"
time="2019-10-11T14:29:21+01:00" level=info msg="Created producer" name=standalone-0-88 topic="persistent://public/default/test-topic-6"
consumer1 key is: key-shared-0, value is: value-0
consumer1 key is: key-shared-1, value is: value-1
consumer1 key is: key-shared-2, value is: value-2
consumer1 key is: key-shared-0, value is: value-3
consumer1 key is: key-shared-1, value is: value-4
consumer1 key is: key-shared-2, value is: value-5
consumer1 key is: key-shared-0, value is: value-6
consumer1 key is: key-shared-1, value is: value-7
consumer1 key is: key-shared-2, value is: value-8
consumer1 key is: key-shared-0, value is: value-9
time="2019-10-11T14:29:23+01:00" level=info msg="Closing producer" name=standalone-0-88 topic="persistent://public/default/test-topic-6"
time="2019-10-11T14:29:23+01:00" level=info msg="Closed producer" name=standalone-0-88 topic="persistent://public/default/test-topic-6"
time="2019-10-11T14:29:23+01:00" level=info msg="Closing consumer" name="<nil>" topic="persistent://public/default/test-topic-6"
time="2019-10-11T14:29:23+01:00" level=info msg="Closed consumer" name="<nil>" topic="persistent://public/default/test-topic-6"
time="2019-10-11T14:29:23+01:00" level=info msg="Closing consumer" name="<nil>" topic="persistent://public/default/test-topic-6"
time="2019-10-11T14:29:23+01:00" level=info msg="Closed consumer" name="<nil>" topic="persistent://public/default/test-topic-6"
--- FAIL: TestConsumerKeyShared (2.07s)
    testing.go:853: race detected during execution of test
--- FAIL: TestPartitionTopicsConsumerPubSub (0.00s)
    consumer_test.go:282: Put http://localhost:8080/admin/v2/persistent/public/default/testGetPartitions/partitions: dial tcp [::1]:8080: connect: connection refused
time="2019-10-11T14:29:23+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
==================
WARNING: DATA RACE
Write at 0x00c0003bc150 by goroutine 12:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:189 +0x7df
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0003bc150 by goroutine 13:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xa9
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_ReceiveAsync()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:431 +0x269
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 12 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_ReceiveAsync()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:431 +0x269
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 13 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
==================
WARNING: DATA RACE
Write at 0x00c0003bc0f0 by goroutine 12:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:191 +0x888
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0003bc0f0 by goroutine 13:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xcc
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_ReceiveAsync()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:431 +0x269
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 12 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_ReceiveAsync()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:431 +0x269
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 13 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
time="2019-10-11T14:29:23+01:00" level=info msg="Connection is ready" laddr="[::1]:65403" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:23+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:23+01:00" level=info msg="Connection is ready" laddr="[::1]:65404" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:23+01:00" level=info msg="Created producer" name=standalone-0-89 topic="persistent://public/default/receive-async"
time="2019-10-11T14:29:23+01:00" level=info msg="Created consumer" name="<nil>" topic="persistent://public/default/receive-async"
receive message payload is:hello-0
receive message payload is:hello-1
receive message payload is:hello-2
receive message payload is:hello-3
receive message payload is:hello-4
receive message payload is:hello-5
receive message payload is:hello-6
receive message payload is:hello-7
receive message payload is:hello-8
receive message payload is:hello-9
time="2019-10-11T14:29:23+01:00" level=info msg="Closing consumer" name="<nil>" topic="persistent://public/default/receive-async"
time="2019-10-11T14:29:23+01:00" level=info msg="Closed consumer" name="<nil>" topic="persistent://public/default/receive-async"
time="2019-10-11T14:29:23+01:00" level=info msg="Closing producer" name=standalone-0-89 topic="persistent://public/default/receive-async"
time="2019-10-11T14:29:23+01:00" level=info msg="Closed producer" name=standalone-0-89 topic="persistent://public/default/receive-async"
--- FAIL: TestConsumer_ReceiveAsync (0.07s)
    testing.go:853: race detected during execution of test
time="2019-10-11T14:29:23+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
==================
WARNING: DATA RACE
Write at 0x00c0003bc230 by goroutine 28:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:189 +0x7df
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0003bc230 by goroutine 29:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xa9
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:86 +0x22f
  github.com/apache/pulsar-client-go/pulsar.newConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:73 +0x27e
  github.com/apache/pulsar-client-go/pulsar.(*client).Subscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:100 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumerAckTimeout()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:490 +0x2a9
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 28 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:86 +0x22f
  github.com/apache/pulsar-client-go/pulsar.newConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:73 +0x27e
  github.com/apache/pulsar-client-go/pulsar.(*client).Subscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:100 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumerAckTimeout()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:490 +0x2a9
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 29 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
==================
WARNING: DATA RACE
Write at 0x00c0003bc1d0 by goroutine 28:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:191 +0x888
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0003bc1d0 by goroutine 29:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xcc
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:86 +0x22f
  github.com/apache/pulsar-client-go/pulsar.newConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:73 +0x27e
  github.com/apache/pulsar-client-go/pulsar.(*client).Subscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:100 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumerAckTimeout()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:490 +0x2a9
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 28 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.singleTopicSubscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:86 +0x22f
  github.com/apache/pulsar-client-go/pulsar.newConsumer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_consumer.go:73 +0x27e
  github.com/apache/pulsar-client-go/pulsar.(*client).Subscribe()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:100 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumerAckTimeout()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:490 +0x2a9
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 29 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
time="2019-10-11T14:29:23+01:00" level=info msg="Connection is ready" laddr="[::1]:65405" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:23+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:23+01:00" level=info msg="Connection is ready" laddr="[::1]:65406" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:23+01:00" level=info msg="Created consumer" name="<nil>" topic="persistent://public/default/test-ack-timeout-topic-1"
time="2019-10-11T14:29:23+01:00" level=info msg="Created consumer" name="<nil>" topic="persistent://public/default/test-ack-timeout-topic-1"
time="2019-10-11T14:29:23+01:00" level=info msg="Created producer" name=standalone-0-90 topic="persistent://public/default/test-ack-timeout-topic-1"
start redeliver messages...
redeliver messages, payload is:hello-0
redeliver messages, payload is:hello-1
redeliver messages, payload is:hello-2
redeliver messages, payload is:hello-3
redeliver messages, payload is:hello-4
time="2019-10-11T14:29:35+01:00" level=info msg="Closing producer" name=standalone-0-90 topic="persistent://public/default/test-ack-timeout-topic-1"
time="2019-10-11T14:29:35+01:00" level=info msg="Closed producer" name=standalone-0-90 topic="persistent://public/default/test-ack-timeout-topic-1"
time="2019-10-11T14:29:35+01:00" level=info msg="Closing consumer" name="<nil>" topic="persistent://public/default/test-ack-timeout-topic-1"
time="2019-10-11T14:29:35+01:00" level=info msg="Closed consumer" name="<nil>" topic="persistent://public/default/test-ack-timeout-topic-1"
time="2019-10-11T14:29:35+01:00" level=info msg="Closing consumer" name="<nil>" topic="persistent://public/default/test-ack-timeout-topic-1"
time="2019-10-11T14:29:35+01:00" level=info msg="Closed consumer" name="<nil>" topic="persistent://public/default/test-ack-timeout-topic-1"
--- FAIL: TestConsumerAckTimeout (12.03s)
    testing.go:853: race detected during execution of test
time="2019-10-11T14:29:35+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
==================
WARNING: DATA RACE
Write at 0x00c000214310 by goroutine 31:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:189 +0x7df
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c000214310 by goroutine 47:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xa9
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_ReceiveAsyncWithCallback()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:614 +0x243
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 31 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_ReceiveAsyncWithCallback()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:614 +0x243
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 47 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
==================
WARNING: DATA RACE
Write at 0x00c0002142b0 by goroutine 31:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:191 +0x888
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0002142b0 by goroutine 47:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xcc
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_ReceiveAsyncWithCallback()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:614 +0x243
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 31 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_ReceiveAsyncWithCallback()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:614 +0x243
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 47 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
time="2019-10-11T14:29:35+01:00" level=info msg="Connection is ready" laddr="[::1]:65416" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:35+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:35+01:00" level=info msg="Connection is ready" laddr="[::1]:65417" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:36+01:00" level=info msg="Created producer" name=standalone-0-91 topic="persistent://public/default/receive-async-with-callback"
time="2019-10-11T14:29:36+01:00" level=info msg="Created consumer" name="<nil>" topic="persistent://public/default/receive-async-with-callback"
receive message payload is:hello-0
receive message payload is:hello-1
receive message payload is:hello-2
receive message payload is:hello-3
receive message payload is:hello-4
receive message payload is:hello-5
receive message payload is:hello-6
receive message payload is:hello-7
receive message payload is:hello-8
receive message payload is:hello-9
time="2019-10-11T14:29:36+01:00" level=info msg="Closing consumer" name="<nil>" topic="persistent://public/default/receive-async-with-callback"
time="2019-10-11T14:29:36+01:00" level=info msg="Closed consumer" name="<nil>" topic="persistent://public/default/receive-async-with-callback"
time="2019-10-11T14:29:36+01:00" level=info msg="Closing producer" name=standalone-0-91 topic="persistent://public/default/receive-async-with-callback"
time="2019-10-11T14:29:36+01:00" level=info msg="Closed producer" name=standalone-0-91 topic="persistent://public/default/receive-async-with-callback"
--- FAIL: TestConsumer_ReceiveAsyncWithCallback (0.08s)
    testing.go:853: race detected during execution of test
--- FAIL: TestConsumer_Shared (0.00s)
    consumer_test.go:282: Put http://localhost:8080/admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions: dial tcp [::1]:8080: connect: connection refused
--- FAIL: TestConsumer_Seek (0.00s)
    consumer_test.go:282: Put http://localhost:8080/admin/v2/persistent/public/default/testSeek: dial tcp [::1]:8080: connect: connection refused
time="2019-10-11T14:29:36+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
==================
WARNING: DATA RACE
Write at 0x00c0002143f0 by goroutine 83:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:189 +0x7df
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0002143f0 by goroutine 86:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xa9
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_EventTime()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:805 +0x243
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 83 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_EventTime()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:805 +0x243
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 86 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
==================
WARNING: DATA RACE
Write at 0x00c000214390 by goroutine 83:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:191 +0x888
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c000214390 by goroutine 86:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xcc
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_EventTime()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:805 +0x243
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 83 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_EventTime()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:805 +0x243
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 86 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
time="2019-10-11T14:29:36+01:00" level=info msg="Connection is ready" laddr="[::1]:65422" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:36+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:36+01:00" level=info msg="Connection is ready" laddr="[::1]:65423" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:36+01:00" level=info msg="Created producer" name=standalone-0-92 topic="persistent://public/default/test-event-time"
time="2019-10-11T14:29:36+01:00" level=info msg="Created consumer" name="<nil>" topic="persistent://public/default/test-event-time"
time="2019-10-11T14:29:36+01:00" level=info msg="Closing consumer" name="<nil>" topic="persistent://public/default/test-event-time"
time="2019-10-11T14:29:36+01:00" level=info msg="Closed consumer" name="<nil>" topic="persistent://public/default/test-event-time"
time="2019-10-11T14:29:36+01:00" level=info msg="Closing producer" name=standalone-0-92 topic="persistent://public/default/test-event-time"
time="2019-10-11T14:29:36+01:00" level=warning msg="Got unexpected message: ledgerId:25 entryId:2 partition:-1 " consumerID=1 laddr="[::1]:65423" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:36+01:00" level=info msg="Closed producer" name=standalone-0-92 topic="persistent://public/default/test-event-time"
--- FAIL: TestConsumer_EventTime (0.03s)
    testing.go:853: race detected during execution of test
time="2019-10-11T14:29:36+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
==================
WARNING: DATA RACE
Write at 0x00c000154230 by goroutine 21:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:189 +0x7df
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c000154230 by goroutine 20:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xa9
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_Flow()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:842 +0x243
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 21 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_Flow()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:842 +0x243
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 20 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
==================
WARNING: DATA RACE
Write at 0x00c0001541d0 by goroutine 21:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:191 +0x888
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0001541d0 by goroutine 20:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xcc
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_Flow()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:842 +0x243
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 21 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestConsumer_Flow()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:842 +0x243
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 20 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
time="2019-10-11T14:29:36+01:00" level=info msg="Connection is ready" laddr="[::1]:65424" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:36+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:36+01:00" level=info msg="Connection is ready" laddr="[::1]:65425" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:36+01:00" level=info msg="Created producer" name=standalone-0-93 topic="persistent://public/default/test-received-since-flow"
time="2019-10-11T14:29:36+01:00" level=info msg="Created consumer" name="<nil>" topic="persistent://public/default/test-received-since-flow"
time="2019-10-11T14:29:36+01:00" level=info msg="Closing producer" name=standalone-0-93 topic="persistent://public/default/test-received-since-flow"
time="2019-10-11T14:29:36+01:00" level=info msg="Closed producer" name=standalone-0-93 topic="persistent://public/default/test-received-since-flow"
time="2019-10-11T14:29:36+01:00" level=info msg="Closing consumer" name="<nil>" topic="persistent://public/default/test-received-since-flow"
time="2019-10-11T14:29:36+01:00" level=info msg="Closed consumer" name="<nil>" topic="persistent://public/default/test-received-since-flow"
--- FAIL: TestConsumer_Flow (0.51s)
    testing.go:853: race detected during execution of test
time="2019-10-11T14:29:36+01:00" level=warning msg="Got unexpected message: ledgerId:27 entryId:101 partition:-1 " consumerID=1 laddr="[::1]:65425" raddr="pulsar://localhost:6650"
time="2019-10-11T14:29:36+01:00" level=info msg="Connecting to broker" raddr="pulsar+ssl://localhost:6651"
time="2019-10-11T14:29:36+01:00" level=warning msg="Failed to connect to broker." error="dial tcp [::1]:6651: connect: connection refused" raddr="pulsar+ssl://localhost:6651"
time="2019-10-11T14:29:36+01:00" level=info msg="Connection closed" raddr="pulsar+ssl://localhost:6651"
time="2019-10-11T14:29:36+01:00" level=info msg="Connecting to broker" raddr="pulsar+ssl://localhost:6651"
time="2019-10-11T14:29:36+01:00" level=warning msg="Failed to connect to broker." error="dial tcp [::1]:6651: connect: connection refused" raddr="pulsar+ssl://localhost:6651"
time="2019-10-11T14:29:36+01:00" level=info msg="Connection closed" raddr="pulsar+ssl://localhost:6651"
--- FAIL: TestTLSInsecureConnection (0.00s)
    impl_client_test.go:65: 
        	Error Trace:	impl_client_test.go:65
        	Error:      	Received unexpected error:
        	            	connection error
        	Test:       	TestTLSInsecureConnection
    impl_client_test.go:66: 
        	Error Trace:	impl_client_test.go:66
        	Error:      	Expected value not to be nil.
        	Test:       	TestTLSInsecureConnection
time="2019-10-11T14:29:36+01:00" level=info msg="Connecting to broker" raddr="pulsar+ssl://localhost:6651"
time="2019-10-11T14:29:36+01:00" level=warning msg="Failed to connect to broker." error="dial tcp [::1]:6651: connect: connection refused" raddr="pulsar+ssl://localhost:6651"
time="2019-10-11T14:29:36+01:00" level=info msg="Connection closed" raddr="pulsar+ssl://localhost:6651"
--- FAIL: TestTLSConnection (0.00s)
    impl_client_test.go:83: 
        	Error Trace:	impl_client_test.go:83
        	Error:      	Received unexpected error:
        	            	connection error
        	Test:       	TestTLSConnection
    impl_client_test.go:84: 
        	Error Trace:	impl_client_test.go:84
        	Error:      	Expected value not to be nil.
        	Test:       	TestTLSConnection
time="2019-10-11T14:29:36+01:00" level=info msg="Connecting to broker" raddr="pulsar+ssl://localhost:6651"
time="2019-10-11T14:29:36+01:00" level=warning msg="Failed to connect to broker." error="dial tcp [::1]:6651: connect: connection refused" raddr="pulsar+ssl://localhost:6651"
time="2019-10-11T14:29:36+01:00" level=info msg="Connection closed" raddr="pulsar+ssl://localhost:6651"
--- FAIL: TestTLSConnectionHostNameVerification (0.00s)
    impl_client_test.go:102: 
        	Error Trace:	impl_client_test.go:102
        	Error:      	Received unexpected error:
        	            	connection error
        	Test:       	TestTLSConnectionHostNameVerification
    impl_client_test.go:103: 
        	Error Trace:	impl_client_test.go:103
        	Error:      	Expected value not to be nil.
        	Test:       	TestTLSConnectionHostNameVerification
time="2019-10-11T14:29:36+01:00" level=info msg="Connecting to broker" raddr="pulsar+ssl://127.0.0.1:6651"
time="2019-10-11T14:29:36+01:00" level=warning msg="Failed to connect to broker." error="dial tcp 127.0.0.1:6651: connect: connection refused" raddr="pulsar+ssl://127.0.0.1:6651"
time="2019-10-11T14:29:36+01:00" level=info msg="Connection closed" raddr="pulsar+ssl://127.0.0.1:6651"
time="2019-10-11T14:29:36+01:00" level=info msg="Connecting to broker" raddr="pulsar+ssl://localhost:6651"
time="2019-10-11T14:29:36+01:00" level=warning msg="Failed to connect to broker." error="dial tcp [::1]:6651: connect: connection refused" raddr="pulsar+ssl://localhost:6651"
time="2019-10-11T14:29:36+01:00" level=info msg="Connection closed" raddr="pulsar+ssl://localhost:6651"
time="2019-10-11T14:29:36+01:00" level=info msg="Connecting to broker" raddr="pulsar+ssl://localhost:6651"
time="2019-10-11T14:29:36+01:00" level=warning msg="Failed to connect to broker." error="dial tcp [::1]:6651: connect: connection refused" raddr="pulsar+ssl://localhost:6651"
time="2019-10-11T14:29:36+01:00" level=info msg="Connection closed" raddr="pulsar+ssl://localhost:6651"
--- FAIL: TestTLSAuth (0.00s)
    impl_client_test.go:158: 
        	Error Trace:	impl_client_test.go:158
        	Error:      	Received unexpected error:
        	            	connection error
        	Test:       	TestTLSAuth
    impl_client_test.go:159: 
        	Error Trace:	impl_client_test.go:159
        	Error:      	Expected value not to be nil.
        	Test:       	TestTLSAuth
time="2019-10-11T14:29:36+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
==================
WARNING: DATA RACE
Write at 0x00c000214690 by goroutine 128:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:189 +0x7df
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c000214690 by goroutine 19:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xa9
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestTokenAuth()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client_test.go:175 +0x2b1
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 128 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestTokenAuth()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client_test.go:175 +0x2b1
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 19 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
==================
WARNING: DATA RACE
Write at 0x00c000214630 by goroutine 128:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:191 +0x888
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c000214630 by goroutine 19:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xcc
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestTokenAuth()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client_test.go:175 +0x2b1
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 128 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestTokenAuth()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client_test.go:175 +0x2b1
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 19 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
time="2019-10-11T14:29:36+01:00" level=info msg="Connection is ready" laddr="[::1]:65439" raddr="pulsar://localhost:6650"
--- FAIL: TestTokenAuth (0.01s)
    impl_client_test.go:179: 
        	Error Trace:	impl_client_test.go:179
        	Error:      	Received unexpected error:
        	            	MetadataError: LookupError
        	Test:       	TestTokenAuth
    impl_client_test.go:180: 
        	Error Trace:	impl_client_test.go:180
        	Error:      	Expected value not to be nil.
        	Test:       	TestTokenAuth
    testing.go:853: race detected during execution of test
time="2019-10-11T14:29:36+01:00" level=info msg="Connecting to broker" raddr="pulsar://localhost:6650"
==================
WARNING: DATA RACE
Write at 0x00c0001c4850 by goroutine 131:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:189 +0x7df
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0001c4850 by goroutine 23:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xa9
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestTokenAuthFromFile()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client_test.go:193 +0x1fc
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 131 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestTokenAuthFromFile()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client_test.go:193 +0x1fc
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 23 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
==================
WARNING: DATA RACE
Write at 0x00c0001c47f0 by goroutine 131:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).connect()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:191 +0x888
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:148 +0x3c

Previous read at 0x00c0001c47f0 by goroutine 23:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).waitUntilReady()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:235 +0xcc
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:75 +0x437
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestTokenAuthFromFile()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client_test.go:193 +0x1fc
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 131 (running) created at:
  github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:147 +0x4c
  github.com/apache/pulsar-client-go/pulsar/internal.(*connectionPool).GetConnection()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/connection_pool.go:72 +0x4cf
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).Request()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:75 +0x8a
  github.com/apache/pulsar-client-go/pulsar/internal.(*rpcClient).RequestToAnyBroker()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/internal/rpc_client.go:69 +0xaf
  github.com/apache/pulsar-client-go/pulsar.(*client).TopicPartitions()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:120 +0x246
  github.com/apache/pulsar-client-go/pulsar.newProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:64 +0x1ef
  github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client.go:92 +0xa6
  github.com/apache/pulsar-client-go/pulsar.TestTokenAuthFromFile()
      /Users/filcho/go/src/github.com/apache/pulsar-client-go/pulsar/impl_client_test.go:193 +0x1fc
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199

Goroutine 23 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:960 +0x651
  testing.runTests.func1()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1202 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:909 +0x199
  testing.runTests()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1200 +0x521
  testing.(*M).Run()
      /usr/local/Cellar/go/1.13/libexec/src/testing/testing.go:1117 +0x2ff
  main.main()
      _testmain.go:120 +0x223
==================
time="2019-10-11T14:29:36+01:00" level=info msg="Connection is ready" laddr="[::1]:65440" raddr="pulsar://localhost:6650"
--- FAIL: TestTokenAuthFromFile (0.01s)
    impl_client_test.go:197: 
        	Error Trace:	impl_client_test.go:197
        	Error:      	Received unexpected error:
        	            	MetadataError: LookupError
        	Test:       	TestTokenAuthFromFile
    impl_client_test.go:198: 
        	Error Trace:	impl_client_test.go:198
        	Error:      	Expected value not to be nil.
        	Test:       	TestTokenAuthFromFile
    testing.go:853: race detected during execution of test
2019/10/11 14:29:36 Put http://localhost:8080/admin/v2/persistent/public/default/TestGetTopicPartitions/partitions: dial tcp [::1]:8080: connect: connection refused
FAIL	github.com/apache/pulsar-client-go/pulsar	15.415s
ok  	github.com/apache/pulsar-client-go/pulsar/internal	(cached)
ok  	github.com/apache/pulsar-client-go/util	(cached)
FAIL

Tell us what happens instead

Steps to reproduce

go test ./... -race

How can we reproduce the issue

System configuration

Pulsar package version: v0.0.0-20191008060812-61388933d356

consumer.Receive () is blocking, availablePermits are decreasing, but msgBacklog is increasing

Expected behavior

The topic stats as follows:

{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "averageMsgSize" : 0.0,
  "storageSize" : 0,
  "publishers" : [ ],
  "subscriptions" : {
    "response:proxy:8787:9:1577364969737" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Failover",
      "activeConsumerName" : "lxxlc",
      "msgRateExpired" : 0.0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "lxxlc",
        "availablePermits" : 1024,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "metadata" : { },
        "connectedSince" : "2019-12-26T20:56:09.832+08:00",
        "address" : "/10.32.68.213:57108"
      } ],
      "isReplicated" : false
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled"
}
{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "averageMsgSize" : 0.0,
  "storageSize" : 2967,
  "publishers" : [ {
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "producerId" : 1,
    "metadata" : { },
    "producerName" : "qa-pulsar-ten-5-279",
    "connectedSince" : "2019-12-26T20:56:40.736+08:00",
    "address" : "/10.32.68.213:43156"
  } ],
  "subscriptions" : {
    "response:proxy:8787:9:1577364969737" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 1,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Failover",
      "activeConsumerName" : "lxxlc",
      "msgRateExpired" : 0.0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "lxxlc",
        "availablePermits" : 1021,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "metadata" : { },
        "connectedSince" : "2019-12-26T20:56:09.832+08:00",
        "address" : "/10.32.68.213:57108"
      } ],
      "isReplicated" : false
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled"
}
{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "averageMsgSize" : 0.0,
  "storageSize" : 5681,
  "publishers" : [ {
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "producerId" : 1,
    "metadata" : { },
    "producerName" : "qa-pulsar-ten-5-279",
    "connectedSince" : "2019-12-26T20:56:40.736+08:00",
    "address" : "/10.32.68.213:43156"
  } ],
  "subscriptions" : {
    "response:proxy:8787:9:1577364969737" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 3,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Failover",
      "activeConsumerName" : "lxxlc",
      "msgRateExpired" : 0.0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "lxxlc",
        "availablePermits" : 1019,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "metadata" : { },
        "connectedSince" : "2019-12-26T20:56:09.832+08:00",
        "address" : "/10.32.68.213:57108"
      } ],
      "isReplicated" : false
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled"
}
{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "averageMsgSize" : 0.0,
  "storageSize" : 7169,
  "publishers" : [ {
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "producerId" : 1,
    "metadata" : { },
    "producerName" : "qa-pulsar-ten-5-279",
    "connectedSince" : "2019-12-26T20:56:40.736+08:00",
    "address" : "/10.32.68.213:43156"
  } ],
  "subscriptions" : {
    "response:proxy:8787:9:1577364969737" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 5,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Failover",
      "activeConsumerName" : "lxxlc",
      "msgRateExpired" : 0.0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "lxxlc",
        "availablePermits" : 1017,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "metadata" : { },
        "connectedSince" : "2019-12-26T20:56:09.832+08:00",
        "address" : "/10.32.68.213:57108"
      } ],
      "isReplicated" : false
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled"
}
{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "averageMsgSize" : 0.0,
  "storageSize" : 8718,
  "publishers" : [ {
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "producerId" : 1,
    "metadata" : { },
    "producerName" : "qa-pulsar-ten-5-279",
    "connectedSince" : "2019-12-26T20:56:40.736+08:00",
    "address" : "/10.32.68.213:43156"
  } ],
  "subscriptions" : {
    "response:proxy:8787:9:1577364969737" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 8,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Failover",
      "activeConsumerName" : "lxxlc",
      "msgRateExpired" : 0.0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "lxxlc",
        "availablePermits" : 1014,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "metadata" : { },
        "connectedSince" : "2019-12-26T20:56:09.832+08:00",
        "address" : "/10.32.68.213:57108"
      } ],
      "isReplicated" : false
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled"
}
{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "averageMsgSize" : 0.0,
  "storageSize" : 6,
  "publishers" : [ {
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "producerId" : 1,
    "metadata" : { },
    "producerName" : "qa-pulsar-ten-5-279",
    "connectedSince" : "2019-12-26T20:56:40.736+08:00",
    "address" : "/10.32.68.213:43156"
  } ],
  "subscriptions" : {
    "response:proxy:8787:9:1577364969737" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Failover",
      "activeConsumerName" : "lxxlc",
      "msgRateExpired" : 0.0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "lxxlc",
        "availablePermits" : 1011,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "metadata" : { },
        "connectedSince" : "2019-12-26T20:56:09.832+08:00",
        "address" : "/10.32.68.213:57108"
      } ],
      "isReplicated" : false
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled"
}

System configuration

Pulsar version: 2.4.0
type : Failover

The message sent by the go client, the Java client's consumer can't receive it normally.

Expected behavior

The message sent by the go client, the Java client's consumer can receive it normally.

Actual behavior

The Java client's consumer can't receive it normally.

Steps to reproduce

Producer of Go client:

func main() {
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL: "pulsar://localhost:6650",
    })

    if err != nil {
        log.Fatal(err)
    }

    defer client.Close()

    producer, err := client.CreateProducer(pulsar.ProducerOptions{
        Topic: "my-topic-2",
    })
    if err != nil {
        log.Fatal(err)
    }

    defer producer.Close()

    ctx := context.Background()

    for i := 0; i < 10; i++ {
        if err := producer.Send(ctx, &pulsar.ProducerMessage{
            Payload: []byte(fmt.Sprintf("hello-%d", i)),
        }); err != nil {
            log.Fatal(err)
        }
    }
}

Java client receive:

public static void main(String[] args) throws PulsarClientException, InterruptedException {
        final String topic = "persistent://public/default/my-topic-2";

        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        Consumer<byte[]> consumer = client.newConsumer()
                .topic(topic)
                .subscriptionName(UUID.randomUUID().toString())
                .subscribe();

        for (int i = 0; i < 10; i++) {
            Message<byte[]> message = consumer.receive();
            System.out.println(new String(message.getValue(), Charset.defaultCharset()));
        }
    }

The broker error information as follows:

17:15:48.785 [pulsar-io-37-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:56912][persistent://public/default/my-topic-2] Creating producer. producerId=1
17:15:48.786 [ForkJoinPool.commonPool-worker-11] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:56912]-1 persistent://public/default/my-topic-2 configured with schema false
17:15:48.787 [ForkJoinPool.commonPool-worker-11] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:56912] Created new producer: Producer{topic=PersistentTopic{topic=persistent://public/default/my-topic-2}, client=/127.0.0.1:56912, producerName=standalone-0-3, producerId=1}
17:15:48.820 [pulsar-io-37-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [PersistentTopic{topic=persistent://public/default/my-topic-2}][standalone-0-3] Closing producer on cnx /127.0.0.1:56912
17:15:48.821 [pulsar-io-37-12] INFO  org.apache.pulsar.broker.service.ServerCnx - [PersistentTopic{topic=persistent://public/default/my-topic-2}][standalone-0-3] Closed producer on cnx /127.0.0.1:56912
17:15:48.822 [pulsar-io-37-12] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:56912
17:15:48.822 [pulsar-io-37-11] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:56910
17:15:48.830 [pulsar-io-37-10] WARN  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:56906] Got exception IOException : Connection reset by peer
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_201]
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:1.8.0_201]
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_201]
	at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_201]
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[?:1.8.0_201]
	at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
17:15:48.834 [pulsar-io-37-10] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:56906
17:15:48.834 [pulsar-io-37-10] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/my-topic-2, name=a01fed02-2baf-4617-89e7-611dd08633cc}, consumerId=0, consumerName=04824, address=/127.0.0.1:56906}
17:15:48.940 [pulsar-io-37-13] INFO  org.apache.pulsar.broker.service.ServerCnx - New connection from /127.0.0.1:56913
17:15:48.943 [pulsar-io-37-13] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:56913] Subscribing on topic persistent://public/default/my-topic-2 / a01fed02-2baf-4617-89e7-611dd08633cc
17:15:48.943 [pulsar-io-37-13] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic-2-a01fed02-2baf-4617-89e7-611dd08633cc] Rewind from 7:10 to 7:0
17:15:48.944 [pulsar-io-37-13] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/my-topic-2] There are no replicated subscriptions on the topic
17:15:48.944 [pulsar-io-37-13] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/my-topic-2][a01fed02-2baf-4617-89e7-611dd08633cc] Created new subscription for 0
17:15:48.944 [pulsar-io-37-13] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:56913] Created subscription on topic persistent://public/default/my-topic-2 / a01fed02-2baf-4617-89e7-611dd08633cc

System configuration

Pulsar version: x.y

[Bug Fix] About go func(){} usage

In impl_producer.go#L79-L84, i think the correct way of handling should be:

	for partitionIdx, partition := range partitions {
		go func(index int) {
			prod, err := newPartitionProducer(client, partition, options, index)
			c <- ProducerError{partitionIdx, prod, err}
		}(partitionIdx)
	}

test code:

package main

import (
	"fmt"
	"time"
)

func main() {
	s := []string{"a", "b", "c"}
	for _, v := range s {
		go func() {
			fmt.Println(v)
		}()
	}
	time.Sleep(time.Second * 1000)
}

output:

c
c
c

As you can see, you can only get the value of the last loop before passing the copy value of the variable v into the anonymous function.

Support multi-topics and pattern topics logic

Is your feature request related to a problem? Please describe.

The current consumer only supports specifying a specific topic name. We can consider supporting both multi-topics and pattern topics.

pulsar-client-go producer_partition crash

Expected behavior

Tell us what should happen

Actual behavior

worker_1  | panic: runtime error: index out of range [1] with length 1
worker_1  |
worker_1  | goroutine 316 [running]:
worker_1  | github.com/apache/pulsar-client-go/pulsar/internal.ConvertFromStringMap(0xc000f21620, 0xc000ac9430, 0xc00014f788, 0x2)
worker_1  | 	/data/code/Multi-SiteHighAvailabilitySignalServer/thirdparty/src/github.com/apache/pulsar-client-go/pulsar/internal/commands.go:258 +0x265
worker_1  | github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).internalSend(0xc000bd6280, 0xc000f21650)
worker_1  | 	/data/code/Multi-SiteHighAvailabilitySignalServer/thirdparty/src/github.com/apache/pulsar-client-go/pulsar/producer_partition.go:248 +0xb0c
worker_1  | github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).runEventsLoop(0xc000bd6280)
worker_1  | 	/data/code/Multi-SiteHighAvailabilitySignalServer/thirdparty/src/github.com/apache/pulsar-client-go/pulsar/producer_partition.go:205 +0x144
worker_1  | created by github.com/apache/pulsar-client-go/pulsar.newPartitionProducer
worker_1  | 	/data/code/Multi-SiteHighAvailabilitySignalServer/thirdparty/src/github.com/apache/pulsar-client-go/pulsar/producer_partition.go:114 +0x5d8
signal_dockerfiles_worker_1 exited with code 2

Steps to reproduce

How can we reproduce the issue

System configuration

Pulsar version: x.y

The flow of multi-Partition consumer can't work

Expected behavior

The flow of multi-Partition consumer can correct work

Actual behavior

The receive blocked.

Steps to reproduce

func TestPartitionTopicsConsumerPubSub(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})
	assert.Nil(t, err)
	defer client.Close()

	topic := "persistent://public/default/testGetPartitions-2"
	testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions-2/partitions"

	makeHTTPCall(t, http.MethodPut, testURL, "3")

	// create producer
	producer, err := client.CreateProducer(ProducerOptions{
		Topic: topic,
	})
	assert.Nil(t, err)
	defer producer.Close()

	topics, err := client.TopicPartitions(topic)
	assert.Nil(t, err)
	assert.Equal(t, topic+"-partition-0", topics[0])
	assert.Equal(t, topic+"-partition-1", topics[1])
	assert.Equal(t, topic+"-partition-2", topics[2])

	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:             topic,
		SubscriptionName:  "my-sub-1",
		Type:              Exclusive,
		ReceiverQueueSize: 10,
	})
	assert.Nil(t, err)
	defer consumer.Close()

	ctx := context.Background()
	for i := 0; i < 500; i++ {
		err := producer.Send(ctx, &ProducerMessage{
			Payload: []byte(fmt.Sprintf("hello-%d", i)),
		})
		assert.Nil(t, err)
	}

	msgs := make([]string, 0)

	for i := 0; i < 500; i++ {
		msg, err := consumer.Receive(ctx)
		assert.Nil(t, err)
		msgs = append(msgs, string(msg.Payload()))

		fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
			msg.ID(), string(msg.Payload()))
		//
		//if err := consumer.Ack(msg); err != nil {
		//	assert.Nil(t, err)
		//}
	}

	assert.Equal(t, len(msgs), 500)
}

System configuration

Pulsar version: x.y

Producer does not close all partitioned producers

When closing a producer all partitioned producers should be closed.

The current code only closes the first partitioned producer

func (p *producer) Close() error {
	for _, pp := range p.producers {
		return pp.Close()
	}
	return nil
}

The pulsar go client missing data when transfer

Now I found one problem relate to pulsar go client

The whole process of mine is csv file  filebeatpulsar flume hive

At past time I verified the data count ,found that the data by this path missing about 0.46% compare with the path of logstashkafka .

No idea the reason why ,so I tried below steps

1 Reduce the amount of data from 100% to 50%, excluding data loss caused by too much pressure in the pulsar cluster, the data loss is still 0.46%
2 Reduce the amount of data from 50% to 10%, make the pulsar cluster stress-free, and the data loss is still 0.46%
3 Use bin / pulsar-perf to quantitatively send test test data to the pulsar cluster, which is 100% acceptable, eliminating the problems of the pulsar cluster
4 Quantitatively send test data to the pulsar cluster with filebeat. 99.6% can receive it (send 36077, receive 35091)

I put the data file in the attachment. Please take a look at this problem. Now the loss rate is about five thousandths, which is a little too large.

Another thing to say, I use the asynchronous sending of the go client, which has been confirmed with @tuteng

Hopefully your response

Add seek by msgID on consumer

Describe the solution you'd like

Add seek by msgID on the consumer, the interface as follows:

Seek (msgID MessageID) error

Transfer data is blocked by flush

To pump data buy filebeat to pulsar cluster , by using go client .

Found that ,after sending message to pulsar successfully, there will be blocked by using flush function.

the code is below :

func (p *partitionProducer) internalFlush(fr *flushRequest) {
	p.internalFlushCurrentBatch()

	pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
	if !ok {
		p.log.Error("internalFlush block ,Flush error")
		fr.waitGroup.Done()
		return
	}
	p.log.Info("InternalFlush corrctly: ")  
	pi.sendRequests = append(pi.sendRequests, &sendRequest{
		msg: nil,
		callback: func(id MessageID, message *ProducerMessage, e error) {
			fr.err = e
			fr.waitGroup.Done()
		},
	})
}

func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) error {
	wg := sync.WaitGroup{}
	wg.Add(1)

	var err error

	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
		err = e
		wg.Done()
	}, true)

	// When sending synchronously we flush immediately to avoid
	// the increased latency and reduced throughput of batching
	p.log.Info("-------------------beginning flsuh ----------------")
	if err = p.Flush(); err != nil {
		return err
	}
	p.log.Info("-------------------end flsuh ----------------")
	wg.Wait()
	return err
}

log as below

time="2019-10-25T10:39:53+08:00" level=info msg="InternalFlush corrctly: " name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
time="2019-10-25T10:39:53+08:00" level=info msg="-------------------end flsuh ----------------" name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
time="2019-10-25T10:39:53+08:00" level=info msg="-------------------beginning flsuh ----------------" name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
time="2019-10-25T10:39:53+08:00" level=info msg="InternalFlush corrctly: " name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
time="2019-10-25T10:39:53+08:00" level=info msg="-------------------end flsuh ----------------" name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
time="2019-10-25T10:39:53+08:00" level=info msg="-------------------beginning flsuh ----------------" name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
time="2019-10-25T10:39:53+08:00" level=info msg="InternalFlush corrctly: " name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
time="2019-10-25T10:39:53+08:00" level=info msg="-------------------end flsuh ----------------" name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
time="2019-10-25T10:39:53+08:00" level=info msg="-------------------beginning flsuh ----------------" name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
time="2019-10-25T10:39:53+08:00" level=info msg="InternalFlush corrctly: " name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0"

the problem is occured on below section

p.log.Info("InternalFlush corrctly: ")  
	pi.sendRequests = append(pi.sendRequests, &sendRequest{
		msg: nil,
		callback: func(id MessageID, message *ProducerMessage, e error) {
			fr.err = e
			fr.waitGroup.Done()
		},
	})

no idea the logic ,but make sure the error is occur on it.

Test killed with quit: ran too long (10m0s)

Expected behavior

Tell us what should happen

Actual behavior

SIGQUIT: quit
PC=0x465113 m=0 sigcode=0

goroutine 169 [syscall]:
runtime.notetsleepg(0xd018c0, 0x70c335, 0x0)
	/usr/local/go/src/runtime/lock_futex.go:227 +0x34 fp=0xc00049a760 sp=0xc00049a730 pc=0x411f84
runtime.timerproc(0xd018a0)
	/usr/local/go/src/runtime/time.go:311 +0x2ea fp=0xc00049a7d8 sp=0xc00049a760 pc=0x452d5a
runtime.goexit()
	/usr/local/go/src/runtime/asm_amd64.s:1337 +0x1 fp=0xc00049a7e0 sp=0xc00049a7d8 pc=0x4632c1
created by runtime.(*timersBucket).addtimerLocked
	/usr/local/go/src/runtime/time.go:169 +0x10e

.....


goroutine 834 [select, 9 minutes]:
github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).Receive(0xc0001a6000, 0x9e18e0, 0xc000024038, 0x0, 0x0, 0x0, 0x0)
	/pulsar-client-go/pulsar/impl_partition_consumer.go:251 +0x196
github.com/apache/pulsar-client-go/pulsar.(*consumer).Receive.func1(0x9e18e0, 0xc000024038, 0xc000178240, 0x9e6780, 0xc0001a6000)
	/pulsar-client-go/pulsar/impl_consumer.go:162 +0xaf
created by github.com/apache/pulsar-client-go/pulsar.(*consumer).Receive
	/pulsar-client-go/pulsar/impl_consumer.go:160 +0xe3

rax    0xfffffffffffffffc
rbx    0x70c335
rcx    0x465113
rdx    0x0
rdi    0xd018c0
rsi    0x80
rbp    0xc00049a6e8
rsp    0xc00049a6a0
r8     0x0
r9     0x0
r10    0xc00049a6d8
r11    0x206
r12    0x59f2b5db530dd2
r13    0x1
r14    0xc00020bf80
r15    0xc00020b6e0
rip    0x465113
rflags 0x206
cs     0x33
fs     0x0
gs     0x0
*** Test killed with quit: ran too long (10m0s).
FAIL	github.com/apache/pulsar-client-go/pulsar	600.013s
ok  	github.com/apache/pulsar-client-go/pulsar/internal	0.008s	coverage: 19.3% of statements
ok  	github.com/apache/pulsar-client-go/util	0.210s	coverage: 81.7% of statements
Build step 'Execute shell' marked build as failure
Archiving artifacts
Setting status of 1ce53f89100f4d9ae5717ee84b3d8b05ecaf0ae4 to FAILURE with url https://builds.apache.org/job/pulsar_precommit_client_go/57/ and message: 'FAILURE

Steps to reproduce

How can we reproduce the issue

System configuration

Pulsar version: x.y

Catch up Java client function list

Non-Support Features:

  • Schema
  • seek by time
  • nack
  • replication
  • Interceptor
  • DeadLetterPolicy
  • receive with timeout
  • multi topics
  • topics pattern
  • auto update partitions
  • read compact
  • auth

Can't install go module.

Expected behavior

go get github.com/apache/pulsar-client-go@latest

should download and install the latest commit on the master branch

Steps to reproduce

$ go get github.com/apache/pulsar-client-go@latest
go: finding github.com/apache/pulsar-client-go latest
go build github.com/apache/pulsar-client-go: no non-test Go files in /Users/jimlambrt/go/pkg/mod/github.com/apache/[email protected]

I think this is happening because you only have one .go file in the parent directory https://github.com/apache/pulsar-client-go and it's a test file.

System configuration

$ go version
go version go1.13.4 darwin/amd64

Support partition-consumer receive async logic

Is your feature request related to a problem? Please describe.

Now, we not impl receive async logic of partition consumer.

Describe the solution you'd like

We should support receive async logic in partition consumer.

Fatal error: concurrent map writes

Expected behavior

The program works fine when we create multiple partitions.

Actual behavior

fatal error: concurrent map writes

goroutine 119 [running]:
runtime.throw(0x4501a47, 0x15)
	/usr/local/Cellar/go/1.12.5/libexec/src/runtime/panic.go:617 +0x72 fp=0xc0003dfe88 sp=0xc0003dfe58 pc=0x402fbf2
runtime.mapassign_fast64(0x4478040, 0xc000277a40, 0x34, 0x0)
	/usr/local/Cellar/go/1.12.5/libexec/src/runtime/map_fast64.go:101 +0x35f fp=0xc0003dfec8 sp=0xc0003dfe88 pc=0x4013cdf
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run(0xc00016e2a0)
	/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:261 +0x19d fp=0xc0003dffb8 sp=0xc0003dfec8 pc=0x430c55d
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1(0xc00016e2a0)
	/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:146 +0x59 fp=0xc0003dffd8 sp=0xc0003dffb8 pc=0x4311b09
runtime.goexit()
	/usr/local/Cellar/go/1.12.5/libexec/src/runtime/asm_amd64.s:1337 +0x1 fp=0xc0003dffe0 sp=0xc0003dffd8 pc=0x405e331
created by github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start
	/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:143 +0x3f

goroutine 1 [chan receive]:
testing.(*T).Run(0xc000466000, 0x45075c9, 0x21, 0x4516718, 0x407cb01)
	/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:917 +0x381
testing.runTests.func1(0xc00015e000)
	/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:1157 +0x78
testing.tRunner(0xc00015e000, 0xc0000e7e30)
	/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:865 +0xc0
testing.runTests(0xc0000cc480, 0x48a7d00, 0x23, 0x23, 0x0)
	/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:1155 +0x2a9
testing.(*M).Run(0xc000140000, 0x0)
	/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:1072 +0x162
main.main()
	_testmain.go:110 +0x13e

goroutine 81 [chan receive]:
github.com/apache/pulsar-client-go/pulsar.newProducer(0xc000222780, 0xc000141480, 0xc0003803f0, 0xc000466100, 0xc000315ef0)
	/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:92 +0x1e4
github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer(0xc000222780, 0x450cf17, 0x2d, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
	/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/impl_client.go:91 +0x7a
github.com/apache/pulsar-client-go/pulsar.TestPartitionTopicsConsumerPubSub(0xc000466000)
	/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:367 +0x277
testing.tRunner(0xc000466000, 0x4516718)
	/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:865 +0xc0
created by testing.(*T).Run
	/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:916 +0x35a

Steps to reproduce

How can we reproduce the issue

System configuration

Pulsar version: x.y

When using the go client's producer to send messages, the cgo client's consumer cannot be consumed normally.

Expected behavior

When using the go client's producer to send messages, the cgo client's consumer can be consumed normally.

Actual behavior

go client producer:

func main() {
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL: "pulsar://localhost:6650",
    })

    if err != nil {
        log.Fatal(err)
    }

    defer client.Close()

    producer, err := client.CreateProducer(pulsar.ProducerOptions{
        Topic: "my-topic-1",
    })
    if err != nil {
        log.Fatal(err)
    }

    defer producer.Close()

    ctx := context.Background()

    for i := 0; i < 10; i++ {
        if err := producer.Send(ctx, &pulsar.ProducerMessage{
            Payload: []byte(fmt.Sprintf("hello-%d", i)),
        }); err != nil {
            log.Fatal(err)
        }
    }
}

cgo client consumer:

func main() {
	client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
	if err != nil {
		log.Fatal(err)
	}

	defer client.Close()

	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:            "my-topic-1",
		SubscriptionName: "my-subscription",
		Type:             pulsar.Shared,
	})
	if err != nil {
		log.Fatal(err)
	}

	defer consumer.Close()

	for {
		msg, err := consumer.Receive(context.Background())
		if err != nil {
			log.Fatal(err)
		}

		fmt.Printf("Received message  msgId: %s -- content: '%s'\n",
			msg.ID(), string(msg.Payload()))

		consumer.Ack(msg)
	}
}

cgo client receive payload content is nil.

Received message  msgId: (4,0,-1,0) -- content: ''
Received message  msgId: (4,1,-1,0) -- content: ''
Received message  msgId: (4,2,-1,0) -- content: ''
Received message  msgId: (4,3,-1,0) -- content: ''
Received message  msgId: (4,4,-1,0) -- content: ''
Received message  msgId: (4,5,-1,0) -- content: ''
Received message  msgId: (4,6,-1,0) -- content: ''
Received message  msgId: (4,7,-1,0) -- content: ''
Received message  msgId: (4,8,-1,0) -- content: ''
Received message  msgId: (4,9,-1,0) -- content: ''

Steps to reproduce

How can we reproduce the issue

System configuration

Pulsar version: x.y

For a pulsar.Client instance, Once Client.Subscribe() times out, all subsequent Client.Subscribe() fail

Expected behavior

When the network is restored, the subscription is successfully created

Actual behavior

When the network is restored, creating a subscription fails

Steps to reproduce

1、code file test.go
func main(){
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL: "pulsar://10.32.32.20:6650",
		ConnectionTimeout:time.Second * time.Duration(2),
		OperationTimeout: time.Second * time.Duration(2),
	})
	if err != nil {
		log.Fatal(err)
	}

	GConsumerClient = client

	defer client.Close()
	for i := 0; i < 10; i++ {
		index := i
		time.Sleep(time.Second*2)
		roomId := strconv.Itoa(index)
		subscriptionName := fmt.Sprintf("sub-name-%s", roomId)
		c, e := initRoomPulsarConsumer(roomId,subscriptionName)
		if e != nil {
			fmt.Printf("create faild  -- %d\n", index)
			//wg.Done()
			//log.Fatal(e)
			continue
		}
		time.Sleep(time.Second*1)
		go func(con pulsar.Consumer,sb string) {
			fmt.Printf("to close room %s.....\n",sb)
			err := con.Unsubscribe()
			if err != nil {
				fmt.Printf("******* Unsubscribe error  %s.....\n",err.Error())
			}
			con.Close()
			fmt.Printf("to close room [%s close done***]\n",sb)
		}(c,subscriptionName)
		fmt.Printf("wait group -- %d done\n", index)	
	}
}
func initRoomPulsarConsumer(roomId string, subscriptionName string) (consumer pulsar.Consumer, err error) {
	fmt.Printf("to creat room consumer SubscriptionName:%s .....\n",subscriptionName)
	cfg := pulsar.ConsumerOptions{
		Topic:               fmt.Sprintf("dev_pid2/signal_geo/%s", roomId),
		SubscriptionName:    subscriptionName,
		Type:                pulsar.Failover,
	}
   
	consumer, err = GConsumerClient.Subscribe(cfg)
	if err != nil {
		fmt.Printf("*****   could not establish subscription %v, err: %s", roomId, err.Error())
		return nil, err
	}
	fmt.Printf("creat room consumer done, SubscriptionName:%s  ***creat done\n",cfg.SubscriptionName)
	return
}
2、steps
  • go run test.go

  • After the first call to subscribe (), the network packet loss simulation is started.

iptables -I INPUT -s 172.23.111.49  -m statistic --mode random --probability 0.6 -j DROP
iptables -I OUTPUT -d 172.23.111.49  -m statistic --mode random --probability 0.6 -j DROP

-After the fourth call to subscribe (), the network packet loss simulation is stopped.

iptables -D INPUT -s 172.23.111.49 -m statistic --mode random --probability 0.6 -j DROP
iptables -D OUTPUT -d 172.23.111.49  -m statistic --mode random --probability 0.6 -j DROP
3、logs

$ go run test.go

to creat room consumer SubscriptionName:sub-name-0 .....
INFO[0002] Connecting to broker                          remote_addr="pulsar://10.32.32.20:6650"
INFO[0002] TCP connection established                    local_addr="172.23.111.52:63844" remote_addr="pulsar://10.32.32.20:6650"
INFO[0002] Connection is ready                           local_addr="172.23.111.52:63844" remote_addr="pulsar://10.32.32.20:6650"
INFO[0002] Connecting to broker                          remote_addr="pulsar://10.32.32.20:6652"
INFO[0002] TCP connection established                    local_addr="172.23.111.52:63845" remote_addr="pulsar://10.32.32.20:6652"
INFO[0002] Connection is ready                           local_addr="172.23.111.52:63845" remote_addr="pulsar://10.32.32.20:6652"
INFO[0002] Connected consumer                            name=cuvse subscription=sub-name-0 topic="persistent://dev_pid2/signal_geo/0"
INFO[0002] Created consumer                              name=cuvse subscription=sub-name-0 topic="persistent://dev_pid2/signal_geo/0"
creat room consumer done, SubscriptionName:sub-name-0  ***creat done
wait group -- 0 done
to close room sub-name-0.....
INFO[0003] Closing consumer=1                            name=cuvse subscription=sub-name-0 topic="persistent://dev_pid2/signal_geo/0"
WARN[0003] Failed to close consumer                      error="server error: MetadataError: Consumer not found" name=cuvse subscription=sub-name-0 topic="persistent://dev_pid2/signal_geo/0"
INFO[0003] exiting events loop                           name=cuvse subscription=sub-name-0 topic="persistent://dev_pid2/signal_geo/0"
to close room [sub-name-0 close done***]
INFO[0003] exiting dispatch loop                         name=cuvse subscription=sub-name-0 topic="persistent://dev_pid2/signal_geo/0"
to creat room consumer SubscriptionName:sub-name-1 .....
WARN[0007] Failed to lookup topic                        error="request timed out" name=gqivy subscription=sub-name-1 topic="persistent://dev_pid2/signal_geo/1"
ERRO[0007] Failed to create consumer                     error="request timed out"
*****   could not establish subscription 1, err: request timed outcreate faild  -- 1
to creat room consumer SubscriptionName:sub-name-2 .....
*****   could not establish subscription 2, err: request timed outcreate faild  -- 2
to creat room consumer SubscriptionName:sub-name-3 .....
*****   could not establish subscription 3, err: request timed outcreate faild  -- 3
to creat room consumer SubscriptionName:sub-name-4 .....
*****   could not establish subscription 4, err: request timed outcreate faild  -- 4
to creat room consumer SubscriptionName:sub-name-5 .....
*****   could not establish subscription 5, err: request timed outcreate faild  -- 5
to creat room consumer SubscriptionName:sub-name-6 .....
*****   could not establish subscription 6, err: request timed outcreate faild  -- 6
to creat room consumer SubscriptionName:sub-name-7 .....
*****   could not establish subscription 7, err: request timed outcreate faild  -- 7
to creat room consumer SubscriptionName:sub-name-8 .....
*****   could not establish subscription 8, err: request timed outcreate faild  -- 8
to creat room consumer SubscriptionName:sub-name-9 .....
*****   could not establish subscription 9, err: request timed outcreate faild  -- 9

System configuration

Pulsar version: 2.4.0

The client batch is full, resulting in data loss or client is blocked

This is the reason for these two problems #76 and #114.

Expected behavior

When the batch is full, the current data should be retried to ensure that the data can be successfully sent.

Actual behavior

Steps to reproduce

  1. SendAsyc is used to send data
  2. ensure that each piece of data is large enough
  3. trigger batch full after multiple pieces of data are sent.

I think there is a problem here https://github.com/apache/pulsar-client-go/blob/master/pulsar/producer_partition.go#L259. If the current data is checked and found to exceed the size of the batch, the p.batchBuilder.Add function will return false and then trigger Flush, but after triggering flush, the current data is not processed. Therefore, if the client uses waitgroup to synchronize, it will be blocked in the callback. If there is no synchronization, Then this message will not be sent correctly resulting in data loss, so I think there should be more retries to ensure that the current data is sent correctly when the batch is full, just like java client https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L396

System configuration

Pulsar version: master
go client: master

Add seek by time on consumer

Describe the solution you'd like

Add seek by time on the consumer, the interface as follows:

Seek (time time.Time) error

When we set the MessageRouter policy, its value is nil

Expected behavior

Users can customize the routing policy of messages, as follows:

producer, err := client.CreateProducer(ProducerOptions{
	Topic: "my-partitioned-topic",
	MessageRouter: func(msg Message, tm TopicMetadata) int {
		fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
		return 2
	},
})

Actual behavior

When we set the MessageRouter policy of ProducerOptions, the MessageRouter is nil.

The consumer can't Unsubscribe correctly in Failover sub mode

Steps to reproduce

run code as follows:

func TestSubFailOver(t *testing.T) {
    client, err := NewClient(ClientOptions{
        URL: lookupURL,
    })
    if err != nil {
        t.Fatal(err)
    }

    topic := "test-failover-topic-4"
    subName := "test-sub-4"

    producer, err := client.CreateProducer(ProducerOptions{
        Topic: topic,
    })
    assert.Nil(t, err)
    for i := 0; i < 30; i++ {
        err = producer.Send(context.Background(), &ProducerMessage{
            Payload: []byte(fmt.Sprintf("message-%d", i)),
        })

        assert.Nil(t, err)
    }

    cfg := ConsumerOptions{
        Topic:            topic,
        SubscriptionName: subName,
        Type:             Failover,
        SubscriptionInitialPosition: SubscriptionPositionEarliest,
    }

    consumer1, err := client.Subscribe(cfg)
    assert.Nil(t, err)

    consumer2, err := client.Subscribe(cfg)
    assert.Nil(t, err)

    consumer3, err := client.Subscribe(cfg)
    assert.Nil(t, err)

    go func(consumer1 Consumer) {
        for {
            msg, err := consumer1.Receive(context.Background())
            assert.Nil(t, err)
            fmt.Printf("consumer-1 receive : %s\n", string(msg.Payload()))
        }
    }(consumer1)

    go func(consumer2 Consumer) {
        for {
            msg, err := consumer2.Receive(context.Background())
            assert.Nil(t, err)
            fmt.Printf("consumer-2 receive : %s\n", string(msg.Payload()))
        }
    }(consumer2)

    go func(consumer3 Consumer) {
        for {
            msg, err := consumer3.Receive(context.Background())
            assert.Nil(t, err)
            fmt.Printf("consumer-3 receive : %s\n", string(msg.Payload()))
        }
    }(consumer3)

    time.Sleep(time.Second * 10)

    consumer1.Unsubscribe()
    consumer1.Close()

    consumer2.Unsubscribe()
    consumer2.Close()

    consumer3.Unsubscribe()
    consumer3.Close()
}

image

System configuration

Pulsar version: 2.5.0

Data race: type client.handers map is not concurrent safe

Expected behavior

Client should be concurrent safe. Writes to client.handlers marking them closable should never cause a panic.

Actual behavior

Current implementation allows a race condition since there is no sync.Mutex or something similar protect concurrent writes to the map index. Examples:

Steps to reproduce

Run two goroutines creating subscribers or producers... or just run some tests using the race detector.

I'll submit a PR for this in a few minutes.

The function Send() performance

Expected behavior

I want to send a quantity of messages using this pulsar-client-go, and I hope it can be done in less than 1 second

Actual behavior

I call Send(ctx,msg) about 2000 times in a loop, it costs me 6 seconds to finish totally.
One message's capacity is 60 Bytes or so.

Steps to reproduce

   for i:=0; i<2000; i++ {		
                    ..........
	msg, err := proto.Marshal(&xxx)
	if err != nil {
		fmt.Println(err)
		return
	}

	if err := producer.Send(context.Background(), &pulsar.ProducerMessage{Payload: msg}); err != nil {
		fmt.Println(err)
	} else {
		//fmt.Printf("[%d]success\n", i)
	}
}

System configuration

OS: macOS 10.14
docker in localhost: apachepulsar/pulsar-standalone:2.3.1 bin/pulsar standalone

Add ack timeout test case for consumer

Is your feature request related to a problem? Please describe.

Now, ack timeout test case only tests some usage scenarios, we should add more comprehensive test.

Describe the solution you'd like

Add tests that ack partially of the messages, and only non-acked messages will be redeliver.
Also add this kind of test for shared multi-partitions consumer.

fatal error: concurrent map read and map write

Expected behavior

In connection struct, the map of pendingReqs is concurrent safe.

pendingReqs      map[uint64]*request

Actual behavior

fatal error: concurrent map read and map write

goroutine 27 [running]:
runtime.throw(0x450438e, 0x21)
	/usr/local/Cellar/go/1.12.5/libexec/src/runtime/panic.go:617 +0x72 fp=0xc0000d3ca0 sp=0xc0000d3c70 pc=0x402fc42
runtime.mapaccess2_fast64(0x4475040, 0xc0000a0210, 0x7, 0x404f168, 0x3a2c156a93503)
	/usr/local/Cellar/go/1.12.5/libexec/src/runtime/map_fast64.go:61 +0x1c2 fp=0xc0000d3cc8 sp=0xc0000d3ca0 pc=0x40139c2
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleResponse(0xc000166000, 0x7, 0xc0004d6160)
	/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:377 +0x4f fp=0xc0000d3d40 sp=0xc0000d3cc8 pc=0x430d19f
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).receivedCommand(0xc000166000, 0xc0004d6160, 0x0, 0x0, 0x0)
	/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:319 +0x266 fp=0xc0000d3db0 sp=0xc0000d3d40 pc=0x430cf06
github.com/apache/pulsar-client-go/pulsar/internal.(*connectionReader).readFromConnection(0xc000138100)
	/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/internal/connection_reader.go:54 +0x17d fp=0xc0000d3fd8 sp=0xc0000d3db0 pc=0x430efdd
runtime.goexit()
	/usr/local/Cellar/go/1.12.5/libexec/src/runtime/asm_amd64.s:1337 +0x1 fp=0xc0000d3fe0 sp=0xc0000d3fd8 pc=0x405e381
created by github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run
	/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/internal/connection.go:250 +0x58

goroutine 1 [chan receive]:
testing.(*T).Run(0xc00012e100, 0x45042a7, 0x21, 0x4513230, 0x407cb01)
	/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:917 +0x381
testing.runTests.func1(0xc00012e000)
	/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:1157 +0x78
testing.tRunner(0xc00012e000, 0xc0000d7e30)
	/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:865 +0xc0
testing.runTests(0xc00000e4c0, 0x48a2a00, 0x20, 0x20, 0x0)
	/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:1155 +0x2a9
testing.(*M).Run(0xc000126000, 0x0)
	/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:1072 +0x162
main.main()
	_testmain.go:104 +0x13e

goroutine 4 [chan receive]:
github.com/apache/pulsar-client-go/pulsar.newProducer(0xc0000e0b40, 0xc000136100, 0xc0001223f0, 0xc00012e200, 0xc0000b97a0)
	/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/impl_producer.go:92 +0x1e4
github.com/apache/pulsar-client-go/pulsar.(*client).CreateProducer(0xc0000e0b40, 0x450a013, 0x2e, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
	/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/impl_client.go:91 +0x7a
github.com/apache/pulsar-client-go/pulsar.TestPartitionTopicsConsumerPubSub(0xc00012e100)
	/Users/wolf4j/github.com/apache/pulsar-client-go/pulsar/consumer_test.go:369 +0x277
testing.tRunner(0xc00012e100, 0x4513230)
	/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:865 +0xc0
created by testing.(*T).Run
	/usr/local/Cellar/go/1.12.5/libexec/src/testing/testing.go:916 +0x35a

Steps to reproduce

How can we reproduce the issue

System configuration

Pulsar version: x.y

consumer is closed, but consumer.Recieve() is blocked instead of throwing an exception

    client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
	log.Fatal(err)
}

defer client.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
	Topic:            "topic1",
	SubscriptionName: "my-sub",
	Type:             pulsar.Shared,
})
if err != nil {
	log.Fatal(err)
}

if err := consumer.Unsubscribe(); err != nil {
	log.Fatal(err)
} 
consumer.Close()

for  {
	fmt.Printf("receive msg ...")
	msg, err := consumer.Receive(context.Background())
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
		msg.ID(), string(msg.Payload()))

	consumer.Ack(msg)
}

Fields other than payload in ProducerMessage cannot be received normally.

Is your feature request related to a problem? Please describe.

ProducerMessage contains these fields:

type ProducerMessage struct {
	// Payload for the message
	Payload []byte

	// Key sets the key of the message for routing policy
	Key string

	// Properties attach application defined properties on the message
	Properties map[string]string

	// EventTime set the event time for a given message
	EventTime *time.Time

	// ReplicationClusters override the replication clusters for this message.
	ReplicationClusters []string

	// SequenceID set the sequence id to assign to the current message
	SequenceID *int64
}

Fields other than payload in ProducerMessage cannot be received normally.

Describe the solution you'd like

When receiving a message, if the user sets other fields such as PartitionKey or Properties, these fields should be received correctly.

Talk about `grabcnx` logic

Is your feature request related to a problem? Please describe.

In grabcnx(), we encapsulate the logic of lookupTopic with the specific BaseCmd request. This will cause both the producer and the consumer to be implemented.

Describe the solution you'd like

Can we use it as a function so that the producer and consumer can share the same function?

Check interface is nil?

Expected behavior

In go language, the interface contains two parts:

  • ptr
  • value

This interface is nil only if they are all nil. So in the go language, we can't tell if an interface is nil.

In impl_producer.go#L95

for _, producer := range p.producers {
	if producer != nil {
		_ = producer.Close()
	}
}

This issue will be fixed in pull#16, we can use reflection to judge.

Data race: type connection_pool.go GetConnection() is not concurrent safe

Expected behavior

Internal func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) should be concurrent safe. It reaches into the connection type and reads attrribs protected by a mutex without locking/unlocking

Actual behavior

Current implementation allows a race condition since there is no call to lock/unlock the sync.Mutex in the connection object while reading its private attribs

log.Debug("Found connection in cache:", cnx.logicalAddr, cnx.physicalAddr)

Steps to reproduce

run some tests using the race detector.

I'll submit a PR for this in a few minutes.

Talk about `send` interface logic

Is your feature request related to a problem? Please describe.

In Producer interface, Send is defined as follows:

Send(context.Context, *ProducerMessage) error

SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error))

In fact, when the pulsar client receives the sendReceipt, we don't assign a value to the ProducerMessage. The returned value is always nil.

Describe the solution you'd like

We can define the interface as follows:

Send(context.Context, *ProducerMessage) (MessageID, error)

SendAsync(context.Context, *ProducerMessage, func(MessageID,  error))

This way, on the one hand, it can be consistent with the Java client in the interface definition. On the other hand, we only need the msgID field. In this regard, if there is a place that is not considered comprehensive, please correct me.

in go mod 1.12 compile error

Expected behavior

in go program with go mod 1.12

go.mod
`
module assets
go 1.12
require (
github.com/apache/pulsar-client-go v0.0.0-20191110190923-af5c6d7e1f89
github.com/go-sql-driver/mysql v1.4.0
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/spf13/cobra v0.0.5
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7 // indirect
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 // indirect
google.golang.org/grpc v1.24.0
)

`

error output
448 go: extracting github.com/pierrec/lz4 v2.0.5+incompatible 449 .... 450 .... 451 .... 452 # github.com/klauspost/compress/zstd 453 .... 454 .... 455 .... 456 /go/pkg/mod/github.com/klauspost/[email protected]/zstd/enc_fast.go:30:15: undefined: xxhash.Digest

producer cannot connect to broker through pulsar proxy

pulsar-client-go cannot connect to pulsar broker through pulsar proxy, the connection failed when send PRODUCER command to pulsar proxy, proxy will throw UnsupportedOperationException, and client got Short read when reading frame size error.

After code review, I found out that once client get LOOKUP result with ProxyThroughServiceUrl == true, client should open a new connection to proxy with CONNECT command and ProxyToBrokerUrl be set. Then according to https://github.com/apache/pulsar/blob/master/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java#L207 , pulsar proxy will tunnel commands to broker.

Currently, pulsar-client-go ignored LOOKUP result if ProxyThroughServiceUrl is true, then all commands are handled by pulsar proxy and not tunneled to broker.

In ZLIB compress, we shouldn't ignore error info

Expected behavior

pkg/compress/zlib.go

func (zlibProvider) Compress(data []byte) []byte {
	var b bytes.Buffer
	w := zlib.NewWriter(&b)
	_, err := w.Write(data)
	if err != nil {
		return nil
	}
	err = w.Close()
	if err != nil {
		return nil
	}

	return b.Bytes()
}

func (zlibProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
	r, err := zlib.NewReader(bytes.NewBuffer(compressedData))
	if err != nil {
		return nil, err
	}

	uncompressed := make([]byte, originalSize)
	for {
		_, err = r.Read(uncompressed)
		if err == io.EOF {
			break
		}
		if err != nil && err != io.EOF {
			return nil, err
		}
	}
	err = r.Close()
	if err != nil {
		return nil, err
	}
	return uncompressed, nil
}

}

In here, we should handle error.

Actual behavior

pkg/compress/zlib.go

func (zlibProvider) Compress(data []byte) []byte {
	var b bytes.Buffer
	w := zlib.NewWriter(&b)
	w.Write(data)
	w.Close()

	return b.Bytes()
}
func (zlibProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
	r, err := zlib.NewReader(bytes.NewBuffer(compressedData))
	if err != nil {
		return nil, err
	}

	uncompressed := make([]byte, originalSize)
	r.Read(uncompressed)
	r.Close()

	return uncompressed, nil
}

In here, we can't ignore error information, otherwise, even if compression errors occur, we will not be able to catch errors information.

Steps to reproduce

How can we reproduce the issue

System configuration

Pulsar version: x.y

Producer deadlocks when pulsar downtime

Expected behavior

Producer works normally after pulsar restart

Actual behavior

producer#flush deadlock after pulsar restart

Steps to reproduce

for  {
	asyncMsg := pulsar.ProducerMessage{
		Payload: []byte(fmt.Sprintf("async-message")),
	}

	producer.SendAsync(ctx, &asyncMsg, func(mid pulsar.MessageID, msg *pulsar.ProducerMessage, err error) {
		if err != nil { log.Fatal(err) }
	})

	// deadlock
	producer.Flush() 
}

=======================
Running the above program and restarting pulsar will cause producer#flush deadlock

System configuration

Pulsar version: latest

Use ConfigOption withxxxx for simple usage

Is your feature request related to a problem? Please describe.
now, when we use client 、producer、consumer, we must provide ClientOption, ProducerOption and ConsumerOption, but there are many params on Option, which puzzle users.
according to Rob Pike: https://commandcenter.blogspot.com/2014/01/self-referential-functions-and-design.html and Dave cheney: https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis , we should use WithXXX. for example:

type ClientOption func(opts ClientOptions) 
func WithURL(URL string) ClientOption {
    return func(opts ClientOptions) {
         .....
    }
}

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

consumer.Subscription() returns "" since options are never set in internalTopicSubscribe()

Expected behavior

Subscriber.Subscription() should return the subscription name

Actual behavior

returns ""

Steps to reproduce

create subscriber with pulsar.ConsumerOptions{
SubscriptionName: "anything-at-all"
}

Then try to call sub.Subscription()

It appears you pass the options correctly when creating the newPartitionConsumer, but you forgot to set them in the consumer you return at:

return consumer, nil

Whether the error return value of Close() should be removed

In the go language, in general, we can't ignore the error, but in the client, producer, and consumer we ignore a lot of return values of the error type, are these return values necessary?

For example:

defer client.Close()

In many cases, we will use the above method to perform the Close operation. If the error of the close is negligible, we should remove the return value of the error in the interface definition.

[Consumer && Message Impl] Thinking about some code design

hello @merlimat I am implementing the implementation of consumer and message related, there are several code design issues I want to communicate with you.

First: Is Message defined as struct more reasonable or as interface is more reasonable?

Second: Whether the ConsumerMessage struct can be replaced with a Message struct.

These two questions are related. For this, my idea is that we can define a Message as a struct that contains the fields that make up a message, for example:

type Message struct {
	ID          MessageID
	Topic       string
	Properties  map[string]string
	Payload     []byte
	PublishTime time.Time
	EventTime   time.Time
	Key         string
}

fmt code

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

fmt code

Describe the solution you'd like
A clear and concise description of what you want to happen.

gofmt

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

setting the SequenceId can't have an effect on Message deduplication

Expected behavior

Preparation: Enabling message deduplication.
when I initialized the ProducerMessage{}, the SequenceID parameter was set to a specific value before I call the SendAsync(). So, in theory, the first Message will be received by the broker, and the following Messages will be discarded because of the repeated SequenceID

Actual behavior

I tested this situation more than one time. However, no messages were discarded.

Steps to reproduce

for i:=int64(0); i<100; i++ {
		 ...
	producer.SendAsync(ctx, &pulsar.ProducerMessage{Payload: msg, SequenceID: &i},func(msgID pulsar.MessageID, message *pulsar.ProducerMessage, e error) {...})
}

I run the code a few times, but the consumer received all messages.

for i:=int64(0); i<100; i++ {
		 ...
        k := int64(0)
	producer.SendAsync(ctx, &pulsar.ProducerMessage{Payload: msg, SequenceID: &k},func(msgID pulsar.MessageID, message *pulsar.ProducerMessage, e error) {...})
}

or like this code, its still not work.

System configuration

docker: apachepulsar/pulsar-standalone:2.3.1 bin/pulsar standalone

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.