Giter VIP home page Giter VIP logo

lmstfy's People

Contributors

1ckpwee avatar calvinxiao avatar chensunny avatar dependabot[bot] avatar git-hulk avatar joker-hh avatar lance726 avatar liguangbo avatar napoleon0621 avatar oif avatar ruoshan avatar testwill avatar yesphet avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

lmstfy's Issues

FIX: Prometheus Summery use to much memory

Describe the bug

  • 当队列和 ns 比较多的时候,内存使用过多
  • OS: [e.g. Centos 7]
    Centos 7
  • Version [e.g. v1.0.0]
    master

To Reproduce
Steps to reproduce the behavior:

  1. 创建 1k 的 queue
  2. 发现使用内存会超过 1G,主要是 Prometheus 的 summery

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots

Automatically set GOMAXPROCS according to container quota

Search before asking

  • I had searched in the issues and found no similar issues.

Motivation

Currently, GOMAXPROCS is set from the physical instance(or node)
which is too large since most lmstfy servers were running on Kubernetes.
So we should automatically set it by matching the CPU container quota.

Solution

Import https://github.com/uber-go/automaxprocs to solve this issue

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Proposal: 优先级队列

Introduction

业务希望能够定义任务的优先级,在消费任务的时候可以优先消费任务权重高的任务。
同时,应该保证延迟时间优先,权重其次

Problem

当前lmstfy数据一共分为四部分:

  1. timer_zset:一个zset,用来处理延时,元素为 {namespace len}{namespace}{queue len}{queue}{tries}{jobID len}{jobID}
  2. ready_queue:一个list,延时到期的任务会被放到这个队列等待消费,元素为 {tries}{jobID len}{jobID}
  3. data_pool:普通kv用来存放任务body,{namespace}{queue}{jobID}
  4. deadletter:一个list,重试次数为0的任务会进入这个队列等待销毁或者重新执行

实现优先级队列需要将ready_queue根据权重排序

#30 表述了希望能将timer_set ready_queue中元素的 tries去掉,在ack的时候可以检测这个任务是否被消费,也可以直接从zset中移除这个任务

Design

为解决以上问题,方案如下:

  1. 修改ready_queue为zset,因此timer_set的lua脚本中需要修改list命令为zset命令,消费函数中需要使用zpopmax以及bzpopmax,此命令需要redis>=5.0
  2. 移除timer_set和ready_queue以及deadletter中元素的tries,将tries以及priority以及任务body存放在一起
  3. data_pool由简单kv替换成set,通过hgetall获取tries和priority

由于使用的特性需要redis5.0以上,切数据结构以及数据内容都有break change,计划实现一个新的engine,通过配置可以指定某个pool使用这个engine。通能过区分key也可以和和以前的pool使用同一个实例,并且通过配置migrateTo可以实现数据的迁移。

timer lua

local zset_key = KEYS[1]
local output_queue_prefix = KEYS[2]
local pool_prefix = KEYS[3]
local output_deadletter_prefix = KEYS[4]
local now = ARGV[1]
local limit = ARGV[2]

local expiredMembers = redis.call("ZRANGEBYSCORE", zset_key, 0, now, "LIMIT", 0, limit)

if #expiredMembers == 0 then
	return 0
end

for _,v in ipairs(expiredMembers) do
	local ns, q, job_id = struct.unpack("Hc0Hc0Hc0", v)
	local metadata = redis.call("HMGET", table.concat({pool_prefix, ns, q, job_id}, "/"),"tries","priority") 
	if metadata !== false then
		local result = {}
		for i = 1, #metadata, 2 do
			result[metadata[i]] = metadata[i + 1]
		end
		-- only pump job to ready queue/dead letter if the job did not expire
		if result['tries'] == 0 then
			-- no more tries, move to dead letter
			local val = struct.pack("Hc0",  #job_id, job_id)
			redis.call("PERSIST", table.concat({pool_prefix, ns, q, job_id}, "/"))  -- remove ttl
			redis.call("LPUSH", table.concat({output_deadletter_prefix, ns, q}, "/"), val)
		else
			-- move to ready queue
			local val = struct.pack("Hc0", #job_id, job_id)
			redis.call("ZADD", table.concat({output_queue_prefix, ns, q}, "/"), result['priority'], val)
		end
	end
end
redis.call("ZREM", zset_key, unpack(expiredMembers))
return #expiredMembers

publish 伪代码

hmset job metadata
if job.delay_time>0 
	timer.Add job
else
	queue.Add job
end

consume 伪代码

if timeout > 0 
	job_id = bzpopmax(ready_queue, timeout)
else 
	job_id = zpopmax(ready_queue)
end
jobMetadata = hgetall(job_id)
if jobMetadata["tries"] <=0 
	log.error
else 
	hincrby(job.id, tries,-1)
	timer.add job
end
return jobMetadata

deadletter

可以保留使用List来实现,Respawn的时候需要修改job的metadata以及获取权重

Client golang SDK 不支持死信的删除

场景:
对死信的任务进行补偿处理,处理之后,可以删除掉死信里的任务.

使用 lmstfy client golang 的 sdk,没有发现,可以删除死信任务的方法,可以支持吗?

消费端处理非原子带来的问题

Describe the bug

整体的设计是通过 retry 的方式保证 数据可靠性,
其过程是通过 consumer sdk 在消费的时候,pop+ timer 的方式。

但是这个操作并非通过 lua 做原子的操作,可能在 pop 过程中进程 panic 导致这个 消息丢失没办法被其他进程消费 or 进入死信队列。

所以 PollQueues 这个函数的 BRPop 和 timer.Add 做成 lua 原子的可能会好点。虽然是小概率事件,但是这个场景丢的数据不太好排查和恢复

请问admin的api有访问控制功能么?

比如简单的设置basic auth这种password方式,我看现在是都可以访问。如果只限制在本地IP的话,监控指标的获取、管理操作都不方便,但是如果直接放开外部访问的话,又比较危险

DesignQuestion: 删除Job的策略

想问下删除Job(Delete API)时的策略问题,在删除Job时只是删除了pool中的job,并没有删除timer zset中的对应Job句柄,请问这个的考虑是什么呢?
refer to #26 ,看实践上是等待再次消费的时候自动删除掉

但看代码: https://github.com/bitleak/lmstfy/blob/master/engine/redis/queue.go#L224-L225 ,PollQueues()在拉取ready队列的时候,似乎不管怎样都会让retry 减一并再度放到timer zset中,等待被调度到死信队列或ready队列。
这中间并没有处理job已经被删除的情况。

按我的理解,对于一个已被ACK/Delete的job被再次调度到ready队列时,job被消费时(但因为job被删除,已不在pool中,会再消费其他job)会一直不断递减retry,并加入timer zset,随后timer zset中的job会被调度到ready队列被消费,不断重复,直到retry=0,被timer zset调度到死信队列。

不知道是不是我遗漏了关键代码,感谢解答~

现在所有的任务好像都是放在一个zset中

现在所有task好像都默认放在了key = timer_set的zset中,这样还未到期任务太多,这个zset就会很大吧,遍历排序延时增高。这样不会有什么问题吗?
请问这一块是本身就是这样设计的吗?

[Feature] 统计 delay 队列的分布情况,方便业务做一些监控和预测

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

在的堆积的,是堆积所有的,包括一天后发送的数据
我要的是当前需要发送,但是没有发送的
View newer replies

比如我想看到
当前 lmstfy 里面,未来三天内,每个时间点到期的任务分布

比如
今天 3点-4 点,有 1w 个任务
今天 5 点-6 点,有 10w个任务

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Proposal: timer队列ZSET列表member去除重试次数tries

Introduction

lmstfy中的延时任务主要会经历三种数据结构的存储

  1. 用来解决定时的timer队列,使用Zset实现。该zset内每个member可以解析出队列、jobID、剩余重试次数。member的score为任务的定时时间。在固定间隔内,系统会将当前系统时间之前的任务从timer队列转移到就绪队列,将重试次数用尽而还没有ack的任务转移到死信队列中。
  2. 就绪队列,使用List实现。进入到就绪队列的任务将会被consume请求获取到进行操作。
  3. 死信队列,使用List实现。保存过期还没有被ack等无法处理的任务用于回溯

近期有几个需求:

  1. 完善consume与ack语义,既只能ack被consumed的任务。现有的ack语义实际上是delete语义,仅删除了任务内容,任务元信息并没有处理,依靠系统获取不到任务内容来慢慢消化被提前删掉的任务。
  2. fastack,既在消费了一次任务并且失败后,希望能立刻重试这个任务。需要提前将任务从timer队列移入就绪队列

以上几个需求都需要对timer队列进行操作,由于timer队列中的member包含重试次数tries,而ack等逻辑无法获取到tries等信息。因此目前timer队列的设计无法满足要求。

Problem

在timer队列中操作任务的操作被称为pump。pump会根据tries的值决定这个任务移入死信队列还是就绪队列。因此获取tries的值是必须的。

原本的设计中,将tries存储在timer队列中的成员名称中。zset中每个member的构成:

{namespace len}{namespace}{queue len}{queue}{tries}{jobID len}{jobID}

这样可以节省额外获取tries的复杂操作,但是带来了无法操作timer中任务的问题。因为操作timer中的任务需要知道zset的member,这个member由tries组成,很多时候无法获得,逻辑上也不合理。可能需要改造。

Design

目前timer队列的结构为:

ZADD test_queue member timestamp
member = {namespace len}{namespace}{queue len}{queue}{tries}{jobID len}{jobID}
timestamp = now+delaySecond或者ttr

所以pump部分获取任务的命令为:

local expiredMembers = redis.call("ZRANGEBYSCORE", zset_key, 0, now, "LIMIT", 0, limit)

需要将tries从member的构成中移除,本方案将tries从member中移入score中,变为score的小数部分。这样变更不会影响到旧任务的消费,还能够在指定时间内被pump。
未来timer队列的结构为:

ZADD test_queue member timestamp
member = {namespace len}{namespace}{queue len}{queue}{jobID len}{jobID}
timestamp = {now+delaySecond或者ttr}.{printf(%05d,tries)} 

pump部分获取任务的命令变更为:

local expiredMembersAndScores = redis.call("ZRANGEBYSCORE", zset_key, 0, (now+1, "WITHSCORES", "LIMIT", 0, limit)

还需要兼容旧版本的member,可以根据member的构成来判断是否包含tries,如果不包含tries则从score中获取重试次数。重试次数固定为小数点后5位(原本的重试上限为65535),防止精度问题,截取后六位进行roundup(ceil)。

使用这个方案能够平滑的过渡,旧数据能够被按时消费,新数据以新的方式存储,并能支持部分新的需求,比如fastack等。

Migrate and refactor the Go client library to make it easy to use

Currently, the lmstfy client was hard to use since it implements the API interface indeed, so users MUST understand all APIs before using. Maybe we should offer a more friendly or easy way for new users which we call high-level SDK, and you can go back to use the old one if wants flexible.

BTW, the go client library was in one repo with the lmstfy server which makes the evolution of the client more difficult, so we decide to move it into a separate repo: https://github.com/bitleak/go-lmstfy-client .

Engine v2 metadata manager design

Lmstfy v2 engine metadata manager design

Table of Contents

Introduction

A manager of namespace and queue info. Using Redis publish and subscribe command to trigger manager update cache totally.

Motivation or Background

According to v2 design, we will allocate a timer_set to every queue. So we need to manage namespace and queue metadata properly.

In Lmstfy v2, we need register queue before use. Each Lmstfy instance should discover change and reload it for pumper using.

Detailed Design

MetaManger contains existCache(map[namespace]map[queue]bool) and cache(sorted []meta).
existCache used by check namespace and queue exist
cache used by pumper, pumper select some queues to pump

MetaManager's public API:
Add add namespace and queue to redis set, and then publish update, notify all Lmstfy instance reload
Remove remove queue from namespace set, and then publish update, notify all Lmstfy instance reload
Exist check namespace and queue have registered
Dump dump the metadata manager cached info
Close stop subscribe and stop the watch goroutine

MetaManager's principle:
MetaManager will subscribe the update channel in redis when initialing. Then start a watch goroutine to receive message of channel. When message received, trigger reload and apply data to the meta manager.

Test Design

  • Add and Remove should be discovered
  • update should work properly
  • Exist return correctly

这里的任务调度是什么类型的任务?有讨论群吗,想深入了解lmstfy

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

有讨论群吗,想深入了解lmstfy

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Sanitizer the Redis configuration to prevent data loss

Search before asking

  • I had searched in the issues and found no similar issues.

Motivation

Currently, we are using Redis as backend storage but didn't check the configuration,
this may cause data loss if it was wrongly configured. So I think MUST check the configuration
like maxmemory_policy and aof_enabled and don't allow to start the server if it's not expected value.

For aof_enabled must be enabled and maxmemory_policy must be noeviction.

Solution

Get configurations through the info command and check the value.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

FIX: Push CPU overuse

Describe the bug

  • 在 推送 qps 是 1k 所有 出现使用CPU 到接近 6core
  • OS: [e.g. Centos 7]
    Centos 7
  • Version [e.g. v1.0.0]
    master

To Reproduce
Steps to reproduce the behavior:

  1. 创建 1 万个 worker,访问 404 的服务

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

image
image
image

过期job如何主动清理

请问过期的job要如何手动清理呢,我使用delete 删除但是redis中依然存在。下面截图为lmstfy-server打出来的日志。
image

Request queue pause API to stop consuming

Is your feature request related to a problem? Please describe.
It's no way to stop the partial queues which were something wrong with consuming

Describe the solution you'd like
Allow using admin API to pause the queue

Docker image can run on multiple cpu platforms

Search before asking

  • I had searched in the issues and found no similar issues.

Motivation

Currently, we only built and released the docker image based on amd64
and it can't run on arm64 platform. So we want to support arm64 as well
to make arm64 users happy.

Solution

Use travis ci to build multi platforms docker image

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Engine v2 timer manager design

Lmstfy v2 engine timer manager design

Table of Contents

Introduction

A manager of timer. Manage a goroutine pool to pump specific queue, register self to redis and use lua script to check all timer manager alive.

Motivation or Background

According to v2 design, we will allocate a timer_set to every queue. So there are thousands of timer_set need to pump, every time_set must be pumped every second. For pumping these timer_sets we can't repeat a single pumper, we need design a timer manager to manage pumper goroutine.

Lmstfy is a distributed system, every instance is same. In Lmstfy V1 we have only one timer_set need to be pumped, so every Lmstfy instance can pump every second that won't cause problem. But now, thousands of timer_sets need to be pumped, if we deploy 10+ Lmstfy instance, there will be tens of thousands of lua scripts needed to run in redis within one second. Haha, this is the disaster. So timer manager could reduce duplicate pump of the timer_set.

Detailed Design

TimerManger structure
sequence int, this timer sequence number. for pick queue to pump
pool pool of pumper goroutine, plan to use https://github.com/Jeffail/tunny/tree/1b37d6cb867a93f37f313022f6b7cb5bfe6cdc1b
name this timer manager name, generated by ip+uuid
queues the queueManager(metaManager), get queues to pump

TimerManger's public API:
Add add jobid to timer_set
Remove remove job_id from time_set
Name return timer manager name
Size calculate timer_set size
Shutdown stop pump pool and sequence calculation

TimerManger's principle:

  1. timer manager register
    ┌───────────────────┐    
    │                   │    
    │   TimerManager    │    
    │                   │    
    └───────────────────┘    
              │              
              │              
   HSET key name now()+ttl   
     repeat every second     
              │              
              │              
              ▼              
    ┌───────────────────┐    
    │                   │    
    │       Redis       │    
    │                   │    
    └───────────────────┘    
  1. timer sequence number calculation
     ┌───────────────────┐      
     │                   │      
     │   TimerManager    │      
     │                   │      
     └───────────────────┘      
               │                
               │                
               │                
┌──────────────┴───────────────┐
│run lua script(Pseudo code):  │
│servers = {}                  │
│for k,v in HGETALL key {      │
│    if v < now() {            │
│        HDEL key k            │
│    } else {                  │
│        server.insert(k)      │
│    }                         │
│}                             │
│return table.sort(servers)    │
└──────────────┬───────────────┘
               │                
               ▼                
     ┌───────────────────┐      
     │                   │      
     │       Redis       │      
     │                   │      
     └───────────────────┘      

The sequence number which timerManager in result servers is the temporary sequence number.
Sequence number 0 means pump all queue, the others timerManager pump queues divide equally.

  1. use sequence number filter queues each second, then distribute queue to pumper pool wait for goroutine.

Test Design

  • Add and Remove should be discovered
  • sequence should calculated properly
  • pump should execute timely

goroutine leak by goredis

Describe the bug
A clear and concise description of what the bug is.

当前 sentinel 节点和 redis 节点出现问题的时候,会出现以下场景

  • goroutine和 内存快速增加
  • 当redis 集群恢复 goroutine leak,无法正常恢复
redis-cli -h 10.140.0.40 -p 26381  client list
id=2133 addr=10.140.0.71:52220 fd=2136 name= age=603 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=2135 addr=10.140.0.71:52226 fd=2138 name= age=603 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=2137 addr=10.140.0.71:52232 fd=2140 name= age=603 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=2139 addr=10.140.0.71:52238 fd=2142 name= age=603 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=1077 addr=10.140.0.71:48688 fd=1080 name= age=608 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=1079 addr=10.140.0.71:48694 fd=1082 name= age=608 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=1081 addr=10.140.0.71:48700 fd=1084 name= age=608 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=1083 addr=10.140.0.71:48706 fd=1086 name= age=608 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=2594 addr=10.140.0.40:58842 fd=8 name= age=0 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=client
sunny@vm-test-redis-lmstfy-persist-01-asea1a:/etc/supervisor/conf.d$ redis-cli -h 10.140.0.40 -p 26381   client list  |wc -l
1117
  • OS: [e.g. Centos 7]
    4.14.138+ #1 SMP Tue Sep 3 02:58:08 PDT 2019 x86_64 GNU/Linux
  • Version [e.g. v1.0.0]
    v1.0.3
    To Reproduce
    Steps to reproduce the behavior:

redis 部署如下

root@vm-test-redis-lmstfy-persist-01-asea1a:~# supervisorctl status all
redis-sentinel-26379:lmstfy      RUNNING   pid 14591, uptime 0:11:59
redis-sentinel-26380:lmstfy      RUNNING   pid 14593, uptime 0:11:59
redis-sentinel-26381:lmstfy      RUNNING   pid 14584, uptime 0:13:22
redis-standalone-6379:lmstfy     RUNNING   pid 14592, uptime 0:11:59
redis-standalone-6380:lmstfy     RUNNING   pid 14590, uptime 0:11:59
  1. supervisorctl stop all
    1.2 wait for moment
  2. supervisorctl start redis-sentinel-26381:lmstfy
    2.2 wait for moment
  3. supervisorctl start redis-standalone-6379:lmstfy
  4. supervisorctl start all

Expected behavior
A clear and concise description of what you expected to happen.

预期能正常连接,同时不会出现内存和 goroutine 的问题

Screenshots
If applicable, add screenshots to help explain your problem.

image

image

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.