Giter VIP home page Giter VIP logo

kratos-transport's Introduction

kratos-transport

把消息队列、任务队列,以及Websocket、HTTP3等网络协议实现为微服务框架 Kratostransport.Server

在使用的时候,可以调用kratos.Server()方法,将之注册成为一个Server

各种缝合,请叫我:缝合怪。

支持的服务(Server)

消息队列

RPC

任务队列

网络协议

支持的消息代理(Broker)

应用示例

以上示例在Kratos官方示例代码库中也可以找到。

kratos-transport's People

Contributors

daixijun avatar exuan avatar ezi4zy avatar haifeiwu avatar liukaho avatar madou-shinni avatar qlinhz avatar ryan961 avatar shuqingzai avatar slone123c avatar sohenk avatar tx7do avatar xiongpan828 avatar zoulux avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

kratos-transport's Issues

go mod tidy的默认版本参数类型不对

使用go mod tidy时,默认的github.com/tx7d0/kratos-transport/transport/kafka版本是v1.0.2
此版本kafka.WithCodec()接受参数类型为encoding.Codec,而不是string,而kratos-cqrs的范例代码中传入string,会报错。

手动在go.mod里指定版本为v0.0.0-20221220110912-9db1b79d385c可以解决这个问题。请问能否让go mod tidy自动指定的版本为正确的版本?

pulsar 的可以支持 kratos 2.7 吗?

没有实现 add 方法。

# github.com/tx7do/kratos-transport/transport/pulsar
/Users/caster/go/pkg/mod/github.com/tx7do/kratos-transport/transport/[email protected]/transport.go:40:9: cannot use tr.reqHeader (variable of type headerCarrier) as transport.Header value in return statement: headerCarrier does not implement transport.Header (missing method Add)
/Users/caster/go/pkg/mod/github.com/tx7do/kratos-transport/transport/[email protected]/transport.go:45:9: cannot use tr.replyHeader (variable of type headerCarrier) as transport.Header value in return statement: headerCarrier does not implement transport.Header (missing method Add)

相关 issue:#45

子包无法通过 go get -u 升级

使用 go get -u github.com/tx7do/kratos-transport/broker/rabbitmq命令,无法更新 rabbitmq 这个子包的版本,代码编译时会报错。
image

rabbitmq with exchange name

你好,
这个库好像无法指定一些rabbitmq的高级参数,比如交换机的名字,优先级等。 在代码中我看到了相关的封装,但是外层无法把这个参数注入进去。这个要这么解决呢。

[BUG]Could`t apply Options to asynq.Server

transport/asynq/server.go
Server.init
When using custom ServerOption like redis address, it will not apply to asynq.Server
Because in init() function, it will crate a kratos.Server instance and asynq.Server first, then it will apply ServerOption to kratos.Server, but won't apply to asynq.Server. It will always using default redis address '127.0.0.1:6379'

func NewServer(opts ...ServerOption) *Server {
	srv := &Server{
		baseCtx: context.Background(),
		started: false,
		redisOpt: asynq.RedisClientOpt{
			Addr: defaultRedisAddress,
			DB:   0,
		},
		asynqConfig: asynq.Config{
			Concurrency: 10,
			Logger:      newLogger(),
		},
		mux: asynq.NewServeMux(),
	}

	srv.init(opts...)

	return srv
}
func (s *Server) init(opts ...ServerOption) {
	_ = s.createAsynqServer()

	for _, o := range opts {
		o(s)
	}
}
func (s *Server) createAsynqServer() error {
	if s.asynqServer != nil {
		log.Errorf("[asynq] asynq server already created")
		return errors.New("asynq server already created")
	}

	s.asynqServer = asynq.NewServer(s.redisOpt, s.asynqConfig)
	if s.asynqServer == nil {
		log.Errorf("[asynq] create asynq server failed")
		return errors.New("create asynq server failed")
	}
	return nil
}

kitex支持

能加一个kitex的transport吗,来替换kratos的grpc

[BUG] 声明消费者参数不足

最新修改的代码中
https://github.com/tx7do/kratos-transport/blob/5cbe362109/broker/rabbitmq/connection.go#L251
新增了 autoDel 参数,但是调用端没有使用 这里 7 个参数

func (r *rabbitConnection) Consume(queueName, routingKey string, bindArgs amqp.Table, qArgs amqp.Table, autoAck, durableQueue, autoDel bool) (*rabbitChannel, <-chan amqp.Delivery, error) {
	consumerChannel, err := newRabbitChannel(r.Connection, r.qos)
	if err != nil {
		return nil, nil, err
	}

	if err = consumerChannel.DeclareQueue(queueName, qArgs, durableQueue, autoDel); err != nil {
		return nil, nil, err
	}

	deliveries, err := consumerChannel.ConsumeQueue(queueName, autoAck)
	if err != nil {
		return nil, nil, err
	}

	if err = consumerChannel.BindQueue(queueName, routingKey, r.exchange.Name, bindArgs); err != nil {
		return nil, nil, err
	}

	return consumerChannel, deliveries, nil
}

调用端
参数不足,直接报错,只传递 6 个参数
https://github.com/tx7do/kratos-transport/blob/5cbe362109/broker/rabbitmq/subscriber.go#L75

		ch, sub, err := s.r.conn.Consume(
			s.opts.Queue,
			s.topic,
			s.headers,
			s.queueArgs,
			s.opts.AutoAck,
			s.durableQueue,
		)

kafka trace 集成到kratos中没有生效

是这样写的不?

srv := kafka.NewServer(
		kafka.WithAddress(c.Kafka.Addrs),
		kafka.WithCodec(encoding.GetCodec("json")),
		kafka.WithTracerProvider(tp, "kafka-tracer"),
	)

redis conn error

server: docker
redis version: 6.2.7
err msg: === RUN TestServer
ReadTimeout set error
IdleTimeout set error
ReadTimeout set error
IdleTimeout set error
INFO msg=[redis] server listening on: redis://127.0.0.1:6379
redis recv error: read tcp 127.0.0.1:49996->127.0.0.1:6379: i/o timeout

mqtt不支持qos配置

func (m *mqttBroker) publish(topic string, buf []byte, opts ...broker.PublishOption) error {
	if !m.client.IsConnected() {
		return errors.New("not connected")
	}

	options := broker.PublishOptions{
		Context: context.Background(),
	}
	for _, o := range opts {
		o(&options)
	}

	var qos byte = 1
	const retained bool = false

	ret := m.client.Publish(topic, qos, retained, buf)
	return ret.Error()
}

func (m *mqttBroker) Subscribe(topic string, handler broker.Handler, binder broker.Binder, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
	if !m.client.IsConnected() {
		return nil, errors.New("not connected")
	}

	var options broker.SubscribeOptions
	for _, o := range opts {
		o(&options)
	}

	var qos byte = 1

	t := m.client.Subscribe(topic, qos, func(c MQTT.Client, mq MQTT.Message) {
		var msg broker.Message

		p := &publication{topic: mq.Topic(), msg: &msg}

		if binder != nil {
			msg.Body = binder()
		} else {
			msg.Body = mq.Payload()
		}

		if err := broker.Unmarshal(m.opts.Codec, mq.Payload(), &msg.Body); err != nil {
			p.err = err
			log.Error(err)
			return
		}

		if err := handler(m.opts.Context, p); err != nil {
			p.err = err
			log.Error(err)
		}
	})

	if rs, err := checkClientToken(t); !rs {
		return nil, err
	}

	return &subscriber{
		opts:   options,
		client: m.client,
		topic:  topic,
	}, nil
}

使用 rabbitmq server 遇到的queue消费不了的问题

问题:

当我循环注册多个queue消费时出现无法消费以及控制面板上的queue未创建的问题

以下是我需要注册的queue

WeChat466a6efd2e811ec039a96a7adcdb8599

实际的面板queue

WeChat83c425b0ecdca4f68c8ddfd19798bafb

使用版本

github.com/tx7do/kratos-transport v1.0.5 // indirect
github.com/tx7do/kratos-transport/broker/rabbitmq v0.0.0-20221220110912-9db1b79d385c // indirect
github.com/tx7do/kratos-transport/transport/rabbitmq v0.0.0-20221220110912-9db1b79d385c // indirect

以下是实际的代码

func NewMqServer(
	c *conf.Data,
	logger log.Logger,
	bk broker.Broker,
	executors *sender.TaskExecutor,
	hs *sender.Handle,
) *rabbitmq.Server {


	srv := rabbitmq.NewServer(
		rabbitmq.WithAddress([]string{c.Rabbitmq.URL}),
		rabbitmq.WithExchange("austin.biz.exchange", true),
		rabbitmq.WithCodec("json"),
	)
	logic := NewMqHandler(logger, bk, executors, hs)

	for _, groupId := range groups.GetAllGroupIds() {
		fmt.Println(`subscriber`,fmt.Sprintf("austin.biz.%s", groupId))
		_ = srv.RegisterSubscriber(context.Background(),
			"austin.biz.routing",
			logic.registerMessageHandler(logic.onMassage),
			func() broker.Any {
				return []*types.TaskInfo{}
			},
			broker.WithQueueName(fmt.Sprintf("austin.biz.%s", groupId)),
			brokermq.WithDurableQueue(),
			brokermq.WithAckOnSuccess(),
		)
	}

	return srv
}



type MessageHandler func(_ context.Context, topic string, headers broker.Headers, msg []*types.TaskInfo) error

type MqHandler struct {
	logger   log.Logger
	broker   broker.Broker
	executor *sender.TaskExecutor
	hs       *sender.Handle
}

func NewMqHandler(
	logger log.Logger,
	broker broker.Broker,
	executor *sender.TaskExecutor,
	hs *sender.Handle,
) *MqHandler {
	return &MqHandler{
		logger:   logger,
		broker:   broker,
		executor: executor,
		hs:       hs,
	}
}

func (m *MqHandler) onMassage(ctx context.Context, topic string, headers broker.Headers, taskList []*types.TaskInfo) error {
	for _, taskInfo := range taskList {
		fmt.Println(taskInfo)
		channel := channelType.TypeCodeEn[taskInfo.SendChannel]
		msgType := messageType.TypeCodeEn[taskInfo.MsgType]
		err := m.executor.Submit(ctx, fmt.Sprintf("%s.%s", channel, msgType), sender.NewTask(taskInfo, m.hs, m.logger))
		if err != nil {
			l := log.NewHelper(log.With(m.logger, "module", "MqHandler/onMassage"))
			l.Error("err", err, "task_list", taskList)
		}
	}
	return nil
}

func (m *MqHandler) registerMessageHandler(fnc MessageHandler) broker.Handler {
	return func(ctx context.Context, event broker.Event) error {
		switch t := event.Message().Body.(type) {
		case []*types.TaskInfo:
			if err := fnc(ctx, event.Topic(), event.Message().Headers, t); err != nil {
				return err
			}
		default:
			return fmt.Errorf("unsupported type: %T", t)
		}
		return nil
	}
}

[BUG] rabbitmq ackSuccessKey 无法设置

在最新的代码提交中,ackSuccess 是取反 ackSuccessKey 的值
https://github.com/tx7do/kratos-transport/blob/main/broker/rabbitmq/rabbitmq.go#L197

	var ackSuccess = false
	if val, ok := options.Context.Value(ackSuccessKey{}).(bool); ok {
		options.AutoAck = val
		ackSuccess = !val
	}

但是,声明服务中的选项永远都是 true,导致没有错误时,手动 ack 无效
https://github.com/tx7do/kratos-transport/blob/main/broker/rabbitmq/options.go#L88

func WithAckOnSuccess() broker.SubscribeOption {
	return broker.SubscribeContextWithValue(ackSuccessKey{}, true)
}

下面 第一个 if 不会生效
https://github.com/tx7do/kratos-transport/blob/main/broker/rabbitmq/rabbitmq.go#L221

		p.err = handler(r.opts.Context, p)
		if p.err == nil && ackSuccess && !options.AutoAck {
			_ = msg.Ack(false)
		} else if p.err != nil && !options.AutoAck {
			_ = msg.Nack(false, requeueOnError)
		}

MQTT断线后重连,订阅消息无法恢复

问题描述: MQTT断线再次重连后,之前订阅过的主题,无法收到消息
期望:MQTT断线后重连,之前订阅的消息能够正常收到消息
代码如下,

func NewMQTTServer(
	c *conf.Bootstrap, svc *service.OperationStatService,
) *mqtt.Server {
	ctx := context.Background()
	var opts = []mqtt.ServerOption{
		mqtt.WithAddress([]string{c.Server.Mqtt.Addr}),
		mqtt.WithCodec("json"),
		mqtt.WithAuth(c.Server.Mqtt.Username, c.Server.Mqtt.Password),
		mqtt.WithClientId(c.Server.Mqtt.ClientId),
		mqtt.WithCleanSession(true),
	}
	srv := mqtt.NewServer(opts...)
	_ = srv.RegisterSubscriber(
		ctx,
		c.OperationStat.SubTopic,
		biz.RegisterDeviceAckJsonHandler(svc.DeviceAckHandler),
		biz.DeviceAckCreator,
	)
	svc.SetMqttBroker(srv)
	return srv
}

关于子模块go依赖包版本号问题

作者你好,我通过go get github.com/tx7do/kratos-transport/transport/websocket@latest安装依赖包,版本号是日期哈希值
github.com/tx7do/kratos-transport/transport/websocket v0.0.0-20230803222306-0904a461c75e
在日志记录caller时非常长
INFO ts=2023-08-14T10:28:18+08:00 [email protected]/server.go:267 service.id=xxx service.name= service.version= trace.id= span.id= msg=[websocket] server listening on: [::]:9002
能否优化加个tag。
ps:对go包发布不是特别了解,不确定能不能实现子模块的版本管理。

go get github.com/tx7do/kratos-transport/codec/proto
go: module github.com/golang/protobuf is deprecated: Use the "google.golang.org/protobuf" module instead.

[BUG] RabbitMQ 链路追踪无效

看到最新的提交,部分mq已经支持 链路追踪 ,但是实测无效
image

应该是 content 脱离,导致不一致,我试了下,手动放入 options.Context 是可以记录的

span := b.startConsumerSpan(options.Context, options.Queue, &msg)
defer span.End()

[BUG] rabbitmq 声明队列时,设置 autoDelete = true 自动删除的原意是?

最新提交的代码中,声明队列时,autoDelete = true ,导致就算队列设置 持久化 就没有意义啦,因为如果没有消费者后,队列会自动删除,导致未消费的消息丢失,请问这样的设计原意是什么?

https://github.com/tx7do/kratos-transport/blob/main/broker/rabbitmq/channel.go#L69

	_, err := r.channel.QueueDeclare(
		queueName,
		durable,
		true,
		false,
		false,
		args,
	)

rabbitmq 针对队列多消费者 topic问题

由于 rabbitmq 创建消费者服务时使用 routingKey 进行分组,并且使用 topic 模式, 如果一个 routingKey 下绑定多个队列,应该如何区分??
示例:
image

和Kratos适配时,优雅关闭问题。

Hi,
我现在使用Kratos框架,server使用的是 kratos-transport的kafka。
有如下疑问:

  1. 优雅关闭消费者时,kratos-transport是通过 context机制来实现的(server.Stop()函数不生效)。subscribe的context是通过kafkaSrv.RegisterSubscriber()来设置的,在与Kratos结合时,subscribe的context应该与Kratos.App.ctx关联起来,在Kratos的layout中,Server是单独一个目录 internal/server/xxx.go,关联起来不是很方便,有什么好的方法吗?
  2. subscribe 使用 context来关闭,会出现一个问题。如果一个消息正在消费(处理业务逻辑),此时收到了优雅关闭信号,context被取消,此时可能收到错误 ERROR msg=[kafka]: unable to commit msg: context canceled, 待服务重启后,会出现二次消费的问题。

[BUG] runAsynqServer function mistakenly assert server is nil

file: transport/asynq/server.go
Sever.runAsynqServer
if asynqServer is not nil which is actually normal in this case, but it will return error

func (s *Server) runAsynqServer() error {
	if s.asynqServer != nil {
		log.Errorf("[asynq] asynq server is nil")
		return errors.New("asynq server is nil")
	}

	if err := s.asynqServer.Run(s.mux); err != nil {
		log.Errorf("[asynq] asynq server run failed: %s", err.Error())
		return err
	}
	return nil
}

cannot use tr.reqHeader问题,求解决

C:\Users\xxx\go\pkg\mod\github.com\tx7do\kratos-transport\transport\[email protected]\transport.go:41:9: cannot use tr.reqHeader (variable o
f type headerCarrier) as transport.Header value in return statement: headerCarrier does not implement transport.Header (missing method Add)
C:\Users\xxx\go\pkg\mod\github.com\tx7do\kratos-transport\transport\[email protected]\transport.go:46:9: cannot use tr.replyHeader (variable
of type headerCarrier) as transport.Header value in return statement: headerCarrier does not implement transport.Header (missing method Add)

kratos版本不一致 tr.reqHeader 报错

github.com/go-kratos/kratos/v2 v2.6.2
github.com/tx7do/kratos-transport v1.0.5

kratos-transport 用kafka 有个报错
cannot use tr.reqHeader (variable of type headerCarrier) as transport.Header value in return statement: headerCarrier does not implement transport.Header (missing method Add)

createProducer 鉴权问题

if credentials.AccessKey != "" && credentials.SecretKey != "" {
producer.WithCredentials(*credentials)
} 为啥不用加opts = append(opts, producer.WithCredentials(*credentials))

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.