Giter VIP home page Giter VIP logo

durabletask-go's Introduction

Durable Task Framework for Go

Build

The Durable Task Framework is a lightweight, embeddable engine for writing durable, fault-tolerant business logic (orchestrations) as ordinary code. The engine itself is written in Go and intended to be embedded into other Go-based processes. It exposes a gRPC endpoint to support writing durable flows in any language. There are currently SDKs that consume this gRPC endpoint for .NET and Java, with more to come. It's also possible to write orchestrations directly in Go and run them in the local process.

This project is largely a Go clone of the .NET-based Durable Task Framework, which is used by various cloud service teams at Microsoft for building reliable control planes and managing infrastructure. It also takes inspiration from the Go Workflows project, which itself is a Go project that borrows heavily from both the Durable Task Framework and Temporal. The main difference is that the Durable Task engine is designed to be used in sidecar architectures.

The Durable Task engine is also intended to be used as the basis for the Dapr embedded workflow engine.

This project is a work-in-progress and should not be used for production workloads. The public API surface is also not yet stable. The project itself is also in the very early stages and is missing some of the basics, such as contribution guidelines, etc.

Storage providers

This project includes a sqlite storage provider for persisting app state to disk.

// Persists state to a file named test.sqlite3. Use "" for in-memory storage.
options := sqlite.NewSqliteOptions("test.sqlite3")
be := sqlite.NewSqliteBackend(options, backend.DefaultLogger())

Additional storage providers can be created by extending the Backend interface.

Creating the standalone gRPC sidecar

See the main.go file for an example of how to create a standalone gRPC sidecar that embeds the Durable Task engine. In short, you must create an Backend (for storage), an Executor (for executing user code), and host them as a TaskHubWorker.

The following code creates a TaskHub worker with sqlite Backend and a gRPC Executor implementations.

// Use the default logger or provide your own
logger := backend.DefaultLogger()

// Configure the sqlite backend that will store the runtime state
sqliteOptions := sqlite.NewSqliteOptions(sqliteFilePath)
be := sqlite.NewSqliteBackend(sqliteOptions, logger)

// Create a gRPC server that the language SDKs will connect to
grpcServer := grpc.NewServer()
executor := backend.NewGrpcExecutor(grpcServer, be, logger)

// Construct and start the task hub worker object, which polls the backend for new work
orchestrationWorker := backend.NewOrchestrationWorker(be, executor, logger)
activityWorker := backend.NewActivityTaskWorker(be, executor, logger)
taskHubWorker := backend.NewTaskHubWorker(be, orchestrationWorker, activityWorker, logger)
taskHubWorker.Start(context.Background())

// Start listening.
lis, _ := net.Listen("tcp", "localhost:4001")
fmt.Printf("server listening at %v\n", lis.Addr())
grpcServer.Serve(lis)

Note that the Durable Task gRPC service implementation is designed to serve one client at a time, just like with any sidecar architecture. Scale out is achieved by adding new pod replicas that contain both the app process and the sidecar (connected to a common database).

Language SDKs for gRPC

The Durable Task Framework for Go currently supports writing orchestrations in the following languages:

Language/Stack Package Project Home Samples
.NET NuGet GitHub Samples
Java Maven Central GitHub Samples
Python PyPI version GitHub Samples

More language SDKs are planned to be added in the future. In particular, SDKs for Python and JavaScript/TypeScript. Anyone can theoretically create an SDK using a language that supports gRPC. However, there is not yet a guide for how to do this, so developers would need to reference existing SDK code as a reference. Starting with the Java implementation is recommended. The gRPC API is defined here.

Embedded orchestrations

It's also possible to create orchestrations in Go and run them in the local process. The full set of Durable Task features is not yet available as part of the Go SDK, but will be added over time.

You can find code samples in the samples directory.
To run them, get into the folder of each sample and run go run .

Activity sequence example

Activity sequences like the following are the simplest and most common pattern used in the Durable Task Framework.

// ActivitySequenceOrchestrator makes three activity calls in sequence and results the results
// as an array.
func ActivitySequenceOrchestrator(ctx *task.OrchestrationContext) (any, error) {
	var helloTokyo string
	if err := ctx.CallActivity(SayHelloActivity, task.WithActivityInput("Tokyo")).Await(&helloTokyo); err != nil {
		return nil, err
	}
	var helloLondon string
	if err := ctx.CallActivity(SayHelloActivity, task.WithActivityInput("London")).Await(&helloLondon); err != nil {
		return nil, err
	}
	var helloSeattle string
	if err := ctx.CallActivity(SayHelloActivity, task.WithActivityInput("Seattle")).Await(&helloSeattle); err != nil {
		return nil, err
	}
	return []string{helloTokyo, helloLondon, helloSeattle}, nil
}

// SayHelloActivity can be called by an orchestrator function and will return a friendly greeting.
func SayHelloActivity(ctx task.ActivityContext) (any, error) {
	var input string
	if err := ctx.GetInput(&input); err != nil {
		return "", err
	}
	return fmt.Sprintf("Hello, %s!", input), nil
}

You can find the full sample here.

Fan-out / fan-in execution example

The next most common pattern is "fan-out / fan-in" where multiple activities are run in parallel, as shown in the snippet below (note that the GetDevicesToUpdate and UpdateDevice activity definitions are left out of the snippet below for brevity):

// UpdateDevicesOrchestrator is an orchestrator that runs activities in parallel
func UpdateDevicesOrchestrator(ctx *task.OrchestrationContext) (any, error) {
	// Get a dynamic list of devices to perform updates on
	var devices []string
	if err := ctx.CallActivity(GetDevicesToUpdate).Await(&devices); err != nil {
		return nil, err
	}

	// Start a dynamic number of tasks in parallel, not waiting for any to complete (yet)
	tasks := make([]task.Task, len(devices))
	for i, id := range devices {
		tasks[i] = ctx.CallActivity(UpdateDevice, task.WithActivityInput(id))
	}

	// Now that all are started, wait for them to complete and then return the success rate
	successCount := 0
	for _, task := range tasks {
		var succeeded bool
		if err := task.Await(&succeeded); err == nil && succeeded {
			successCount++
		}
	}

	return float32(successCount) / float32(len(devices)), nil
}

The full sample can be found here.

External orchestration inputs (events) example

Sometimes orchestrations need asynchronous input from external systems. For example, an approval workflow may require a manual approval signal from an authorized user. Or perhaps an orchestration pauses and waits for a command from an operator. The WaitForSingleEvent method can be used in an orchestrator function to pause execution and wait for such inputs. You an even specify a timeout value indicating how long to wait for the input before resuming execution (use -1 to indicate infinite timeout).

// ExternalEventOrchestrator is an orchestrator function that blocks for 30 seconds or
// until a "Name" event is sent to it.
func ExternalEventOrchestrator(ctx *task.OrchestrationContext) (any, error) {
	var nameInput string
	if err := ctx.WaitForSingleEvent("Name", 30*time.Second).Await(&nameInput); err != nil {
		// Timeout expired
		return nil, err
	}

	return fmt.Sprintf("Hello, %s!", nameInput), nil
}

Sending an event to a waiting orchestration can be done using the RaiseEvent method of the task hub client. These events are durably buffered in the orchestration state and are consumed as soon as the target orchestration calls WaitForSingleEvent with a matching event name. The following code shows how to use the RaiseEvent method to send an event with a payload to a running orchestration. See Managing local orchestrations for more information on how to interact with local orchestrations in Go.

id, _ := client.ScheduleNewOrchestration(ctx, ExternalEventOrchestrator)

// Prompt the user for their name and send that to the orchestrator
go func() {
	fmt.Println("Enter your first name: ")
	var nameInput string
	fmt.Scanln(&nameInput)
	
	client.RaiseEvent(ctx, id, "Name", api.WithEventPayload(nameInput))
}()

The full sample can be found here.

Managing local orchestrations

The following code snippet provides an example of how you can configure and run orchestrations. The TaskRegistry type allows you to register orchestrator and activity functions, and the TaskHubClient allows you to start, query, terminate, suspend, resume, and wait for orchestrations to complete.

The code snippet below demonstrates how to register and start a new instance of the ActivitySequenceOrchestrator orchestrator and wait for it to complete. The initialization of the client and worker are left out for brevity.

r := task.NewTaskRegistry()
r.AddOrchestrator(ActivitySequenceOrchestrator)
r.AddActivity(SayHelloActivity)

ctx := context.Background()
client, worker := Init(ctx, r)
defer worker.Shutdown(ctx)

id, err := client.ScheduleNewOrchestration(ctx, ActivitySequenceOrchestrator)
if err != nil {
  panic(err)
}

metadata, err := client.WaitForOrchestrationCompletion(ctx, id)
if err != nil {
  panic(err)
}

fmt.Printf("orchestration completed: %v\n", metadata)

Each sample linked above has a full implementation you can use as a reference.

Distributed tracing support

The Durable Task Framework for Go supports publishing distributed traces to any configured Open Telemetry-compatible exporter. Simply use otel.SetTracerProvider(tp) to register a global TracerProvider as part of your application startup and the task hub worker will automatically use it to emit OLTP trace spans.

The following example code shows how you can configure distributed trace collection with Zipkin, a popular open source distributed tracing system. The example assumes Zipkin is running locally, as shown in the code.

func ConfigureZipkinTracing() (*trace.TracerProvider, error) {
	// Inspired by this sample: https://github.com/open-telemetry/opentelemetry-go/blob/main/example/zipkin/main.go
	exp, err := zipkin.New("http://localhost:9411/api/v2/spans")
	if err != nil {
		return nil, err
	}

	// NOTE: The simple span processor is not recommended for production.
	//       Instead, the batch span processor should be used for production.
	processor := trace.NewSimpleSpanProcessor(exp)
	// processor := trace.NewBatchSpanProcessor(exp)

	tp := trace.NewTracerProvider(
		trace.WithSpanProcessor(processor),
		trace.WithSampler(trace.AlwaysSample()),
		trace.WithResource(resource.NewWithAttributes(
			"durabletask.io",
			attribute.KeyValue{Key: "service.name", Value: attribute.StringValue("sample-app")},
		)),
	)
	otel.SetTracerProvider(tp)
	return tp, nil
}

You can find this code in the distributedtracing sample. The following is a screenshot showing the trace for the sample's orchestration, which calls an activity, creates a 2-second durable timer, and uses another activity to make an HTTP request to bing.com:

image

Note that each orchestration is represented as a single span with activities, timers, and sub-orchestrations as child spans. The generated spans contain a variety of attributes that include information such as orchestration instance IDs, task names, task IDs, etc.

Cloning this repository

This repository contains submodules. Be sure to clone it with the option to include submodules. Otherwise you will not be able to generate the protobuf code.

git clone --recurse-submodules https://github.com/microsoft/durabletask-go 

Building the project

This project requires go v1.19.x or greater. You can build a standalone executable by simply running go build at the project root.

Generating protobuf

Use the following command to regenerate the protobuf from the submodule. Use this whenever updating the submodule reference.

# NOTE: assumes the .proto file defines: option go_package = "/internal/protos"
protoc --go_out=. --go-grpc_out=. -I submodules/durabletask-protobuf/protos orchestrator_service.proto

Generating mocks for testing

Test mocks were generated using mockery. Use the following command at the project root to regenerate the mocks.

mockery --dir ./backend --name="^Backend|^Executor|^TaskWorker" --output ./tests/mocks --with-expecter

Running tests

All automated tests are under ./tests. A separate test package hierarchy was chosen intentionally to prioritize black box testing. This strategy also makes it easier to catch accidental breaking API changes.

Run tests with the following command.

go test ./tests/... -coverpkg ./api,./task,./client,./backend/...,./internal/helpers

Running integration tests

You can run pre-built container images to run full integration tests against the durable task host over gRPC.

.NET Durable Task client SDK tests

Use the following docker command to run tests against a running worker.

docker run -e GRPC_HOST="host.docker.internal" cgillum/durabletask-dotnet-tester:0.5.0-beta

Note that the test assumes the gRPC server can be reached over localhost on port 4001 on the host machine. These values can be overridden with the following environment variables:

  • GRPC_HOST: Use this to change from the default 127.0.0.1 to some other value, for example host.docker.internal.
  • GRPC_PORT: Set this environment variable to change the default port from 4001 to something else.

If successful, you should see output that looks like the following:

Test run for /root/out/bin/Debug/Microsoft.DurableTask.Tests/net6.0/Microsoft.DurableTask.Tests.dll (.NETCoreApp,Version=v6.0)
Microsoft (R) Test Execution Command Line Tool Version 17.3.1 (x64)
Copyright (c) Microsoft Corporation.  All rights reserved.

Starting test execution, please wait...
A total of 1 test files matched the specified pattern.
[xUnit.net 00:00:00.00] xUnit.net VSTest Adapter v2.4.3+1b45f5407b (64-bit .NET 6.0.10)
[xUnit.net 00:00:00.82]   Discovering: Microsoft.DurableTask.Tests
[xUnit.net 00:00:00.90]   Discovered:  Microsoft.DurableTask.Tests
[xUnit.net 00:00:00.90]   Starting:    Microsoft.DurableTask.Tests
  Passed Microsoft.DurableTask.Tests.OrchestrationPatterns.ExternalEvents(eventCount: 100) [6 s]
  Passed Microsoft.DurableTask.Tests.OrchestrationPatterns.ExternalEvents(eventCount: 1) [309 ms]
  Passed Microsoft.DurableTask.Tests.OrchestrationPatterns.LongTimer [8 s]
  Passed Microsoft.DurableTask.Tests.OrchestrationPatterns.SubOrchestration [1 s]
  ...
  Passed Microsoft.DurableTask.Tests.OrchestrationPatterns.ActivityFanOut [914 ms]
[xUnit.net 00:01:01.04]   Finished:    Microsoft.DurableTask.Tests
  Passed Microsoft.DurableTask.Tests.OrchestrationPatterns.SingleActivity_Async [365 ms]

Test Run Successful.
Total tests: 33
     Passed: 33
 Total time: 1.0290 Minutes

Running locally

You can run the engine locally by pressing F5 in Visual Studio Code (the recommended editor). You can also simply run go run main.go to start a local Durable Task gRPC server that listens on port 4001.

go run main.go --port 4001 --db ./test.sqlite3

The following is the expected output:

2022/09/14 17:26:50 backend started: sqlite::./test.sqlite3
2022/09/14 17:26:50 server listening at 127.0.0.1:4001
2022/09/14 17:26:50 orchestration-processor: waiting for new work items...
2022/09/14 17:26:50 activity-processor: waiting for new work items...

At this point you can use one of the language SDKs mentioned earlier in a separate process to implement and execute durable orchestrations. Those SDKs will connect to port 4001 by default to interact with the Durable Task engine.

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com.

When you submit a pull request, a CLA bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact [email protected] with any additional questions or comments.

Trademarks

This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft trademarks or logos is subject to and must follow Microsoft's Trademark & Brand Guidelines. Use of Microsoft trademarks or logos in modified versions of this project must not cause confusion or imply Microsoft sponsorship. Any use of third-party trademarks or logos are subject to those third-party's policies.

durabletask-go's People

Contributors

cgillum avatar deepanshua avatar dependabot[bot] avatar impl avatar italypaleale avatar kaibocai avatar shivamkm07 avatar yaron2 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

durabletask-go's Issues

Improvements for cascade terminate

This is in reference to the cascade terminate feature introduced in #24.

There are some potential problems with the current implementation:

  • Each language SDK must implement the cascade terminate feature
  • A failed sub-orchestration in the chain might block termination of its own downstream sub-orchestrations

We should consider an alternate model that improves on both of these points.

One potential solution is to implement the cascade behavior entirely in the durabletask-go execution engine (the backend package) in a way that doesn't require any work for language SDKs. This would also allow us to bypass any orchestration status checks which might otherwise quietly discard termination messages for orchestrations that are already complete. More work is needed to know whether this approach can be effective and whether it can be done without introducing any breaking changes.

Support chunking of actions submitted to backend

Some backend types may not be able to handle saving an unbounded number of actions to storage in a single go. In such cases, it's necessary to break a large batch of actions into smaller chunks to remain within certain limits of the store. For example, Azure Cosmos DB doesn't allow saving more than 100 documents at a time in a single batch transaction, and that transaction must not exceed 2 MB in size (source). This is an issue in the Dapr Workflow project, as noted here: dapr/dapr#6544.

Even for stores which support saving unbounded numbers of records in a single transaction, it may be desirable to break those transactions into smaller chunks. One reason could be that large transactions could occupy too many database resources. Another is that large transactions could take a long time, increase the chance of failures, and cause work to need to be redone more often. In degenerate cases, this could cause workflows to get stuck, continuously consume huge amounts of resources, and continuously schedule the same work over and over.

Rather than making each backend implementation do its own chunking, the durabletask-go engine should support this directly. Depending on configuration, the orchestration engine can submit multiple calls to the backend, one for each logical chunk. The configuration for this, for example, could include MaxNewHistoryEventCount and MaxNewHistoryEventBytes settings. When the payload of an orchestration result is close to exceeding either of these numbers, a call to Backend.CompleteOrchestrationWorkItem is called to save the current chunk. The engine will then continue building the next payload until a final call to Backend.CompleteOrchestrationWorkItem is made with the final set of updates.

Issue with distributed traces and multiple instances of the worker

I noticed a strange behavior when i run multiple instances of the worker (say 3) all pointing to the same database. Currently took the postgres implementation with some changes.

Here's the screenshot
strange-traces

You can see that it contains several orchestration:SimpleOrchestration spans. This happens also to activities.

I also see several of these logs.

{"time":"2024-01-13T14:55:54.491837674+08:00","level":"ERROR","msg":"orchestration-processor: failed to complete work item: instance 'db1659b0-1528-4042-a500-0cb3822f2cad' no longer exists or was locked by a different worker"}
{"time":"2024-01-13T14:55:54.497473338+08:00","level":"ERROR","msg":"orchestration-processor: failed to abandon work item: lock on work-item was lost"}

I think this happens while the other workers are all processing the work items, while one of them has already transitioned or completed the work item.

frequent `unknown instance ID` error when running multiple backend instances

We have found an scenario where there is a transient error that happens frequently

failed to complete orchestration task: rpc error: code = Unknown desc = unknown instance ID: 5f7b2345-897d-4471-af96-6c8e590a29bf

The unknown instance ID could be considered transient, because after the server returning this error to the client, the server stops giving that error after retries, but IMO it shows a more fundamental problem with the server side implementation.

In our scenario we can run multiple instances of the server, so there are multiple grpc servers behind a load balancer. So it can happen that a request to CompleteOrchestratorTask lands in a server where there is no "pending orchestrator" to serve that request.

Here is the series of steps I went through to come to that conclusion:

  • first you schedule a new orchestration
  • on the server side the orchestration worker is eventually triggered which on ProcessWorkItem calls ExecuteOrchestrator
  • we continue on the server side and ExecuteOrchestrator , here https://github.com/microsoft/durabletask-go/blob/main/backend/executor.go#L100 , adds the instance id to be executed into a pendingOrchestrators map , then puts a work item into the work queue and then the function waits for the execution to complete by expecting a signal in a channel attached to the original instance stored in the pendingOrchestrators map
  • now on the client side, because of the work item added to the work queue, the client eventually receives the work item to execute the orchestrator and then it calls CompleteOrchestratorTask https://github.com/microsoft/durabletask-go/blob/main/backend/executor.go#L230
  • on the server side, if the call to CompleteOrchestratorTask is received by a different server instance than the one that originally put the instance id into the pendingOrchestrators map, then the unknown instance ID error will happen.

work item listener does not recover from client disconnections

the client work item listener does not have any retry logic to recover from errors when the stream closes or when the client looses the connection to the server
https://github.com/microsoft/durabletask-go/blob/main/client/worker_grpc.go#L36

the EOF error typically requires to create a brand new grpc connection, which is something that cant currently be done inside of the function StartWorkItemListener, additionally, it doesn't expose any means for users to detect that the streamed connection stopped, so users can handle the reconnection or retries...

I can't propose a solution, but IMO this definitely needs some refactor. I'm happy to contribute the improvements once a decision on how to address it is made

Grpc server stuck for certain tests

I encounter this issue when trying to set up durabletask-go as the testing image for durabletask-java integration tests. I can reproduce this issue in Java, Go, and JavaScript.

Issue Description:
for a test suite contains two tests:

func Test_Grpc_WaitForInstanceStart_Timeout(t *testing.T) {
	r := task.NewTaskRegistry()
	r.AddOrchestratorN("WaitForInstanceStartThrowsException", func(ctx *task.OrchestrationContext) (any, error) {
		// sleep 5 seconds
		time.Sleep(5 * time.Second)
		return 20, nil
	})

	cancelListener := startGrpcListener(t, r)
	defer cancelListener()

	id, err := grpcClient.ScheduleNewOrchestration(ctx, "WaitForInstanceStartThrowsException", api.WithInput("世界"))
	require.NoError(t, err)
	timeoutCtx, cancelTimeout := context.WithTimeout(ctx, time.Second)
	defer cancelTimeout()
	_, err = grpcClient.WaitForOrchestrationStart(timeoutCtx, id, api.WithFetchPayloads(true))
	if assert.Error(t, err) {
		assert.Contains(t, err.Error(), "context deadline exceeded")
	}
	time.Sleep(1 * time.Second)
}

func Test_Grpc_HelloOrchestration(t *testing.T) {
	r := task.NewTaskRegistry()
	r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) {
		var input string
		if err := ctx.GetInput(&input); err != nil {
			return nil, err
		}
		var output string
		err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output)
		return output, err
	})
	r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) {
		var name string
		if err := ctx.GetInput(&name); err != nil {
			return nil, err
		}
		return fmt.Sprintf("Hello, %s!", name), nil
	})

	cancelListener := startGrpcListener(t, r)
	defer cancelListener()

	id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"))
	require.NoError(t, err)
	timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second)
	defer cancelTimeout()
	metadata, err := grpcClient.WaitForOrchestrationCompletion(timeoutCtx, id, api.WithFetchPayloads(true))
	require.NoError(t, err)
	assert.Equal(t, true, metadata.IsComplete())
	assert.Equal(t, `"Hello, 世界!"`, metadata.SerializedOutput)
	time.Sleep(1 * time.Second)
}

If running them in order the second test will timeout, although running them separately, both tests will succeed. It turns out that after running the first test, all other tests will timeout. The executor goroutine seems parking forever (not fetching work item and processing them. )

The step to reproduce the issue

  1. checkout branch kaibocai/server-stuck-issue
  2. run go test tests/grpc/grpc_test.go

You will see the second test time-out.
image

I try to figure this out but without much luck. Appreciate it if you can help take a look @cgillum , @ItalyPaleAle , @shivamkm07

Issue with Sqlite backend initialization in Dapr Workflow

Issue Description

When utilizing Sqlite in-memory as a backend for Dapr Workflow, an issue arises where the application is executed immediately after the workflow is initiated. This results in the following error:

Unhandled exception. Dapr.DaprException: Start Workflow operation failed: the Dapr endpoint indicated a failure. 
See InnerException for details.
 ---> Grpc.Core.RpcException: Status(StatusCode="Internal", Detail="error starting workflow 'OrderProcessingWorkflow': 
unable to start workflow: failed to start orchestration: backend not initialized")

Upon investigation, it was identified that the error is caused by a specific line in the Durable Task for Go library, where the initialization of the Sqlite database takes time. Consequently, when the application starts executing the workflow, it encounters an error due to Sqlite not being ready. Given some time (e.g., 10-15 seconds), the application is able to run the workflow successfully with the Sqlite backend.

Proposed Solution

To address this issue, it is suggested to implement a mechanism similar to the one found in the Actor backend. Specifically, exposing a method in the Backend interface to check whether the backend is ready or not. The proposed addition to the Backend interface is as follows:

type Backend interface {
    ...
    ...
    WaitForBackendReady(context.Context)
}

This method will allow applications to wait until the Sqlite backend is fully initialized before attempting to execute workflows, preventing the encountered error.

Steps to Reproduce

  1. Set up Dapr Workflow with Sqlite in-memory as the backend.
  2. Initiate a workflow immediately after starting the application.
  3. Observe the error mentioned above.

Support for activity and orchestrator retries

I've been searching for the go sdk to provide apis for retries like it does for other sdk (java, .net..).

Is it something that is planned to have?

If anyone has pointers on how to implement such, will be more than happy to learn. Thanks.

Support for libsql backend

Hi, will it be interesting to support libsql/ sqld / libsql-server.

Recently i tried using the sqld, with s3 replication, and it works with just changing the db url instead of file.

Fyi libsql is an open contribution of sqlite fork.

Support heartbeats from app code for work-item renewal

Problem

Each of the supported backends currently has a lock timeout which is used to detect when a remote app worker may have crash or otherwise become unresponsive. However, the simple timeout mechanism doesn't take into account whether the app has gone away or whether the task is simply taking a long time to complete.

For example, if the lock expiration timeout is 1 minute, but a particular activity task takes 5 minutes to complete, then the lock on that work-item will expire before the activity completes and the activity may be rescheduled unnecessarily.

Proposal - heartbeats

To solve this problem, we propose adding a "heartbeat" callback that activity implementations can use to signal that they're still actively processing a particular work-item. This would be a gRPC API that SDKs can call periodically to renew the lock expiration time for an activity work-item.

As a secondary feature, the heartbeat could be used to get the status of the parent orchestration. If the parent orchestration has been terminated, the activity could then choose to cooperatively terminate itself (details TBD on how this would work for each language SDK).

Orchestration ID reuse policies

This idea comes from dapr/dapr#7101, as it relates to reusing existing workflow IDs.

When scheduling the creation of a new orchestration, the following options should be available:

  • TerminateIfExists - Terminates any existing orchestrations with the same instance ID and then schedules a new instance as one atomic action, similar to on-demand ContinueAsNew.
  • SkipIfExists - If there is an existing orchestration already scheduled, then the scheduler does nothing.
  • ThrowIfExists - If there is an existing orchestration then the scheduler throws an exception (this is the current behavior).

As an optional extension of this, we can consider exposing an "Execution ID" or "Generation ID" property to the orchestration which can help distinguish different instantiations of the given instance ID.

Support for injection of otel spans into tasks for enhancement

Dapr workflows looks like it uses durable tasks. In order to support this request: dapr/dapr#6906 it looks like we would need to support the ability to inject spans into tasks to allow client to add onto spans and add attributes to existing spans.

First wanted to confirm that what I being requested isn't currently supported and also confirm that you would accept a PR for this enhancement?

Initial though was to put span into: https://github.com/microsoft/durabletask-python/blob/4046191d28a6f0a7779d829e5aec88a06493f3d8/durabletask/task.py#L339 to allow clients to enhance spans...

Maybe this is not the right repo to start this process. If so can somebody point me in the right direction?

Flakey test: SuspendResumeOrchestration

The integration test SuspendResumeOrchestration failed in a recent PR: https://github.com/microsoft/durabletask-go/actions/runs/5028545973/jobs/9019359366.

A re-run of the test passed, but integration and unit tests should pass 100% of the time.

Output logs for this particular test execution.

DEBUG: 2023/05/19 21:48:35 orchestration-processor: waiting for one of 1 in-flight execution(s) to complete
DEBUG: 2023/05/19 21:48:35 orchestration-processor: processing work item: 0[620](https://github.com/microsoft/durabletask-go/actions/runs/5028545973/jobs/9019359366?pr=17#step:5:621)03b7-9540-4a80-9188-724e83e9aee3 (3 event(s))
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: received work item with 3 new event(s): [ExecutionStarted, ExecutionSuspended, EventRaised]
DEBUG: 2023/05/19 21:48:35 activity-processor: waiting for new work items...
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: got orchestration runtime state: name=SuspendResumeOrchestration, status=PENDING, events=0, age=(new)
INFO: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: starting new 'SuspendResumeOrchestration' instance with ID = '062003b7-9540-4a80-9188-724e83e9aee3'.
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: invoking orchestrator
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: orchestrator returned 1 action(s): [CreateTimer#0]
DEBUG: 2023/05/19 21:48:35 orchestration-processor: work item processed successfully
DEBUG: 2023/05/19 21:48:35 orchestration-processor: waiting for one of 1 in-flight execution(s) to complete
DEBUG: 2023/05/19 21:48:35 orchestration-processor: processing work item: 062003b7-9540-4a80-9188-724e83e9aee3 (4 event(s))
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: received work item with 4 new event(s): [EventRaised, EventRaised, EventRaised, EventRaised]
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: got orchestration runtime state: name=SuspendResumeOrchestration, status=SUSPENDED, events=5, age=0s
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: invoking orchestrator
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: orchestrator returned 1 action(s): [CreateTimer#0]
DEBUG: 2023/05/19 21:48:35 orchestration-processor: work item processed successfully
DEBUG: 2023/05/19 21:48:35 orchestration-processor: waiting for one of 1 in-flight execution(s) to complete
DEBUG: 2023/05/19 21:48:35 orchestration-processor: processing work item: 062003b7-9540-4a80-9188-724e83e9aee3 (3 event(s))
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: received work item with 3 new event(s): [EventRaised, EventRaised, EventRaised]
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: got orchestration runtime state: name=SuspendResumeOrchestration, status=SUSPENDED, events=11, age=0s
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: invoking orchestrator
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: orchestrator returned 1 action(s): [CreateTimer#0]
INFO: 2023/05/19 21:48:35 orchestration-processor: received cancellation signal
INFO: 2023/05/19 21:48:35 orchestration-processor: stopped listening for new work items
DEBUG: 2023/05/19 21:48:35 orchestration-processor: work item processed successfully
DEBUG: 2023/05/19 21:48:35 orchestration-processor: waiting for one of 1 in-flight execution(s) to complete
DEBUG: 2023/05/19 21:48:35 orchestration-processor: processing work item: 062003b7-9540-4a80-9188-724e83e9aee3 (2 event(s))
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: received work item with 2 new event(s): [EventRaised, EventRaised]
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: got orchestration runtime state: name=SuspendResumeOrchestration, status=SUSPENDED, events=16, age=0s
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: invoking orchestrator
DEBUG: 2023/05/19 21:48:35 062003b7-9540-4a80-9188-724e83e9aee3: orchestrator returned 1 action(s): [CreateTimer#0]
DEBUG: 2023/05/19 21:48:35 orchestration-processor: work item processed successfully
DEBUG: 2023/05/19 21:48:35 orchestration-processor: waiting for new work items...
INFO: 2023/05/19 21:48:35 activity-processor: received cancellation signal
INFO: 2023/05/19 21:48:35 activity-processor: stopped listening for new work items
DEBUG: 2023/05/19 21:48:38 orchestration-processor: waiting for one of 1 in-flight execution(s) to complete
DEBUG: 2023/05/19 21:48:38 orchestration-processor: processing work item: 062003b7-9540-4a80-9188-724e83e9aee3 (1 event(s))
DEBUG: 2023/05/19 21:48:38 062003b7-9540-4a80-9188-724e83e9aee3: received work item with 1 new event(s): [ExecutionResumed]
DEBUG: 2023/05/19 21:48:38 062003b7-9540-4a80-9188-724e83e9aee3: got orchestration runtime state: name=SuspendResumeOrchestration, status=SUSPENDED, events=20, age=3s
DEBUG: 2023/05/19 21:48:38 062003b7-9540-4a80-9188-724e83e9aee3: invoking orchestrator
panic: a previous execution called CreateTimer with sequence number 0, but the current execution doesn't have this action with this sequence number [recovered]
	panic: a previous execution called CreateTimer with sequence number 0, but the current execution doesn't have this action with this sequence number

goroutine 1820 [running]:
github.com/microsoft/durabletask-go/task.(*OrchestrationContext).start.func1()
	/home/runner/work/durabletask-go/durabletask-go/task/orchestrator.go:83 +0xd8
panic({0xb927c0, 0xc0002fe2d0})
	/opt/hostedtoolcache/go/1.19.9/x64/src/runtime/panic.go:884 +0x212
github.com/microsoft/durabletask-go/task.(*completableTask).Await(0xc0002fd300, {0xb57420, 0xc0003c[628](https://github.com/microsoft/durabletask-go/actions/runs/5028545973/jobs/9019359366?pr=17#step:5:629)0})
	/home/runner/work/durabletask-go/durabletask-go/task/task.go:67 +0x225
github.com/microsoft/durabletask-go/tests.Test_SuspendResumeOrchestration.func1(0xba0340?)
	/home/runner/work/durabletask-go/durabletask-go/tests/orchestrations_test.go:534 +0x82
github.com/microsoft/durabletask-go/task.(*OrchestrationContext).onExecutionStarted(0xc0003f0100, 0xc0001acf50)
	/home/runner/work/durabletask-go/durabletask-go/task/orchestrator.go:284 +0x133
github.com/microsoft/durabletask-go/task.(*OrchestrationContext).processEvent(0xc0003f0100, 0x7fdc72de85b8?)
	/home/runner/work/durabletask-go/durabletask-go/task/orchestrator.go:144 +0x525
github.com/microsoft/durabletask-go/task.(*OrchestrationContext).processNextEvent(0x7fdc72de85b8?)
	/home/runner/work/durabletask-go/durabletask-go/task/orchestrator.go:106 +0x47
github.com/microsoft/durabletask-go/task.(*OrchestrationContext).start(0xc0003f0100)
	/home/runner/work/durabletask-go/durabletask-go/task/orchestrator.go:88 +0x125
github.com/microsoft/durabletask-go/task.(*taskExecutor).ExecuteOrchestrator(0xc0002fe2a0?, {0xc00011a010?, 0xc0001d4140?}, {0xc000317440, 0x24}, {0xc0003ee000, 0x14, 0x14}, {0xc0003ec2d0, 0x2, ...})
	/home/runner/work/durabletask-go/durabletask-go/task/executor.go:82 +0x18f
github.com/microsoft/durabletask-go/backend.(*orchestratorProcessor).ProcessWorkItem(0xc0002bac30, {0xd89178, 0xc0002c01c0}, {0xd838c0?, 0xc0003097c0})
	/home/runner/work/durabletask-go/durabletask-go/backend/orchestration.go:83 +0x830
github.com/microsoft/durabletask-go/backend.(*worker).processWorkItem(0xc000287ec0, {0xd89178, 0xc0002c01c0}, {0xd838c0, 0xc0003097c0})
	/home/runner/work/durabletask-go/durabletask-go/backend/worker.go:193 +0x243
created by github.com/microsoft/durabletask-go/backend.(*worker).ProcessNext
	/home/runner/work/durabletask-go/durabletask-go/backend/worker.go:172 +0x565
FAIL	github.com/microsoft/durabletask-go/tests	12.423s
FAILIL

SQLITE_BUSY failures when persisting to disk

This seems to be a new problem in the latest, unreleased bits. I don't recall seeing this prior to #17. I definitely missed it since most of my testing has been with an in-memory database.

INFO: 2023/07/12 22:17:35 worker started with backend sqlite::taskhub.sqlite3
server listening at 127.0.0.1:4001
DEBUG: 2023/07/12 22:17:35 orchestration-processor: waiting for new work items...
DEBUG: 2023/07/12 22:17:35 activity-processor: waiting for new work items...
ERROR: 2023/07/12 22:17:35 activity-processor: failed to fetch work item: failed to query for activity work-items: database is locked (5) (SQLITE_BUSY)
ERROR: 2023/07/12 22:17:35 unexpected worker error: failed to query for activity work-items: database is locked (5) (SQLITE_BUSY). Adding 5 extra seconds of backoff.

The problem seems persistent as well. My assumption is that it doesn't like multiple threads accessing the database concurrently, but this seems strange since I was under the impression that SQLite supported multithreading natively.

FYI @ItalyPaleAle in case you're familiar with this SQLite error.

Suspended orchestrations reschedule old actions continuously (v0.2.4)

This issue was discovered accidentally while debugging an unrelated problem.

Observed
When an orchestration is suspended, it continues to schedule actions for events in its past history. For example, if an orchestration starts 5 activities and is then suspended, those activities will keep getting scheduled in an infinite loop until the orchestration is resumed.

Expected
Suspended orchestrations should never schedule any actions at all. Furthermore, no orchestration should ever schedule the same actions over and over again.

Scheduling of workflows

While creating a new orchestration, the user can schedule it to run at a later time by setting the ScheduledStartTimestamp field here:

ScheduledStartTimestamp *timestamp.Timestamp `protobuf:"bytes,5,opt,name=scheduledStartTimestamp,proto3" json:"scheduledStartTimestamp,omitempty"`

However, it gets ignored while sending the request to backend:

if err := g.backend.CreateOrchestrationInstance(ctx, e, WithOrchestrationIdReusePolicy(req.OrchestrationIdReusePolicy)); err != nil {

It needs to be passed to backend so that backend can schedule workflows accordingly.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.