Giter VIP home page Giter VIP logo

eio's Introduction

API reference | #eio Matrix chat | Dev meetings

Eio — Effects-Based Parallel IO for OCaml

Eio provides an effects-based direct-style IO stack for OCaml 5. For example, you can use Eio to read and write files, make network connections, or perform CPU-intensive calculations, running multiple operations at the same time. It aims to be easy to use, secure, well documented, and fast. A generic cross-platform API is implemented by optimised backends for different platforms. Eio replaces existing concurrency libraries such as Lwt (Eio and Lwt libraries can also be used together).

Contents

Motivation

The Unix library provided with OCaml uses blocking IO operations, and is not well suited to concurrent programs such as network services or interactive applications. For many years, the solution was to use libraries such as Lwt and Async, which provide a monadic interface. These libraries allow writing code as if there were multiple threads of execution, each with their own stack, but the stacks are simulated using the heap.

OCaml 5 added support for "effects", removing the need for monadic code here. Using effects brings several advantages:

  1. It's faster, because no heap allocations are needed to simulate a stack.
  2. Concurrent code can be written in the same style as plain non-concurrent code.
  3. Because a real stack is used, backtraces from exceptions work as expected.
  4. Other features of the language (such as try ... with ...) can be used in concurrent code.

Additionally, modern operating systems provide high-performance alternatives to the old Unix select call. For example, Linux's io_uring system has applications write the operations they want to perform to a ring buffer, which Linux handles asynchronously, and Eio can take advantage of this.

Please try porting your programs to use Eio and submit PRs or open issues when you find problems. Remember that you can always fall back to using Lwt libraries to provide missing features if necessary. See Awesome Multicore OCaml for links to work migrating other projects to Eio.

Eio packages

  • Eio provides concurrency primitives (promises, etc.) and a high-level, cross-platform OS API.
  • Eio_posix provides a cross-platform backend for these APIs for POSIX-type systems.
  • Eio_linux provides a Linux io_uring backend for these APIs.
  • Eio_windows is for use on Windows (incomplete - help wanted).
  • Eio_main selects an appropriate backend (e.g. eio_linux or eio_posix), depending on your platform.
  • Eio_js allows Eio code to run in the browser, using js_of_ocaml.

Getting OCaml 5.1

You'll need OCaml 5.1.0 or later. You can either install it yourself or build the included Dockerfile.

To install it yourself:

  1. Make sure you have opam 2.1 or later (run opam --version to check).

  2. Use opam to install OCaml:

    opam switch create 5.1.1
    

Getting Eio

Install eio_main (and utop if you want to try it interactively):

opam install eio_main utop

If you want to install the latest unreleased development version of Eio, see HACKING.md.

Running Eio

Try out the examples interactively by running utop in the shell.

First require the eio_main library. It's also convenient to open the Eio.Std module, as follows. (The leftmost # shown below is the Utop prompt, so enter the text after the prompt and return after each line.)

# #require "eio_main";;
# open Eio.Std;;

This function writes a greeting to out using Eio.Flow:

let main out =
  Eio.Flow.copy_string "Hello, world!\n" out

We use Eio_main.run to run the event loop and call main from there:

# Eio_main.run @@ fun env ->
  main (Eio.Stdenv.stdout env);;
Hello, world!
- : unit = ()

Note that:

  • The env argument represents the standard environment of a Unix process, allowing it to interact with the outside world. A program will typically start by extracting from env whatever things the program will need and then calling main with them.

  • The type of the main function here tells us that this program only interacts via the out flow.

  • Eio_main.run automatically calls the appropriate run function for your platform. For example, on Linux this will call Eio_linux.run. For non-portable code you can use the platform-specific library directly.

This example can also be built using dune; see examples/hello.

Testing with Mocks

Because external resources are provided to main as arguments, we can easily replace them with mocks for testing. For example, instead of giving main the real standard output, we can have it write to a buffer:

# Eio_main.run @@ fun _env ->
  let buffer = Buffer.create 20 in
  main (Eio.Flow.buffer_sink buffer);
  traceln "Main would print %S" (Buffer.contents buffer);;
+Main would print "Hello, world!\n"
- : unit = ()

Eio.traceln provides convenient printf-style debugging, without requiring you to plumb stderr through your code. It uses the Format module, so you can use the extended formatting directives here too.

The Eio_mock library provides some convenient pre-built mocks:

# #require "eio.mock";;
# Eio_main.run @@ fun _env ->
  main (Eio_mock.Flow.make "mock-stdout");;
+mock-stdout: wrote "Hello, world!\n"
- : unit = ()

Fibers

Here's an example running two threads of execution concurrently using Eio.Fiber:

let main _env =
  Fiber.both
    (fun () -> for x = 1 to 3 do traceln "x = %d" x; Fiber.yield () done)
    (fun () -> for y = 1 to 3 do traceln "y = %d" y; Fiber.yield () done);;
# Eio_main.run main;;
+x = 1
+y = 1
+x = 2
+y = 2
+x = 3
+y = 3
- : unit = ()

The two fibers run on a single core, so only one can be running at a time. Calling an operation that performs an effect (such as yield) can switch to a different thread.

Tracing

When OCaml's tracing is turned on, Eio writes events about many actions, such as creating fibers or resolving promises.

You can use eio-trace to capture a trace and display it in a window. For example, this is a trace of the counting example above:

dune build ./examples
eio-trace run -- ./_build/default/examples/both/main.exe

The upper horizontal bar is the initial fiber, and the brackets show Fiber.both creating a second fiber. The green segments show when each fiber is running. Note that the output from traceln appears in the trace as well as on the console. In the eio-trace window, scrolling with the mouse or touchpad will zoom in or out of the diagram.

There are various third-party tools that can also consume this data (but may currently require patches to support the new system):

  • Meio (Monitoring for Eio) provides an interactive console-based UI for exploring running fibers.
  • Olly can save Perfetto traces and report statistics.

examples/trace shows how to consume the events manually.

Cancellation

Every fiber has a cancellation context. If one of the Fiber.both fibers fails, the other is cancelled:

# Eio_main.run @@ fun _env ->
  Fiber.both
    (fun () -> for x = 1 to 3 do traceln "x = %d" x; Fiber.yield () done)
    (fun () -> failwith "Simulated error");;
+x = 1
Exception: Failure "Simulated error".

What happened here was:

  1. Fiber.both created a new cancellation context for the child fibers.
  2. The first fiber (the lower one in the diagram) ran, printed x = 1 and yielded.
  3. The second fiber raised an exception.
  4. Fiber.both caught the exception and cancelled the context.
  5. The first thread's yield raised a Cancelled exception there.
  6. Once both threads had finished, Fiber.both re-raised the original exception.

There is a tree of cancellation contexts for each domain, and every fiber is in one context. When an exception is raised, it propagates towards the root until handled, cancelling the other branches as it goes. You should assume that any operation that can switch fibers can also raise a Cancelled exception if an uncaught exception reaches one of its ancestor cancellation contexts.

If you want to make an operation non-cancellable, wrap it with Cancel.protect (this creates a new context that isn't cancelled with its parent).

Racing

Fiber.first returns the result of the first fiber to finish, cancelling the other one:

# Eio_main.run @@ fun _env ->
  let x =
    Fiber.first
      (fun () ->
        traceln "first fiber delayed...";
        Fiber.yield ();
        traceln "delay over";
        "a"
      )
      (fun () -> "b")
  in
  traceln "x = %S" x;;
+first fiber delayed...
+x = "b"
- : unit = ()

Note: using Fiber.first to ensure that exactly one of two actions is performed is not reliable. There is usually a possibility that both actions succeed at the same time (and one result is thrown away). For example, if you ask Eio read from two sockets with io_uring then the kernel may have already performed both reads by the time it tells Eio about the first one.

Switches

A switch is used to group fibers together, so they can be waited on together. This is a form of structured concurrency. For example:

# Eio_main.run @@ fun _env ->
  Switch.run (fun sw ->
    Fiber.fork ~sw
      (fun () -> for i = 1 to 3 do traceln "i = %d" i; Fiber.yield () done);
    traceln "First thread forked";
    Fiber.fork ~sw
      (fun () -> for j = 1 to 3 do traceln "j = %d" j; Fiber.yield () done);
    traceln "Second thread forked; top-level code is finished"
  );
  traceln "Switch is finished";;
+i = 1
+First thread forked
+j = 1
+Second thread forked; top-level code is finished
+i = 2
+j = 2
+i = 3
+j = 3
+Switch is finished
- : unit = ()

Switch.run fn creates a new switch sw and runs fn sw. fn may spawn new fibers and attach them to the switch. It may also attach other resources such as open file handles. Switch.run waits until fn and all other attached fibers have finished, and then releases any attached resources (e.g. closing all attached file handles).

If you call a function without giving it access to a switch, then when the function returns you can be sure that any fibers it spawned have finished, and any files it opened have been closed. This works because Eio does not provide e.g. a way to open a file without attaching it to a switch. If a function doesn't have a switch and wants to open a file, it must use Switch.run to create one. But then the function can't return until Switch.run does, at which point the file is closed.

So, a Switch.run puts a bound on the lifetime of things created within it, leading to clearer code and avoiding resource leaks. The Fiber.fork call above creates a new fiber that continues running after fork returns, so it needs to take a switch argument.

Every switch also creates a new cancellation context. You can use Switch.fail to mark the switch as failed and cancel all fibers within it. The exception (or exceptions) passed to fail will be raised by run when the fibers have exited.

Networking

Eio provides an API for networking. Here is a server connection handler that handles an incoming connection by sending the client a message:

let handle_client flow _addr =
  traceln "Server: got connection from client";
  Eio.Flow.copy_string "Hello from server" flow

We can test it using a mock flow:

# Eio_mock.Backend.run @@ fun () ->
  let flow = Eio_mock.Flow.make "flow" in
  let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 37568) in
  handle_client flow addr;;
+Server: got connection from client
+flow: wrote "Hello from server"
- : unit = ()

Note: Eio_mock.Backend.run can be used instead of Eio_main.run for tests that don't access the outside environment at all. It doesn't support multiple domains, but this allows it to detect deadlocks automatically (a multi-domain loop has to assume it might get an event from another domain, and so must keep waiting).

Here is a client that connects to address addr using network net and reads a message:

let run_client ~net ~addr =
  Switch.run ~name:"client" @@ fun sw ->
  traceln "Client: connecting to server";
  let flow = Eio.Net.connect ~sw net addr in
  (* Read all data until end-of-stream (shutdown): *)
  traceln "Client: received %S" (Eio.Flow.read_all flow)

Note: the flow is attached to sw and will be closed automatically when it finishes. We also named the switch here; this will appear in the trace output (see below).

This can also be tested on its own using a mock network:

# Eio_mock.Backend.run @@ fun () ->
  let net = Eio_mock.Net.make "mocknet" in
  let flow = Eio_mock.Flow.make "flow" in
  Eio_mock.Net.on_connect net [`Return flow];
  Eio_mock.Flow.on_read flow [
    `Return "(packet 1)";
    `Yield_then (`Return "(packet 2)");
    `Raise End_of_file;
  ];
  let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 8080) in
  run_client ~net ~addr;;
+Client: connecting to server
+mocknet: connect to tcp:127.0.0.1:8080
+flow: read "(packet 1)"
+flow: read "(packet 2)"
+Client: received "(packet 1)(packet 2)"
+flow: closed
- : unit = ()

Eio.Net.run_server runs a loop accepting clients and handling them (concurrently):

let run_server socket =
  Eio.Net.run_server socket handle_client
    ~on_error:(traceln "Error handling connection: %a" Fmt.exn)

Note: when handle_client finishes, run_server closes the flow automatically.

We can now run the client and server together using the real network (in a single process):

let main ~net ~addr =
  Switch.run ~name:"main" @@ fun sw ->
  let server = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in
  Fiber.fork_daemon ~sw (fun () -> run_server server);
  run_client ~net ~addr

Fiber.fork_daemon creates a new fiber and then cancels it when the switch finishes. We need that here because otherwise the server would keep waiting for new connections and the test would never finish.

# Eio_main.run @@ fun env ->
  main
    ~net:(Eio.Stdenv.net env)
    ~addr:(`Tcp (Eio.Net.Ipaddr.V4.loopback, 8080));;
+Client: connecting to server
+Server: got connection from client
+Client: received "Hello from server"
- : unit = ()

See examples/net for a more complete example.

Design Note: Capabilities

Eio follows the principles of capability-based security. The key idea here is that the lambda calculus already contains a perfectly good security system: a function can only access things that are in its scope. If we can avoid breaking this model (for example, by adding global variables to our language) then we can reason about the security properties of code quite easily.

Consider the network example in the previous section. Imagine this is a large program and we want to know:

  1. Does this program modify the filesystem?
  2. Does this program send telemetry data over the network?

In a capability-safe language, we don't have to read the entire code-base to find the answers:

  • All authority starts at the (privileged) Eio_main.run function with the env parameter, so we must check this code.

  • Only env's network access is used, so we know this program doesn't access the filesystem, answering question 1 immediately.

  • To check whether telemetry is sent, we need to follow the net authority as it is passed to main.

  • main uses net to open a listening socket on the loopback interface, which it passes to run_server. run_server does not get the full net access, so we probably don't need to read that code; however, we might want to check whether we granted other parties access to this port on our loopback network.

  • run_client does get net, so we do need to read that. We could make that code easier to audit by passing it (fun () -> Eio.Net.connect net addr) instead of net . Then we could see that run_client could only connect to our loopback address.

Since OCaml is not a capability language, code can ignore Eio and use the non-capability APIs directly. However, it still makes non-malicious code easier to understand and test, and may allow for an extension to the language in the future.

The Lambda Capabilities blog post provides a more detailed introduction to capabilities, written for functional programmers.

Buffered Reading and Parsing

Reading from an Eio flow directly may give you more or less data than you wanted. For example, if you want to read a line of text from a TCP stream, the flow will tend to give you the data in packet-sized chunks, not lines. To solve this, you can wrap the flow with a buffer and read from that.

Here's a simple command-line interface that reads stdin one line at a time:

let cli ~stdin ~stdout =
  let buf = Eio.Buf_read.of_flow stdin ~initial_size:100 ~max_size:1_000_000 in
  while true do
    let line = Eio.Buf_read.line buf in
    traceln "> %s" line;
    match line with
    | "h" | "help" -> Eio.Flow.copy_string "It's just an example\n" stdout
    | x -> Eio.Flow.copy_string (Fmt.str "Unknown command %S\n" x) stdout
  done

Let's try it with some test data (you could use the real stdin if you prefer):

# Eio_main.run @@ fun env ->
  cli
    ~stdin:(Eio.Flow.string_source "help\nexit\nquit\nbye\nstop\n")
    ~stdout:(Eio.Stdenv.stdout env);;
+> help
It's just an example
+> exit
Unknown command "exit"
+> quit
Unknown command "quit"
+> bye
Unknown command "bye"
+> stop
Unknown command "stop"
Exception: End_of_file.

Buf_read.of_flow allocates an internal buffer (with the given initial_size). When you try to read a line from it, it will take a whole line from the buffer if possible. If not, it will ask the underlying flow for the next chunk of data, until it has enough.

For high performance applications, you should use a larger initial buffer so that fewer reads on the underlying flow are needed.

If the user enters a line that doesn't fit in the buffer then the buffer will be enlarged as needed. However, it will raise an exception if the buffer would need to grow above max_size. This is useful when handling untrusted input, since otherwise when you try to read one line an attacker could just keep sending e.g. 'x' characters until your service ran out of memory and crashed.

As well as calling individual parsers (like line) directly, you can also build larger parsers from smaller ones. For example:

open Eio.Buf_read.Syntax

type message = { src : string; body : string }

let message =
  let+ src = Eio.Buf_read.(string "FROM:" *> line)
  and+ body = Eio.Buf_read.take_all in
  { src; body }
# Eio_main.run @@ fun _ ->
  let flow = Eio.Flow.string_source "FROM:Alice\nHello!\n" in
  match Eio.Buf_read.parse message flow ~max_size:1024 with
  | Ok { src; body } -> traceln "%s sent %S" src body
  | Error (`Msg err) -> traceln "Parse failed: %s" err;;
+Alice sent "Hello!\n"
- : unit = ()

Buffered Writing

For performance, it's often useful to batch up writes and send them all in one go. For example, consider sending an HTTP response without buffering:

let send_response socket =
  Eio.Flow.copy_string "HTTP/1.1 200 OK\r\n" socket;
  Eio.Flow.copy_string "\r\n" socket;
  Fiber.yield ();       (* Simulate delayed generation of body *)
  Eio.Flow.copy_string "Body data" socket
# Eio_main.run @@ fun _ ->
  send_response (Eio_mock.Flow.make "socket");;
+socket: wrote "HTTP/1.1 200 OK\r\n"
+socket: wrote "\r\n"
+socket: wrote "Body data"
- : unit = ()

The socket received three writes, perhaps sending three separate packets over the network. We can wrap a flow with Eio.Buf_write to avoid this:

module Write = Eio.Buf_write

let send_response socket =
  Write.with_flow socket @@ fun w ->
  Write.string w "HTTP/1.1 200 OK\r\n";
  Write.string w "\r\n";
  Fiber.yield ();       (* Simulate delayed generation of body *)
  Write.string w "Body data"
# Eio_main.run @@ fun _ ->
  send_response (Eio_mock.Flow.make "socket");;
+socket: wrote "HTTP/1.1 200 OK\r\n"
+              "\r\n"
+socket: wrote "Body data"
- : unit = ()

Now the first two writes were combined and sent together.

Error Handling

Errors interacting with the outside world are indicated by the Eio.Io (err, context) exception. This is roughly equivalent to the Unix.Unix_error exception from the OCaml standard library.

The err field describes the error using nested error codes, allowing you to match on either specific errors or whole classes of errors at once. For example:

let test r =
  try Eio.Buf_read.line r
  with
  | Eio.Io (Eio.Net.E Connection_reset Eio_unix.Unix_error _, _) -> "Unix connection reset"
  | Eio.Io (Eio.Net.E Connection_reset _, _) -> "Connection reset"
  | Eio.Io (Eio.Net.E _, _) -> "Some network error"
  | Eio.Io _ -> "Some I/O error"

For portable code, you will want to avoid matching backend-specific errors, so you would avoid the first case. The Eio.Io type is extensible, so libraries can also add additional top-level error types if needed.

Io errors also allow adding extra context information to the error. For example, this HTTP GET function adds the URL to any IO error:

let get ~net ~host ~path =
  try
    Eio.Net.with_tcp_connect net ~host ~service:"http" @@ fun _flow ->
    "..."
  with Eio.Io _ as ex ->
    let bt = Printexc.get_raw_backtrace () in
    Eio.Exn.reraise_with_context ex bt "fetching http://%s/%s" host path;;

If we test it using a mock network that returns a timeout, we get a useful error message telling us the IP address and port of the failed attempt, extended with the hostname we used to get that, and then extended again by our get function with the full URL:

# Eio_mock.Backend.run @@ fun () ->
  let net = Eio_mock.Net.make "mocknet" in
  Eio_mock.Net.on_getaddrinfo net [`Return [`Tcp (Eio.Net.Ipaddr.V4.loopback, 80)]];
  Eio_mock.Net.on_connect net [`Raise (Eio.Net.err (Connection_failure Timeout))];
  get ~net ~host:"example.com" ~path:"index.html";;
+mocknet: getaddrinfo ~service:http example.com
+mocknet: connect to tcp:127.0.0.1:80
Exception:
Eio.Io Net Connection_failure Timeout,
  connecting to tcp:127.0.0.1:80,
  connecting to "example.com":http,
  fetching http://example.com/index.html

To get more detailed information, you can enable backtraces by setting OCAMLRUNPARAM=b or by calling Printexc.record_backtrace true, as usual.

When writing MDX tests that depend on getting the exact error output, it can be annoying to have the full backend-specific error displayed:

# Eio_main.run @@ fun env ->
  let net = Eio.Stdenv.net env in
  Switch.run @@ fun sw ->
  Eio.Net.connect ~sw net (`Tcp (Eio.Net.Ipaddr.V4.loopback, 1234));;
Exception:
Eio.Io Net Connection_failure Refused Unix_error (Connection refused, "connect", ""),
  connecting to tcp:127.0.0.1:1234

If we ran this using another backend, the Unix_error part might change. To avoid this problem, you can use Eio.Exn.Backend.show to hide the backend-specific part of errors:

# Eio.Exn.Backend.show := false;;
- : unit = ()

# Eio_main.run @@ fun env ->
  let net = Eio.Stdenv.net env in
  Switch.run @@ fun sw ->
  Eio.Net.connect ~sw net (`Tcp (Eio.Net.Ipaddr.V4.loopback, 1234));;
Exception:
Eio.Io Net Connection_failure Refused _,
  connecting to tcp:127.0.0.1:1234

We'll leave it like that for the rest of this file, so the examples can be tested automatically by MDX.

Filesystem Access

Access to the filesystem is performed using Eio.Path. An 'a Path.t is a pair of a capability to a base directory (of type 'a) and a string path relative to that. To append to the string part, it's convenient to use the / operator:

let ( / ) = Eio.Path.( / )

env provides two initial paths:

  • cwd restricts access to files beneath the current working directory.
  • fs provides full access (just like OCaml's stdlib).

You can save a whole file using Path.save:

# Eio_main.run @@ fun env ->
  let path = Eio.Stdenv.cwd env / "test.txt" in
  traceln "Saving to %a" Eio.Path.pp path;
  Eio.Path.save ~create:(`Exclusive 0o600) path "line one\nline two\n";;
+Saving to <cwd:test.txt>
- : unit = ()

For more control, use Path.open_out (or with_open_out) to get a flow.

To load a file, you can use load to read the whole thing into a string, Path.open_in (or with_open_in) to get a flow, or Path.with_lines to stream the lines (a convenience function that uses Buf_read.lines):

# Eio_main.run @@ fun env ->
  let path = Eio.Stdenv.cwd env / "test.txt" in
  Eio.Path.with_lines path (fun lines ->
     Seq.iter (traceln "Processing %S") lines
  );;
+Processing "line one"
+Processing "line two"
- : unit = ()

Access to cwd only grants access to that sub-tree:

let try_save path data =
  match Eio.Path.save ~create:(`Exclusive 0o600) path data with
  | () -> traceln "save %a : ok" Eio.Path.pp path
  | exception ex -> traceln "%a" Eio.Exn.pp ex

let try_mkdir path =
  match Eio.Path.mkdir path ~perm:0o700 with
  | () -> traceln "mkdir %a : ok" Eio.Path.pp path
  | exception ex -> traceln "%a" Eio.Exn.pp ex
# Eio_main.run @@ fun env ->
  let cwd = Eio.Stdenv.cwd env in
  try_mkdir (cwd / "dir1");
  try_mkdir (cwd / "../dir2");
  try_mkdir (cwd / "/tmp/dir3");;
+mkdir <cwd:dir1> : ok
+Eio.Io Fs Permission_denied _, creating directory <cwd:../dir2>
+Eio.Io Fs Permission_denied _, creating directory <cwd:/tmp/dir3>
- : unit = ()

The checks also apply to following symlinks:

# Unix.symlink "dir1" "link-to-dir1";
  Unix.symlink (Filename.get_temp_dir_name ()) "link-to-tmp";;
- : unit = ()

# Eio_main.run @@ fun env ->
  let cwd = Eio.Stdenv.cwd env in
  try_save (cwd / "dir1/file1") "A";
  try_save (cwd / "link-to-dir1/file2") "B";
  try_save (cwd / "link-to-tmp/file3") "C";;
+save <cwd:dir1/file1> : ok
+save <cwd:link-to-dir1/file2> : ok
+Eio.Io Fs Permission_denied _, opening <cwd:link-to-tmp/file3>
- : unit = ()

You can use open_dir (or with_open_dir) to create a restricted capability to a subdirectory:

# Eio_main.run @@ fun env ->
  let cwd = Eio.Stdenv.cwd env in
  Eio.Path.with_open_dir (cwd / "dir1") @@ fun dir1 ->
  try_save (dir1 / "file4") "D";
  try_save (dir1 / "../file5") "E";;
+save <dir1:file4> : ok
+Eio.Io Fs Permission_denied _, opening <dir1:../file5>
- : unit = ()

You only need to use open_dir if you want to create a new sandboxed environment. You can use a single base directory object to access all paths beneath it, and this allows following symlinks within that subtree.

A program that operates on the current directory will probably want to use cwd, whereas a program that accepts a path from the user will probably want to use fs, perhaps with open_dir to constrain all access to be within that directory.

On systems that provide the cap_enter system call, you can ask the OS to reject accesses that don't use capabilities. examples/capsicum/ contains an example that restricts itself to using a directory passed on the command-line, and then tries reading /etc/passwd via the stdlib. Running on FreeBSD, you should see:

mkdir /tmp/cap
dune exec -- ./examples/capsicum/main.exe /tmp/cap
+Opened directory <fs:/tmp/cap>        
+Capsicum mode enabled
+Using the file-system via the directory resource works:
+Writing <cap:capsicum-test.txt>...
+Read: "A test file"
+Bypassing Eio and accessing other resources should fail in Capsicum mode:
Fatal error: exception Sys_error("/etc/passwd: Not permitted in capability mode")

Running processes

Spawning a child process can be done using the Eio.Process module:

# Eio_main.run @@ fun env ->
  let proc_mgr = Eio.Stdenv.process_mgr env in
  Eio.Process.run proc_mgr ["echo"; "hello"];;
hello
- : unit = ()

There are various optional arguments for setting the process's current directory or connecting up the standard streams. For example, we can use tr to convert some text to upper-case:

# Eio_main.run @@ fun env ->
  let proc_mgr = Eio.Stdenv.process_mgr env in
  Eio.Process.run proc_mgr ["tr"; "a-z"; "A-Z"]
    ~stdin:(Eio.Flow.string_source "One two three\n");;
ONE TWO THREE
- : unit = ()

If you want to capture the output of a process, you can provide a suitable Eio.Flow.sink as the stdout argument, or use the parse_out convenience wrapper:

# Eio_main.run @@ fun env ->
  let proc_mgr = Eio.Stdenv.process_mgr env in
  Eio.Process.parse_out proc_mgr Eio.Buf_read.line ["echo"; "hello"];;
- : string = "hello"

All process functions either return the exit status or check that it was zero (success):

# Eio_main.run @@ fun env ->
  let proc_mgr = Eio.Stdenv.process_mgr env in
  Eio.Process.parse_out proc_mgr Eio.Buf_read.take_all ["sh"; "-c"; "exit 3"];;
Exception:
Eio.Io Process Child_error Exited (code 3),
  running command: sh -c "exit 3"

Process.spawn and Process.await give more control over the process's lifetime and exit status, and Eio_unix.Process gives more control over passing file descriptors (on systems that support them).

Time

The standard environment provides a clock with the usual POSIX time:

# Eio_main.run @@ fun env ->
  let clock = Eio.Stdenv.clock env in
  traceln "The time is now %f" (Eio.Time.now clock);;
+The time is now 1623940778.270336
- : unit = ()

The mock backend provides a mock clock that advances automatically where there is nothing left to do:

# Eio_mock.Backend.run_full @@ fun env ->
  let clock = Eio.Stdenv.clock env in
  traceln "Sleeping for five seconds...";
  Eio.Time.sleep clock 5.0;
  traceln "Resumed";;
+Sleeping for five seconds...
+mock time is now 5
+Resumed
- : unit = ()

Note: You could also just use Eio_unix.sleep 5.0 if you don't want to pass a clock around. This is especially useful if you need to insert a delay for some quick debugging.

Multicore Support

OCaml allows a program to create multiple domains in which to run code, allowing multiple CPUs to be used at once. Fibers are scheduled cooperatively within a single domain, but fibers in different domains run in parallel. This is useful to perform CPU-intensive operations quickly (though extra care needs to be taken when using multiple cores; see the Multicore Guide for details).

Domain Manager

Eio.Domain_manager provides a basic API for spawning domains. For example, let's say we have a CPU intensive task:

let sum_to n =
  traceln "Starting CPU-intensive task...";
  let total = ref 0 in
  for i = 1 to n do
    total := !total + i
  done;
  traceln "Finished";
  !total

We can use the domain manager to run this in a separate domain:

let main ~domain_mgr =
  let test n =
    traceln "sum 1..%d = %d" n
      (Eio.Domain_manager.run domain_mgr
        (fun () -> sum_to n))
  in
  Fiber.both
    (fun () -> test 100000)
    (fun () -> test 50000)
# Eio_main.run @@ fun env ->
  main ~domain_mgr:(Eio.Stdenv.domain_mgr env);;
+Starting CPU-intensive task...
+Starting CPU-intensive task...
+Finished
+sum 1..50000 = 1250025000
+Finished
+sum 1..100000 = 5000050000
- : unit = ()

Notes:

  • traceln can be used safely from multiple domains. It takes a mutex, so that trace lines are output atomically.
  • The exact traceln output of this example is non-deterministic, because the OS is free to schedule domains as it likes.
  • You must ensure that the function passed to run doesn't have access to any non-threadsafe values. The type system does not check this.
  • Domain_manager.run waits for the domain to finish, but it allows other fibers to run while waiting. This is why we use Fiber.both to create multiple fibers.

Executor Pool

An Eio.Executor_pool distributes jobs among a pool of domain workers. Domains are reused and can execute multiple jobs concurrently.

Each domain worker starts new jobs until the total ~weight of its running jobs reaches 1.0. The ~weight represents the expected proportion of a CPU core that the job will take up. Jobs are queued up if they cannot be started immediately due to all domain workers being busy (>= 1.0).

This is the recommended way of leveraging OCaml 5's multicore capabilities.

Usually you will only want one pool for an entire application, so the pool is typically created when the application starts:

let () =
  Eio_main.run @@ fun env ->
  Switch.run @@ fun sw ->
  let pool =
    Eio.Executor_pool.create
      ~sw (Eio.Stdenv.domain_mgr env)
      ~domain_count:4
  in
  main ~pool

The pool starts its domain workers immediately upon creation.

The pool will not block our switch sw from completing; when the switch finishes, all domain workers and running jobs are cancelled.

~domain_count is the number of domain workers to create. The total number of domains should not exceed Domain.recommended_domain_count or the number of cores on your system.

We can run the previous example using an Executor Pool like this:

let main ~domain_mgr =
  Switch.run @@ fun sw ->
  let pool =
    Eio.Executor_pool.create ~sw domain_mgr ~domain_count:4
  in
  let test n =
    traceln "sum 1..%d = %d" n
      (Eio.Executor_pool.submit_exn pool ~weight:1.0
        (fun () -> sum_to n))
  in
  Fiber.both
    (fun () -> test 100000)
    (fun () -> test 50000)
# Eio_main.run @@ fun env ->
  main ~domain_mgr:(Eio.Stdenv.domain_mgr env);;
+Starting CPU-intensive task...
+Starting CPU-intensive task...
+Finished
+sum 1..50000 = 1250025000
+Finished
+sum 1..100000 = 5000050000
- : unit = ()

~weight is the anticipated proportion of a CPU core used by the job. In other words, the fraction of time actively spent executing OCaml code, not just waiting for I/O or system calls. In the above code snippet we use ~weight:1.0 because the job is entirely CPU-bound: it never waits for I/O or other syscalls. ~weight must be >= 0.0 and <= 1.0. Example: given an IO-bound job that averages 2% of one CPU core, pass ~weight:0.02.

Each domain worker starts new jobs until the total ~weight of its running jobs reaches 1.0.

Synchronisation Tools

Eio provides several sub-modules for communicating between fibers, and these work even when the fibers are running in different domains.

Promises

Promises are a simple and reliable way to communicate between fibers. One fiber can wait for a promise and another can resolve it:

# Eio_main.run @@ fun _ ->
  let promise, resolver = Promise.create () in
  Fiber.both
    (fun () ->
      traceln "Waiting for promise...";
      let x = Promise.await promise in
      traceln "x = %d" x
    )
    (fun () ->
      traceln "Resolving promise";
      Promise.resolve resolver 42
    );;
+Waiting for promise...
+Resolving promise
+x = 42
- : unit = ()

A promise is initially "unresolved", and can only be resolved once. Awaiting a promise that is already resolved immediately returns the resolved value.

Promises are one of the easiest tools to use safely: it doesn't matter whether you wait on a promise before or after it is resolved, and multiple fibers can wait for the same promise and will get the same result. Promises are thread-safe; you can wait for a promise in one domain and resolve it in another.

Promises are also useful for integrating with callback-based libraries. For example:

let wrap fn x =
  let promise, resolver = Promise.create () in
  fn x
    ~on_success:(Promise.resolve_ok resolver)
    ~on_error:(Promise.resolve_error resolver);
  Promise.await_exn promise

Example: Concurrent Cache

Here's an example using promises to cache lookups, with the twist that another user might ask the cache for the value while it's still adding it. We don't want to start a second fetch in that case, so instead we just store promises in the cache:

let make_cache fn =
  let tbl = Hashtbl.create 10 in
  fun key ->
    match Hashtbl.find_opt tbl key with
    | Some p -> Promise.await_exn p
    | None ->
      let p, r = Promise.create () in
      Hashtbl.add tbl key p;
      match fn key with
      | v -> Promise.resolve_ok r v; v
      | exception ex -> Promise.resolve_error r ex; raise ex

Notice that we store the new promise in the cache immediately, without doing anything that might switch to another fiber.

We can use it like this:

# let fetch url =
    traceln "Fetching %S..." url;
    Fiber.yield ();             (* Simulate work... *)
    traceln "Got response for %S" url;
    if url = "http://example.com" then "<h1>Example.com</h1>"
    else failwith "404 Not Found";;
val fetch : string -> string = <fun>

# Eio_main.run @@ fun _ ->
  let c = make_cache fetch in
  let test url =
    traceln "Requesting %s..." url;
    match c url with
    | page -> traceln "%s -> %s" url page
    | exception ex -> traceln "%s -> %a" url Fmt.exn ex
  in
  Fiber.List.iter test [
    "http://example.com";
    "http://example.com";
    "http://bad.com";
    "http://bad.com";
  ];;
+Requesting http://example.com...
+Fetching "http://example.com"...
+Requesting http://example.com...
+Requesting http://bad.com...
+Fetching "http://bad.com"...
+Requesting http://bad.com...
+Got response for "http://example.com"
+http://example.com -> <h1>Example.com</h1>
+Got response for "http://bad.com"
+http://bad.com -> Failure("404 Not Found")
+http://example.com -> <h1>Example.com</h1>
+http://bad.com -> Failure("404 Not Found")
- : unit = ()

Fiber.List.iter is like List.iter but doesn't wait for each job to finish before starting the next. Notice that we made four requests, but only started two download operations.

This version of the cache remembers failed lookups too. You could modify it to remove the entry on failure, so that all clients currently waiting still fail, but any future client asking for the failed resource will trigger a new download.

This cache is not thread-safe. You will need to add a mutex if you want to share it between domains.

Streams

A stream is a bounded queue. Reading from an empty stream waits until an item is available. Writing to a full stream waits for space.

# Eio_main.run @@ fun _ ->
  let stream = Eio.Stream.create 2 in
  Fiber.both
    (fun () ->
       for i = 1 to 5 do
         traceln "Adding %d..." i;
         Eio.Stream.add stream i
       done
    )
    (fun () ->
       for i = 1 to 5 do
         let x = Eio.Stream.take stream in
         traceln "Got %d" x;
         Fiber.yield ()
       done
    );;
+Adding 1...
+Adding 2...
+Adding 3...
+Got 1
+Adding 4...
+Got 2
+Adding 5...
+Got 3
+Got 4
+Got 5
- : unit = ()

Here, we create a stream with a maximum size of 2 items. The first fiber added 1 and 2 to the stream, but had to wait before it could insert 3.

A stream with a capacity of 1 acts like a mailbox. A stream with a capacity of 0 will wait until both the sender and receiver are ready.

Streams are thread-safe and can be used to communicate between domains.

Example: Worker Pool

A useful pattern is a pool of workers reading from a stream of work items. Client fibers submit items to a stream and workers process the items:

let handle_job request =
  Fiber.yield ();       (* (simulated work) *)
  Printf.sprintf "Processed:%d" request

let rec run_worker id stream =
  let request, reply = Eio.Stream.take stream in
  traceln "Worker %s processing request %d" id request;
  Promise.resolve reply (handle_job request);
  run_worker id stream

let submit stream request =
  let reply, resolve_reply = Promise.create () in
  Eio.Stream.add stream (request, resolve_reply);
  Promise.await reply

Each item in the stream is a request payload and a resolver for the reply promise.

# Eio_main.run @@ fun env ->
  let domain_mgr = Eio.Stdenv.domain_mgr env in
  Switch.run @@ fun sw ->
  let stream = Eio.Stream.create 0 in
  let spawn_worker name =
    Fiber.fork_daemon ~sw (fun () ->
       Eio.Domain_manager.run domain_mgr (fun () ->
          traceln "Worker %s ready" name;
          run_worker name stream
       )
    )
  in
  spawn_worker "A";
  spawn_worker "B";
  Switch.run (fun sw ->
     for i = 1 to 3 do
       Fiber.fork ~sw (fun () ->
         traceln "Client %d submitting job..." i;
         traceln "Client %d got %s" i (submit stream i)
       );
       Fiber.yield ()
     done
  );;
+Worker A ready
+Worker B ready
+Client 1 submitting job...
+Worker A processing request 1
+Client 2 submitting job...
+Worker B processing request 2
+Client 3 submitting job...
+Client 1 got Processed:1
+Worker A processing request 3
+Client 2 got Processed:2
+Client 3 got Processed:3
- : unit = ()

We use a zero-capacity stream here, which means that the Stream.add doesn't succeed until a worker accepts the job. This is a good choice for a worker pool because it means that if the client fiber gets cancelled while waiting for a worker then the job will never be run. It's also more efficient, as 0-capacity streams use a lock-free algorithm that is faster when there are multiple domains. Note that, while the stream itself is 0-capacity, clients still queue up waiting to use it.

In the code above, any exception raised while processing a job will exit the whole program. We might prefer to handle exceptions by sending them back to the client and continuing:

let rec run_worker id stream =
  let request, reply = Eio.Stream.take stream in
  traceln "Worker %s processing request %d" id request;
  begin match handle_job request with
    | result -> Promise.resolve_ok reply result
    | exception ex -> Promise.resolve_error reply ex; Fiber.check ()
  end;
  run_worker id stream

The Fiber.check () checks whether the worker itself has been cancelled, and exits the loop if so. It's not actually necessary in this case, because if we continue instead then the following Stream.take will perform the check anyway.

Note: in a real system, you would probably use Eio.Executor_pool for this rather than making your own pool.

Mutexes and Semaphores

Eio also provides Mutex and Semaphore sub-modules. Each of these corresponds to the module with the same name in the OCaml standard library, but allows other fibers to run while waiting instead of blocking the whole domain. They are all safe to use in parallel from multiple domains.

  • Eio.Mutex provides mutual exclusion, so that only one fiber can access a resource at a time.
  • Eio.Semaphore generalises this to allow up to n fibers to access a resource at once.

For example, if we allow loading and saving data in a file there could be a problem if we try to load the data while a save is in progress. Protecting the file with a mutex will prevent that:

module Atomic_file = struct
  type 'a t = {
    path : 'a Eio.Path.t;
    mutex : Eio.Mutex.t;
  }

  let of_path path =
    { path; mutex = Eio.Mutex.create () }

  let save t data =
    Eio.Mutex.use_rw t.mutex ~protect:true (fun () ->
       Eio.Path.save t.path data ~create:(`Or_truncate 0o644)
    )

  let load t =
    Eio.Mutex.use_ro t.mutex (fun () ->
       Eio.Path.load t.path
    )
end

The ~protect:true in save makes the critical section non-cancellable, so that if a cancel happens during a save then we will finish writing the data first. It can be used like this:

# Eio_main.run @@ fun env ->
  let dir = Eio.Stdenv.cwd env in
  let t = Atomic_file.of_path (dir / "data") in
  Fiber.both
    (fun () -> Atomic_file.save t "some data")
    (fun () ->
      let data = Atomic_file.load t in
      traceln "Loaded: %S" data
    );;
+Loaded: "some data"
- : unit = ()

Note: In practice, a better way to make file writes atomic is to write the data to a temporary file and then atomically rename it over the old data. That will work even if the whole computer crashes, and does not delay cancellation.

If the operation being performed is very fast (such as updating some in-memory counters), then it is fine to use the standard library's Mutex instead.

If the operation does not switch fibers and the resource is only accessed from one domain, then no mutex is needed at all. For example:

(* No mutex needed if only used from a single domain: *)

let in_use = ref 10
let free = ref 0

let release () =
  incr free;
  decr in_use

Conditions

Eio.Condition allows a fiber to wait until some condition is true. For example:

module X = struct
  (* Note: this version is not safe to share across domains! *)

  type t = {
    mutable x : int;
    changed : Eio.Condition.t;
  }

  let make x = { x; changed = Eio.Condition.create () }

  let await_zero t =
    while t.x <> 0 do Eio.Condition.await_no_mutex t.changed done;
    traceln "x is now zero"

  let set t x =
    t.x <- x;
    Eio.Condition.broadcast t.changed;
    traceln "x set to %d" x
end
# Eio_mock.Backend.run @@ fun () ->
  let x = X.make 5 in
  Fiber.both
    (fun () ->
       traceln "Waiting for x to be 0";
       X.await_zero x
    )
    (fun () -> X.set x 0);;
+Waiting for x to be 0
+x set to 0
+x is now zero
- : unit = ()

Note that we need a loop in await_zero. This is needed because it's possible that another fiber might set it to zero and then set it to something else before the waiting fiber resumes.

The above version is not safe to share across domains, because await_zero relies on the value of x not changing after x is read but before await_no_mutex registers itself with the condition. Here's a domain-safe version:

module Y = struct
  (* Safe to share between domains. *)

  type t = {
    mutable y : int;
    mutex : Eio.Mutex.t;
    changed : Eio.Condition.t;
  }

  let make y = {
    y;
    mutex = Eio.Mutex.create ();
    changed = Eio.Condition.create ();
  }

  let await_zero t =
    Eio.Mutex.use_ro t.mutex (fun () ->
      while t.y <> 0 do Eio.Condition.await t.changed t.mutex done;
      traceln "y is now zero (at least until we release the mutex)"
    )

  let set t y =
    Eio.Mutex.use_rw t.mutex ~protect:true (fun () ->
       t.y <- y;
       Eio.Condition.broadcast t.changed;
       traceln "y set to %d" y
    );
end

Here, Eio.Condition.await registers itself with changed and only then releases the mutex, allowing other threads to change y. When it gets woken, it re-acquires the mutex.

# Eio_mock.Backend.run @@ fun () ->
  let y = Y.make 5 in
  Fiber.both
    (fun () ->
       traceln "Waiting for y to be 0";
       Y.await_zero y
    )
    (fun () -> Y.set y 0);;
+Waiting for y to be 0
+y set to 0
+y is now zero (at least until we release the mutex)
- : unit = ()

Conditions are more difficult to use correctly than e.g. promises or streams. In particular, it is easy to miss a notification due to broadcast getting called before await. However, they can be useful if used carefully.

Example: Signal handlers

On Unix-type systems, processes can react to signals. For example, pressing Ctrl-C will send the SIGINT (interrupt) signal.

Here is an example function that allows itself to be interrupted:

let run_op ~interrupted =
  Fiber.first
    (fun () ->
       Eio.Condition.await_no_mutex interrupted;
       traceln "Cancelled at user's request."
    )
    (fun () ->
       traceln "Running operation (Ctrl-C to cancel)...";
       Fiber.await_cancel ()       (* Simulated work *)
    )

Note that we don't need a mutex here. We're just waiting for the number of interrupts received to change, and, since that increases monotonically, once we get woken we always want to continue. Also, we don't care about missing interrupts from before this operation started.

The code here is quite subtle. We rely on the fact that the first branch of the Fiber.first runs first, and only starts running the second branch once await_no_mutex has finished registering. Thus, we never display the message telling the user to press Ctrl-C before we're ready to receive it. This isn't likely to matter if a human is responding to the message, but if the response is automated then the delay could matter.

To run this function, we need to install a signal handler. There are very few things that you can do safely in a signal handler. For example, you can't take a mutex in a signal handler because the signal might have interrupted a fiber that had already locked it. However, you can safely call Eio.Condition.broadcast:

# Eio_main.run @@ fun _env ->
  let interrupted = Eio.Condition.create () in
  let handle_signal (_signum : int) =
    (* Warning: we're in a signal handler now.
       Most operations are unsafe here, except for Eio.Condition.broadcast! *)
    Eio.Condition.broadcast interrupted
  in
  Sys.set_signal Sys.sigint (Signal_handle handle_signal);
  run_op ~interrupted;;
+Running operation (Ctrl-C to cancel)...
[ user presses Ctrl-C here ]
+Cancelled at user's request.
- : unit = ()

Another common pattern when using signals is using SIGHUP to tell an application to reload its configuration file:

let main ~config_changed =
  Eio.Condition.loop_no_mutex config_changed (fun () ->
      traceln "Reading configuration ('kill -SIGHUP %d' to reload)..." (Unix.getpid ());
      load_config ();
      traceln "Finished reading configuration";
      None      (* Keep waiting for futher changes *)
    )

See the examples/signals directory for the full code.

Design Note: Determinism

Within a domain, fibers are scheduled deterministically. Programs using only the Eio APIs can only behave non-deterministically if given a capability to do so from somewhere else.

For example, Fiber.both f g always starts running f first, and only switches to g when f finishes or performs an effect that can switch fibers.

Performing IO with external objects (e.g., stdout, files, or network sockets) will introduce non-determinism, as will using multiple domains.

Note that traceln is unusual. Although it writes (by default) to stderr, it will not switch fibers. Instead, if the OS is not ready to receive trace output, the whole domain is paused until it is ready. This means that adding traceln to deterministic code will not affect its scheduling.

In particular, if you test your code by providing (deterministic) mocks then the tests will be deterministic. An easy way to write tests is by having the mocks call traceln and then comparing the trace output with the expected output. See Eio's own tests for examples, e.g., tests/switch.md.

Note: this only applies to the high-level APIs in the Eio module. Programs can behave non-deterministically when using Eio_unix or the various Low_level APIs provided by the backends.

Provider Interfaces

Eio applications use resources by calling functions (such as Eio.Flow.write). These functions are actually wrappers that look up the implementing module and call the appropriate function on that. This allows you to define your own resources.

Here's a flow that produces an endless stream of zeros (like "/dev/zero"):

module Zero = struct
  type t = unit

  let single_read () buf = 
    Cstruct.memset buf 0;
    Cstruct.length buf

  let read_methods = []         (* Optional optimisations *)
end

let ops = Eio.Flow.Pi.source (module Zero)

let zero = Eio.Resource.T ((), ops)

It can then be used like any other Eio flow:

# Eio_main.run @@ fun _ ->
  let r = Eio.Buf_read.of_flow zero ~max_size:100 in
  traceln "Got: %S" (Eio.Buf_read.take 4 r);;
+Got: "\000\000\000\000"
- : unit = ()

Example Applications

Integrations

Eio can be used with several other IO libraries.

Async

Async_eio has experimental support for running Async and Eio code together in a single domain.

Lwt

You can use Lwt_eio to run Lwt threads and Eio fibers together in a single domain, and to convert between Lwt and Eio promises. This may be useful during the process of porting existing code to Eio.

Unix and System Threads

The Eio_unix module provides features for using Eio with OCaml's Unix module. In particular, Eio_unix.run_in_systhread can be used to run a blocking operation in a separate systhread, allowing it to be used within Eio without blocking the whole domain.

Domainslib

For certain compute-intensive tasks it may be useful to send work to a pool of Domainslib worker domains. You can resolve an Eio promise from non-Eio domains (or systhreads), which provides an easy way to retrieve the result. For example:

open Eio.Std

let pool = Domainslib.Task.setup_pool ~num_domains:2 ()

let fib n = ... (* Some Domainslib function *)

let run_in_pool fn x =
  let result, set_result = Promise.create () in
  let _ : unit Domainslib.Task.promise = Domainslib.Task.async pool (fun () ->
      Promise.resolve set_result @@
      match fn x with
      | r -> Ok r
      | exception ex -> Error ex
    )
  in
  Promise.await_exn result

let () =
  Eio_main.run @@ fun _ ->
  Fiber.both
    (fun () -> traceln "fib 30 = %d" (run_in_pool fib 30))
    (fun () -> traceln "fib 10 = %d" (run_in_pool fib 10))

Note that most Domainslib functions can only be called from code running in the Domainslib pool, while most Eio functions can only be used from Eio domains. The bridge function run_in_pool makes use of the fact that Domainslib.Task.async is able to run from an Eio domain, and Eio.Promise.resolve is able to run from a Domainslib one.

kcas

Eio provides the support kcas requires to implement blocking in the lock-free software transactional memory (STM) implementation that it provides. This means that one can use all the composable lock-free data structures and primitives for communication and synchronization implemented using kcas to communicate and synchronize between Eio fibers, raw domains, and any other schedulers that provide the domain local await mechanism.

To demonstrate kcas

# #require "kcas"
# open Kcas

let's first create a couple of shared memory locations

let x = Loc.make 0
let y = Loc.make 0

and spawn a domain

# let foreign_domain = Domain.spawn @@ fun () ->
    let x = Loc.get_as (fun x -> Retry.unless (x <> 0); x) x in
    Loc.set y 22;
    x
val foreign_domain : int Domain.t = <abstr>

that first waits for one of the locations to change value and then writes to the other location.

Then we run a Eio program

# let y = Eio_main.run @@ fun _env ->
    Loc.set x 20;
    Loc.get_as (fun y -> Retry.unless (y <> 0); y) y
val y : int = 22

that first writes to the location the other domain is waiting on and then waits for the other domain to write to the other location.

Joining with the other domain

# y + Domain.join foreign_domain
- : int = 42

we arrive at the answer.

Best Practices

This section contains some recommendations for designing library APIs for use with Eio.

Switches

A function should not take a switch argument if it could create one internally instead.

Taking a switch indicates that a function creates resources that outlive the function call, and users seeing a switch argument will naturally wonder what these resources may be and what lifetime to give them, which is confusing if this is not needed.

Creating the switch inside your function ensures that all resources are released promptly.

(* BAD - switch should be created internally instead *)
let load_config ~sw path =
  parse_config (Eio.Path.open_in ~sw path)

(* GOOD - less confusing and closes file promptly *)
let load_config path =
  Switch.run @@ fun sw ->
  parse_config (Eio.Path.open_in ~sw path)

Of course, you could use with_open_in in this case to simplify it further.

Casting

Unlike many languages, OCaml does not automatically cast to super-types as needed. Remember to keep the type polymorphic in your interface so users don't need to do this manually.

For example, if you need an Eio.Flow.source then users should be able to use a Flow.two_way without having to cast it first:

(* BAD - user must cast to use function: *)
module Message : sig
  type t
  val read : Eio.Flow.source_ty r -> t
end

(* GOOD - a Flow.two_way can be used without casting: *)
module Message : sig
  type t
  val read : _ Eio.Flow.source -> t
end

If you want to store the argument, this may require you to cast internally:

module Foo : sig
  type t
  val of_source : _ Eio.Flow.source -> t
end = struct
  type t = {
    src : Eio.Flow.source_ty r;
  }

  let of_source x = {
    src = (x :> Eio.Flow.source_ty r);
  }
end

Passing env

The env value you get from Eio_main.run is a powerful capability, and programs are easier to understand when it's not passed around too much.

In many cases, it's clearer (if a little more verbose) to take the resources you need as separate arguments, e.g.

module Status : sig
  val check :
    clock:_ Eio.Time.clock ->
    net:_ Eio.Net.t ->
    bool
end

You can also provide a convenience function that takes an env too. Doing this is most appropriate if many resources are needed and your library is likely to be initialised right at the start of the user's application.

In that case, be sure to request only the resources you need, rather than the full set. This makes it clearer what you library does, makes it easier to test, and allows it to be used on platforms without the full set of OS resources. If you define the type explicitly, you can describe why you need each resource there:

module Status : sig
  type 'a env = 'a constraint 'a = <
    net : _ Eio.Net.t;             (** To connect to the servers *)
    clock : _ Eio.Time.clock;      (** Needed for timeouts *)
    ..
  > as 'a

  val check : _ env -> bool
end

Further Reading

Some background about the effects system can be found in:

eio's People

Contributors

adatario avatar anmonteiro avatar avsm avatar bikallem avatar bord-o avatar c-cube avatar christinerose avatar cjen1 avatar dra27 avatar haesbaert avatar jebrosen avatar kayceesrk avatar kit-ty-kate avatar lucperkins avatar mefyl avatar nojb avatar patricoferris avatar polytypic avatar prgbln avatar raphael-proust avatar ryangibb avatar sgrondin avatar sidkshatriya avatar smondet avatar squiddev avatar sudha247 avatar talex5 avatar thelortex avatar zenfey avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

eio's Issues

Can't use unbound UDP sockets

Currently we're forcing a bind in Eio.Net.datagram_socket by obliging the user to pass a sockaddr, this should be optional, an unbound UDP socket binds on the first sendto and the kernel chooses an ephemeral port.

Consider renaming Stream

I think the Stream module in Eio is not a stream in a traditional functional sense.

In particular, it looks more similar to abstractions provided by some other libraries/runtimes under different names:

"Streams" in functional languages tend to be related to one-directional element-wise "stream processing".

My suggestion would be to rename the Eio.Stream module to Eio.Pipe as this seems to be a more used name in the OCaml ecosystem.

[Discussion] Object Capabilities / API

I was looking at the object capabilities & API in Eio.

I've been experimenting with something similar in a couple of projects. So here are a couple of things worth discussing since it is early on (even just if just to show us all & document that those are actually bad ideas ;) ):

  • the goal is to help the reader so I'd use full words and expressions instead of acronyms: network, file_system, etc.
  • instead of multiplying the ~network ~file_system ~clock arguments, I just pack them all in the first argument of every function as an open object that describes what the function needs:

    val foo: < network: Network.t; console: Console.t ; .. > -> bar:int -> …
    • type inference does a lot more of the job for us, and the capabilities get in the way only when we actually want to look at them
    • it is very composable so user code can extend it to any other kind of context or state that one may want to pass around.
    • merlin makes writing mlis easier with insert-merlin-type :)
    • when a function does not need something any more it is easier to remove a method from an object type in signatures than unthreading an argument through a whole call stack (but it's also a bit easier to leave unused fields in signatures)
    • it does have the drawback of making every signature a bit heavier
    • In gemini-eio/src/main.ml:74, there is a regular record that is packing things like that (incl the Switch.t!) but it is done in a non-reusable way (because the record is not structural).
  • the methods of those objects should not be functions with complicated types because error messages quickly become insane → better have a < network: Network.t ; .. > and put the functions in the Network module (those also would take the object as first argument for composability, and it makes much better documentation anyway)

Is IO prioritisation possible?

This may be a bit of a weird request, but I've been working on some consensus systems recently, and being able to prioritise internal communications over external ones is quite important

Specifically there for these systems you have multiple nodes communicating with each other (keepalives etc), and also taking client requests. The problem is if there are too many client requests, and no prioritization, the internal communication can end up being delayed waiting for a batch of client requests to be handled. This results in other nodes believing that the leader is dead and hence calling an election.

The way I've fixed this previously was to separate the scheduling spaces (ensuring fairness) via either processes or system threads, one for internal communication and one for external.

One hope I had with effects is that it may be possible to do this prioritisation by using an effect handler to intercept the IO effects, and then assign them to the correct priority level.

This could look something like the following

let client_rpc_handler = ...
let internal_rpc_handler = ...

let intercept f ~priority =
  match f () with
  | x -> x
  | Effect e -> perform_with_priority e ~priority

let main () =
  intercept ~priority:2 client_rpc_handler;
  intercept ~priority:1 internal_rpc_handler;
  ()

And would optimally have the effect of if there are packets waiting on the client sockets and the internal sockets, that it would read from the internal sockets first (generalised over all IO operations).

I understand that this may be impossible to do, but it would be much simpler to do this in the concurrency scheduler. Additionally I'm also not sure whether this is a wrong-headed approach and that another solution would work better.

[Questions] Using eio to build an application

I'm looking forward to the prospect of writing monad-less effects in OCaml. So this library is exciting to me. Thanks for your work on it!

Right now I'm looking to make a single domain application, possibly with Eio. I am aware the Eio is very much a work-in-progress and experimental.

I recently notice ocaml-multicore/ocaml-multicore#770 -- I didn't go deep into the issue but it seems that this is related to domains + effects and since I'm considering only a single domain application it does not impact my use case. Though it's heartening to know that a fix for this was found and promptly applied to the 5.00 multicore branch.

But the questions I have are:

  • As you find bugs in multicore while working on Eio, the fixes will be only on the ocaml-multicore 5.00 branch (which is confusingly called 4.14+domains in the opam file). However, Eio is currently built to run on 4.12+domains. How will that work out? Is there a plan to have a Eio for 4.14+domains ?
  • The tooling support in 4.14 in general and 4.14+domains in particular is/will be quite poor in the immediate term, so even if you did shift to using Eio on 4.14+domains the development experience would be quite difficult. For instance merlin would not work on 4.14 / 4.14+domains for quite some time. Also the 5.00 (aka 4.14+domains) seems to not compiling too many packages compared to 4.12+domains (according to http://check.ocamllabs.io:8082/ )

I know all these questions relate to this transitional period and things will sort themselves out eventually. But this transition is likely to take many months so I'm wondering if you have any recommendations on what might be the best strategy if I want to use Eio right now.

(Thanks for your time!)

Provide Eio_unix API

Eio needs to work in browsers and unikernels, so Eio itself must not depend on Unix.

Provide an Eio_unix module to extend the basic API with Unix-specific features, such as extracting a Unix.file_descr from a flow, waiting for an FD to become readable, etc.

At the moment the Eio_linux and Eio_luv backends provide separate APIs for this, but there should be a shared API both can implement.

eio fails to install due to vendor conflicts

Howdy, I'm running into an issue when installing eio. It appears that there is a conflict with the vendored uring. Calling submodule deinit resolves the issue.

[ERROR] The compilation of eio_main.~dev failed at "dune build -p eio_main -j 3
        --promote-install-files=false @install".

#=== ERROR while compiling eio_main.~dev ======================================#
# context     2.1.0 | linux/x86_64 | ocaml-variants.4.12.0+domains | pinned(git+file:///home/matt/usr/src/mis
c/eio#main#6e739586)
# path        ~/usr/opam/4.12.0+domains/.opam-switch/build/eio_main.~dev
# command     ~/usr/opam/opam-init/hooks/sandbox.sh build dune build -p eio_main -j 3 --promote-install-files
=false @install
# exit-code   1
# env-file    ~/usr/opam/log/eio_main-1151034-395744.env
# output-file ~/usr/opam/log/eio_main-1151034-395744.out
### output ###
# Error: Conflict between the following libraries:
# - "uring" in _build/default/ocaml-uring/lib/uring
# - "uring" in /home/matt/usr/opam/4.12.0+domains/lib/uring
#   -> required by library "eio_linux" in
#      /home/matt/usr/opam/4.12.0+domains/lib/eio_linux
~  opam --version
2.1.0
~  dune --version
2.9.1

Signals don't get processed while Fibers are scheduled?

I couldn't get eio to handle my signals while another fiber is running (even if sleeping). Take this pathological case:

open Eio.Std

let () =
  Sys.(
    set_signal
      sigint
      (Signal_handle
         (fun i ->
           Format.eprintf "handle signal: %d@." i;
           exit 0)));
  Eio_main.run (fun _env ->
      Switch.run (fun sw ->
          Fiber.fork ~sw (fun () -> Eio.Time.sleep (Eio.Stdenv.clock _env) 5.)))

You can send ctrl+C to the program while it's running and it won't process it until after 5 seconds (when the eio fiber ends).

Tested on macOS (with eio_luv)

Expose `unix_fd` in `Eio.Net.accept{,_fork}`

I've found myself needing to access the underlying file descriptor for a server that calls Eio.Net.accept{,_fork, but the socket is only exposed as a Flow.two_way.

My use case is the sendfile syscall.

Can't use connected UDP sockets

Usually you can "connect" a UDP socket and use normal writes and reads.
In former times a sendto on UDP was a connect+write, at the very least this should allow the kernel to cache a route lookup and etc.

Eio.Flow.read hangs on closed flow

Not sure if I'm doing something wrong, but I've run into the following issue (on the Luv backend).

A fiber waiting on a Eio.Flow.read before another fiber closes it will hang forever, at least for a client type Eio.Net.connect flow.

I've managed to trim it down to the following case:

reproduce.ml
open Eio.Std

let main env =
  let run_client ~net ~addr =
    traceln "client on";
    Switch.run (fun sw ->
        let flow = Eio.Net.connect ~sw net addr in
        traceln "client <-> server";
        let read_loop () =
          let b = Cstruct.create 4 in
          traceln "client will be stuck after this";
          while true do
            Eio.Flow.read flow b |> ignore;
            traceln "unreachable"
          done
        in
        let d = "Hello from client" in
        traceln "client -> %S" d;
        Eio.Flow.copy_string d flow;
        Fiber.fork ~sw read_loop;
        Eio.Flow.close flow);
    traceln "client off"
  in
  let run_server socket =
    traceln "server on";
    Switch.run (fun sw ->
        Eio.Net.accept_sub socket ~sw
          (fun ~sw:_ flow _addr ->
            traceln "server <-> client";
            let b = Buffer.create 100 in
            Eio.Flow.copy flow (Eio.Flow.buffer_sink b);
            traceln "server <[EOF]- %S" (Buffer.contents b))
          ~on_error:(traceln "Error handling connection: %a" Fmt.exn));
    traceln "server off"
  in

  let net = Eio.Stdenv.net env in
  let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 9999) in
  Switch.run @@ fun sw ->
  let server = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in
  Fiber.both (fun () -> run_server server) (fun () -> run_client ~net ~addr)

Where the expected output would be:

+server on
+client on
+client <-> server
+client -> "Hello from client"
+server <-> client
+client will be stuck after this
+server <[EOF]- "Hello from client"
+server off
*hangs forever*

Decide on cstruct vs bytes

Performing IO operations with the kernel requires giving the kernel the address of a buffer that the GC won't move during the operation. To ensure that, we use Cstruct.t everywhere. However, there are rumours (mirage/ocaml-cohttp#819 (comment)) that regular strings in OCaml 5 are sure to stay put, at least once they're in the major heap.

  • Using regular strings/bytes is likely to be slightly faster than a cstruct (which wraps a bigarray).
  • It would integrate better with other OCaml APIs.
  • We would have to ensure that a buffer is on the major heap (and move it if not) when doing IO. This would involve extra copying if not careful.
  • There are severe length restrictions on strings on 32-bit platforms, as the top bits of the length field are used for other things.
  • Only cstructs work with memory-mapped data.

Tasks:

  • Confirm that OCaml 5 guarantees that strings will not be moved.
  • Measure performance difference of strings vs cstructs.

/cc @avsm @kayceesrk

Check EINTR is handled correctly

System calls need to be retried if they return EINTR (unless this was due to them being cancelled; you don't always get ECANCELED in that case).

There is some handling already, e.g. here's eio_linux handling EINTR for rw ops:

begin match Fiber_context.get_error action.fiber with
| Some e -> Suspended.discontinue action e (* If cancelled, report that instead. *)
| None ->
if errno_is_retry e then (
submit_rw_req st req;
schedule st
) else (
Suspended.continue action e
)

Check if we need this in more places, and also check the eio_luv backend.

@hannesm notes in mirage/mirage-crypto#155 (comment) that getrandom needs this too.

Consider renaming `Fibre.fork`

As mentioned on discuss, this name isn't very helpful:

  • It's unrelated to Unix.fork, which can be confusing.
  • Since adding the switch argument, it doesn't actually copy the calling environment at all.

Should probably be renamed to create, start, spawn, or something like that.

Suggestions about README.me

An observation about README.me, which is great, by the way! (not sure I am supposed to create an issue for this, but where should it go?).

As an eio newbie, this paragraph is not clear to me:
If you call a function **without** giving it access to a switch, then when the function returns **you can** be sure that any fibres it spawned have finished, and any files it opened have been closed. So, a Switch.run puts a bound on the lifetime of things created within it, leading to clearer code and avoiding resource leaks.

I understand it as : "if you call any function wherever in your code, you can be sure that any files it opened have been closed.", which seems weird to me. Should it be "you cannot be sure that..." ? Or should it be "files or resources opened using a specific part of the eio API" ?

May be these two cases could be explained:

  • Inside a switch: files & resources attached to the switch are automatically closed when the switch is off.
  • Not inside a switch: ... ?

datagram_socket t#recv does not return on empty packet

problem

⚠️ mediocre code ahead (ref: https://github.com/cdaringe/protohacks/blob/c4e00f6f55b991d780d46f012a460bb8a528c505/lib/server/server.ml#L33-L44)

      let client_addr, r = recv socket buf in
      traceln "ack";

ack is never emitted when empty packets are sent. yes, ack is objectively the wrong debug word in this scenario 😆

image

i'm using linux in docker, so perhaps something with the libuv bindings? in the wireshark snippet above, you can see the service retrying, hoping i'll send them a response, but sure enough, the first time they send me an empty packet, i'm unable to proceed passed recv

Stuck on networking echo service

I'm trying to extend the example of Networking to a client-server echo service, as we learned from network programming course 101.

And here's my code:

open Eio.Std

let run_client ~net ~addr =
  traceln "Connecting to server...";
  Switch.run @@ fun sw ->
  let flow = Eio.Net.connect ~sw net addr in
  Eio.Flow.copy_string "Hello from client" flow;
  let b = Buffer.create 100 in
  Eio.Flow.copy flow (Eio.Flow.buffer_sink b);
  traceln "Client received : %S from server" (Buffer.contents b);
  ;;  

let run_server socket =
  Switch.run @@ fun sw ->
  Eio.Net.accept_sub socket ~sw (fun ~sw flow _addr ->
    traceln "Server accepted connection from client";
    let b = Buffer.create 100 in
    Eio.Flow.copy flow (Eio.Flow.buffer_sink b);
    traceln "Server received: %S" (Buffer.contents b);
    Eio.Flow.copy_string (Buffer.contents b) flow;
    traceln "Server echo: %S to client" (Buffer.contents b);
  ) ~on_error:(traceln "Error handling connection: %a" Fmt.exn);
  traceln "(normally we'd loop and accept more connections here)"
  ;;

  let main ~net ~addr =
    Switch.run @@ fun sw ->
    let server = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in
    traceln "Server ready...";
    Fiber.both
      (fun () -> run_server server)
      (fun () -> run_client ~net ~addr)
  ;;

However, it got stuck as below:

image

Just wondering if anything I'd missed?

Provide API for buffered input

At the moment, there is no easy way to e.g. read one line from a flow.

You can use the Eio branch of Angstrom (https://github.com/talex5/angstrom/tree/effects) for this, but it would be nice not to have to depend on a separate library for something so basic.

This could be as simple as moving the core Angstrom buffer type (https://github.com/talex5/angstrom/blob/effects/lib/parser.ml) into Eio, so that Angstrom itself just adds a load of combinators on top of this, but you can also do simple things yourself.

One design decision is whether a buffered-reader should be an object sub-type of Flow.read, or a separate module. Using a module might be better, as the case of reading from the buffer is often performance critical (whereas occasionally refilling the buffer from the underlying flow is not).

exception when trying to install eio 0.5

on a fresh new switch with 5.0.0+trunk, I try to install eio: opam install eio, and get this:

[ERROR] The compilation of eio.0.5 failed at "dune build -p eio -j 19 --promote-install-files=false @install".
[…]
# File "lib_eio/core/debug.ml", line 32, characters 16-25:
# 32 |     | exception Unhandled -> default_traceln
#                      ^^^^^^^^^
# Error: This variant pattern is expected to have type exn
#        There is no constructor Unhandled within type exn

Getaddrinfo raises error on some backends

It seems that luv raises an exception instead of returning an empty list.

open Eio

let lookup net hostname =
   match Net.getaddrinfo_stream net hostname with
   | [] -> ()
   | _ -> assert false

let () =
  let hostname = "blahblahblah.tarides.com" in
  (assert (Unix.getaddrinfo hostname "" [] = []));
  Eio_main.run (fun env -> lookup env#net hostname)

This returns:

Fatal error: exception Eio_luv.Luv_error(EAI_NONAME) (* unknown node or service *)

I can't currently run this properly on Linux with Uring but I'm assuming it returns [] because Unix.getaddrinfo does.

docs: install instructions borked

Problem

Per the readme.md, the following should setup a switch for ocaml@5, but errors out as shown

$ opam --version
2.1.1
$ opam switch create 5.0.0~alpha1 --repo=default,alpha=git+https://github.com/kit-ty-kate/opam-alpha-repository.git
[ERROR] No compiler matching `5.0.0~alpha1' found, use `opam switch list-available' to see what is available, or use `--packages'
        to select packages explicitly.

Is kit-ty-kate/opam-alpha-repository still the correct place to get 5 switch? Or, should we be using a std opam variant now that 5 got merged?

close: file descriptor used after calling close!

In Eio 0.3 accept_fork always closes then socket when the function returns. However, the function is supposed to be allowed to close it itself first if it wants to. This results errors such as:

Error handling connection: Cancelled: Invalid_argument("close: file descriptor used after calling close!")

A work-around is to avoid manually closing the socket (there's no very good reason to continue handling a connection after closing it).

Hang in eio_linux tests

@haesbaert wrote:

it seems to hang on lib_eio_linux/tests/test.ml on the very first test

We should try to find out what's causing this. I wrote:

What happens if you just run ./_build/default/lib_eio_linux/tests/test.exe in a loop? It doesn't hang for me.

ENOMEM with readme example

Hi! This project looks excellent!

I was trying out the example in the README:

#require "eio_main";;
open Eio.Std;;

let main ~stdout = Eio.Flow.copy_string "hello World" stdout
Eio_main.run @@ fun env -> main ~stdout:(Eio.Stdenv.stdout env)
;;

This is failing with Unix.Unix_error(Unix.ENOMEM, "io_uring_register_buffers", "").

There is sufficient free RAM available, so I expect that this is due to some system limit but have no idea how to debug it and can't find any documentation on this error either.

Decide how to represent pathnames

At the moment, paths in Eio are strings, which are used in the context of some base directory. e.g.

Eio.Dir.with_open_in cwd "foo/input.dat" (fun flow -> ...)

This should work well on most systems, but we need to think about how to support Windows too.

Possibilities include:

  • Using Unix path strings everywhere and providing a way to import and export Windows paths.
  • Using an abstract type for paths.

The Fpath library encodes lots of knowledge about Windows paths. It probably can't be used directly because its paths behave differently depending on the host platform (preventing e.g. a program running on Linux from creating paths that are to be used on Windows). The path rules should probably be per-filesystem instead.

Switches and background tasks

I'm currently fiddling with building a DBus library on top of libsystemd and eio. Really enjoying using eio, though I did have some questions about how best to design the API to fit within eio's model.

The main problem at hand is that one needs to run a background task which waits for the underlying socket to become readable/writable and then gets libsystemd to process its message queue[^1]. I originally had something like this:

let rec process bus = 
  await_event bus;
  sd_bus_process bus;
  process bus

let rec create ~sw () = 
  let bus = (* ... *) in
  Fibre.fork ~sw (fun () -> process bus);
  Switch.on_release sw (fun () -> sd_bus_close bus);
  bus

This obviously doesn't quite do what you want though, as the process fibre keeps the switch open forever.

I've had a quick look at how other structured concurrency libraries handle the problem of background tasks which shouldn't keep the switch open, but haven't had much luck (I realise in most cases it is pretty undesirable). Is there anything obvious in eio I'm missing, or is my best option just to provide an explicit close method which shuts down this task?

WSL: Unix.Unix_error on "Hello, World" example

I tried running the "Hello, world" example from the readme on WSL and got the following exception:

Eio_main.run @@ fun env -> main ~stdout:(Eio.Stdenv.stdout env);;
Exception: Unix.Unix_error(Unix.ENOSYS, "io_uring_queue_init", "")

Here's some more info about my system:

  • WSL version: 2
  • Windows version: Windows 11 / Version 10.0.22000 Build 22000
$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.3 LTS
Release:        20.04
Codename:       focal

$ opam list --roots
# Packages matching: root
# Name         # Installed    # Synopsis
eio_main       0.1            Effect-based direct-style IO mainloop for OCaml
ocaml-variants 4.12.0+domains OCaml 4.12.0, with support for multicore domains
utop           2.9.0          Universal toplevel for OCaml

Api sdk design questions: Should an http api eio sdk delegate Eio_main.run to the caller?

I'm rewriting https://github.com/lessp/fetch into an ocaml5 cohttp-eio client. But I require design help around eio_main calls.

  1. Is it reasonable to have each http request call Eio_main.run itself? Or should each fetch call expect an env object? piaf passes both env and eio switch to each request. It makes sense to pass these two as args to maximize the flexibility.
  2. If our implementation requires env and sw, how should I best update this fetch signature: link?

One option is make env and sw optional and generic, such that functor implementations (lwt, async) that don't use eio can ignore it.

  val fetch :
    ?env:'a ->
    ?sw:'b ->
    ?body:string ->
    ?headers:Headers.t list ->
    ?meth:Method.t ->
    string ->
    (Response.t, string) result promise

Any plans on supporting js_of_ocaml?

Is eio library supposed to be bound to OCaml native applications only? Let's assume that JSOO gets support for effects [1] - will it be possible to have eio compile to JS backend? For context - we have some proprietary project that is based on Lwt and relies on the fact that Lwt can be ran both on native and in JS. For JS we use node.js as target platform. Thinking about possible migration path for OCaml 5.

[1] https://discuss.ocaml.org/t/ocaml-multicore-effects-and-js-of-ocaml/8502

Need Unix.socketpair_datagram

Unix.socketpair takes a Unix.socket_type argument, allowing you to create a datagram socket. However, it still returns a stream socket. Likewise, as_socket always returns a stream socket.

In particular, a zero-length read for a datagram socket does not mean end-of-file (spotted while trying to change the datagram test in network.md to reproduce #340)

Monotonic Support

Could eio support monotonic clock in Stdenv.t like we do the system clock?

Integrate Eio's CTF tracing with OCaml's tracing

Eio's Ctf module provides a ring-buffer for storing trace events. Currently, this only works for a single domain. OCaml has its own CTF-based eventlog system (https://ocaml.org/manual/instrumented-runtime.html) and we should probably just use that somehow. It might not currently support user-defined events. There's also a proposal to replace it with eventring (ocaml-multicore/ocaml-multicore#793), which says:

This implementation is designed to be open for extension to user tracing events in the future, as well as integration with statmemprof (e.g continuous allocation profiling).

/cc @sadiqj

Windows support

Eio needs to support Windows. It was somewhat working with the libuv backend, but the other platforms have stopped using that now (see #434).

The current plan is:

  • Create an eio_windows backend based on eio_posix (#497).
  • Get eio_main working with Windows. Get the CI to run one of the examples.
  • Set up CI to run the main set of tests on Windows.
  • Add initial FS support (#509).
  • Finish FS support (see #509 (comment)).
  • Go through the backend and implement all the TODOs.
  • Improve timer resolution (currently 1 second).

Later, we could convert the backend to use IOCP, but that's not ready yet.

Signals abstraction

As discussed in #301 we might need to abstract Signals.

I've got a working prototype that works on uring and libuv but we have to define some things before I go forward.

1. Do we want to export more signals, less or the same as Sys ?

Sys is a bit conservative and I think we should export more, for example it misses SIGWINCH and SIGINFO (linux doesn't have this but it's popular in Unix).

2. Do we want to restrict what we export ?

Like type signum = Sigint | Sigwhatever, or we just keep accepting an int and if the user wants to use whatever other signal he has, he can.
I think we should keep taking an int, but do some discovery ourselves (for the value of SIGWINCH for example) and export that as a
val sigwinch : int
Worth noting that the default ocaml signal interface accepts arbitrary integers so that would be no issue, also not an issue with Luv.

3. What do we do with signals that are not supported ?

Do we want to fail hard, fail silently or give it a "button" to control the behavior, this is likely very relevant for windows as they have only a small set of signals.

Network tests fail on libuv + macOS

Running the tests on macOS fails with:

❯ dune test
File "tests/network.md", line 1, characters 0-0:
diff --git a/_build/default/tests/network.md b/_build/default/tests/.mdx/network.md.corrected
index d2a08c4c69..df05279a8d 100644
--- a/_build/default/tests/network.md
+++ b/_build/default/tests/.mdx/network.md.corrected
@@ -451,8 +451,7 @@ ECONNRESET:
     ignore (Eio.Flow.read a (Cstruct.create 1) : int);
     assert false
   with Eio.Net.Connection_reset _ -> traceln "Connection failed (good)";;
-+Connection failed (good)
-- : unit = ()
+Exception: End_of_file.
 \```

 EPIPE:

Ctf tracing: extend the `Fiber` API to allow labelling fibers

Similarly to how Promise.create has an optional label argument, we could have a similar feature for labelling tasks that are spawned using functions from the Fiber module.

For fork, the signature could be val fork : ?label:string -> sw:Switch.t -> (unit -> unit) -> unit.

For combinators we can choose between:

  • val any : ?label:string -> (unit -> 'a) list -> 'a: label all tasks with the same name
  • val any_labelled : (string * (unit -> 'a)) list -> 'a : new function to label each task separately

I have a small implementation for this be cause it's very useful to have this when tracing complex Eio programs.

Mutex API ergonomics

Can we draw inspiration from Rust's Mutex to make Eio.Mutex a bit more ergonomic? E.g. usage adapted from https://github.com/ocaml-multicore/eio#the-rest-mutex-semaphore-and-condition:

open Eio

let of_path = Mutex.create

let save m data =
  Mutex.use_rw m ~protect:true @@ fun path ->
  Path.save path data ~create:(`Or_truncate 0o644)

let load m = Mutex.use_ro m Path.load

We would need signatures like this:

module Mutex : sig
  type _ t

  val create : 'a -> 'a t
  (** Create in an unlocked state holding the data. *)

  val use_rw : protect:bool -> 'a t -> ('a -> 'b) -> 'b
  (** Lock for read-write access. *)

  val use_ro : 'a t -> ('a -> 'b) -> 'b
  (** Lock for read access. *)
end

Having said that, I realize we don't have borrow/move checking in OCaml so we wouldn't get the same level of safety. But it should make it a bit more ergonomic as the key idea is that a mutex is always paired with its data.

Rename Fibre to Fiber?

Rahul on discuss pointed out that upstream OCaml spells this as Fiber. We should probably rename it to be consistent with that.

https://grammarist.com/spelling/fiber-fibre/ says:

There is no difference in meaning between fiber and fibre. Fiber is the preferred spelling in American English, and fibre is preferred in all the other main varieties of English.

Support for socket options

Right now both Eio.Net.{connect,accept} return a socket, but I didn't find a way to e.g. set TCP_NODELAY on the stream socket. Did I miss an API?

Implement signature that expects covariant type parameter

Most signatures that abstract over Lwt / Async currently define an interface to be implemented by the I/O runtime.

Part of that interface is usually:

type +'a t

I took a stab at implementing one of these (Caqti, in particular) for my own use, and I was trying to use an Eio.Promise.t (which is itself defined as type !'a t). The OCaml compiler doesn't seem to like it:

Error: In this definition, expected parameter variances are not satisfied.
       The 1st type parameter was expected to be covariant,
       but it is injective invariant.

Do you have a recommended solution? Is my only hope to implement a wrapper of sorts, or alternatively can the type parameter for Eio.Promise.t be made covariant?

Skip network tests when network is unavailable

The network tests can't be run in some cases:

  • Docker containers don't have IPv6 loopback by default.
  • opam-repo-ci's sandbox on macos prevents using any loopback devices.

This is probably best done by extending MDX. We could perhaps have an exception Mdx_skip_block to indicate that the current block's output should be ignored, or a skip-if=TEST label at the top of the block.

From #274.

API for spawning sub-processes

Eio needs to provide a way to create and manage sub-processes (like Lwt_process).

  • Must work in programs using multiple domains. Note that Unix.fork cannot be used there.
  • API should try to prevent bugs with argument splitting (e.g. avoid error-prone Unix.system style API by default). But also needs to work on Windows, where I think you can't provide an argv.
  • Child processes should be attached to a switch by default to avoid leaking them, but need a way to spawn detached sub-processes too.
  • Must allow passing pipes, etc, to child processes.
  • Must prevent accidentally leaking FDs to child processes. All FDs in Eio are already opened with close-on-exec set for this reason.
  • Should provide a simple way to check or report the process's exit status.

Possible sources of inspiration include:

Current status:

file descriptor used after calling close!

  • #244 seemed like it had a different scope (specific to accept_fork), so I opened a new one.

  • I'm hitting a case where I sometimes end up calling Eio.Flow.close a second time, and that blows up my application

  • in cases where it's hard to guarantee that I only call close once, would it be reasonable to implement either one or both of the following?

    1. make Eio.Flow.close not throw (but perhaps leave the other operations throwing?)
    2. (at least) make the exception raised by Eio_luv.Handle.get easier to pattern match against. Right now the pattern needs to be Invalid_argument "close: handle used after calling close!" which feels a bit brittle.

Fibre-local storage

Many concurrency libraries - both in OCaml1 and other languages - have some form of fibre-local storage, allowing you to pass implicit context around async code. It would be nice to have similar functionality in eio.

Motivation

I'm currently working on an eio-based project where many tasks are run in parallel. Each task produces a lot of log output, and so it would be useful to track which log messages are associated with each task.

One way to achieve this would be to pass an explicit context around, and use that when logging (such as with logs's ~tag parameter). However, this obviously doesn't allow you to track log messages in external libraries, limiting its usefulness.

One common solution (at least in other languages) is to have some global (thread/fibre-local) variable which stores the current task. This is then be used when processing logs to associate them with the current task.

Other notes

I realise this API may not be desirable for eio, as much of the API is designed around avoiding implicit state. Thought it was worth opening an issue and getting some thoughts before spending too much time thinking about how a potential API would behave.

Would be happy to implement this if it would be useful/helpful.

Footnotes

  1. lwt's Lwt.new_key and async's Async.Execution_context. While lwt's API is deprecated, its (sometimes) confusing behaviour could hopefully be avoided in eio due to the separation of promises and fibres.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.