bitleak / lmstfy Goto Github PK
View Code? Open in Web Editor NEWA task queue with REST API
License: MIT License
A task queue with REST API
License: MIT License
Describe the bug
To Reproduce
Steps to reproduce the behavior:
Expected behavior
A clear and concise description of what you expected to happen.
Screenshots
rate limit of the queue:
Currently, GOMAXPROCS
is set from the physical instance(or node)
which is too large since most lmstfy servers were running on Kubernetes.
So we should automatically set it by matching the CPU container quota.
Import https://github.com/uber-go/automaxprocs
to solve this issue
挺不错的 科技公司多搞些组件的自研,二次开发或者封装很有技术氛围
业务希望能够定义任务的优先级,在消费任务的时候可以优先消费任务权重高的任务。
同时,应该保证延迟时间优先,权重其次
当前lmstfy数据一共分为四部分:
实现优先级队列需要将ready_queue根据权重排序
#30 表述了希望能将timer_set ready_queue中元素的 tries去掉,在ack的时候可以检测这个任务是否被消费,也可以直接从zset中移除这个任务
为解决以上问题,方案如下:
由于使用的特性需要redis5.0以上,切数据结构以及数据内容都有break change,计划实现一个新的engine,通过配置可以指定某个pool使用这个engine。通能过区分key也可以和和以前的pool使用同一个实例,并且通过配置migrateTo可以实现数据的迁移。
local zset_key = KEYS[1]
local output_queue_prefix = KEYS[2]
local pool_prefix = KEYS[3]
local output_deadletter_prefix = KEYS[4]
local now = ARGV[1]
local limit = ARGV[2]
local expiredMembers = redis.call("ZRANGEBYSCORE", zset_key, 0, now, "LIMIT", 0, limit)
if #expiredMembers == 0 then
return 0
end
for _,v in ipairs(expiredMembers) do
local ns, q, job_id = struct.unpack("Hc0Hc0Hc0", v)
local metadata = redis.call("HMGET", table.concat({pool_prefix, ns, q, job_id}, "/"),"tries","priority")
if metadata !== false then
local result = {}
for i = 1, #metadata, 2 do
result[metadata[i]] = metadata[i + 1]
end
-- only pump job to ready queue/dead letter if the job did not expire
if result['tries'] == 0 then
-- no more tries, move to dead letter
local val = struct.pack("Hc0", #job_id, job_id)
redis.call("PERSIST", table.concat({pool_prefix, ns, q, job_id}, "/")) -- remove ttl
redis.call("LPUSH", table.concat({output_deadletter_prefix, ns, q}, "/"), val)
else
-- move to ready queue
local val = struct.pack("Hc0", #job_id, job_id)
redis.call("ZADD", table.concat({output_queue_prefix, ns, q}, "/"), result['priority'], val)
end
end
end
redis.call("ZREM", zset_key, unpack(expiredMembers))
return #expiredMembers
hmset job metadata
if job.delay_time>0
timer.Add job
else
queue.Add job
end
if timeout > 0
job_id = bzpopmax(ready_queue, timeout)
else
job_id = zpopmax(ready_queue)
end
jobMetadata = hgetall(job_id)
if jobMetadata["tries"] <=0
log.error
else
hincrby(job.id, tries,-1)
timer.add job
end
return jobMetadata
可以保留使用List来实现,Respawn的时候需要修改job的metadata以及获取权重
场景:
对死信的任务进行补偿处理,处理之后,可以删除掉死信里的任务.
使用 lmstfy client golang 的 sdk,没有发现,可以删除死信任务的方法,可以支持吗?
是否支持redis sentinal
监控指标如何查看,是否有http api会透出,比如堆积job数等等;
Describe the bug
整体的设计是通过 retry 的方式保证 数据可靠性,
其过程是通过 consumer sdk 在消费的时候,pop+ timer 的方式。
但是这个操作并非通过 lua 做原子的操作,可能在 pop 过程中进程 panic 导致这个 消息丢失没办法被其他进程消费 or 进入死信队列。
所以 PollQueues 这个函数的 BRPop 和 timer.Add 做成 lua 原子的可能会好点。虽然是小概率事件,但是这个场景丢的数据不太好排查和恢复
how do confirm client is connected successful
export the Redis max_memory
to alert the warning when the memory usage was too high
比如简单的设置basic auth这种password方式,我看现在是都可以访问。如果只限制在本地IP的话,监控指标的获取、管理操作都不方便,但是如果直接放开外部访问的话,又比较危险
想问下删除Job(Delete API)时的策略问题,在删除Job时只是删除了pool中的job,并没有删除timer zset中的对应Job句柄,请问这个的考虑是什么呢?
refer to #26 ,看实践上是等待再次消费的时候自动删除掉
但看代码: https://github.com/bitleak/lmstfy/blob/master/engine/redis/queue.go#L224-L225 ,PollQueues()在拉取ready队列的时候,似乎不管怎样都会让retry 减一并再度放到timer zset中,等待被调度到死信队列或ready队列。
这中间并没有处理job已经被删除的情况。
按我的理解,对于一个已被ACK/Delete的job被再次调度到ready队列时,job被消费时(但因为job被删除,已不在pool中,会再消费其他job)会一直不断递减retry,并加入timer zset,随后timer zset中的job会被调度到ready队列被消费,不断重复,直到retry=0,被timer zset调度到死信队列。
不知道是不是我遗漏了关键代码,感谢解答~
现在所有task好像都默认放在了key = timer_set的zset中,这样还未到期任务太多,这个zset就会很大吧,遍历排序延时增高。这样不会有什么问题吗?
请问这一块是本身就是这样设计的吗?
Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
在的堆积的,是堆积所有的,包括一天后发送的数据
我要的是当前需要发送,但是没有发送的
View newer replies
比如我想看到
当前 lmstfy 里面,未来三天内,每个时间点到期的任务分布
比如
今天 3点-4 点,有 1w 个任务
今天 5 点-6 点,有 10w个任务
Describe the solution you'd like
A clear and concise description of what you want to happen.
Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.
Additional context
Add any other context or screenshots about the feature request here.
lmstfy中的延时任务主要会经历三种数据结构的存储
近期有几个需求:
以上几个需求都需要对timer队列进行操作,由于timer队列中的member包含重试次数tries,而ack等逻辑无法获取到tries等信息。因此目前timer队列的设计无法满足要求。
在timer队列中操作任务的操作被称为pump。pump会根据tries的值决定这个任务移入死信队列还是就绪队列。因此获取tries的值是必须的。
原本的设计中,将tries存储在timer队列中的成员名称中。zset中每个member的构成:
{namespace len}{namespace}{queue len}{queue}{tries}{jobID len}{jobID}
这样可以节省额外获取tries的复杂操作,但是带来了无法操作timer中任务的问题。因为操作timer中的任务需要知道zset的member,这个member由tries组成,很多时候无法获得,逻辑上也不合理。可能需要改造。
目前timer队列的结构为:
ZADD test_queue member timestamp
member = {namespace len}{namespace}{queue len}{queue}{tries}{jobID len}{jobID}
timestamp = now+delaySecond或者ttr
所以pump部分获取任务的命令为:
local expiredMembers = redis.call("ZRANGEBYSCORE", zset_key, 0, now, "LIMIT", 0, limit)
需要将tries从member的构成中移除,本方案将tries从member中移入score中,变为score的小数部分。这样变更不会影响到旧任务的消费,还能够在指定时间内被pump。
未来timer队列的结构为:
ZADD test_queue member timestamp
member = {namespace len}{namespace}{queue len}{queue}{jobID len}{jobID}
timestamp = {now+delaySecond或者ttr}.{printf(%05d,tries)}
pump部分获取任务的命令变更为:
local expiredMembersAndScores = redis.call("ZRANGEBYSCORE", zset_key, 0, (now+1, "WITHSCORES", "LIMIT", 0, limit)
还需要兼容旧版本的member,可以根据member的构成来判断是否包含tries,如果不包含tries则从score中获取重试次数。重试次数固定为小数点后5位(原本的重试上限为65535),防止精度问题,截取后六位进行roundup(ceil)。
使用这个方案能够平滑的过渡,旧数据能够被按时消费,新数据以新的方式存储,并能支持部分新的需求,比如fastack等。
我的redis服务不支持script命令 ,请问可以提供使用eval执行的代码么?
Currently, the lmstfy client was hard to use since it implements the API interface indeed, so users MUST understand all APIs before using. Maybe we should offer a more friendly or easy way for new users which we call high-level SDK, and you can go back to use the old one if wants flexible.
BTW, the go client library was in one repo with the lmstfy server which makes the evolution of the client more difficult, so we decide to move it into a separate repo: https://github.com/bitleak/go-lmstfy-client .
We are going to remove the push module in the next release, since it may cause a performance reduction.
多实例下,worker数量是每个机器都是一样的配置么?也就是总worker = 机器数量 * worker配置数量?
A manager of namespace and queue info. Using Redis publish and subscribe command to trigger manager update cache totally.
According to v2 design, we will allocate a timer_set to every queue. So we need to manage namespace and queue metadata properly.
In Lmstfy v2, we need register queue before use. Each Lmstfy instance should discover change and reload it for pumper using.
MetaManger contains existCache
(map[namespace]map[queue]bool) and cache
(sorted []meta).
existCache
used by check namespace and queue exist
cache
used by pumper, pumper select some queues to pump
MetaManager's public API:
Add
add namespace and queue to redis set, and then publish update, notify all Lmstfy instance reload
Remove
remove queue from namespace set, and then publish update, notify all Lmstfy instance reload
Exist
check namespace and queue have registered
Dump
dump the metadata manager cached info
Close
stop subscribe and stop the watch goroutine
MetaManager's principle:
MetaManager will subscribe the update channel in redis when initialing. Then start a watch goroutine to receive message of channel. When message received, trigger reload and apply data to the meta manager.
Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
有讨论群吗,想深入了解lmstfy
Describe the solution you'd like
A clear and concise description of what you want to happen.
Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.
Additional context
Add any other context or screenshots about the feature request here.
你好,是否可以提供grafana监控指标配置模板.
业务指标和性能指标支持挺丰富的。
Currently, we are using Redis as backend storage but didn't check the configuration,
this may cause data loss if it was wrongly configured. So I think MUST check the configuration
like maxmemory_policy
and aof_enabled
and don't allow to start the server if it's not expected value.
For aof_enabled
must be enabled and maxmemory_policy
must be noeviction.
Get configurations through the info command and check the value.
Describe the bug
To Reproduce
Steps to reproduce the behavior:
Expected behavior
A clear and concise description of what you expected to happen.
Screenshots
If applicable, add screenshots to help explain your problem.
current go-redis was too old for us to move forward, like add metrics with the hook.
[Pool.migrate]是做什么用的?什么场景下会用到另一个redis实例?求解答谢谢
we may want to know whether last tries or not and may do some actions to record the failure task for further recovery
We're missing the metric to record failure requests on authorizing and the server has no way to know when suffering token issues.
对queue部分用了list实现,蛮赞的👍。
期待会有“主动”推的实现。
Is your feature request related to a problem? Please describe.
It's no way to stop the partial queues which were something wrong with consuming
Describe the solution you'd like
Allow using admin API to pause the queue
Currently, we only built and released the docker image based on amd64
and it can't run on arm64 platform. So we want to support arm64
as well
to make arm64 users happy.
Use travis ci to build multi platforms docker image
为什么不支持批量任务拉取及生成呢?
我现在每天爬虫的任务大概有3亿个,一个一个拉取有点太麻烦了
A manager of timer. Manage a goroutine pool to pump specific queue, register self to redis and use lua script to check all timer manager alive.
According to v2 design, we will allocate a timer_set to every queue. So there are thousands of timer_set need to pump, every time_set must be pumped every second. For pumping these timer_sets we can't repeat a single pumper, we need design a timer manager to manage pumper goroutine.
Lmstfy is a distributed system, every instance is same. In Lmstfy V1 we have only one timer_set need to be pumped, so every Lmstfy instance can pump every second that won't cause problem. But now, thousands of timer_sets need to be pumped, if we deploy 10+ Lmstfy instance, there will be tens of thousands of lua scripts needed to run in redis within one second. Haha, this is the disaster. So timer manager could reduce duplicate pump of the timer_set.
TimerManger structure
sequence
int, this timer sequence number. for pick queue to pump
pool
pool of pumper goroutine, plan to use https://github.com/Jeffail/tunny/tree/1b37d6cb867a93f37f313022f6b7cb5bfe6cdc1b
name
this timer manager name, generated by ip+uuid
queues
the queueManager(metaManager), get queues to pump
TimerManger's public API:
Add
add jobid to timer_set
Remove
remove job_id from time_set
Name
return timer manager name
Size
calculate timer_set size
Shutdown
stop pump pool and sequence calculation
TimerManger's principle:
┌───────────────────┐
│ │
│ TimerManager │
│ │
└───────────────────┘
│
│
HSET key name now()+ttl
repeat every second
│
│
▼
┌───────────────────┐
│ │
│ Redis │
│ │
└───────────────────┘
┌───────────────────┐
│ │
│ TimerManager │
│ │
└───────────────────┘
│
│
│
┌──────────────┴───────────────┐
│run lua script(Pseudo code): │
│servers = {} │
│for k,v in HGETALL key { │
│ if v < now() { │
│ HDEL key k │
│ } else { │
│ server.insert(k) │
│ } │
│} │
│return table.sort(servers) │
└──────────────┬───────────────┘
│
▼
┌───────────────────┐
│ │
│ Redis │
│ │
└───────────────────┘
The sequence number which timerManager in result servers is the temporary sequence number.
Sequence number 0 means pump all queue, the others timerManager pump queues divide equally.
请问一下,成功消费的任务也会加入死信队列。这样死信队列中的任务会越来越多。这样不会对性能,内存造成比较大的影响吗?
Describe the bug
A clear and concise description of what the bug is.
当前 sentinel 节点和 redis 节点出现问题的时候,会出现以下场景
redis-cli -h 10.140.0.40 -p 26381 client list
id=2133 addr=10.140.0.71:52220 fd=2136 name= age=603 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=2135 addr=10.140.0.71:52226 fd=2138 name= age=603 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=2137 addr=10.140.0.71:52232 fd=2140 name= age=603 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=2139 addr=10.140.0.71:52238 fd=2142 name= age=603 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=1077 addr=10.140.0.71:48688 fd=1080 name= age=608 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=1079 addr=10.140.0.71:48694 fd=1082 name= age=608 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=1081 addr=10.140.0.71:48700 fd=1084 name= age=608 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=1083 addr=10.140.0.71:48706 fd=1086 name= age=608 idle=2 flags=N db=0 sub=1 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=2594 addr=10.140.0.40:58842 fd=8 name= age=0 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=client
sunny@vm-test-redis-lmstfy-persist-01-asea1a:/etc/supervisor/conf.d$ redis-cli -h 10.140.0.40 -p 26381 client list |wc -l
1117
redis 部署如下
root@vm-test-redis-lmstfy-persist-01-asea1a:~# supervisorctl status all
redis-sentinel-26379:lmstfy RUNNING pid 14591, uptime 0:11:59
redis-sentinel-26380:lmstfy RUNNING pid 14593, uptime 0:11:59
redis-sentinel-26381:lmstfy RUNNING pid 14584, uptime 0:13:22
redis-standalone-6379:lmstfy RUNNING pid 14592, uptime 0:11:59
redis-standalone-6380:lmstfy RUNNING pid 14590, uptime 0:11:59
Expected behavior
A clear and concise description of what you expected to happen.
预期能正常连接,同时不会出现内存和 goroutine 的问题
Screenshots
If applicable, add screenshots to help explain your problem.
假如因为一些原因,token失效了,可能就导致api访问不通了,这个是不是风险挺高的?
Errors of initialize the meta manager while setup is unhandled that may cause monitor invalid.
我有一个任务,他需要一直存在,每隔一段时间便进行调用一次。比如每隔五分钟更新一次缓存。我查看了api,发现并没有一个参数,可以设定为永远按间隔时间进行调用
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.