Giter VIP home page Giter VIP logo

fastflow's Introduction

Fastflow——基于golang的轻量级工作流框架

Go Report Card codecov

Fastflow 是什么?用一句话来定义它:一个 基于golang协程支持水平扩容的分布式高性能工作流框架。 它具有以下特点:

  • 易用性:工作流模型基于 DAG 来定义,同时还提供开箱即用的 API,你可以随时通过 API 创建、运行、暂停工作流等,在开发新的原子能力时还提供了开箱即用的分布式锁功能
  • 高性能:得益于 golang 的协程 与 channel 技术,fastflow 可以在单实例上并行执行数百、数千乃至数万个任务
  • 可观测性fastflow 基于 Prometheus 的 metrics 暴露了当前实例上的任务执行信息,比如并发任务数、任务分发时间等。
  • 可伸缩性:支持水平伸缩,以克服海量任务带来的单点瓶颈,同时通过选举 Leader 节点来保障各个节点的负载均衡
  • 可扩展性fastflow 准备了部分开箱即用的任务操作,比如 http请求、执行脚本等,同时你也可以自行定义新的节点动作,同时你可以根据上下文来决定是否跳过节点(skip)
  • 轻量:它仅仅是一个基础框架,而不是一个完整的产品,这意味着你可以将其很低成本融入到遗留项目而无需部署、依赖另一个项目,这既是它的优点也是缺点——当你真的需要一个开箱即用的产品时(比如 airflow),你仍然需要少量的代码开发才能使用

为什么要开发 Fastflow

组内有很多项目都涉及复杂的任务流场景,比如离线任务,集群上下架,容器迁移等,这些场景都有几个共同的特点:

  1. 流程耗时且步骤复杂,比如创建一个 k8s 集群,需要几十步操作,其中包含脚本执行、接口调用等,且相互存在依赖关系。
  2. 任务量巨大,比如容器平台每天都会有几十万的离线任务需要调度执行、再比如我们管理数百个K8S集群,几乎每天会有集群需要上下节点、迁移容器等。

我们尝试过各种解法:

  • 硬编码实现:虽然工作量较小,但是只能满足某个场景下的特定工作流,没有可复用性。
  • airflow:我们最开始的离线任务引擎就是基于这个来实现的,不得不承认它的功能很全,也很方便,但是存在几个问题
    • 由 python 编写的,我们希望团队维护的项目能够统一语言,更有助于提升工作效率,虽然对一个有经验的程序员来说多语言并不是问题,但是频繁地在多个语言间来回切换其实是不利于高效工作的
    • airflow 的任务执行是以 进程 来运行的,虽然有更好的隔离性,但是显然因此而牺牲了性能和并发度。
  • 遗留的工作流平台:你可能想象不到一个内部一个经历了数年线上考证的运维用工作流平台,会脆弱到承受不了上百工作流的并发,第一次压测就直接让他们的服务瘫痪,进而影响到其他业务的运维任务。据团队反馈称是因为我们的工作流组成太复杂,一个流包含数十个任务节点才导致了这次意外的服务过载,随后半年这个团队重写了一个新的v2版本。

当然 Github 上也还有其他的任务流引擎,我们也都评估过,无法满足需求。比如 kubeflow 是基于 Pod 执行任务的,比起 进程 更为重量,还有一些项目,要么就是没有经过海量数据的考验,要么就是没有考虑可伸缩性,面对大量任务的执行无法水平扩容。

Concept

工作流模型

fastflow 的工作流模型基于 DAG(Directed acyclic graph),下图是一个简单的 DAG 示意图:

在这个图中,首先 A 节点所定义的任务会被执行,当 A 执行完毕后,B、C两个节点所定义的任务将同时被触发,而只有 B、C 两个节点都执行成功后,最后的 D 节点才会被触发,这就是 fastflow 的工作流模型。

工作流的要素

fastflow 执行任务的过程会涉及到几个概念:Dag, Task, Action, DagInstance

Dag

描述了一个完整流程,它的每个节点被称为 Task,它定义了各个 Task 的执行顺序和依赖关系,你可以通过编程 or yaml 来定义它

一个编程式定义的DAG

dag := &entity.Dag{
		BaseInfo: entity.BaseInfo{
			ID: "test-dag",
		},
		Name: "test",
		Tasks: []entity.Task{
			{ID: "task1", ActionName: "PrintAction"},
			{ID: "task2", ActionName: "PrintAction", DependOn: []string{"task1"}},
			{ID: "task3", ActionName: "PrintAction", DependOn: []string{"task2"}},
		},
	}

对应的yaml如下:

id: "test-dag"
name: "test"
tasks:
- id: "task1"
  actionName: "PrintAction"
- id: ["task2"]
  actionName: "PrintAction"
  dependOn: ["task1"]
- id: "task3"
  actionName: "PrintAction"
  dependOn: ["task2"]

同时 Dag 可以定义这个工作流所需要的参数,以便于在各个 Task 去消费它:

id: "test-dag"
name: "test"
vars:
  fileName:
    desc: "the file name"
    defaultValue: "file.txt"
  filePath:
    desc: "the file path"
    defaultValue: "/tmp/"
tasks:
- id: "task1"
  actionName: "PrintAction"
  params:
    writeName: "{{fileName}}"
    writePath: "{{filePath}}"

Task

它定义了这个节点的具体工作,比如是要发起一个 http 请求,或是执行一段脚本等,这些不同动作都通过选择不同的 Action 来实现,同时它也可以定义在何种条件下需要跳过 or 阻塞该节点。 下面这段yaml演示了 Task 如何根据某些条件来跳过运行该节点。

id: "test-dag"
name: "test"
vars:
  fileName:
    desc: "the file name"
    defaultValue: "file.txt"
tasks:
- id: "task1"
  actionName: "PrintAction"
  preCheck:
    isIgnoreFiles:
      act: skip #you can set "skip" or "block"
      conditions:
      - source: vars # source could be "vars" or "share-data"
        key: "fileName"
        op: "in"
        values: ["warn.txt", "error.txt"]

Task 的状态有以下几个:

  • init: Task已经初始化完毕,等待执行
  • running: 正在运行中
  • ending: 当执行 Action 的 Run 所定义的内容后,会进入到该状态
  • retrying: 任务重试中
  • failed: 执行失败
  • success: 执行成功
  • blocked: 任务已阻塞,需要人工启动
  • skipped: 任务已跳过

Action

Action 是工作流的核心,定义了该节点将执行什么操作,fastflow携带了一些开箱即用的Action,但是一般你都需要根据具体的业务场景自行编写,它有几个关键属性:

  • Name: Required Action的名称,不可重复,它是与 Task 关联的核心
  • Run: Required 需要执行的动作,fastflow 将确保该动作仅会被执行 一次(ExactlyOnce)
  • RunBefore: Optional 在执行 Run 之前运行,如果有一些前置动作,可以在这里执行,RunBefore 有可能会被执行多次。
  • RunAfter: Optional 在执行 Run 之后运行,一些长时间执行的任务内容建议放在这里,只要 Task 尚未结束,节点发生故障重启时仍然会继续执行这部分内容,
  • RetryBefore:Optional 在重试失败的任务节点,可以提前执行一些清理的动作

自行开发的 Action 在使用前都必须先注册到 fastflow,如下所示:

type PrintParams struct {
  Key string
  Value string
}

type PrintAction struct {
}

// Name define the unique action identity, it will be used by Task
func (a *PrintAction) Name() string {
	return "PrintAction"
}

func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error {
	cinput := params.(*ActionParam)

	fmt.Println("action start: ", time.Now())
	fmt.Println(fmt.Sprintf("params: key[%s] value[%s]", cinput.Key, cinput.Value))
	return nil
}

func (a *PrintAction) ParameterNew() interface{} {
	return &PrintParams{}
}

func main() {
  ...

	// Register action
	fastflow.RegisterAction([]run.Action{
		&PrintAction{},
	})

  ...
}

DagInstance

当你开始运行一个 Dag 后,则会为本次执行生成一个执行记录,它被称为 DagInstance,当它生成以后,会由 Leader 实例将其分发到一个健康的 Worker,再由其解析、执行。

实例类型与Module

首先 fastflow 是一个分布式的框架,意味着你可以部署多个实例来分担负载,而实例被分为两类角色:

  • Leader:此类实例在运行过程中只会存在一个,从 Worker 中进行选举而得出,它负责给 Worker 实例分发任务,也会监听长时间得不到执行的任务将其调度到其他节点等
  • Worker:此类实例会存在复数个,它们负责解析 DAG 工作流并以 协程 执行其中的任务

而不同节点能够承担不同的功能,其背后是不同的 模块 在各司其职,不同节点所运行的模块如下图所示:

NOTE

  • Leader 实例本质上是一个承担了 仲裁者 角色的 Worker,因此它也会分担工作负载。
  • 为了实现更均衡的负载,以及获得更好的可扩展性,fastflow 没有选择加锁竞争的方式来实现工作分发

从上面的图看,Leader 实例会比 Worker 实例多运行一些模块用于执行中仲裁者相关的任务,模块之间的协作关系如下图所示:

其中各个模块的职责如下:

  • Keeper: 每个节点都会运行 负责注册节点到存储中,保持心跳,同时也会周期性尝试竞选 Leader,防止上任 Leader 故障后阻塞系统,这个模块同时也提供了 分布式锁 功能,我们也可以实现不同存储的 Keeper 来满足特定的需求,比如 Etcd or Zookeepper,目前支持的 Keeper 实现只有 Mongo
  • Store: 每个节点都会运行 负责解耦 Worker 对底层存储的依赖,通过这个组件,我们可以实现利用 Mongo, Mysql 等来作为 fastflow 的后端存储,目前仅实现了 Mongo
  • ParserWorker 节点运行 负责监听分发到自己节点的任务,然后将其 DAG 结构重组为一颗 Task 树,并渲染好各个任务节点的输入,接下来通知 Executor 模块开始执行 Task
  • Commander每个节点都会运行 负责封装一些常见的指令,如停止、重试、继续等,下发到节点去运行
  • ExecutorWorker 节点运行 按照 Parser 解析好的 Task 树以 goroutine 运行单个的 Task
  • DispatcherLeader节点才会运行 负责监听等待执行的 DAG,并根据 Worker 的健康状况均匀地分发任务
  • WatchDogLeader节点才会运行 负责监听执行超时的 Task 将其更新为失败,同时也会重新调度那些一直得不到执行的 DagInstance 到其他 Worker

Tips

以上模块的分布机制仅仅只是 fastflow 的默认实现,你也可以自行决定实例运行的模块,比如在 Leader 上不再运行 Worker 的实例,让其专注于任务调度。

GetStart

更多例子请参考项目下面的 examples 目录

准备一个Mongo实例

如果已经你已经有了可测试的实例,可以直接替换为你的实例,如果没有的话,可以使用Docker容器在本地跑一个,指令如下:

docker run -d --rm --name fastflow-mongo -p 27017:27017 \
	-e MONGO_INITDB_ROOT_USERNAME=root \
	-e MONGO_INITDB_ROOT_PASSWORD=pwd \
	mongo

运行 fastflow

运行以下示例

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/shiningrush/fastflow"
	mongoKeeper "github.com/shiningrush/fastflow/keeper/mongo"
	"github.com/shiningrush/fastflow/pkg/entity/run"
	"github.com/shiningrush/fastflow/pkg/mod"
	mongoStore "github.com/shiningrush/fastflow/store/mongo"
)

type PrintAction struct {
}

// Name define the unique action identity, it will be used by Task
func (a *PrintAction) Name() string {
	return "PrintAction"
}
func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error {
	fmt.Println("action start: ", time.Now())
	return nil
}

func main() {
	// Register action
	fastflow.RegisterAction([]run.Action{
		&PrintAction{},
	})

	// init keeper, it used to e
	keeper := mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
		Key:      "worker-1",
    // if your mongo does not set user/pwd, youshould remove it
		ConnStr:  "mongodb://root:[email protected]:27017/fastflow?authSource=admin",
		Database: "mongo-demo",
		Prefix:   "test",
	})
	if err := keeper.Init(); err != nil {
		log.Fatal(fmt.Errorf("init keeper failed: %w", err))
	}

	// init store
	st := mongoStore.NewStore(&mongoStore.StoreOption{
    // if your mongo does not set user/pwd, youshould remove it
		ConnStr:  "mongodb://root:[email protected]:27017/fastflow?authSource=admin",
		Database: "mongo-demo",
		Prefix:   "test",
	})
	if err := st.Init(); err != nil {
		log.Fatal(fmt.Errorf("init store failed: %w", err))
	}

	go createDagAndInstance()

	// start fastflow
	if err := fastflow.Start(&fastflow.InitialOption{
		Keeper: keeper,
		Store:  st,
		// use yaml to define dag
		ReadDagFromDir: "./",
	}); err != nil {
		panic(fmt.Sprintf("init fastflow failed: %s", err))
	}
}

func createDagAndInstance() {
	// wait fast start completed
	time.Sleep(time.Second)

	// run some dag instance
	for i := 0; i < 10; i++ {
		_, err := mod.GetCommander().RunDag("test-dag", nil)
		if err != nil {
			log.Fatal(err)
		}
		time.Sleep(time.Second * 10)
	}
}

程序运行目录下的test-dag.yaml

id: "test-dag"
name: "test"
tasks:
- id: "task1"
  actionName: "PrintAction"
- id: "task2"
  actionName: "PrintAction"
  dependOn: ["task1"]
- id: "task3"
  actionName: "PrintAction"
  dependOn: ["task2"]

Basic

Action内的通信

Action的通信主要指 Action.RunBeforeAction.RunAction.RunAfter 之间的信息共享,目前有如下方式:

  • 基于 context: 由于同一个 Task 在执行期间共享同一个 context,因此你可以通过它来传递信息,但是注意这样弊端,当节点重启时,如果 Task 尚未执行完毕,那么这部分内容会丢失。
func (a *DemoAction) Run(ctx run.ExecuteContext, params interface{}) error {
	ctx.WithValue("key", "value")
	return nil
}

func (a *DemoAction) RunAfter(ctx run.ExecuteContext, params interface{}) error {
	val := ctx.Context().Value("key")
	return nil
}
  • 使用 ShareData: 如果不想因为故障or升级而丢失你的更改,可以使用 ShareData 来进行通信,ShareData 是整个 在整个 DagInstance 的生命周期都会共享的一块数据空间,每次对它的写入都会通过 Store 组件持久化,以确保数据不会丢失,用法如下:
func (a *DemoAction) Run(ctx run.ExecuteContext, params interface{}) error {
	ctx.ShareData().Set("key", "value")
	return nil
}

func (a *DemoAction) RunAfter(ctx run.ExecuteContext, params interface{}) error {
	val := ctx.ShareData().Get("key")
	return nil
}
  • 共享内存: 由于任务都是基于 goroutine 来执行,因此彼此的内存都是可访问的,意味着你完全可以基于内存共享来实现通信。不过不太推荐你使用这样的方式,除了内存的易失性之外,共享内存在可读性与线程同步等都需要作额外的考虑。
var (
	shareMap = map[string]interface{}
)

func (a *DemoAction) Run(ctx run.ExecuteContext, params interface{}) error {
	shareMap["key"] =  "value"
	return nil
}

func (a *DemoAction) RunAfter(ctx run.ExecuteContext, params interface{}) error {
	val := shareMap["key"]
	return nil
}

Task与Task之间的通信

如果我们想要在不同Task之间传递数据我们有哪些方式呢

  • ShareData: 同Action内的方式相同
func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error {
	ctx.ShareData().Set("key", "value")
	return nil
}

func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error {
	val := ctx.ShareData().Get("key")
	return nil
}
  • 共享内存: 同Action内的方式相同
var (
shareMap = map[string]interface{}
)

func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error {
shareMap["key"] =  "value"
return nil
}

func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error {
val := shareMap["key"]
return nil
}

任务日志

fastflow 还提供了 Task 粒度的日志记录,这些日志都会通过 Store 组件持久化,用法如下:

func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error {
	ctx.Trace("some message")
	return nil
}

使用Dag变量

上面的文章中提到,我们可以在 Dag 中定义一些变量,在创建工作流时可以对这些变量进行赋值,比如以下的Dag,定义了一个名为 `fileName 的变量

id: "test-dag"
name: "test"
vars:
  fileName:
    desc: "the file name"
    defaultValue: "file.txt"

随后我们可以使用 Commander 组件来启动一个具体的工作流:

	mod.GetCommander().RunDag("test-id", map[string]string{
		"fileName": "demo.txt",
	})

这样本次启动的工作流的变量则被赋值为 demo.txt,接下来我们有两种方式去消费它

  1. 带参数的Action
id: "test-dag"
name: "test"
vars:
  fileName:
    desc: "the file name"
    defaultValue: "file.txt"
tasks:
- id: "task1"
  action: "PrintAction"
  params:
    # using {{var}} to consume dag's variable 
    fileName: "{{fileName}}"

PrintAction.go:

type PrintParams struct {
  FileName string `json:"fileName"`
}

type PrintAction struct {
}

// Name define the unique action identity, it will be used by Task
func (a *PrintAction) Name() string {
	return "PrintAction"
}

func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error {
	cinput := params.(*ActionParam)

	fmt.Println(fmt.Sprintf("params: file[%s]", cinput.FileName, cinput.Value))
	return nil
}

func (a *PrintAction) ParameterNew() interface{} {
	return &PrintParams{}
}
  1. 编程式读取 fastflow 也提供了相关函数来获取 Dag 变量
func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error {
  // get variable by name
	ctx.GetVar("fileName")

  // iterate variables
  ctx.IterateVars(func(key, val string) (stop bool) {
		...
	})
	return nil
}

分布式锁

如前所述,你可以在直接使用 Keeper 模块提供的分布式锁,如下所示:

...
mod.GetKeeper().NewMutex("mutex key").Lock(ctx.Context(),
		mod.LockTTL(time.Second),
		mod.Reentrant("worker-key1"))
...

其中:

  • LockTTL 表示你持有该锁的TTL,到期之后会自动释放,默认 30s
  • Reentrant 用于需要实现可重入的分布式锁的场景,作为持有场景的标识,默认为空,表示该锁不可重入

fastflow's People

Contributors

philhuan avatar shiningrush avatar yannan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fastflow's Issues

DAG实例启动后,如何block DAG实例从而阻塞未执行的task

image
有这样一种场景,DAG实例运行过程中发现不符合预期或线上有case需要暂停流程、封禁动作的执行,目前来看貌似不支持此类命令,希望有能够同时阻塞所有dag 实例的运行的命令和阻塞单个实例运行的命令,通过continue或者放开全局锁来解除封禁操作~
辛苦看下当前是否有能够支持该场景的函数/建议如何自行实现~

question: DAG 任务支持取消与回滚吗?

如题,一个 DAG 内的任务,支持任务支持取消与回滚吗?
例如 DAG 执行到某个中间步骤,此时前端调用取消任务的 API,是否支持将正在执行中(Running)的任务取消,并回滚已经执行完成的(success)任务?

能支持流式执行任务吗

在平常写shell脚本过程中,管道通常是读取一行,然后管道符后面的程序就开始用这一行作为输入开始执行了

看了下readme,好像不支持这种流式执行的任务?

请问Task的父亲依赖节点DependOn的最大数量是多少?

RT,测试了某个dag中某个Task的DependOn的长度为10000,即前向依赖于10000个任务节点,每个任务节点的Action是简单的print,但是执行到该节点(timeout设置为3600)时就卡住了,该task的状态一直维持为init。

如何在Action的Run方法体内获取当前的Task Instance ID的元信息呢?

假设当前存在某种定制的跳过场景,外部想控制某个task进行跳过

设置sharedata中要跳过的taskinstance的skip字段为true,理论上在action的runafter方法体内轮询这个字段即可。但我应该如何在逻辑中确认当前正在运行的action属于具体的哪一个taskinstance呢?似乎并没有找到通过run.ExecuteContext获取当前运行时taskinstance信息的接口啊

对于示例中的preCheck

看到示例中的是这样写的

id: "test-dag"
name: "test"
vars:
  fileName:
    desc: "the file name"
    defaultValue: "file.txt"
tasks:
- id: "task1"
  actionName: "PrintAction"
  preCheck:
  - act: skip #you can set "skip" or "block"
    conditions:
    - source: vars # source could be "vars" or "share-data"
      key: "fileName"
      op: "in"
      values: ["warn.txt", "error.txt"]

preCheck应该是一个列表,但实际运行中发现,报错,查看代码得知,preCheck定义是一个PreChecks map[string]*Check对象,修改成

    preCheck:
      act1:
        act: block
        conditions:
          - source: "vars"
            key: "fileName"
            op: "in"
            values: ["file.txt", "error.txt"]

如果是这样的话,不知道原意设计是有多个preCheck要处理,还是只能处理一个act的preCheck,然后再conditions里面设置多个条件判断呢?

按照我的理解,preCheck的定义应该是一个[]Check,能够根据多个条件来判断该节点是应该阻塞还是应该跳过

再次咨询,action not found问题

情况是这样的,fastflow部署了一共2台机器。运行的时候,经常报action not found
报这个错误的代码在这里:
execturor.go
func (e *DefExecutor) runAction(taskIns *entity.TaskInstance) error {
act := ActionMap[taskIns.ActionName]
if act == nil {
return fmt.Errorf("action not found: %s", taskIns.ActionName)
}

再翻看源代码,ActionMap 是定义的一个全局变量。
var (
ActionMap = map[string]run.Action{}

defExc Executor
defStore Store
defKeeper Keeper
defParser Parser
defCommander Commander
)

Action 的注册,是将 Action 放到这个 Map 中,相当于数据是存储在‹内存›中,并没有持久化。
在程序中,我是这么注册Action 的。

// 注册 action
fastflow.RegisterAction(actions)

我看到官方文档上是这样描述:
当你开始运行一个 Dag 后,则会为本次执行生成一个执行记录,它被称为 DagInstance,当它生成以后,会由 Leader 实例将其分发到一个健康的 Worker,再由其解析、执行。

假设fastflow.RegisterAction(actions)在2台机器中的A执行,那么是不有可能由另一台机器去执行,是不是因为这个造成的报action not found呢?

怎么获取DAG里的每个task的执行状态?

尝试获取Dag里task的状态,

dag, err := mod.GetStore().GetDag("test-dag")

if err != nil {
	panic(err)
}

for _, task := range dag.Tasks{
	println("============== ", task.ActionName, task.GetStatus())
}

但是看实现里 task.GetStatus()返回的是空字符串

  // GetStatus
  func (t *Task) GetStatus() TaskInstanceStatus {
      return ""
  }

看源码里有TaskInstance可以获取状态

  // GetStatus
  func (t *TaskInstance) GetStatus() TaskInstanceStatus {
      return t.Status
  }

这个TaskInstance怎么能获取到进而再获取其执行状态?

创建task有非常大的延迟

`if err := mod.GetStore().CreateDagIns(dagInstance); err != nil {
return nil, err
}

dagIns, err := mod.GetStore().GetDagInstance(dagInstance.ID)
if err != nil {
	return nil, err
}

dagIns.ShareData.Set("dagInstanceID", dagInstance.ID)
if err := mod.GetStore().UpdateDagIns(dagIns); err != nil {
	return nil, err
}
dagIns.ShareData.Set("jobID", strconv.Itoa(jobID))
if err := mod.GetStore().UpdateDagIns(dagIns); err != nil {
	return nil, err
}`

在执行上述create的操作后,我期待在后面通过一个for循环同步tasks的状态,理论上我最终通过listTaskInstance获取到的task数量应该和dag中声明的task数量相同,但这里存在非常大的延迟,for循环等待时间可能要数十秒或一致卡死,显示我目前db中不存在和当前dagInstance相关联的task,当我查看db时也是如此,dagInstance存在,而tasks却不存在,我想知道这是什么问题呢?有没有相关的排查思路?
for { time.Sleep(500 * time.Millisecond) tasks, err = mod.GetStore().ListTaskInstance(&mod.ListTaskInstanceInput{ DagInsID: dagInstance.ID, }) if err != nil { fmt.Printf("total_task_length: %d, taskcount: %d\n, err: %s", taskCount, len(tasks), err.Error()) continue } else if len(tasks) != taskCount { fmt.Printf("total_task_length: %d, taskcount: %d\n", taskCount, len(tasks)) continue } break }

而且在运行过程中出现过非常诡异的现象,dagInstance理论上会从init状态变为scheduled,但是流程中在没有通过外部调用的情况下,dagInstance又从scheduled变回了init状态,请问这是为什么呢?通过mongo连续两次查询相同的dagInstance状态可以观察此现象:
image

Event 可以使用了吗

代码里看到有相关的包,但是貌似还没有相关事件触发,是还未实装么。

const (
	KeyDagInstanceUpdated = "DagInstanceUpdated"
	KeyDagInstancePatched = "DagInstancePatched"

	KeyTaskCompleted = "TaskCompleted"
	KeyTaskBegin     = "TaskBegin"

	KeyLeaderChanged                = "LeaderChanged"
	KeyDispatchInitDagInsCompleted  = "DispatchInitDagInsCompleted"
	KeyParseScheduleDagInsCompleted = "ParseScheduleDagInsCompleted"
)

如何使处理任务的worker实例和执行RunDag的实例相同,即本机执行任务

假设有两个副本,这两个副本都有概率接收到创建dag实例(RunDag)的请求, 希望创建实例后在本地执行、处理请求,请问如何配置?
如 A副本创建了一个dag_ins, 这个实例流程任务都在A副本上运行。

辛苦帮忙看看,目前创建实例的api执行后,会在本地初始化一个管道,用于和流程中的init任务交互,实际执行节点和api节点不一致的情况下会导致通信失败~

内置的http请求操作怎么使用

可扩展性:fastflow 准备了部分开箱即用的任务操作,比如 http请求、执行脚本等,同时你也可以自行定义新的节点动作,同时你可以根据上下文来决定是否跳过节点(skip)

这是readme中描述的,但是看后面也没有介绍内置的http请求应该如何使用

对于DAG中的cron字段

我看到DAG中有一个cron字段,但源码里并未具体实现关于定时任务的相关处理,这部分是在外部处理吗?还是在内部处理,但是开源版本并未实现该功能,如果我想要自己在内部实现的话,能提供一个大概的思路吗?

我的思路是这样:在派发任务的Dispatcher组件中,去判断是否带有cron参数,如果带有cron参数,就启用goroutine并使用cron库来定时去派发任务到节点上去执行。

我是刚学了go语言,想做一个小的demo来熟练go在项目上的应用,对于上面的思路可能有思考的欠缺,如果有可能,希望能够得到你的指点

在RunBefore中如果有错误,则任务阻塞

我试图在RunBefore函数中获取TaskInstance相关参数,但返回结果taskIns1为空指针,然后程序就会卡在fmt.Println("taskIns1.2", taskIns1.TaskID),没有显示的报错,附完整代码

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/shiningrush/fastflow"
	mongoKeeper "github.com/shiningrush/fastflow/keeper/mongo"
	"github.com/shiningrush/fastflow/pkg/entity"
	"github.com/shiningrush/fastflow/pkg/entity/run"
	"github.com/shiningrush/fastflow/pkg/mod"
	mongoStore "github.com/shiningrush/fastflow/store/mongo"
)

type PrintAction struct {
}

// Name define the unique action identity, it will be used by Task
func (a *PrintAction) Name() string {
	return "PrintAction"
}

func (a *PrintAction) RunBefore(ctx run.ExecuteContext, params interface{}) error {
	fmt.Println("-------- Run action before")
	ctx1 := ctx.Context()
	taskIns1, _ := ctx1.Value("running-task").(*entity.TaskInstance)
	fmt.Println("taskIns1.1", taskIns1)
	fmt.Println("taskIns1.2", taskIns1.TaskID)
	return nil
}

func (a *PrintAction) RunAfter(ctx run.ExecuteContext, params interface{}) error {
	fmt.Println("Run action after")
	return nil
}

func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error {
	fmt.Println("Run action start: ", time.Now())
	taskIns, _ := entity.CtxRunningTaskIns(ctx.Context())
	fmt.Println(taskIns.TaskID)

	return nil
}

func main() {
	// Register action
	fastflow.RegisterAction([]run.Action{
		&PrintAction{},
	})

	// init keeper, it used to e
	keeper := mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
		Key: "worker-1",
		// if your mongo does not set user/pwd, you should remove it
		ConnStr:  "mongodb://root:[email protected]:27017/fastflow?authSource=admin",
		Database: "mongo-demo",
		Prefix:   "test",
	})
	if err := keeper.Init(); err != nil {
		log.Fatal(fmt.Errorf("init keeper failed: %w", err))
	}

	// init store
	st := mongoStore.NewStore(&mongoStore.StoreOption{
		// if your mongo does not set user/pwd, you should remove it
		ConnStr:  "mongodb://root:[email protected]:27017/fastflow?authSource=admin",
		Database: "mongo-demo",
		Prefix:   "test",
	})
	if err := st.Init(); err != nil {
		log.Fatal(fmt.Errorf("init store failed: %w", err))
	}

	go createDagAndInstance()

	// start fastflow
	if err := fastflow.Start(&fastflow.InitialOption{
		Keeper: keeper,
		Store:  st,
		// use yaml to define dag
		// 所有的yaml文件都会被执行
		ReadDagFromDir: "./",
	}); err != nil {
		panic(fmt.Sprintf("init fastflow failed: %s", err))
	}
}

func createDagAndInstance() {
	// wait fast start completed
	time.Sleep(time.Second)

	// run some dag instance
	// for i := 0; i < 10; i++ {
	_, err := mod.GetCommander().RunDag("test-dag", nil)
	if err != nil {
		log.Fatal(err)
	}
	// time.Sleep(time.Second * 10)
	// }
}
id: "test-dag"
name: "test"
vars:
  fileName:
    desc: "the file name"
    defaultValue: "file.txt"
tasks:
- id: "task1"
  actionName: "PrintAction"
  preCheck:
    isIgnoreFiles:
      act: skip #you can set "skip" or "block"
      conditions:
      - source: vars # source could be "vars" or "share-data"
        key: "fileName"
        op: "in"
        values: ["warn.txt", "error.txt", "file1.txt"]
- id: "task2"
  actionName: "PrintAction"
  dependOn: ["task1"]
- id: "task3"
  actionName: "PrintAction"
  dependOn: ["task1"]

发现个问题,项目启动的时候register了结构体,并调用fastflow.init,然后在接口中构造dag,调用run方法,但我发现会出现actionName运行2次的情况,或者多次运行之后出现2023/10/09 22:52:18 error: parser get some error%!(EXTRA string=module, string=parser, string=err, *fmt.wrapError=worker do failed: dag instance[482065678661058561] does not found task tree)

使用的是项目启动的时候注册的结构体作为action,没有在每次调用的时候重复注册

如何跳过一些路线的任务

如下图,我想在执行任务B时,如果条件命中了,跳过任务B,此时我希望任务E也能够跳过,这时候任务的线路是A->C->D,有点类似if-else,这种能够实现吗。

image

在ctx中可以取到当前任务信息吗

在 Run(ctx run.ExecuteContext, params interface{}) 的 ctx 中可以取到当前任务相关信息吗

dagId
dagInsId
taskId : 节点ID

或者如何能获取到呢

多机执行时,报action not found:

你好,多机时,会报action not found:。
翻看了源代码,ActionMap 是定义的一个全局变量。
var (
ActionMap = map[string]run.Action{}

defExc       Executor
defStore     Store
defKeeper    Keeper
defParser    Parser
defCommander Commander

)

Action 的注册,是将 Action 放到这个 Map 中,相当于数据是存储在本机内存中。
在程序中,我是这么注册Action 的。

// 注册 action
fastflow.RegisterAction(actions)

那么,如果是多机执行,这个 Dag 被调度到另外一台机器时,另外一台机器的内在中(邓全局的ActionMap)显然就没有相关的 Dag 数据。
目前,在多机情况下,能够稳定复现action not found:。不知道是不是我自己还有别的点没有考虑到。

诚盼解惑。

About Leader and Workers

Let's say if I want to have three workers (included leader), how to run these on a server? Could you give an example? Thanks.

is fastflow support rollback?

in my case, i want to use goflow to create some resources on a public cloud. like creating a cloud vm with public ip.

  1. create a vm using cloud openapi
  2. wait the status of vm is running
  3. create a public ip using cloud openapi
  4. wait the status of public ip is available
  5. bind the public ip with vm
  6. other steps

i want

  1. if I cancel the workflow(like run flow.cancel()), workflow can free the created resources(vm,public ip etc)
  2. if one step failed, workflow can free the created resources(vm,public ip etc)

RFC: support pass unique key to Keeper

Hi, first of all thank you for the library!

In the process of trying to use it, we found that because the worker's unique id is achieved through sonyflake, the worker requires an integer between 0 and 255 to achieve the unique id when it is started

func CheckWorkerKey(key string) (int, error) {

In practice, this restriction would lead to reliance on external storage to generate the unique integer in multi-process situations, so I propose that the global unique key can be passed directly from the caller and the keeper component no longer generates the unique key based on number

There are two advantages to this change

  • the logic of the keeper component is lighter, no need to generate its own unique keys, no need to introduce sonyflake
  • the limit of less than 255 workers is lifted

If you agree to this change, I will provide PR in my free time

thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.