Giter VIP home page Giter VIP logo

dyno-queues's Introduction

DISCLAIMER: THIS PROJECT IS NO LONGER ACTIVELY MAINTAINED

Dyno Queues

Build Status Dev chat at https://gitter.im/Netflix/dynomite

Dyno Queues is a recipe that provides task queues utilizing Dynomite.

Dyno Queues Features

  • Time based queues. Each queue element has a timestamp associated and is only polled out after that time.
  • Priority queues
  • No strict FIFO semantics. However, within a shard, the elements are delivered in FIFO (depending upon the priority)

dyno-queues's People

Contributors

davidwadden avatar ipapapa avatar jkschneider avatar kishorekasi avatar kowalczykbartek avatar michel-zededa avatar ricool06 avatar robzienert avatar rpalcolea avatar rsrinivasannetflix avatar saurabhdev7933 avatar smukil avatar v1r3n avatar vikramsingh2104 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dyno-queues's Issues

RedisDynoQueue.pop() not returning HTTP tasks that are in the queue

We have HTTP tasks that are in SCHEDULED status but the pop() method of RedisDynoQueue is not returning them. If I call the peek method of the queue I can see them but the pop() method acts like they are not there. I have tried to debug around that class but RedisDynoQueue is in the dyno queues project and that project uses other classes from other projects so it is difficult.

I had a look at the code but without being able to log information at runtime it is tough to find out what is going on. There are also several other very similar HTTP tasks that get processed fine.

This also seems to happen when more than 5 tasks are pushed to the HTTP queue around the same time. Then some of them just stay in the queue, even when the activity dies down.

Is there any reason why the pop() would not return what the peek() method sees?

HTTP Task remains in scheduled state

The workflow created from conductor UI using new workflow definition button creates a basic workflow with default HTTP task called get_population_data
The workflow remains in running state and http task is scheduled. I expect it to go to complete while skipping running , almost immiediately. but not the behaviour i see in 3.13.5

it remains in scheduled state

here is the json

{
"createTime": 1680633455822,
"name": "Conduc_3.13.5",
"description": "conductor workflow for 3.13.5",
"version": 1,
"tasks": [
{
"name": "get_population_data",
"taskReferenceName": "get_population_data",
"inputParameters": {
"http_request": {
"uri": "https://datausa.io/api/data?drilldowns=Nation&measures=Population",
"method": "GET"
}
},
"type": "HTTP",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
],
"inputParameters": [],
"outputParameters": {
"data": "${get_population_data.output.response.body.data}",
"source": "${get_population_data.output.response.body.source}"
},
"schemaVersion": 2,
"restartable": true,
"workflowStatusListenerEnabled": false,
"ownerEmail": "[email protected]",
"timeoutPolicy": "ALERT_ONLY",
"timeoutSeconds": 0,
"variables": {},
"inputTemplate": {}
}

steps to reproduce:

connect to conductor ui
click on definition
click blue button "new workflow definition"
enter name or workflow and desc and other mandatory fields
save it

click on workbench and select the newly created workflow
execute workflow
navigate to executions screen and wait for workflow to show up
verify status is running and task is scheduled

i do not see task in HTTP task queue
i do not have invisible duration

we are using mysql not postgres for persistence. is there an issue with dyno queues not popping duplicate elements?
any help or update would help us decide on how to proceed on this version or wait for fix

Operation: ( remove ) failed on key: [(a shard in) DECISION ]

Hello.

I have a lot of tasks running on Conductor. When my workers are trying to execute them on polling, conductor responds with that kind of error

 {"code":"INTERNAL_ERROR","message":"INTERNAL_ERROR - Operation: ( remove ) failed on key: [(a shard in) DECISION ].","retryable":false,"instance":"18eb47ff475b"}

What is the meaning of this error?

Cannot remove taskId from the queue shard conductor.test.QUEUE.HTTP.a

Hi @apanicker-nflx @kishorebanala ,
We are facing a lot of these warnings in conductor.

cannot add 2b5a87f2-c97f-4294-b883-57869f5fd65d from the queue shard conductor.test.QUEUE.HTTP.a
cannot remove b3ce517b-eeef-474f-94e9-3468e6b0927b from the queue shard conductor.test.QUEUE._deciderQueue.a
cannot add b3ce517b-eeef-474f-94e9-3468e6b0927b from the queue shard conductor.test.QUEUE._deciderQueue.a

We are running Redis cluster setup. Also, enabled workflow execution locking a few days back but that did not help much.
We also see the deciderQueue getting piled up and we run Sweeper. But running Sweeper also does not help much because sweeper also does pop from redis and I see that in pop itself these exception comes.
Can you please help us with this?
Does it cause any harm in running workflows?
We also see PENDING_WORKFLOWS set also gets piled up and we terminate all old workflows based on the workflow creation time.

processUnack not called after re-establishing connection to Redis DynoQueue

I have a Netflix conductor server along with Dynomite DB that is running a workflow where the client doesn't respond and the timeout kicks in via sweeper functionality the task times out and is re-scheduled.

But if I take the Dynomite server down temporarily and bring it back, then timeout of tasks stop working. When Dynomite server is not reachable I see logs/traceback about error in running sweeper which is as expected, once connection is re-established those errors stops but sweeper doesn't pick up any workflows to sweep.

Based on debug logs and redis commands issued it looks like the unack queue has some entries but those unack entries are not processed.
I have tried dyno-queue with version release 2.0.13 and see this behavior.

Any ideas/comments on how to fix this?

Message priority is ignored

Message priority is not taken into account because of the bug in com.netflix.dyno.queues.redis.RedisDynoQueue.push :

instead of

double priority = message.getPriority() / 100;

should be

double priority = message.getPriority() / 100.0;

Unit test issue: com.netflix.dyno.queues.redis.v2.RedisDynoQueueTest > testTimeoutUpdate FAILED

One test in testTimeoutUpdate is flawed as its result is not determinated.

        boolean updated = rdq.setUnackTimeout(id, 500);
        assertTrue(updated);
        popped = rdq.pop(1, 1, TimeUnit.SECONDS);  // <-- RedisPipelineQueue.processUnacks() might be scheduled while waiting in pop operation
        assertNotNull(popped);
        assertEquals(0, popped.size());

RedisPipelineQueue.processUnacks() is triggered periodically via a scheduled thread pool. If it happens to run while waiting for pop to finish, pop may return some results and the test fails.

The full console output is as follows:

[INFO ] 2020-08-22 14:28:16,790 RedisPipelineQueue - com.netflix.dyno.queues.redis.v2.RedisPipelineQueue is ready to serve {test_queue.x}, shard=x
[DEBUG] 2020-08-22 14:28:18,801 RedisPipelineQueue - Adding 1 messages back to the queue for test_queue
[DEBUG] 2020-08-22 14:28:18,810 RedisPipelineQueue - Adding 1 messages back to the queue for test_queue
[DEBUG] 2020-08-22 14:28:18,825 RedisPipelineQueue - Adding 1 messages back to the queue for test_queue
[DEBUG] 2020-08-22 14:28:18,846 RedisPipelineQueue - Adding 2 messages back to the queue for test_queue
[DEBUG] 2020-08-22 14:28:18,890 RedisPipelineQueue - Adding 1 messages back to the queue for test_queue
[DEBUG] 2020-08-22 14:28:18,898 RedisPipelineQueue - Adding 1 messages back to the queue for test_queue
[DEBUG] 2020-08-22 14:28:18,913 RedisPipelineQueue - Adding 2 messages back to the queue for test_queue
[DEBUG] 2020-08-22 14:28:18,947 RedisPipelineQueue - Adding 1 messages back to the queue for test_queue
[DEBUG] 2020-08-22 14:28:19,237 RedisPipelineQueue - Cannot get the message payload some fake id
Message [id=0, payload=Hello World-0, timeout=0, priority=10]
Message [id=1, payload=Hello World-1, timeout=0, priority=9]
Message [id=2, payload=Hello World-2, timeout=0, priority=8]
Message [id=3, payload=Hello World-3, timeout=0, priority=7]
Message [id=4, payload=Hello World-4, timeout=0, priority=6]
Message [id=5, payload=Hello World-5, timeout=0, priority=5]
Message [id=6, payload=Hello World-6, timeout=0, priority=4]
Message [id=7, payload=Hello World-7, timeout=0, priority=3]
Message [id=8, payload=Hello World-8, timeout=0, priority=2]
Message [id=9, payload=Hello World-9, timeout=0, priority=1]
[DEBUG] 2020-08-22 14:28:23,895 RedisPipelineQueue - Adding 1 messages back to the queue for test_queue

expected:<0> but was:<1>
Expected :0
Actual   :1
<Click to see difference>

java.lang.AssertionError: expected:<0> but was:<1>
	at org.junit.Assert.fail(Assert.java:88)
	at org.junit.Assert.failNotEquals(Assert.java:743)
        at com.netflix.dyno.queues.redis.v2.RedisDynoQueueTest.testTimeoutUpdate(RedisDynoQueueTest.java:119)

How does dyno-queues make sure that message would be consumed only once.

Hi, I wanna to find a easy delay queue for my application.Then I find your dyno-quues.
I read your blog and code, I find there is maybe a problem about message being consumed.
When pop, you use peekIds() method to get from redis.
`
private Set peekIds(int offset, int count) {

    return execute(() -> {
        double now = Long.valueOf(System.currentTimeMillis() + 1).doubleValue();
        Set<String> scanned = quorumConn.zrangeByScore(myQueueShard, 0, now, offset, count);
        return scanned;
    });

}`

Then you remove them in _pop() method.
long removed = quorumConn.zrem(myQueueShard, msgId); if (removed == 0) { if (logger.isDebugEnabled()) { logger.debug("cannot remove from the queue shard " + msgId); } monitor.misses.increment(); continue; }
My question is that if I consume those messages with multi consumers, how to make sure that each message will be consumed once during peekIds() and _pop(). It seems like one message could be get by different consumer.

RedisDynoQueue.pop() not returning HTTP tasks that are in the queue

When we use the Redis implementation (not MySql), we have HTTP tasks that are in SCHEDULED status but the pop() method of RedisDynoQueue is not returning them. The reason these SCHEDULED tasks are not popped is because the RedisMock class's zAdd method for these tasks for the HTTP UNACK queue returns 0. That causes the task to not be popped in the _pop method of RedisDynoQueue. These tasks become stranded and will never be returned on a poll.

Seems to be a bug with RedisSortedSetCache. It is trying to maintain two maps, the cache map and the scores map. The scores map value is a Map and the cache map value is a Set. When tasks are added to the UNACK.HTTP cache after they have already been added before, a duplicate entry is added to the cache map's Set but the scores entry is just overwritten (since the put does not allow dups). Then when the entry is removed, only one is removed from the cache, leaving one (or more) and the scores entry is deleted entirely.

The Comparator of the cache map is the culprit. It does comparisons using the scores map, not the cache map, so the contract of the SortedSet that says it won't allow duplicates is broken, at least for the way that Conductor is using that class. The contains() method uses the Comparator to determine if the entry already exists and because of the Comparator the contains() will sometimes return false even if there is an entry. Conductor uses this contains() method.

When the pop() method of DynoQueueDAO is called, it calls the pop() method of the “conductor_queues.test.UNACK.HTTP.c” RedisDynoQueue which calls its internal _pop() method. In its _pop() method, it does this for each task in the UNACK.HTTP queue:

             long added = quorumConn.zadd(unackQueueName, unackScore, msgId, zParams);
            if(added == 0){
                   monitor.misses.increment();
                   continue;
            }

So if ‘added=0’, that task will not be added to the tasks to pop. In the zadd method of RedisMock, the contains() method of the cache map is used to see if the task exists in the cache. If it does, it returns 0. The contains() method uses the Comparator specified by RedisSortedSet.set() method which doesn’t just check the task id of the others in the set, it also will return a 0 (equals) if both scores for those tasks are null or if the score being added is greater than any other in the set, which they are at that point. So it returns 0 because either both scores are null or the to-be-added score is greater than the highest in the set and it is not popped.

Once there are duplicates, that causes issues that keep SCHEDULED tasks from being popped.

I am not sure where to go with this. Should Conductor use a different Redis class for its persistence of the tasks in the queue or is this something that should be changed in RedisMock?

Dyno Queue pop doesnt work as expected

If the thread to consume the messages is started first and then we push the messages queue, the push works fine but , pop doesn't happen at all. The timeout in pop is set 1 day.
here is the sample code we are using:
Thread 1 - pop
ExecutorService executorService = Executors.newSingleThreadExecutor();
Runnable runnable = ()->{
while(true) {
V1Queue.pop(1,1,TimeUnit.DAYS).forEach(DynoQueueMain::processMessage);
}
};
Runnable runnable1 =()->{
for(int i=0;i<5;i++) {
Message m = new Message("id-"+i, "payload-extra-"+i);
messages.add(m);
V1Queue.push(messages);

        }
    };
    executorService1.submit(runnable1);

version of dynomite 0.5.7

After the restarting the application , the pop is done.
The code gets stuck in
while(this.prefetchedIds.size() < messageCount && this.clock.millis() - start < waitFor) {
Uninterruptibles.sleepUninterruptibly(200L, TimeUnit.MILLISECONDS);
this.prefetchIds();
}
though the messages are pushed after the thread is started.
Steps:
start the consumer thread
set the timeout 1 day
later start pushing the messages
the message gets pushed.
pop doesnt happen

Prefetching ids multiple times leading to duplication.

If there are not enough ids in shard to fetch, prefetchId of will be executed multiple times in com.netflix.dyno.queues.redis.RedisDynoQueue#pop.

            prefetchIds();
            while (prefetchedIds.size() < messageCount && ((clock.millis() - start) < waitFor)) {
                Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
                prefetchIds();
            }

Which will result in duplicated id and printing warn logs like cannot add 66883d7b-3929-471a-b3e9-893df395dd10 to the unack shard

Testing workflow/task with priority

In the latest code of netflix conductor added support for priority for workflow.
Need to understand how priority works with workflow/tasks when startWorkflow invoked.

I have tested with simple workflow, just calling an http task.
submitting 3 workflows with different priority, but the conductor server executing those workflows in order of submission(since you are scheduling the workflow for later some time).

#Few More details:
**I have followed below steps to check the priority testing:
1.Registered Task
2.Registered Workflow(SIMPLE)
3.Created Worker and started polling the registered the tasks.

Then, submitted 5 curl request to start the workflow with different priority(0-99) (used endpoint which accepts priority in query param),.
But the worker is polling the task & getting in the order of submission.

[
What are you doing here in the class RedisDynoQueue.class method public List push(final List messages) with priority how it set the priority to the message here.

double priority = message.getPriority() / 100.0;
double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority;
String shard = shardingStrategy.getNextShard(allShards, message);
String queueShard = getQueueShardKey(queueName, shard);
quorumConn.zadd(queueShard, score, message.getId());

]

Please need your help to understand and implement the priority with workflow.

(Netflix/conductor#1224)

Performance Issues while using dyno-queues over dynomite

I am using Netflix conductor for quite a while. But I am seeing lot of performance issues, especially w.r.t polling. Netflix conductor provides in-memory configuration and dynomite based configuration. When I execute workflows using both the configuration, I see an difference of 4000 ms w.r.t in-memory and dynomite. While worker poll, I see lot polling timeout issues, if the no.of parallel workflows crosses beyond 20. I see people claiming around millions of workflows without any issues, but I am not able to scale beyond 20. So Netflix conductor community redirected here for possible configuration issues. I am using the default configuration provided for Netflix conductor without any changes. So, I like to know whether I need to change any configuration for dynoqueue and dynomite. Any help on this would be appreiated.

Misleading exception thrown when dynamo can't connect to any node in cluster

This happens when nodes denoted as dynamo cluster hosts are not available (see attached project)

We receive somewhere in logs

[WARN] [com.netflix.dyno.connectionpool.impl.HostConnectionPoolImpl] Unable to make any successful connections to host Host [hostname=localhost, ipAddress=127.0.0.1, port=8102, rack: rack1, status: Up]

but the exception which is being received while calling

 DynoJedisClient dynoClient = new DynoJedisClient.Builder()  //
                .withApplicationName("appname")   //
                .withHostSupplier(customHostSupplier)  //
                .withCPConfig(connectionPoolConfiguration)//
                .build();

is ugly ArrayIndexOutOfBoundsException

Caused by: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 0
	at com.netflix.dyno.jedis.DynoJedisClient$Builder.startConnectionPool(DynoJedisClient.java:3414)
	at com.netflix.dyno.jedis.DynoJedisClient$Builder.buildDynoJedisClient(DynoJedisClient.java:3383)
	at com.netflix.dyno.jedis.DynoJedisClient$Builder.build(DynoJedisClient.java:3302)
	at com.hybris.decay.client.DynoQueueClient.<init>(DynoQueueClient.java:62)
	at com.hybris.decay.client.DynoQueueClient.<init>(DynoQueueClient.java:38)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
	at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:147)
	... 77 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
	at com.netflix.dyno.connectionpool.impl.lb.HostSelectionWithFallback.calculateReplicationFactor(HostSelectionWithFallback.java:389)
	at com.netflix.dyno.connectionpool.impl.lb.HostSelectionWithFallback.initWithHosts(HostSelectionWithFallback.java:346)
	at com.netflix.dyno.connectionpool.impl.ConnectionPoolImpl.initSelectionStrategy(ConnectionPoolImpl.java:605)
	at com.netflix.dyno.connectionpool.impl.ConnectionPoolImpl.start(ConnectionPoolImpl.java:505)
	at com.netflix.dyno.jedis.DynoJedisClient$Builder.startConnectionPool(DynoJedisClient.java:3397)
	... 86 more

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.