Giter VIP home page Giter VIP logo

Comments (32)

rclough avatar rclough commented on June 19, 2024

I've corresponded privately on this issue, but would like to note that this is a priority issue for me and my team, and I intend on working on the implementation.

Some thoughts:

  • Extending DAG Cycle detection:
    • Once you can reference other jobs, cycles become easier to sneak in
    • Can the underlying DAG data structure current handle adding a DAG to a DAG? Using the above example:
    • Task 1 now needs to become the parent of all starting tasks for Job 2. Job 2's completing tasks need to be added as parents to whatever the "Job 1" reference is parent of.
    • Its certainly a realistic addition, just something to consider
  • How should job completion be detected? Should the child report to the parent? Or should the parent job poll the child for job status?
  • This raises another question which is, what if you have a Job 3, which only runs when Job 1 and Job 2 are complete?
  • Let me attempt to answer my question with a proposal: This case can be handled by haivng a "Meta-job" (nominally "Job 4") That basically creats a DAG of just job references.
  • In this case, perhaps any job-start should be have an optional parameter of whatever job/task it needs to report back to upon completion, allowing maximum flexibility.

I think personally I prefer notifications over polling, but am always open to ideas in the other direction.

from dagobah.

rclough avatar rclough commented on June 19, 2024

Another edge case: Failed tasks. If a task fails, and that task was a job reference, then "Start jobs from failed tasks" needs to drill down into the child job, and trigger "start job from failed task" on the child job (theoretically many levels deep)

from dagobah.

rclough avatar rclough commented on June 19, 2024

Other cool idea (non essential) but double clicking a Job task and seeing its steps (even status) or at least double clicking and opening that job page.

Which brings up another idea of potentially instead of literally calling the job, just inheriting the tasks and running them "natively" within the single job. Complexities I can think of are command logging and failed task handling. Not sure if I'm into it, I'm just brain storming.

from dagobah.

thieman avatar thieman commented on June 19, 2024

DAG cycle detection sounds like a fun problem. Needs to be able to support the same Job being called multiple times within the main Job (as long as it doesn't create a cycle) but also needs to not get stuck in an infinite loop in DAG resolution. Certainly achievable.

I think the idea of inheriting the Tasks of each child job is definitely the simplest approach. This allows us to sidestep a lot of the hard problems with managing child Jobs as first-class citizens.

So, how about this for a simple version: when a Job is started from the beginning, it snapshots the current Task config of all its child Jobs. It inlines these tasks for purposes of its own DAG validation and execution. Changes to the child Job after this initial snapshot will not be propagated up to the parent Job until it is started from scratch again. All Job configuration like email settings comes only from the parent Job.

Some implementation details:

  • This eliminates the need for a Job to be either scheduled or dependent. We shouldn't need any special handling in the dependent job at all.
  • The UI should support expanding a child Job in the graph. You double-click on a child Job, it expands its nodes, you can double-click on one of those to collapse it again. I do not think this needs to expand to the table, since that is primarily used for changing things and we should not support changing the child inside the parent's view.
  • A failure inside the child Job can be resumed like anything else. Nothing has actually changed about the DAG other than the ability to expand/collapse the view of child Jobs.
  • We should not allow the user to expand the child Job and connect specific Tasks into the parent DAG. The child Job only has one entry point and one exit point.

I think this is a pretty simple approach and avoids a lot of intricate coupling between parents and children. The hard work will probably be mostly in the UI and in tweaking reporting (probably want to expand the child job if there's a failure but keep it condensed if everything worked). Thoughts?

from dagobah.

rclough avatar rclough commented on June 19, 2024

Sounds like a fair idea for implementation to me.

General thoughts

  • UI-wise, We should decide on some distinguishing feature of Job references, ie have them be circles instead of squares, or maybe outline in a different color.
  • Expansion of the job sounds awesome. Future idea of being able to visually see which expanded jobs all belong to a job reference would be nice, but is not important.
  • Auto expansion on failure is a great idea.
  • Logging: how do we provide access to logs in the UI for inner-tasks, especially for when a task of a job reference fails?

Implementation

When it comes to the overall runtime, does this sound correct?

  1. Start Job is pressed on "Job 1"
  2. Job 1 creates the DAG, generating snapshots of each job at that time.
  3. Job runs to completion (expansion on failure, bot otherwise follows normal job workflow)
  4. Snapshots are destroyed (so if by next run, latest version of the job may be used)

Some edge cases: Job with job-references fails, and then is scheduled to run again before failure is resolved. How is this case currently handled? If it is to run again, do we re-generate the snapshots?

Cycle Detection

I was thinking about cycle detection, and realized that blind inheritance of job references could just lead to us adding tasks ad-infinitum, for example:

  • Job 1 references Job 2
  • Job 2 references Job 1
  • process goes:
    • Job one adds its tasks, and expands Job 2
    • In the process of expanding Job 2, Job 1 needs to be expanded
    • Continue until you run out of memory or something

To mitigate this, we need a cycle detection algo that does not require the graph to be "pre built".

My idea is you could do a simple cycle detection algorithm skipping anything that's not a job reference (Think of this as cycle detection on a meta-graph comprised of just job references).

Basically we modify something like DFS or whatever and the "get next node" or "get child nodes" basically just works by getting the nearest job references (or null, if none). I'd need to flesh this out a bit more but I think it can be done without too much headache with existing algos. It could help if I came up with some examples too, to visualize it.

from dagobah.

thieman avatar thieman commented on June 19, 2024

The logging question is legitimate but we can deal with it when we get there. The implementation you laid out here lines up with what I was thinking. The edge case should follow the current behavior for when a Job hits its scheduled start time:

If Job is waiting or failed => start Job from scratch. This would cause a new snapshot.
If Job is running => Do nothing

I've got an idea for cycle detection and will try to flush it out and post back here soon. Might be similar to what you're thinking of.

from dagobah.

thieman avatar thieman commented on June 19, 2024

Okay, this is how I think the new cycle detection could work.

You start with a Job which is a DAG composed of Tasks and other Jobs. You also begin with an empty set which I will call the context. The context is the set of all Jobs that you are "inside" at any given point. Trying to expand a Job that is already in the context indicates that you are in a cycle.

  1. Add the current Job's ID to the context. When you start out, this means the parent Job.
  2. Perform a topological sort on the current DAG without expanding any child Jobs. This verifies that the current Job is acyclic before expansion.
  3. Traverse the nodes in topological order. For each node that is a Job, do the following:
    1. If this Job is in the current context, fail.
    2. Run this algorithm again on this node, passing the current context.

I'll try to illustrate the beginning of this algorithm on a simple Job with two child Jobs here.

We have three Jobs. Job 1 will be the parent. Job 1 calls Jobs 2 and 3, Job 2 calls Job 3. These DAGs are all fine. Let's prove it.

We start with Job 1. First, we add Job 1 to the context. This means that anything that tries to call Job 1 inside of Job 1 will result in a cycle. We then topologically sort Job 1 to ensure that there are no cycles in the nodes themselves before expansion. This checks out, so we then start traversing the nodes. When we hit Job 2, we expand it and call the validation algorithm on the expanded nodes with the starting context of [Job 1].

We now run the algorithm again using the expansion of Job 2. We add Job 2 to the context, giving us [Job 1, Job 2]. Any calls to these Jobs inside this block will result in a cycle. We topologically sort without expanding Job 3 to convince ourselves that the nodes themselves are kosher. We then traverse the nodes again until we get to Job 3.

We repeat the same process here, and our context ends up being [Job 1, Job 2, Job 3]. If any of the three jobs are called in here, we have a cycle. We then sort and traverse. Since Job 3 calls no other Jobs, we end up exiting successfully and crawl back up the stack of the original DAG.

The next step is to expand Job 3 from the outermost scope. Just like when we called Job 2 from the outermost scope, the starting context we pass to that call of the algorithm is just [Job 1].

Did any of that make sense? I always feel I have to admit a degree of defeat when I am forced to take pictures to explain a point. 😄

from dagobah.

thieman avatar thieman commented on June 19, 2024

Oh and of course you can come up with your own approach as long as it meets the same requirements, this is just a suggestion. The one thing that might be missing from your idea (as I was thinking of it, anyway) might be detection of non-Job-related cycles inside of child Jobs. The parent Job cannot trust that the child Jobs are acyclic by themselves and must verify it.

from dagobah.

rclough avatar rclough commented on June 19, 2024

I think this is pretty much the way I was thinking of doing it, but verbalized a bit better (and visualized! 👍 ). I don't think pictures mean defeat, theyre a great way to visualize the problem and see any potential issues 😄

I didn't have non-job related cycles because I assumed that there was some sort of cycle detection in place that would prevent a cyclic job from being saved, in which case it would be safe to assume child jobs are acyclic. But if that's not the case, adding a cycle check on child jobs is a trivial addition.

The only thing that's not quite covered in this is when you have to explore the next branch, ie how do you know when to remove a job from the context because you're now exploring a different path of the graph? I assume there's some level of magic you can do with the topological sort to figure this out?

from dagobah.

thieman avatar thieman commented on June 19, 2024

It is currently possible to save a Job with a cycle, but a cyclic Job will refuse to start. I originally did this so you wouldn't get errors while performing a bunch of add/delete edge operations to the DAG. This could be reconsidered.

Popping the context isn't actually necessary, you rely on the call stack to do this for you. The calls would look like this, where nodes contains all the nodes and their edges and context is a set of job IDs.

def verify_dag(nodes, context=set()):
    ...

# calls to this function would look like this in my example above
verify_dag(job_1)
    verify_dag(job_2, {job_1_id})
        verify_dag(job_3, {job_1_id, job_2_id}) # when this returns, go all the way back to outer scope
    verify_dag(job_3, {job_1_id})

The context is just keeping track of all of the parent Jobs of the DAG at any given point. It might be more clear to think of these non-sequentially. The calls above are really just doing this:

  1. Job 1 starts validation.
  2. Job 1 needs to call Job 2 and Job 3 at some point. When it does, those need to know that Job 1 was their parent.
  3. When Job 2 gets called, it calls its own version of Job 3. This version of Job 3 needs to know that Jobs 1 and 2 are its parents.

from dagobah.

rclough avatar rclough commented on June 19, 2024

Ah ha. I see now. Good shit.

As long a job with cycles doesn't start, thats the most important part, but being able to save a job with cycles may be misleading.

from dagobah.

thieman avatar thieman commented on June 19, 2024

I agree that this becomes more confusing with Jobs-in-Jobs. Job 1 refusing to start because it depends on Job 2 which is in a bad state is shitty. I'll add an issue for it.

from dagobah.

rclough avatar rclough commented on June 19, 2024

I'm going to start working on a branch for this. Is there any other prerequisite work that should be done before I start developing this?

from dagobah.

thieman avatar thieman commented on June 19, 2024

I merged the fix for #77, so you should be good to go here.

from dagobah.

thieman avatar thieman commented on June 19, 2024

Another note: feel free to submit small PRs for small components of this system so we can merge them as they come in. I'm fine with having not-quite-used-yet code in master if it helps us have shorter PR feedback cycles.

from dagobah.

rclough avatar rclough commented on June 19, 2024

Dope, will do. I think I'm gonna work on having it available in the backend first, and then tackle wiring it up in the front end, so the back end components could be a separate, if not a few different PRs

from dagobah.

thieman avatar thieman commented on June 19, 2024

Sounds like a plan.

from dagobah.

rclough avatar rclough commented on June 19, 2024

My first step for adding jobs-in-jobs is going to be adjusting/defining the data model, so I was wondering what you thought might be the best way to support a "job" as a "task". I see a few options:

  1. Take the existing data model, and use the existing fields somehow to signify a Job reference. For example, making the "command" prefixed with something to signify that its referencing a job.
  2. Add a column with a job_id that is default null (or non existent in case of mongo). When it is set, this acts both as a signifier that this task is a job, and gives you the ID of that job.
  3. Have a completely new object that extends the Task class

Other considerations: setting start and end times as well as things like task status for tasks that are actually a job will require a special logic, which may warrant a separate class? I'm not sure if the task handles this logic or if the job does it by polling, which might mean a separate class isn't necessary.

Thoughts?

hmmmm

from dagobah.

rclough avatar rclough commented on June 19, 2024

also things like stderr and std out etc

from dagobah.

thieman avatar thieman commented on June 19, 2024

I like adding a job_id column to signify that a Task runs a sub-Job. I also agree that you will probably end up wanting to sub-class the Task class to hold all the logic specific to this case.

from dagobah.

rclough avatar rclough commented on June 19, 2024

I think I will make it a job_name column, because although jobs have IDs, they are not readily available by the dagobah object, as they are stored by name.

from dagobah.

rclough avatar rclough commented on June 19, 2024

so #160 is in, and I'm starting on "expanding" jobs into one giant graph (when the "snapshot" is initialized).

I'm thinking I'll need to add a predecessors(node) to py-dag in order to implement this. This is because, when we expand a JobTask, we need all of the JobTask's ind_nodes to have edges coming from the JobTask's predecessors ("parents", if this were a tree).

Also, I'd want to add another downstream function, downstreams_final(node) to get all of the nodes which ONLY don't have further nodes downstream (I assume downstream_all gets you all the nodes in between the current and the "final" downstreams as well)

I'm working on code now and it's actually fairly complex (like how does one merge the edge and node dictionaries properly? for example) so any thoughts are welcome.

from dagobah.

rclough avatar rclough commented on June 19, 2024

let me revise that downstreams_final idea.

What's actually needed is all nodes in the DAG that have no downstreams. Not sure what a good name for this is, or if theres a canonical name.

from dagobah.

rclough avatar rclough commented on June 19, 2024

Here's my first pass at coding it. This is untested, and will not run because I havent implemented some of the aforementioned functions (I decided to call that last one dag_ends() for the "ends" of a DAG). But you get the gist:

    def expand(self, dag=None):
        """
        This method takes some DAG object, and expands all the underlying
        tasks recursively:
            * Starting with ind_nodes, traverse the graph
            * If the node is expandable:
                * Call expand on the JobTask, save expanded version "Expanded"
                * Connect all parents of this JobTask to the ind_nodes of
                  "Expanded"
                * Get a list of all downstream nodes in "Expanded" that don't
                  have a downstream node, and add edges to all downstream
                  nodes of the JobTask.
            * Delete JobTask from graph
            * Continue traversing from the downstreams of the deleted JobTask
        """
        dag = dag.deepcopy()
        traversal_queue = dag.ind_nodes()
        expanded = []
        while traversal_queue:
            task = traversal_queue.pop()
            if not self.implements_expandable(task) or task in expanded:
                continue

            expanded.append(task)
            expanded_dag = dag.tasks[task].expand()

            # Empty Job expansion
            if not expanded_dag:
                pred = dag.predecessors(task)
                children = dag.downstream(task)
                [dag.add_edge(p, c) for p in pred for c in children]
                dag.delete_task(task)
                continue

            # Prepend all expanded task names with "<jobname>_" to prevent
            # task name conflicts
            [expanded_dag.edit_task(t, {"name": "{0}_{1}".format(task, t)})
             for t in expanded_dag.tasks]

            # Merge node and edge dictionaries (creating 2 unconnected DAGs,
            # in one graph)
            final_dict = default_dict(list)
            for key, value in dag.graph:
                final_dict[key].append[value]
            for key, value in expanded_dag.graph:
                final_dict[key].expand(value)
            dag.graph = final_dict

            # Add edges between predecessors and start nodes
            predecessors = dag.predecessors(task)
            start_nodes = expanded_dag.ind_nodes()
            [dag.add_edge(p, s) for p in predecessors for s in start_nodes]

            # Add edges between the final downstreams and the child nodes
            final_tasks = expanded_dag.dag_ends()
            children = dag.downstream(task)
            [dag.add_edge(f, c) for f in final_tasks for c in children]

            # add children to traversal queue and delete old reference
            traversal_queue.extend(children)
            dag.delete_node(task)

        return dag

EDIT: updated with some slightly cleaner code
EDIT 2: corrections

from dagobah.

rclough avatar rclough commented on June 19, 2024

Also this is probably obvious, but I did a breadth-first traversal, not sure if there's any advantage over depth first, but its just hat my brain defaulted to

from dagobah.

rclough avatar rclough commented on June 19, 2024

maybe it should be called end_nodes() which goes nicely with ind_nodes() which are basically the opposite concept (nodes with no incoming edges vs nodes with no outgoing edges)

from dagobah.

thieman avatar thieman commented on June 19, 2024

I'm thinking I'll need to add a predecessors(node) to py-dag in order to implement this. This is because, when we expand a JobTask, we need all of the JobTask's ind_nodes to have edges coming from the JobTask's predecessors ("parents", if this were a tree).

This makes sense to me, if I remember the py-dag structure this should be pretty simple and hopefully O(n).

What's actually needed is all nodes in the DAG that have no downstreams. Not sure what a good name for this is, or if theres a canonical name.

I believe the standard term here is "leaf node." So maybe all_leaves?

Would review the actual code right now but my brain isn't having it, will take a stab at it tomorrow.

from dagobah.

rclough avatar rclough commented on June 19, 2024

yeah, since the edges are stored in a map of Edge->list of edges, it can be done in O(n) simply by traversing the graph keys once

from dagobah.

thieman avatar thieman commented on June 19, 2024

Thoughts after looking at the code:

  1. Does the "prepend expanded tasks with _" thing allow us to have multiple instances of the same JobTask within the Job? Or do we need to add some sort of counter component to allow that?
  2. Does edit_task do any database access or is it just an in-place object mutation?
  3. I don't really understand what the "merge node and edge dictionaries" step is doing or why we need it.
  4. It seems like this could traverse over the same JobTask multiple times if it has many parents, what is accounting for that?

from dagobah.

rclough avatar rclough commented on June 19, 2024
  1. By restriction of dagobah, we cannot have 2 tasks with the same name. So 2 of the same jobtasks would have to have different TASK names. So by prepending the child tasks with the task names, we ensure no name conflicts (mostly)
  2. Good question, I'll need to look into it, but this should be a copy of the dag in any case. good eye though. I'm about to get off work so I'll check soon.
  3. This is adding all of the tasks from the underlying jobtask's job to the current job
  4. I think I need a bit more clarification on this?

from dagobah.

rclough avatar rclough commented on June 19, 2024

RE # 4, I think I get it. The expanded_tasks list prevents re-expanding a task once it has been expanded once

from dagobah.

rclough avatar rclough commented on June 19, 2024

so I realized last week that both the underlying DAG (the job.graph, in practice) and the task list need to be synced if we want to allow in place task expansion, and basically in this implementation, they are combined. I'm working on an actual PR now that will manage both basically, by adding a "snapshot_tasks" list just like we do with a snapshot of the DAG

from dagobah.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.