Giter VIP home page Giter VIP logo

tochemey / goakt Goto Github PK

View Code? Open in Web Editor NEW
56.0 56.0 4.0 1.71 MB

[Go] Fast and Distributed Actor framework using protocol buffers as message for Golang

License: MIT License

Earthly 0.45% Go 99.53% Makefile 0.01%
actor-framework actor-model actor-system actors actorsystem cloud-computing concurrent-programming distributed distributed-system distributed-systems erlang-otp event-sourcing framework go go-actor golang golang-library high-performance proto protocol-buffers

goakt's People

Contributors

alexandertar avatar chenxyzl avatar renovate[bot] avatar tochemey avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

goakt's Issues

reimplement the cluster engine using a dedicated raft library

The current implementation relies on etcd embedded server. We need to rethink this approach when we want to add cluster sharding and co. The reason is that the replication behind etcd cannot horizontally scale because it lacks data sharding.

Metrics integration

  • Implement the various metrics to track for an Actor using Opentelemetry
  • Implement the various metrics to track for an Actor system using Opentelemetry
  • Add a default Prometheus server to scrape the metrics

Dependency Dashboard

This issue lists Renovate updates and detected dependencies. Read the Dependency Dashboard docs to learn more.

Open

These updates have all been created already. Click a checkbox below to force a retry/rebase of any.

  • fix(deps): update minor go modules (go.opentelemetry.io/otel, go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc, go.opentelemetry.io/otel/exporters/prometheus, go.opentelemetry.io/otel/exporters/stdout/stdouttrace, go.opentelemetry.io/otel/metric, go.opentelemetry.io/otel/sdk, go.opentelemetry.io/otel/sdk/metric, go.opentelemetry.io/otel/trace, google.golang.org/protobuf, k8s.io/api, k8s.io/apimachinery, k8s.io/client-go)

Ignored or Blocked

These are blocked by an existing closed PR and will not be recreated unless you click a checkbox below.

Detected dependencies

github-actions
.github/workflows/build.yml
  • actions/checkout v4
  • docker/login-action v3
  • earthly/actions v1
  • codecov/codecov-action v4
.github/workflows/pr.yml
  • actions/checkout v4
  • docker/login-action v3
  • earthly/actions v1
  • codecov/codecov-action v4
gomod
go.mod
  • go 1.21
  • connectrpc.com/connect v1.16.1
  • connectrpc.com/otelconnect v0.7.0
  • github.com/buraksezer/olric v0.5.6-0.20240205222928-c5efb0d4b5ea@c5efb0d4b5ea
  • github.com/caarlos0/env/v11 v11.0.0
  • github.com/cespare/xxhash/v2 v2.3.0
  • github.com/deckarep/golang-set/v2 v2.6.0
  • github.com/flowchartsman/retry v1.2.0
  • github.com/google/go-cmp v0.6.0
  • github.com/google/uuid v1.6.0
  • github.com/grandcat/zeroconf v1.0.0
  • github.com/joho/godotenv v1.5.1
  • github.com/nats-io/nats-server/v2 v2.10.14
  • github.com/nats-io/nats.go v1.34.1
  • github.com/pkg/errors v0.9.1
  • github.com/prometheus/client_golang v1.19.0
  • github.com/redis/go-redis/v9 v9.5.1
  • github.com/reugn/go-quartz v0.11.2
  • github.com/spf13/cobra v1.8.0
  • github.com/stretchr/testify v1.9.0
  • github.com/travisjeffery/go-dynaport v1.0.0
  • go.opentelemetry.io/otel v1.25.0
  • go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.25.0
  • go.opentelemetry.io/otel/exporters/prometheus v0.47.0
  • go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.25.0
  • go.opentelemetry.io/otel/metric v1.25.0
  • go.opentelemetry.io/otel/sdk v1.25.0
  • go.opentelemetry.io/otel/sdk/metric v1.25.0
  • go.opentelemetry.io/otel/trace v1.25.0
  • go.uber.org/atomic v1.11.0
  • go.uber.org/goleak v1.3.0
  • go.uber.org/multierr v1.11.0
  • go.uber.org/zap v1.27.0
  • golang.org/x/net v0.24.0
  • golang.org/x/sync v0.7.0
  • google.golang.org/protobuf v1.33.0
  • k8s.io/api v0.29.4
  • k8s.io/apimachinery v0.29.4
  • k8s.io/client-go v0.29.4
  • k8s.io/utils v0.0.0-20240423183400-0849a56e8f22@0849a56e8f22
regex
Earthfile
  • tochemey/docker-go 1.21.0-1.0.0
.github/workflows/build.yml
  • earthly/earthly v0.8.9
.github/workflows/pr.yml
  • earthly/earthly v0.8.9

  • Check this box to trigger a request for Renovate to run again on this repository

[feat] Implement Routers

The first iteration of the implementation will focus on implementing a PoolRouter that will send a specific type of message to a bunch of routees to process in parallel. There will be two types of routees:

  • synchronous routee: their job is to process the message and ignore the outcome.
  • asynchronous routee: their job is to process the message and return the outcome back to the router.

Router features:

  • Remove a stopped routee from the set of routees
  • Shutdown when the last routee is stopped
  • Resize pool
  • Handle idle routees (when passivated)

Cluster actor passivation awareness

At the moment, actors created in cluster mode can be passivated on the node they are created. The passivated actor is removed from the actors map on the node it has been created. However, that information is not sent across the cluster which means that actor still has a reference in the cluster and that can cause an issue.

Add clustering

The cluster feature will help actor system nodes to form a cluster when that flag is turn on.

How will it work?

  • Define and implement a discovery mechanism to discover nodes based upon their ip and additional meta data. The discovery plugin should be extensible and pluggable. This is the proposed interface for the discovery plugin:
// Discovery helps discover other running actor system in a cloud environment
type Discovery interface {
	// Start the discovery engine
	Start(ctx context.Context, meta Meta) error
	// Nodes returns the list of Nodes at a given time
	Nodes(ctx context.Context) ([]*Node, error)
	// Watch returns event based upon node lifecycle
	Watch(ctx context.Context) (<-chan Event, error)
	// EarliestNode returns the earliest node
	EarliestNode(ctx context.Context) (*Node, error)
	// Stop shutdown the discovery provider
	Stop() error
}

With this interface we can implement mDNS, kubernetes and co discovery mode.

  • Define a cluster interface:
// Cluster defines the cluster contract
type Cluster interface {
	// Start starts the cluster node
	Start(ctx context.Context) error
	// Stop stops the cluster node
	Stop(ctx context.Context) error
	// GetPeers fetches all the peers of a given node
	GetPeers(ctx context.Context) ([]*Peer, error)
	// PutActor adds an actor meta to the cluster
	PutActor(ctx context.Context, actor *goaktpb.WireActor) error
	// GetActor reads an actor meta from the cluster
	GetActor(ctx context.Context, actorName string) (*goaktpb.WireActor, error)
}
  • Implement a raft node.

  • Each node on start will make use the discovery plugin to try and join an existing cluster using the underlying raft implementation.

Feature: Add message scheduler

This feature let an actor to stash some messages that it can process at later time. This will be an extension of the stash mechanism with a simple scheduler. The API definition will be as follow:

  • ScheduleOnce: to schedule a message to be processed one time
  • ScheduleTimes(count int): to schedule a message to be process count times

This feature should be available both on the PID and the receiver context

[feat] reimplement tracing

  1. Make tracing configurable

  2. Add span tracing to the following methods on PID and ActorSystem:

    • Tell
    • Ask
    • BatchTell
    • BatchAsk
    • init
    • Start (for actor system)
    • Stop (for actor system)
    • Shutdown
    • Spawn (for actor system)
    • ReSpawn (for actor system)
    • Kill
    • Forward
    • etc...
  3. Record errors in the span

Events Stream for eventsourcing

We need to provide a event stream to publish events persisted on the event store so that downstream applications can subscribe to those events and build some read models.

  • The event streams should be singleton which means once instantiated it should be running until the actor system is shutdown
  • Actor can publish messages to the stream
  • Actor can subscribe to messages
  • External service can subscribe to messages (TBD)

Alternatives

  • Make use of gRPC streaming
  • Use a golang standard streaming library

Feature: Add a deadletter buffer

Messages that cannot be delivered or handled will push to a deadletter box. The purpose of this feature is help debugging when there is some inconsistency in messages processing.

This feature will require:

  • deadletter queue(memory/durable storage)
  • event stream(pub/sub)
  • deadletter logging mechanism

[feat] Better discovery config passing

NewDiscovery() should accept typed configs type discoConfig struct rather than using discovery.Config{}.

disco := dnssd.NewDiscovery(dnssd.Config{
    Domain: "xxx",
    IPv6: false
})

[feat] RemoteSpawn method

RemoteSpawn method help start an actor on remote machine.

  • Register the actor type with the actor system
  • call RemoteSpawn from the api or the actor PID.

preparation for 1.0.0

  • Enhance performance
  • Cleanup code base
  • Better documentation
  • Enhance observability (not needed)
  • Possible additional features (dead letters)

[feat]: Add Initialized system message

  • Push that message to the actor mailbox after the actor has successfully started.
  • The message can be handled in the receive loop.

message definition:

// Started is used when an actor has successfully started
message PostStart {
}

[feat] Add an ID and Equal methods to PID

The current implementation does not allow the comparison of two PID and there is no proper way to get the ID of an actor.

Current solution

The current solution is to use the actor path.

[feat] discovery.HostNode() should accept configs rather than read env directly

Reading and merging env to configs should be handled by user using some modules like viper or koanf.

To my opinion, I always avoid using env and put all configs in local config files. I have come accross many times that a mysterious env is controlling a key feature and no one knows about it. It is a nightmare to debug.

Message stashing during behavior changes

  • Ability to stash incoming messages between behavior changes
  • Ability to unstash messages

Stashing enables an actor to temporarily buffer all or some messages that cannot or should not be handled using the actor’s current behavior.

A typical example when this is useful is if the actor has to load some initial state or initialize some resources before it can accept the first real message. Another example is when the actor is waiting for something to complete before processing the next message.

[fix] Cluster not exiting when olric fails to start

{"level":"info","ts":"2024-04-19T13:59:51+08:00","msg":"enabling clustering..."}
{"level":"info","ts":"2024-04-19T13:59:51+08:00","msg":"starting cluster engine..."}
{"level":"info","ts":"2024-04-19T13:59:51+08:00","msg":"Starting GoAkt cluster Node service on (DESKTOP-1T48526:9000)....πŸ€”"}
{"level":"info","ts":"2024-04-19T13:59:51+08:00","msg":"Olric 0.6.0-alpha.1 on linux/amd64 go1.22.2 => olric.go:319"}
{"level":"error","ts":"2024-04-19T13:59:51+08:00","msg":"Failed to run the routing table subsystem: mDNS domain is not provided => olric.go:347"}
{"level":"error","ts":"2024-04-19T13:59:51+08:00","msg":"failed to start the cluster Node=(thingcross-0).πŸ’₯: mDNS domain is not provided"}
{"level":"info","ts":"2024-04-19T13:59:51+08:00","msg":"127.0.1.1:9000 is gone => olric.go:448"}

go func() {
if err = n.server.Start(); err != nil {
logger.Error(errors.Wrapf(err, "failed to start the cluster Node=(%s).πŸ’₯", n.name))
if e := n.server.Shutdown(ctx); e != nil {
logger.Panic(e)
}
}
}()
<-startCtx.Done()

The context is never cancelled.

[refactor] Refactor the implementation of the deadletter stream

The current implementation is limited to very few scenarios of dead letters. We need to extend it to other parts of the message delivery system in the code to capture more data.
This change will not alter the API definition which means developers don't need to do anything.

[feat] Reimplement the actor metrics

This ticket/issue outlines the various metrics to implement for both the actor system and a given actor.

Actor System Metrics

  • Total Number of Actors
  • Dead-letters count

Actor Metrics

  • Number of children
  • Number of messages stashed
  • Number of Restarts
  • Last message received processing duration
  • Number of Instances created for the same type of Actor

Behavior as FSM

Brief Overview

An actor can should be able to behave like a FSM. An FSM as described in the Erlang design principles is a set of relations of the form:

State(S) x Event(E) -> Actions (A), State(S’).

If we are in state S and the event E occurs, we should perform the actions A and make a transition to the state S’. So what it means that an actor in state S when a message is received will process the message with state. When it transits to another state, every new message will be processed with the new state behaviour. This type of feature helps make an actor wear many coats.

Possible Action Items

  • Implement a behavior stack to hold the actor behaviors
  • Implement a mechanism to allow actor to transit from one behavior to the other and vice-versa
  • Implement the internal FSM that can help the actor processes message at particular state
  • Allow an actor to switch back to its default behavior

Use case

Let us consider for instance we are building an authentication/authorization system using Go-Akt. The session actor can switch between the various states transitions:
Authenticating -> Authenticated -> Authorizing -> Authorized.

Yes of course this one way of building such system using a single actor.

prepare for 1.4.0

  • Add cluster events to events streams.
    • NodeJoined
    • NodeLeft
  • Cluster metrics
    • Number of Members
  • Metrics #170
  • Complete #210

[feat] Actor redeployment when cluster mode is enabled

The cluster implementation simply relies on a distributed sharded key/value store engine. It handles the rebalance and the sharding mechanism.

  • A single instance of an actor is created when cluster mode is enabled. The actor identifier is propagated across the cluster.
  • Each node in the cluster has a copy of the created actor identifier and can easily refers to the location of the actor.
  • When the node where the given actor is created dies, then the other nodes in the cluster are aware of the event. However, at the moment, Go-Akt is not doing actor redeployment which is the issue we need to address.

It is an issue because that actor cannot be found when its node dies

[feat]Reimplement cluster engine

Implementation details: Simple distributed in-memory key/value store

  • Allow leader/coordination re-election when cluster topology changes
  • Handle efficiently network partition
  • Data should be sharded in the cluster
  • Able to move left node data to another node efficiently

A a partitioner interface for cluster

The current implementation does not allow the user to specify his/her partitioner function using a different algo. This can be an issue when the integration requires a different hash algo

  • Add an interface to make the partitioner function customisable
  • Add a default partitioner
  • Add an option to set the custom partition in the actor system

[feat] Implement Batch(Tell/Ask)

  • BatchTell- helps send a bunch of messages to a given actor to process in a fire-forget manner.
  • BatchAsk - helps send a bunch of messages to a given actor to process and expect responses. The responses will be sent in the same order as the messages.

The messages will be processed one after the other in the order they are sent.

[refact] refactor the code to enable gRPC interceptors

This is necessary because of:

  1. The breaking change introduced by the new version of otelconnect
  2. Only enable the gRPC interceptors when metric or traces are enabled on the actor system
  3. Enhance the API implementation to optionally toggle observability or not for remote calls

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.