Giter VIP home page Giter VIP logo

pulsar-beat-output's Introduction

Beat Output Pulsar

License FOSSA Status

This is an output implementation of elastic beats for support Filebeat, Metricbeat, Functionbeat, Winlogbeat, Auditbeat to Apache Pulsar

Compatibility

This output is developed and tested using Apache Pulsar Client 2.4.0 and Beats 7.3.1

Download pulsar-beat-output

mkdir -p $GOPATH/src/github.com/streamnative/
cd $GOPATH/src/github.com/streamnative/
git clone https://github.com/streamnative/pulsar-beat-output
cd pulsar-beat-output

Build

Build beats

go build -o filebeat filebeat/filebeat.go
go build -o functionbeat functionbeat/functionbeat.go
go build -o winlogbeat winlogbeat/winlogbeat.go
go build -o packetbeat packetbeat/packetbeat.go

Usage

In this section, you can use the sample config file in the directory [./sample/config/], or you can create it as follow steps.

example

Add following configuration to beat.yml

output.pulsar:
  url: "pulsar://localhost:6650"
  topic: my_topic
  name: test123

Start filebeat

./filebeat modules enable system
./filebeat modules list
./filebeat -c filebeat.yml -e

Build and test with docker

Requirements

Build Beat images

docker build -t pulsar-beat .

Create network

docker network create pulsar-beat

Start Pulsar service

docker run -d -it --network pulsar-beat -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-beat-standalone apachepulsar/pulsar:2.7.0 bin/pulsar standalone

Add following configuration to filebeat.yml

output.pulsar:
  url: "pulsar://pulsar-beat-standalone:6650"
  topic: my_topic
  name: test123

Start Filebeat

docker pull golang:1.17
docker run -it --network pulsar-beat --name filebeat golang:1.17 /bin/bash
git clone https://github.com/streamnative/pulsar-beat-output
cd pulsar-beat-output
go build -o filebeat filebeat/filebeat.go
chown -R root:root filebeat.yml test_module/modules.d/system.yml test_module/module/system
cp test_module/module/system/auth/test/test.log /var/log/messages.log
cp filebeat/filebeat filebeat.yml test_module
cd test_module
./filebeat modules enable system
./filebeat -c filebeat.yml -e

New open a window for consumer message

docker cp pulsar-client.py pulsar-beat-standalone:/pulsar
docker exec -it pulsar-beat-standalone /bin/bash
python pulsar-client.py

Now you can see the information collected from filebeat.

Configurations

Client

Name Description Default
url Configure the service URL for the Pulsar service pulsar://localhost:6650
certificate_path path of tls cert file ""
private_key_path path of tls key file ""
use_tls Whether to turn on TLS, if to start, use protocol pulsar+ssl false
token Access token information of cluster ""
token_file_path The file path where token is saved ""
log_level Setting the log level, available options(panic, fatal, error, warn, info, debug, trace) info
oauth2.enabled Enabled or disabled oauth2 authentication false
oauth2.clientId client ID ""
oauth2.issuerUrl URL of the authentication provider which allows the Pulsar client to obtain an access token ""
oauth2.privateKey URL of a JSON credentials file ""
oauth2.audience The audience value is either the application (Client ID) for an ID Token or the API that is being called (API Identifier) for an Access Token ""
oauth2.scope Scope is a mechanism in OAuth 2.0 to limit an application's access to a user's account ""

Producer

Name Description Default
topic Specify the topic this producer will be publishing on. You can set the topic dynamically by using a format string to access any event field. For example %{[fields.log_topic]}. ""
partition_key Specify the message key. You can set the message key dynamically by using a format string to access any event field. For example %{[fields.partition_key]} ""
name Specify a name for the producer ""
send_timeout Set the send timeout 30s
block_if_queue_full Set whether the send and sendAsync operations should block when the outgoing message queue is full. false
batching_max_messages maximum number of messages in a batch 1000
batching_max_publish_delay the batch delay 1ms
message_routing_mode the message routing mode, SinglePartition,RoundRobinPartition, CustomPartition(0,1,2) 1
hashing_schema JavaStringHash,Murmur3_32Hash(0,1) 0
compression_type NONE,LZ4,ZLIB,ZSTD(0,1,2,3) 0
max_cache_producers Specify the maximun cache(lru) producers of dynamic topic. 8

FAQ

case-insensitive import collision: "github.com/datadog/zstd" and "github.com/DataDog/zstd"

/root/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/compression/zstd_cgo.go:27:2: case-insensitive import collision: "github.com/datadog/zstd" and "github.com/DataDog/zstd"

Replace zstd_cgo.go file

cp zstd_cgo.go /root/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/compression/zstd_cgo.go

Install Pulsar Go Client

Reference https://pulsar.apache.org/docs/en/client-libraries-go/ .

If you encounter problems with dynamic libraries,please reference:https://pulsar.apache.org/docs/en/client-libraries-cpp/.

Build Packetbeat

Reference elastic/beats#11054.

Build auditbeat.go

vendor/github.com/elastic/beats/x-pack/auditbeat/module/system/package/rpm_linux.go:23:24: fatal error: rpm/rpmlib.h: No such file or directory
aapt-get install librpm-dev

Start beat

Exiting: error loading config file: config file ("filebeat.yml") must be owned by the user identifier (uid=0) or root
chown -R root:root filebeat.yml

License

FOSSA Status

pulsar-beat-output's People

Contributors

bbigras avatar bigmaning avatar dependabot[bot] avatar fossabot avatar hailin0 avatar holmes07 avatar shoothzj avatar sijie avatar thinker0 avatar tuteng avatar tyluffy avatar wolfstudy avatar xuthus5 avatar zhongxp1219 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pulsar-beat-output's Issues

Producer with name '' is already connected to topic

server error: ProducerBusy: org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name '' is already connected to topic" producerID=1 producer_name= topic=""
请问这个问题如何解决

Conditional topic selection

Hi Everyone,

Is there a way to conditionally apply a topic to a certain message? For example: if an object has a field like level, then it will be published to a different topic if it is an info or warning.

Unable to use additional modules such as o365

So I'm trying to use the o365 module with this so that I can get Office365 audit data pushed into pulsar, but I am unable to accomplish this.

I have stood up a fresh instance of filebeat, copied the o365 module and config from the modules/ and module.d/ folders respectively into a fresh install of this project. I then used the modules enable command to successfully enable the module. When I attempt to run this project though, I keep getting the following error every 30 seconds or so: Error creating runner from config: failed to create input: Error creating input. No such input type exist: 'o365audit'

Very probably I'm just missing something obvious, but I do need assistance to get this solved. Thank you.

beats stop collecting logs suddenly

hi, i build pulsar-beat-output and get the binary file. When i use filebeat and metricbeat to collect logs, i found that they will stop collecting suddenly. Actually the logs are still printed by other service. There are no error log and i get the debug log as follows:

2020-12-22T04:00:57.538Z DEBUG [input] log/input.go:208 input states cleaned up. Before: 0, After: 0, Pending: 0
2020-12-22T04:00:57.538Z DEBUG [input] log/input.go:187 Start next scan
2020-12-22T04:00:57.538Z DEBUG [input] input/input.go:152 Run input
2020-12-22T04:00:57.538Z DEBUG [input] log/input.go:187 Start next scan
2020-12-22T04:00:57.539Z DEBUG [input] log/input.go:208 input states cleaned up. Before: 0, After: 0, Pending: 0
2020-12-22T04:00:57.539Z DEBUG [input] log/input.go:208 input states cleaned up. Before: 0, After: 0, Pending: 0
2020-12-22T04:00:57.539Z DEBUG [input] log/input.go:208 input states cleaned up. Before: 0, After: 0, Pending: 0
2020-12-22T04:00:57.539Z DEBUG [input] input/input.go:152 Run input
2020-12-22T04:00:57.539Z DEBUG [input] log/input.go:187 Start next scan
2020-12-22T04:00:57.539Z DEBUG [input] log/input.go:208 input states cleaned up. Before: 0, After: 0, Pending: 0
2020-12-22T04:00:57.540Z DEBUG [input] input/input.go:152 Run input
2020-12-22T04:00:57.540Z DEBUG [input] log/input.go:187 Start next scan
2020-12-22T04:00:57.540Z DEBUG [input] log/input.go:417 Check file for harvesting: /opt/container/edge/push/logs/push-running.log
2020-12-22T04:00:57.540Z DEBUG [input] log/input.go:507 Update existing file for harvesting: /opt/container/edge/push/logs/push-running.log, offset: 7733110
2020-12-22T04:00:57.540Z DEBUG [input] log/input.go:559 Harvester for file is still running: /opt/container/edge/push/logs/push-running.log
2020-12-22T04:00:57.540Z DEBUG [input] log/input.go:208 input states cleaned up. Before: 1, After: 1, Pending: 0
2020-12-22T04:00:57.540Z DEBUG [input] input/input.go:152 Run input
2020-12-22T04:00:57.540Z DEBUG [input] log/input.go:187 Start next scan
2020-12-22T04:00:57.540Z DEBUG [input] log/input.go:417 Check file for harvesting: /opt/container/ccops/metricbeat/logs/Metricbeat-runing.log
2020-12-22T04:00:57.540Z DEBUG [input] log/input.go:507 Update existing file for harvesting: /opt/container/ccops/metricbeat/logs/Metricbeat-runing.log, offset: 9928682
2020-12-22T04:00:57.540Z DEBUG [input] log/input.go:559 Harvester for file is still running: /opt/container/ccops/metricbeat/logs/Metricbeat-runing.log
2020-12-22T04:00:57.540Z DEBUG [input] log/input.go:417 Check file for harvesting: /opt/container/ccops/metricbeat/logs/metricbeat.log
2020-12-22T04:00:57.540Z DEBUG [input] log/input.go:507 Update existing file for harvesting: /opt/container/ccops/metricbeat/logs/metricbeat.log, offset: 4231944
2020-12-22T04:00:57.540Z DEBUG [input] log/input.go:559 Harvester for file is still running: /opt/container/ccops/metricbeat/logs/metricbeat.log
2020-12-22T04:00:57.540Z DEBUG [input] log/input.go:208 input states cleaned up. Before: 2, After: 2, Pending: 0
2020-12-22T04:01:09.725Z DEBUG [input] input/input.go:152 Run input
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:187 Start next scan
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:417 Check file for harvesting: /opt/filebeat/logs/Filebeat-runing.log
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:507 Update existing file for harvesting: /opt/filebeat/logs/Filebeat-runing.log, offset: 27382
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:559 Harvester for file is still running: /opt/filebeat/logs/Filebeat-runing.log
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:417 Check file for harvesting: /opt/filebeat/logs/filebeat.log
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:507 Update existing file for harvesting: /opt/iot/filebeat/logs/filebeat.log, offset: 112387
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:559 Harvester for file is still running: /opt/iot/filebeat/logs/filebeat.log
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:208 input states cleaned up. Before: 8, After: 8, Pending: 0
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:239 State for file not removed because harvester not finished: /opt/iot/filebeat/logs/filebeat.log
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:229 Remove state for file as file removed or renamed: /opt/iot/filebeat/logs/filebeat.log
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:239 State for file not removed because harvester not finished: /opt/iot/filebeat/logs/filebeat.log
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:229 Remove state for file as file removed or renamed: /opt/iot/filebeat/logs/filebeat.log
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:239 State for file not removed because harvester not finished: /opt/iot/filebeat/logs/filebeat.log
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:229 Remove state for file as file removed or renamed: /opt/iot/filebeat/logs/filebeat.log
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:239 State for file not removed because harvester not finished: /opt/iot/filebeat/logs/filebeat.log
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:229 Remove state for file as file removed or renamed: /opt/iot/filebeat/logs/filebeat.log
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:239 State for file not removed because harvester not finished: /opt/iot/filebeat/logs/filebeat.log
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:229 Remove state for file as file removed or renamed: /opt/iot/filebeat/logs/filebeat.log
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:239 State for file not removed because harvester not finished: /opt/iot/filebeat/logs/filebeat.log
2020-12-22T04:01:09.725Z DEBUG [input] log/input.go:229 Remove state for file as file removed or renamed: /opt/iot/filebeat/logs/filebeat.log

here are my filebeat.yml:
filebeat.inputs:
#=========================== ccops ELK log config ============================

  • type: log
    enabled: true
    paths:

    • /opt/container/ccops/metricbeat/logs/*.log
      fields:
      log_topic: ${LOG_TOPIC}
      logtype: ccops_metricbeat_log
      microservice: ccopsmetricbeat
      fields_under_root: true
      max_bytes: 512000
      close_timeout: 300m
      multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
      multiline.negate: true
      multiline.match: after
      spool_size: 2048
      harvester_buffer_size: 102400
      scan_frequency: 30s
      idle_timeout: 20s
      registry_file: registry
      ignore_older: 1h
  • type: log
    enabled: true
    paths:

    • /opt/filebeat/logs/*.log
      fields:
      log_topic: ${LOG_TOPIC}
      logtype: ccops_filebeat_log
      microservice: ccopsfilebeat
      fields_under_root: true
      max_bytes: 512000
      close_timeout: 300m
      multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
      multiline.negate: true
      multiline.match: after
      spool_size: 2048
      harvester_buffer_size: 102400
      scan_frequency: 30s
      idle_timeout: 20s
      registry_file: registry
      ignore_older: 1h

output.pulsar:
enabled: true
url: "pulsar://service-pulsar-ops:6650"
topic: ${LOG_TOPIC}

#================================ Logging ======================================
logging.level: debug
logging.to_files: true
logging.files:
path: /opt/filebeat/logs/
name: filebeat.log
keepfiles: 5
rotateeverybytes: 53687091
permissions: 0644
logging.json: false

may be too many logs leading to the problem ?

Output tenant and namespace

I noticed there is only an option for setting the output topic. Is it possible to change which tenant and namespace that topic belongs to?
Thank you 😃

dial tcp: address xxx too many colons in address

filebeat output pulsar的url可以配置多个地址吗?
以下是我的配置

output.pulsar:
    url: "pulsar://pulsar1:6650,pulsar2:6650,pulsar3:6650"

错误信息

[Failed to cibbect to broker.] error="dial tcp: address "pulsar1:6650,pulsar2:6650,pulsar3:6650: too many colons in address remote_address="pulsar://pulsar1:6650,pulsar2:6650,pulsar3:6650"

编译报错

按照README.md操作,在主目录$GOPATH/src/github.com/streamnative/编辑了一个main.go文件,然后执行go build -o filebeat main.go,报错:

go: downloading github.com/go-logr/logr v0.2.0
go: downloading github.com/Azure/go-autorest/autorest/date v0.2.0
go: downloading github.com/jpillora/backoff v1.0.0
go: downloading github.com/urso/go-bin v0.0.0-20180220135811-781c575c9f0e
go: downloading github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7
go: downloading github.com/hashicorp/golang-lru v0.5.2-0.20190520140433-59383c442f7d
go: downloading github.com/Azure/go-autorest/autorest/validation v0.2.0
#github.com/elastic/beats/libbeat/common/kubernetes
/root/go/pkg/mod/github.com/elastic/[email protected]+incompatible/libbeat/common/kubernetes/informer.go:53:18: not enough arguments in call to p.List
have ("k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
want (context.Context, "k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
/root/go/pkg/mod/github.com/elastic/[email protected]+incompatible/libbeat/common/kubernetes/informer.go:57:19: not enough arguments in call to p.Watch
have ("k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
want (context.Context, "k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
/root/go/pkg/mod/github.com/elastic/[email protected]+incompatible/libbeat/common/kubernetes/informer.go:66:18: not enough arguments in call to e.List
have ("k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
want (context.Context, "k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
/root/go/pkg/mod/github.com/elastic/[email protected]+incompatible/libbeat/common/kubernetes/informer.go:69:19: not enough arguments in call to e.Watch
have ("k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
want (context.Context, "k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
/root/go/pkg/mod/github.com/elastic/[email protected]+incompatible/libbeat/common/kubernetes/informer.go:79:18: not enough arguments in call to n.List
have ("k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
want (context.Context, "k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
/root/go/pkg/mod/github.com/elastic/[email protected]+incompatible/libbeat/common/kubernetes/informer.go:83:19: not enough arguments in call to n.Watch
have ("k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
want (context.Context, "k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
/root/go/pkg/mod/github.com/elastic/[email protected]+incompatible/libbeat/common/kubernetes/informer.go:93:19: not enough arguments in call to ns.List
have ("k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
want (context.Context, "k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
/root/go/pkg/mod/github.com/elastic/[email protected]+incompatible/libbeat/common/kubernetes/informer.go:97:20: not enough arguments in call to ns.Watch
have ("k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
want (context.Context, "k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
/root/go/pkg/mod/github.com/elastic/[email protected]+incompatible/libbeat/common/kubernetes/informer.go:106:18: not enough arguments in call to d.List
have ("k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
want (context.Context, "k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
/root/go/pkg/mod/github.com/elastic/[email protected]+incompatible/libbeat/common/kubernetes/informer.go:109:19: not enough arguments in call to d.Watch
have ("k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
want (context.Context, "k8s.io/apimachinery/pkg/apis/meta/v1".ListOptions)
/root/go/pkg/mod/github.com/elastic/[email protected]+incompatible/libbeat/common/kubernetes/informer.go:109:19: too many errors

Compile filebeat Error

Error Meet

image

When I am compiling pulsar-beat-output, it appears this error.

[root@af9dfd50cdaa pulsar-beat-output]# uname -a
Linux af9dfd50cdaa 4.19.121-linuxkit #1 SMP Tue Dec 1 17:50:32 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
[root@af9dfd50cdaa pulsar-beat-output]# go version
go version go1.15.6 linux/amd64

How to reproduce

docker run -it ttbb/base:goc bash
export GOPATH="/root/go"
mkdir -p $GOPATH/src/github.com/streamnative/
cd $GOPATH/src/github.com/streamnative/
git clone https://github.com/streamnative/pulsar-beat-output
cd pulsar-beat-output
go build -o filebeat main.go

[bug report] BatchingMaxPublishDelay conf init error

file : config.go

happens at

if config.BatchingMaxPublishDelay > 0 {
producerOptions.BatchingMaxPublishDelay = config.BatchingMaxPublishDelay * time.Second
}

desc

The delay time will amplification 1000000000 ,Timer Will not execute as expected

solution

remove code * time.Millisecond

if config.BatchingMaxPublishDelay > 0 {
producerOptions.BatchingMaxPublishDelay = config.BatchingMaxPublishDelay
}

The instance of filebeat occur broken

When I use the pulsar-beat-output ,met some issues .

1 the instance some times break

the log as below

panic: interface conversion: interface {} is nil, not *pulsar.pendingItem

goroutine 121 [running]:
github.com/streamnative/pulsar-beat-output/vendor/github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).internalFlush(0xc0002f60a0, 0xc000538160)
/home/halo_op/gocode/src/github.com/streamnative/pulsar-beat-output/vendor/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:291 +0x1c6
github.com/streamnative/pulsar-beat-output/vendor/github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).runEventsLoop(0xc0002f60a0)
/home/halo_op/gocode/src/github.com/streamnative/pulsar-beat-output/vendor/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:207 +0x1f5
created by github.com/streamnative/pulsar-beat-output/vendor/github.com/apache/pulsar-client-go/pulsar.newPartitionProducer
/home/halo_op/gocode/src/github.com/streamnative/pulsar-beat-output/vendor/github.com/apache/pulsar-client-go/pulsar/impl_partition_producer.go:114 +0x5f2

2 Another issue ,the instance is alive ,but could not transfer the data from source to pulsar .The log can't fetch ,just the filebeat monitor log could not display this error ,this thing happens on about after the instance start up 10 -30mins ago.

you can see that after 40mins ,only 5 instance transfer the data ,

{
"msgRateIn" : 3064.133757314516,
"msgThroughputIn" : 2207192.7056091167,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"averageMsgSize" : 720.3317088688569,
"storageSize" : 0,
"publishers" : [ {
"msgRateIn" : 597.9667858215149,
"msgThroughputIn" : 431634.90267711494,
"averageMsgSize" : 721.0,
"producerId" : 1,
"metadata" : { },
"producerName" : "dc_ds11",
"connectedSince" : "2019-09-27T17:01:27.228+08:00",
"address" : "/10.131.0.58:32339"
}, {
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"producerId" : 1,
"metadata" : { },
"producerName" : "dc_ds7",
"connectedSince" : "2019-09-27T16:54:00.551+08:00",
"address" : "/10.131.0.132:43490"
}, {
"msgRateIn" : 609.9334467402922,
"msgThroughputIn" : 439671.21508286794,
"averageMsgSize" : 720.0,
"producerId" : 1,
"metadata" : { },
"producerName" : "dc_ds9",
"connectedSince" : "2019-09-27T16:57:08.634+08:00",
"address" : "/10.131.0.5:49475"
}, {
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"producerId" : 1,
"metadata" : { },
"producerName" : "dc_ds3",
"connectedSince" : "2019-09-27T16:46:45.404+08:00",
"address" : "/10.131.0.90:12283"
}, {
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"producerId" : 1,
"metadata" : { },
"producerName" : "dc_ds5",
"connectedSince" : "2019-09-27T16:49:34.111+08:00",
"address" : "/10.131.0.102:12117"
}, {
"msgRateIn" : 631.9334194658584,
"msgThroughputIn" : 455381.5954018448,
"averageMsgSize" : 720.0,
"producerId" : 1,
"metadata" : { },
"producerName" : "dc_ds6",
"connectedSince" : "2019-09-27T17:29:45.74+08:00",
"address" : "/10.131.0.122:47279"
}, {
"msgRateIn" : 618.0000732742087,
"msgThroughputIn" : 445361.76947172714,
"averageMsgSize" : 720.0,
"producerId" : 1,
"metadata" : { },
"producerName" : "dc_ds8",
"connectedSince" : "2019-09-27T17:35:16.464+08:00",
"address" : "/10.131.0.138:5258"
}, {
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"producerId" : 1,
"metadata" : { },
"producerName" : "dc_ds12",
"connectedSince" : "2019-09-27T17:13:00.555+08:00",
"address" : "/10.131.0.60:61939"
}, {
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"producerId" : 1,
"metadata" : { },
"producerName" : "dc_ds2",
"connectedSince" : "2019-09-24T09:53:40.368+08:00",
"address" : "/10.131.0.66:21422"
}, {
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"producerId" : 1,
"metadata" : { },
"producerName" : "dc_ds10",
"connectedSince" : "2019-09-27T16:59:22.512+08:00",
"address" : "/10.131.0.30:35196"
}, {
"msgRateIn" : 606.3000320126417,
"msgThroughputIn" : 435143.22297556215,
"averageMsgSize" : 717.0,
"producerId" : 1,
"metadata" : { },
"producerName" : "dc_ds13",
"connectedSince" : "2019-09-27T17:25:48.978+08:00",
"address" : "/10.131.0.70:57855"
} ],
"subscriptions" : { },
"replication" : { },
"deduplicationStatus" : "Disabled"

this is very common ,tomorrow will see none transfer data .

Support Pulsar SSL?

Hi, is it support for pulsar SSL? Because i've create pulsar client with SSL.

@tuteng can you help with this issue?

Thanks.

Configuring jwt for standalone, throwing 401 Unauthorized exceptions

报错信息如下:

18:49:29.181 [pulsar-web-68-1] WARN  org.apache.pulsar.broker.web.AuthenticationFilter - [127.0.0.1] Failed to authenticate HTTP request: Failed to authentication token: Illegal base64url character: '"'
18:49:29.190 [pulsar-web-68-1] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [17/Jun/2021:18:49:29 +0800] "PUT /admin/v2/persistent/public/functions/assignments HTTP/1.1" 401 0 "-" "Pulsar-Java-v2.7.2" 22
18:49:29.199 [AsyncHttpClient-87-1] WARN  org.apache.pulsar.client.admin.internal.BaseResource - [http://localhost:8080/admin/v2/persistent/public/functions/assignments] Failed to perform http put request: javax.ws.rs.NotAuthorizedException: HTTP 401 Unauthorized
18:49:29.204 [main] ERROR org.apache.pulsar.functions.worker.WorkerService - Error Starting up in worker
org.apache.pulsar.client.admin.PulsarAdminException$NotAuthorizedException: HTTP 401 Unauthorized
        at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:218) ~[org.apache.pulsar-pulsar-client-admin-original-2.7.2.jar:2.7.2]

standalone.conf配置如下:

authenticationEnabled=true
authorizationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
brokerClientTlsEnabledWithKeyStore=true
brokerClientTlsEnabled=true
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
brokerClientAuthenticationParameters={"token":"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.T6KZUYQfPgYCAuLDnDoqUOfowTFJmrsYwE8HopAKAWA"}
brokerClientTrustCertsFilePath=/home/wangyimin/source/apache-pulsar-2.7.2/certs/ca.cert.pem
authenticateOriginalAuthData=true
tokenSecretKey=file:///home/wangyimin/source/apache-pulsar-2.7.2/my-secret.key

proxy.conf 配置如下:

tokenSecretKey=file:///home/wangyimin/source/apache-pulsar-2.7.2/my-secret.key
authenticationEnabled=true
authorizationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
tokenSecretKey=file:///home/wangyimin/source/apache-pulsar-2.7.2/my-secret.key
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
brokerClientAuthenticationParameters={"token":"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.T6KZUYQfPgYCAuLDnDoqUOfowTFJmrsYwE8HopAKAWA"}
forwardAuthorizationCredentials=true

client.conf 配置如下:

webServiceUrl=http://localhost:8080/
brokerServiceUrl=pulsar://localhost:6650/
authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
authParams=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.T6KZUYQfPgYCAuLDnDoqUOfowTFJmrsYwE8HopAKAWA

functions_worker.yml 配置如下:

authenticationProviders: org.apache.pulsar.broker.authentication.AuthenticationProviderToken
superUserRoles:
    - admin
authenticationEnabled: true
authorizationEnabled: true

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.