Giter VIP home page Giter VIP logo

delayqueue's Introduction

DelayQueue

license Build Status Coverage Status Go Report Card Go Reference

中文版

DelayQueue is a message queue supporting delayed/scheduled delivery based on redis. It is designed to be reliable, scalable and easy to get started.

Core Advantages:

  • Guaranteed at least once consumption
  • Auto retry failed messages
  • Works out of the box, Config Nothing and Deploy Nothing, A Redis is all you need.
  • Natively adapted to the distributed environment, messages processed concurrently on multiple machines . Workers can be added, removed or migrated at any time
  • Support Redis Cluster or clusters of most cloud service providers. see chapter Cluster
  • Easy to use monitoring data exporter, see Monitoring

Install

DelayQueue requires a Go version with modules support. Run following command line in your project with go.mod:

go get github.com/hdt3213/delayqueue

Get Started

package main

import (
	"github.com/redis/go-redis/v9"
	"github.com/hdt3213/delayqueue"
	"strconv"
	"time"
)

func main() {
	redisCli := redis.NewClient(&redis.Options{
		Addr: "127.0.0.1:6379",
	})
	queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool {
		// callback returns true to confirm successful consumption.
		// If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
		return true
	}).WithConcurrent(4) // set the number of concurrent consumers 
	// send delay message
	for i := 0; i < 10; i++ {
		err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
		if err != nil {
			panic(err)
		}
	}
	// send schedule message
	for i := 0; i < 10; i++ {
		err := queue.SendScheduleMsg(strconv.Itoa(i), time.Now().Add(time.Hour))
		if err != nil {
			panic(err)
		}
	}
	// start consume
	done := queue.StartConsume()
	<-done
}

if you are using github.com/go-redis/redis/v8 please use go get github.com/hdt3213/delayqueue@redisv8

Please note that redis/v8 is not compatible with redis cluster 7.x. detail

If you are using redis client other than go-redis, you could wrap your redis client into RedisCli interface

If you don't want to set the callback during initialization, you can use func WithCallback.

Producer consumer distributed deployment

By default, delayqueue instances can be both producers and consumers.

If your program only need producers and consumers are placed elsewhere, delayqueue.NewPublisher is a good option for you.

func consumer() {
	queue := NewQueue("test", redisCli, cb)
	queue.StartConsume()
}

func producer() {
	publisher := NewPublisher("test", redisCli)
	publisher.SendDelayMsg(strconv.Itoa(i), 0)
}

Options

func (q *DelayQueue)WithCallback(callback CallbackFunc) *DelayQueue

WithCallback set callback for queue to receives and consumes messages callback returns true to confirm successfully consumed, false to re-deliver this message.

If there is no callback set, StartConsume will panic

queue := NewQueue("test", redisCli)
queue.WithCallback(func(payload string) bool {
	return true
})
func (q *DelayQueue)WithLogger(logger *log.Logger) *DelayQueue

WithLogger customizes logger for queue

func (q *DelayQueue)WithConcurrent(c uint) *DelayQueue

WithConcurrent sets the number of concurrent consumers

func (q *DelayQueue)WithFetchInterval(d time.Duration) *DelayQueue

WithFetchInterval customizes the interval at which consumer fetch message from redis

func (q *DelayQueue)WithMaxConsumeDuration(d time.Duration) *DelayQueue

WithMaxConsumeDuration customizes max consume duration

If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again

func (q *DelayQueue)WithFetchLimit(limit uint) *DelayQueue

WithFetchLimit limits the max number of unack (processing) messages

UseHashTagKey()

UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot.

If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, you should add this option to NewQueue: NewQueue("test", redisCli, cb, UseHashTagKey()). This Option cannot be changed after DelayQueue has been created.

WARNING! CHANGING(add or remove) this option will cause DelayQueue failing to read existed data in redis

see more: https://redis.io/docs/reference/cluster-spec/#hash-tags

WithDefaultRetryCount(count uint)  *DelayQueue

WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue

use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message

Monitoring

We provides Monitor to monitor the running status.

monitor := delayqueue.NewMonitor("example", redisCli)

Monitor.ListenEvent can register a listener that can receive all internal events, so you can use it to implement customized data reporting and metrics.

The monitor can receive events from all workers, even if they are running on another server.

type EventListener interface {
	OnEvent(*Event)
}

// returns: close function, error
func (m *Monitor) ListenEvent(listener EventListener) (func(), error) 

The definition of event could be found in events.go.

Besides, We provide a demo that uses EventListener to monitor the production and consumption amount per minute.

The complete demo code can be found in example/monitor.

type MyProfiler struct {
	List  []*Metrics
	Start int64
}

func (p *MyProfiler) OnEvent(event *delayqueue.Event) {
	sinceUptime := event.Timestamp - p.Start
	upMinutes := sinceUptime / 60
	if len(p.List) <= int(upMinutes) {
		p.List = append(p.List, &Metrics{})
	}
	current := p.List[upMinutes]
	switch event.Code {
	case delayqueue.NewMessageEvent:
		current.ProduceCount += event.MsgCount
	case delayqueue.DeliveredEvent:
		current.DeliverCount += event.MsgCount
	case delayqueue.AckEvent:
		current.ConsumeCount += event.MsgCount
	case delayqueue.RetryEvent:
		current.RetryCount += event.MsgCount
	case delayqueue.FinalFailedEvent:
		current.FailCount += event.MsgCount
	}
}

func main() {
	queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool {
		return true
	})
	start := time.Now()
	// IMPORTANT: EnableReport must be called so monitor can do its work
	queue.EnableReport() 

	// setup monitor
	monitor := delayqueue.NewMonitor("example", redisCli)
	listener := &MyProfiler{
		Start: start.Unix(),
	}
	monitor.ListenEvent(listener)

	// print metrics every minute
	tick := time.Tick(time.Minute)
	go func() {
		for range tick {
			minutes := len(listener.List)-1
			fmt.Printf("%d: %#v", minutes, listener.List[minutes])
		}
	}()
}

Monitor use redis pub/sub to collect data, so it is important to call DelayQueue.EnableReport of all workers, to enable events reporting for monitor.

If you do not want to use redis pub/sub, you can use DelayQueue.ListenEvent to collect data yourself.

Please be advised, DelayQueue.ListenEvent can only receive events from the current instance, while monitor can receive events from all instances in the queue.

Once DelayQueue.ListenEvent is called, the monitor's listener will be overwritten unless EnableReport is called again to re-enable the monitor.

Get Status

You could get Pending Count, Ready Count and Processing Count from the monitor:

func (m *Monitor) GetPendingCount() (int64, error) 

GetPendingCount returns the number of which delivery time has not arrived.

func (m *Monitor) GetReadyCount() (int64, error)

GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered yet

func (m *Monitor) GetProcessingCount() (int64, error)

GetProcessingCount returns the number of messages which are being processed

Cluster

If you are using Redis Cluster, please use NewQueueOnCluster

redisCli := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs: []string{
        "127.0.0.1:7000",
        "127.0.0.1:7001",
        "127.0.0.1:7002",
    },
})
callback := func(s string) bool {
    return true
}
queue := NewQueueOnCluster("test", redisCli, callback)

If you are using transparent clusters, such as codis, twemproxy, or the redis of cluster architecture on aliyun, tencentcloud, just use NewQueue and enable hash tag

redisCli := redis.NewClient(&redis.Options{
    Addr: "127.0.0.1:6379",
})
callback := func(s string) bool {
    return true
}
queue := delayqueue.NewQueue("example", redisCli, callback, UseHashTagKey())

More Details

Here is the complete flowchart:

  • pending: A sorted set of messages pending for delivery. member is message id, score is delivery unix timestamp.
  • ready: A list of messages ready to deliver. Workers fetch messages from here.
  • unack: A sorted set of messages waiting for ack (successfully consumed confirmation) which means the messages here is being processing. member is message id, score is the unix timestamp of processing deadline.
  • retry: A list of messages which processing exceeded deadline and waits for retry
  • garbage: A list of messages reaching max retry count and waits for cleaning

delayqueue's People

Contributors

by1an avatar hdt3213 avatar tic8 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

delayqueue's Issues

consume error: pending2ReadyScript failed: ERR Error running script

当key比较多的时候会报如下错误
consume error: pending2ReadyScript failed: ERR Error running script (call to f_7450a9e992a62833484b2567e7d1b1f70c56eb20): @user_script:8: user_script:8: too many results to unpack
好像是需要限制unpack接收的参数数量?

关于大量任务

func (q *DelayQueue) SendScheduleMsgs(payloads []string, t []time.Time, opts ...interface{}) error {
	retryCount := q.defaultRetryCount
	for _, opt := range opts {
		switch o := opt.(type) {
		case retryCountOpt:
			retryCount = uint(o)
		case msgTTLOpt:
			q.msgTTL = time.Duration(o)
		}
	}
	pipe := q.redisCli.TxPipeline() // 这里是在warpper里添加的
	now := time.Now()
	ctx := context.Background()
	for i := 0; i < len(t); i++ {
		idStr := uuid.Must(uuid.NewRandom()).String()
		msgTTL := t[i].Sub(now) + q.msgTTL
		pipe.Set(ctx, q.genMsgKey(idStr), payloads[i], msgTTL)
		pipe.HSet(ctx, q.retryCountKey, idStr, strconv.Itoa(int(retryCount)))
		values := map[string]float64{idStr: float64(t[i].Unix())}
		var zs []redis.Z
		for member, score := range values {
			zs = append(zs, redis.Z{
				Score:  score,
				Member: member,
			})
		}
		pipe.ZAdd(ctx, q.pendingKey, zs...)
	}
	_, err := pipe.Exec(ctx)
	if err != nil {
		return fmt.Errorf("push to pending failed: %v", err)
	}
	return nil
}

就不提pr了,作者看下,这样写是否有问题.如果可行的话可以手动修改下

callbak开放出来

1.希望 callback处理函数增加 WithCallback 开放出来
2.希望能增加队列阻塞监控

感谢~

没有删除吗

一般来讲,如果到达时间点消费的时候订单状态已经支付了,在消费的时候查表判断吗,感觉不太优雅,不能在支付回调中主动删除队列任务吗

ttl设置很长时间会不会有问题

我是当定时队列使用的,我们的需求是T(工作日)+1执行,所以消息在redis里面的时候可能会有点长
例如国庆是插入消息后的第八天取出消息

担心ttl设置的比较长之后会不会被自动清理掉了

consume error: pending2ReadyScript failed: ERR bad lua script for redis cluster, first parameter of redis.call/redis.pcall must be a single literal string

使用时报错

各个版本

  • github.com/redis/go-redis/v9 v9.4.0
  • github.com/hdt3213/delayqueue v1.0.4
  • Redis: 阿里云集群版 Redis 6.0 | 集群版(16G,共2分片) | 云原生

使用方式

初始化时使用了UseHashTagKey()
delayQueue := delayqueue.NewQueue(queueName, RedisInstance, consumer, delayqueue.UseHashTagKey()).WithConcurrent(concurrent)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.