Giter VIP home page Giter VIP logo

opus's Introduction

Opus

Build Status Package Version Coverage Status

Livebook badge

A framework for pluggable business logic components.

example-image

Installation

The package can be installed by adding opus to your list of dependencies in mix.exs:

def deps do
  [{:opus, "~> 0.8"}]
end

Documentation

Conventions

  • Each Opus pipeline module has a single entry point and returns tagged tuples {:ok, value} | {:error, error}
  • A pipeline is a composition of stateless stages
  • A stage returning {:error, _} halts the pipeline
  • A stage may be skipped based on a condition function (:if and :unless options)
  • Exceptions are converted to {:error, error} tuples by default
  • An exception may be left to raise using the :raise option
  • Each stage of the pipeline is instrumented. Metrics are captured automatically (but can be disabled).
  • Errors are meaningful and predictable

Usage

defmodule ArithmeticPipeline do
  use Opus.Pipeline

  step  :add_one,         with: &(&1 + 1)
  check :even?,           with: &(rem(&1, 2) == 0), error_message: :expected_an_even
  tee   :publish_number,  if: &Publisher.publishable?/1, raise: [ExternalError]
  step  :double,          if: :lucky_number?
  step  :divide,          unless: :lucky_number?
  step  :randomize,       with: &(&1 * :rand.uniform)
  link  JSONPipeline

  def double(n), do: n * 2
  def divide(n), do: n / 2
  def lucky_number?(n) when n in 42..1337, do: true
  def lucky_number?(_), do: false
end

ArithmeticPipeline.call(41)
# {:ok, 84.13436750126804}

Read this blogpost to get started.

Pipeline

The core aspect of this library is defining pipeline modules. As in the example above you need to add use Opus.Pipeline to turn a module into a pipeline. A pipeline module is a composition of stages executed in sequence.

Stages

There are a few different types of stages for different use-cases. All stage functions, expect a single argument which is provided either from initial call/1 of the pipeline module or the return value of the previous stage.

An error value is either :error or {:error, any} and anything else is considered a success value.

Step

This stage processes the input value and with a success value the next stage is called with that value. With an error value the pipeline is halted and an {:error, any} is returned.

Check

This stage is intended for validations.

This stage calls the stage function and unless it returns true it halts the pipeline.

Example:

defmodule CreateUserPipeline do
  use Opus.Pipeline

  check :valid_params?, with: &match?(%{email: email} when is_bitstring(email), &1)
  # other stages to actually create the user
end

Tee

This stage is intended for side effects, such as a notification or a call to an external system where the return value is not meaningful. It never halts the pipeline.

Link

This stage is to link with another Opus.Pipeline module. It calls call/1 for the provided module. If the module is not an Opus.Pipeline it is ignored.

Skip

The skip macro can be used for linked pipelines. A linked pipeline may act as a true bypass, based on a condition, expressed as either :if or :unless. When skipped, none of the stages are executed and it returns the input, to be used by any next stages of the caller pipeline. A very common use-case is illustrated in the following example:

defmodule RetrieveCustomerInformation do
  use Opus.Pipeline

  check :valid_query?
  link FetchFromCache,    if: :cacheable?
  link FetchFromDatabase, if: :db_backed?
  step :serialize
end

With skip it can be written as:

defmodule RetrieveCustomerInformation do
  use Opus.Pipeline

  check :valid_query?
  link FetchFromCache
  link FetchFromDatabase
  step :serialize
end

A linked pipeline becomes:

defmodule FetchFromCache do
  use Opus.Pipeline

  skip :assert_suitable, if: :cacheable?
  step :retrieve_from_cache
end

Available options

The behaviour of each stage can be configured with any of the available options:

  • :with: The function to call to fulfill this stage. It can be an Atom referring to a public function of the module, an anonymous function or a function reference.
  • :if: Makes a stage conditional, it can be either an Atom referring to a public function of the module, an anonymous function or a function reference. For the stage to be executed, the condition must return true. When the stage is skipped, the input is forwarded to the next step if there's one.
  • :unless: The opposite of the :if option, executes the step only when the callback function returns false.
  • :raise: A list of exceptions to not rescue. Defaults to false which converts all exceptions to {:error, %Opus.PipelineError{}} values halting the pipeline.
  • :error_message: An error message to replace the original error when a stage fails. It can be a String or Atom, which will be used directly in place of the original message, or an anonymous function, which receives the input of the failed stage and must return the error message to be used.
  • :retry_times: How many times to retry a failing stage, before halting the pipeline.
  • :retry_backoff: A backoff function to provide delay values for retries. It can be an Atom referring to a public function in the module, an anonymous function or a function reference. It must return an Enumerable.t yielding at least as many numbers as the retry_times.
  • :instrument?: A boolean which defaults to true. Set to false to skip instrumentation for a stage.

Retries

defmodule ExternalApiPipeline do
  use Opus.Pipeline

  step :http_request, retry_times: 8, retry_backoff: fn -> linear_backoff(10, 30) |> cap(100) end

  def http_request(_input) do
    # code for the actual request
  end
end

The above module, will retry be retried up to 8 times, each time applying a delay from the next value of the retry_backoff function, which returns a Stream.

All the functions from the :retry package will be available to be used in retry_backoff.

Stage Filtering

You can select the stages of a pipeline to run using call/2 with the :except and :only options.

Example:

# Runs only the stage with the :validate_params name
CreateUserPipeline.call(params, only: [:validate_params]

# Runs all the stages except the selected ones
CreateUserPipeline.call(params, except: :send_notification)

Instrumentation

Instrumentation hooks which can be defined:

  • :pipeline_started: Called before a pipeline module is called
  • :before_stage: Called before each stage
  • :stage_skipped: Called when a conditional stage was skipped
  • :stage_completed: Called after each stage
  • :pipeline_completed: Called after pipeline module has returned

You can disable all instrumentation callbacks for a stage using instrument?: false.

defmodule ArithmeticPipeline do
  use Opus.Pipeline

  step :double, instrument?: false
end

You can define module specific instrumentation callbacks using:

defmodule ArithmeticPipeline do
  use Opus.Pipeline

  step :double, with: &(&1 * 2)
  step :triple, with: &(&1 * 3)

  instrument :before_stage, fn %{input: input} ->
    IO.inspect input
  end

  # Will be called only for the matching stage
  instrument :stage_completed, %{stage: %{name: :triple}}, fn %{time: time} ->
    # send to the monitoring tool of your choice
  end
end

You can define a default instrumentation module for all your pipelines by adding in your config/*.exs:

config :opus, :instrumentation, YourModule

# but you may choose to provide a list of modules
config :opus, :instrumentation, [YourModuleA, YourModuleB]

An instrumentation module has to export instrument/3 functions like:

defmodule CustomInstrumentation do
  def instrument(:pipeline_started, %{pipeline: ArithmeticPipeline}, %{input: input}) do
    # publish the metrics to specific backend
  end

  def instrument(:before_stage, %{stage: %{pipeline: pipeline}}, %{input: input}) do
    # publish the metrics to specific backend
  end

  def instrument(:stage_completed, %{stage: %{pipeline: ArithmeticPipeline}}, %{time: time}) do
    # publish the metrics to specific backend
  end

  def instrument(:pipeline_completed, %{pipeline: ArithmeticPipeline}, %{result: result, time: total_time}) do
    # publish the metrics to specific backend
  end

  def instrument(_, _, _), do: nil
end

Telemetry

Opus includes an instrumentation module which emits events using the :telemetry library.
To enable it, change your config/config.exs with:

config :opus, :instrumentation, [Opus.Telemetry]

Browse the available events here.

For instructions to integrate Opus Telemetry metrics in your Phoenix application, read this post.

Module-Global Options

You may choose to provide some common options to all the stages of a pipeline.

  • :raise: A list of exceptions to not rescue. When set to true, Opus does not handle any exceptions. Defaults to false which converts all exceptions to {:error, %Opus.PipelineError{}} values halting the pipeline.
  • :instrument?: A boolean which defaults to true. Set to false to skip instrumentation for a module.
defmodule ArithmeticPipeline do
  use Opus.Pipeline, instrument?: false, raise: true
  # The pipeline opts will disable instrumentation for this module
  # and will not rescue exceptions from any of the stages

  step :double, with: &(&1 * 2)
  step :triple, with: &(&1 * 3)
end

Graph

You may visualise your pipelines using Opus.Graph:

Opus.Graph.generate(:your_app)
# => {:ok, "Graph file has been written to your_app_opus_graph.png"}

โ— This feature requires the opus_graph package to be installed, add it in your mix.exs.

defp deps do
  {:opus_graph, "~> 0.1", only: [:dev]}
end

Setup

First make sure to add graphvix to your dependencies:

# in mix.exs

defp deps do
  [
    {:opus, "~> 0.5"},
    {:graphvix, "~> 0.5", only: [:dev]}
  ]
end

This feature uses graphviz, so make sure to have it installed. To install it:

# MacOS

brew install graphviz
# Debian / Ubuntu

apt-get install graphviz

Opus.Graph is in fact a pipeline and its visualisation is:

graph-png

You can customise the visualisation:

Opus.Graph.generate(:your_app, %{filetype: :svg})
# => {:ok, "Graph file has been written to your_app_opus_graph.svg"}

Read the available visualisation options here.

Influences

Press

Using Opus in your company / project?
Let us know by submitting an issue describing how you use it.

License

Copyright (c) 2018 Dimitris Zorbas, MIT License. See LICENSE.txt for further details.

opus's People

Contributors

crowdhailer avatar darhazer avatar defacedvr avatar fenollp avatar hl avatar hubertlepicki avatar michallepicki avatar mindreframer avatar rafaels88 avatar wpiekutowski avatar zorbash avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

opus's Issues

Feature: add `.matching/2` sugar

defmodule Workflow do
  use Opus.Pipeline, strict: true

  # Here `strict: true` would require adherence to the type specs below
  @type strict_input :: map()
  @type strict_result ::
          :ok
          | {:ok, map}
          | {:error, any}
          | {:error, step :: atom, error :: any(), results_so_far :: map()}

  step :lookup_member 
  check :is_member_ready?

  matching %{member: %Member{status: "NO_GOOD"} = member} do
    tee(:send_email, with: fn -> Email.build(member, "not_ready.html") end)
  end

  matching %{member: %Member{status: "GOOD"} = member, payload: %{code: code}} do
    step :give_vip_status
    tee :expire_partner_code, with: fn -> Partner.expire_code(code) end

    matching %{member: %Member{status: "GREAT"} = member} do
      tee(:send_email, with: fn -> Email.build(member, "very_ready.html") end)
    end
  end

  defp lookup_member(%{member_id: id}) do
    {:ok, %{member: %Member{id: id}}}
  end

  defp give_vip_status(%{member: %Member{status: "Good"} = member}}) do
    {:ok, %{member | status: "Great"}}
  end
end

Explanation:

Thread a Kernel.match/1 call through children of the :do block's :if clauses. Which would have the benefit of avoiding the need to name the eval step

Design Concerns:

We would need a strict mode to ensure only patching the pipeline with maps from ok tuples

If there were some sort of strict mode, would it be better to use:

  1. An Ecto.Multi style pipeline where each step puts its key and {:ok, result} into the map
  2. An Exunit.Callbacks.setup/(1/2) style pipeline, which is just a folding map merge
  3. A Plug.Conn style pipeline struct with nested :assigns and :results maps

PS

Could also name this bind, bind_match, with_match or otherwise.

Parameterize steps

Many times, the general behavior of a step is quite repetitive, so it would be nice to be able to reuse a single function of arity >1, and pass the extra arg(s) to the step itself.

If you think this could be interesting, I can provide a pull request.

This is an example:

defmodule Mapper do
  @moduledoc """
  Converts a flat structure from CSV into deeply nested fields of MyStruct.
  Empty fields are skipped.
  """

  use Opus.Pipeline

  step :parse
  step :nest, with: &%{source: &1}
  step :create_output, with: &Map.put(&1, :output, %MyStruct{})
  step :copy, args: ["some_col", [:some, :nested, :path]]
  step :copy, args: ["some_col_2", [:other, :nested, :path]]
  step :copy, args: ["some_col_3", [:again, :nested, :path]]
  # possibly many more lines like this
  step :extract_output, with: &get_in(&1, [:output])

  def parse(text) do
    [headers | rows] = NimbleCSV.RFC4180.parse_string(data, skip_headers: false)
    Enum.map(rows, fn row -> headers |> Enum.zip(row) |> Map.new() end)
  end

  def copy(input, [from, to]) do
    case get_in(input, [:source | List.wrap(from)]) do
      nil -> input
      "" -> input
      value -> put_in(input, [:output | List.wrap(to)], value)
    end
  end
end

What's the plan on Elixir version support?

On Elixir 1.8:

==> opus
Compiling 13 files (.ex)
warning: System.stacktrace/0 outside of rescue/catch clauses is deprecated. If you want to support only Elixir v1.7+, you must access __STACKTRACE__ inside a rescue/catch. If you want to support earlier Elixir versions, move System.stacktrace/0 inside a rescue/catch
  lib/opus/safe.ex:31

warning: System.stacktrace/0 outside of rescue/catch clauses is deprecated. If you want to support only Elixir v1.7+, you must access __STACKTRACE__ inside a rescue/catch. If you want to support earlier Elixir versions, move System.stacktrace/0 inside a rescue/catch
  lib/opus/safe.ex:37

warning: System.stacktrace/0 outside of rescue/catch clauses is deprecated. If you want to support only Elixir v1.7+, you must access __STACKTRACE__ inside a rescue/catch. If you want to support earlier Elixir versions, move System.stacktrace/0 inside a rescue/catch
  lib/opus/safe.ex:46

Generated opus app

I see CI uses 1.4/1.5. Do you plan to move minimal required version to at least 1.7 anytime?
Thanks

Decouple Graphvix

Environment

  • opus version: 0.5.1

  • Elixir / Hex version (mix hex.info):

    • Hex: 0.18.2
    • Elixir: 1.7.4
    • OTP: 21.1.1
  • Operating system: MacOS 10.14.1

Current behavior

This is both a bug report and feature request. I think it's better to decouple Graphvix from Opus because it doesn't add anything for production. I'm also getting the below error on version 0.5 (version 1.0 is released)

[error] Graphvix.Graph Graphvix.Graph received unexpected message in handle_info/2: :save_state

Expected behavior

No error and decoupling

Bad instrumentation for skip stage

When trying to instrument the skip stage, we get a name that probably doesn't make too much sense (?).

Also, when the pipeline is skipped (skip true), Opus treat it as an error.

Environment

  • opus version: 0.6.0

Current behavior

defmodule Test do
    use Opus.Pipeline
    
    skip if: :skip?

    def skip?(_), do: true

    def instrument(:stage_completed, params, params2) do
        require Logger
        Logger.warn("#{inspect params} >>> #{inspect params2}")
    end
end

Output:

%{stage: %{name: [if: :skip?], pipeline: Test}} >>> %{input: 1, result: {:error, :skipped}, stage: [if: :skip?], time: 31450000}

Expected behavior

  1. Not sure what would be the expected behaviour in that case, since it's the only stage without a name?

  2. Skip should not be an error

Add an accessor pattern

I find often times I'm just folding a map, and most of the code ends up just being getting a key out of the map and then passing it to some validation/execution function

Current behavior

defmodule MyOpus do
  use Opus.Pipeline
  
  check :valid_email?

  def valid_email?(%{email: email}) when is_binary(email) do
    MyValidator.valid_email?(email)
  end
end

Useful/Dry behavior

defmodule MyOpus do
  use Opus.Pipeline
  import MyValidator, only: [valid_email?: 1]
  
  # some accessor/getter/lens syntax
  check :valid_email?, in: :email
  # or
  check :valid_email?, get_in: [:email]
  # or
  check {:email, :valid_email?}
end

Conditional results in pipeline?

Is there a way for an if/unless step to modify the pipeline data?

I've found myself using pipelines similar to a with statement sometimes, and I would like some results to flow all the way to the end - even the conditional checks.
But I can't pass results of the conditional checks, as they only return a true/false, they don't mutate the pipeline.

An example might be to obtain the results for a pipeline that must complete successfully, it would never fully error out. I need to know that some steps were skipped, and the task/log result of why. I've seen instrumentation, but they are asynchronous. I need to feed the direct response and instantly reply with all conditional events skipped in the pipeline.

Any thoughts on this?

Add a :unless param for stages

Proposal

Add the ability to pass a :unless parameter for all the stages where it is possible to pass a :if one.

The idea is that you don't need to name your functions like not_* in order to add a new stage to the pipeline.

Context

Imagine we can perform a step only if the user is allowed.

defmodule Pipeline do
  use Opus.Pipeline

  step :do_this, if: :user_allowed
end

Now, if we want to perform another step in case the user is not allowed, we would end up with something like:

defmodule Pipeline do
  use Opus.Pipeline

  step :do_this, if: :user_allowed
  step :do_another, if: :user_not_allowed
end

Which makes us implement two different functions (user_not_allowed and user_allowed) returning the opposite of the other. This would be simplified if we could do:

defmodule Pipeline do
  use Opus.Pipeline

  step :do_this, if: :user_allowed
  step :do_another, unless: :user_allowed
end

Conditional link

Is it good practice to pipe a pipeline into another from within a stage? Some sort of conditional link.

My fix right now involves piping the results of one pipeline into another and checking a condition in the second stages first step and either returning pipeline or continuing with the steps.

Disable importing instrumentation

I would like to use another module for handling instrumentation, the only issue is that it also implements instrument/3 and that ambiguity creates a problem, how can I use Opus without that function being in the scope I am in?

Accept a function in :error_message

Current behavior

defmodule ArithmeticPipeline do
  use Opus.Pipeline

  step  :add_one,         with: &(&1 + 1)
  check :even?,           with: &(rem(&1, 2) == 0), error_message: :expected_an_even
  step :do_something_else

The error message should accept a function which will be called with the return value of the previous stage so that the error message can be parameterised.

Halting the pipeline without raising an error

Let's suppose we have a pipeline to create a user.

defmodule CreateUserPipeline do
  use Opus.Pipeline

  check :valid_params?
  step :fetch_avatar
  step :encrypt_password
  step :persist_user
  step :notify_clients

  # ...

Now, imagine that I don't want to run over all this pipeline if the user already exists. I can imagine two options to solve that:

  1. Creating a call?/1 function in my module to be called before .call/1. This might work, but it forces the client of the module to know that. In my opinion, it can makes sense but sometimes not (sometimes, the pipeline itself should takes care of it as a defense).

  2. Should I add a check at the beginning of the pipeline to halt the pipeline before all these steps? It can work, but then Opus will return a {:error, _} tuple. This might make sense most of the times, but sometimes it is not an error. It's just that all the params are correct but the pipeline should skip all the process of creating the user and return something like, {:ok, "User already exists"}.

So, in my opinion, it would be awesome to have a way to add a check at the beginning of the pipeline and add an option to return a custom tuple in case of false, like:

check :user_exists, on_false: {:ok, "User already exists"}

Makes sense?

Add "transform" to Link

I'm trying to reuse as much logic as possible in my pipelines.
As such, I'm having some problems when trying to work with them.

Let's imagine I have a message with several different kinds of data, for example: the same message has a device position (lat, lng), a state ("is the light turned on?") and a numeric value ("my current voltage is...")

Since the device sends all of those together, I'd need to parse and validate that message in a single pipeline.
And, since that data is already in the pipeline, I should be able to send each part of the message to a specific Pipeline to handle those.

For the position: StorePositionPipeline which stores the location and updates the "this is the current position of the device".

For the state: StoreStatePipeline which takes a state and a state name, and stores it.

For the numeric value: StoreNumericValuePipeline and VerifyVoltagePipeline. The first one is the same as StoreStatePipeline, but keeps track of numbers instead of enums. The second one has to do several things: check if the voltage is in the safe zone, if not maybe create an alert if there wasn't an alert already open for that device, ...

I am currently using a step and then calling the pipeline's call/1, since each one of those Pipelines expects its input in a different format.

By adding a transform to Link, I'd be able to write my pipeline as:

link StorePositionPipeline, transform: &extract_latlng/1
link StoreStatePipeline, transform: &extract_light_state/1
...

And that'd be much more readable IMHO than messing with steps just to transform the data and do a call.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.