Giter VIP home page Giter VIP logo

thousand_island's Introduction

Thousand Island Thousand Island

Build Status Docs Hex.pm

Thousand Island is a modern, pure Elixir socket server, inspired heavily by ranch. It aims to be easy to understand and reason about, while also being at least as stable and performant as alternatives. Informal tests place ranch and Thousand Island at roughly the same level of performance and overhead; short of synthetic scenarios on the busiest of servers, they perform equally for all intents and purposes.

Thousand Island is written entirely in Elixir, and is nearly dependency-free (the only library used is telemetry). The application strongly embraces OTP design principles, and emphasizes readable, simple code. The hope is that as much as Thousand Island is capable of backing the most demanding of services, it is also useful as a simple and approachable reference for idiomatic OTP design patterns.

Usage

Thousand Island is implemented as a supervision tree which is intended to be hosted inside a host application, often as a dependency embedded within a higher-level protocol library such as Bandit. Aside from supervising the Thousand Island process tree, applications interact with Thousand Island primarily via the ThousandIsland.Handler behaviour.

Handlers

The ThousandIsland.Handler behaviour defines the interface that Thousand Island uses to pass ThousandIsland.Sockets up to the application level; together they form the primary interface that most applications will have with Thousand Island. Thousand Island comes with a few simple protocol handlers to serve as examples; these can be found in the examples folder of this project. A simple implementation would look like this:

defmodule Echo do
  use ThousandIsland.Handler

  @impl ThousandIsland.Handler
  def handle_data(data, socket, state) do
    ThousandIsland.Socket.send(socket, data)
    {:continue, state}
  end
end

{:ok, pid} = ThousandIsland.start_link(port: 1234, handler_module: Echo)

For more information, please consult the ThousandIsland.Handler documentation.

Starting a Thousand Island Server

Thousand Island servers exist as a supervision tree, and are started by a call to ThousandIsland.start_link/1. There are a number of options supported; for a complete description, consult the Thousand Island docs.

Connection Draining & Shutdown

The ThousandIsland.Server process is just a standard Supervisor, so all the usual rules regarding shutdown and shutdown timeouts apply. Immediately upon beginning the shutdown sequence the ThousandIsland.ShutdownListener will cause the listening socket to shut down, which in turn will cause all of the Acceptor processes to shut down as well. At this point all that is left in the supervision tree are several layers of Supervisors and whatever Handler processes were in progress when shutdown was initiated. At this point, standard Supervisor shutdown timeout semantics give existing connections a chance to finish things up. Handler processes trap exit, so they continue running beyond shutdown until they either complete or are :brutal_killed after their shutdown timeout expires.

The shutdown_timeout configuration option allows for fine grained control of the shutdown timeout value. It defaults to 15000 ms.

Logging & Telemetry

As a low-level library, Thousand Island purposely does not do any inline logging of any kind. The ThousandIsland.Logger module defines a number of functions to aid in tracing connections at various log levels, and such logging can be dynamically enabled and disabled against an already running server. This logging is backed by telemetry events internally.

Thousand Island emits a rich set of telemetry events including spans for each server, acceptor process, and individual client connection. These telemetry events are documented in the ThousandIsland.Telemetry module.

Implementation Notes

At a top-level, a Server coordinates the processes involved in responding to connections on a socket. A Server manages two top-level processes: a Listener which is responsible for actually binding to the port and managing the resultant listener socket, and an AcceptorPoolSupervisor which is responsible for managing a pool of AcceptorSupervisor processes.

Each AcceptorSupervisor process (there are 100 by default) manages two processes: an Acceptor which accepts connections made to the server's listener socket, and a DynamicSupervisor which supervises the processes backing individual client connections. Every time a client connects to the server's port, one of the Acceptors receives the connection in the form of a socket. It then creates a new process based on the configured handler to manage this connection, and immediately waits for another connection. It is worth noting that Acceptor processes are long-lived, and normally live for the entire period that the Server is running.

A handler process is tied to the lifecycle of a client connection, and is only started when a client connects. The length of its lifetime beyond that of the underlying connection is dependent on the behaviour of the configured Handler module. In typical cases its lifetime is directly related to that of the underlying connection.

This hierarchical approach reduces the time connections spend waiting to be accepted, and also reduces contention for DynamicSupervisor access when creating new Handler processes. Each AcceptorSupervisor subtree functions nearly autonomously, improving scalability and crash resiliency.

Graphically, this shakes out like so:

graph TD;
  Server(Server: supervisor, rest_for_one)-->Listener;
  Server-->AcceptorPoolSupervisor(AcceptorPoolSupervisor: dynamic supervisor);
  AcceptorPoolSupervisor--1...n-->AcceptorSupervisor(AcceptorSupervisor: supervisor, rest_for_one)
  AcceptorSupervisor-->DynamicSupervisor
  AcceptorSupervisor-->Acceptor(Acceptor: task)
  DynamicSupervisor--1...n-->Handler(Handler: gen_server)
  Server-->ShutdownListener;

Thousand Island does not use named processes or other 'global' state internally (other than telemetry event names). It is completely supported for a single node to host any number of Server processes each listening on a different port.

Contributing

Contributions to Thousand Island are very much welcome! Before undertaking any substantial work, please open an issue on the project to discuss ideas and planned approaches so we can ensure we keep progress moving in the same direction.

All contributors must agree and adhere to the project's Code of Conduct.

Security disclosures should be handled per Thousand Island's published security policy.

Installation

Thousand Island is available in Hex. The package can be installed by adding thousand_island to your list of dependencies in mix.exs:

def deps do
  [
    {:thousand_island, "~> 1.0"}
  ]
end

Documentation can be found at https://hexdocs.pm/thousand_island.

License

MIT

thousand_island's People

Contributors

aaronrenner avatar asakura avatar danschultzer avatar davydog187 avatar dch avatar dependabot[bot] avatar dougw-bc avatar dsdshcym avatar elfenlaid avatar ewildgoose avatar feng19 avatar icehaunter avatar jbraungardt avatar jswanner avatar lucacorti avatar mcrumm avatar moogle19 avatar mtrudel avatar nelsonmestevao avatar patrickjaberg avatar sdball avatar thymusvulgaris avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

thousand_island's Issues

Question: would `controlling_process/2` be a potential bottleneck?

Hi @mtrudel,
while reading the source code, I ran into this call to controlling_process/2 in Connection.start/3):

# Since this process owns the socket at this point, it needs to be the
# one to make this call. connection_pid is sitting and waiting for the
# word from us to start processing, in order to ensure that we've made
# the following call. Note that we purposefully do not match on the
# return from this function; if there's an error the connection process
# will see it, but it's no longer our problem if that's the case
server_config.transport_module.controlling_process(raw_socket, pid)

In the :gen_tcp.controlling_process/2's doc, it says:

If the socket is set in active mode, this function will transfer any messages in the mailbox of the caller to the new controlling process.
If any other process is interacting with the socket while the transfer is happening, the transfer may not work correctly and messages may remain in the caller's mailbox. For instance changing the sockets active mode before the transfer is complete may cause this.

While the Acceptor process won't interact with the socket in any way, we still need to transfer the messages it receives to the Handler process.

I'm wondering if it would be a bottleneck, when an acceptor accepts many connections at the same time, and they all start sending a lot of messages?
Have you seen similar issues before (I guess it won't be an issue for HTTP servers)


I'm also thinking an alternative architecture:

In the :gen_tcp.accept/2's doc, it says:

The accept call does not have to be issued from the socket owner process. Using version 5.5.3 and higher of the emulator, multiple simultaneous accept calls can be issued from different processes, which allows for a pool of acceptor processes handling incoming connections.

So can we merge Acceptor and Handler into one process?

  1. this process accepts connections
  2. when a connection comes in, it tells the DynamicSupervisor to start a new process (to accept new connections)
  3. then it handles the connection
  4. when connection closes, this process dies (so the overall number would be changing around num_acceptor option)

It would save the message transfering cost, but would add some cost before we can accept next connection

GenServer `:tls_alert` error on handshake failure

ThousandIsland.Handler shuts down with an unexpected reason when there's a TLS handshake error, and reports this to the error logger:

17:01:55.000 [error] GenServer #PID<0.24370.1> terminating
** (stop) {:tls_alert, {:handshake_failure, 'TLS server: In state wait_finished received CLIENT ALERT: Fatal - Handshake Failure\n'}}
Last message: {:thousand_island_ready, %ThousandIsland.Socket{socket: {:sslsocket, {:gen_tcp, #Port<0.581>, :tls_connection, [option_tracker: #PID<0.24058.1>, session_tickets_tracker: :disabled, session_id_tracker: #PID<0.24059.1>]}, [#PID<0.24369.1>, #PID<0.24368.1>]}, transport_module: ThousandIsland.Transports.SSL, read_timeout: 60000, span: %ThousandIsland.Telemetry{span_name: :connection, telemetry_span_context: #Reference<0.4197377238.932970501.69923>, start_time: -576460732548559697, start_metadata: %{parent_telemetry_span_context: #Reference<0.4197377238.932970498.70998>, remote_address: {127, 0, 0, 1}, remote_port: 56792, telemetry_span_context: #Reference<0.4197377238.932970501.69923>}}}}

I think TLS handshake failure should be expected and is normal behavior in the client-server negotiation, and not sure it makes sense to shut it down this way. Also, :ssl already reports to the OTP logger. In my case I'm testing by accessing the server on a different hostname than what the cert is tied to, and I wouldn't expect anything in Bandit/Thousand Island to report abnormal behavior (I already know what went wrong client side).

I was thinking of adding a conditional in ThousandIsland.Handler:

      def handle_info({:thousand_island_ready, socket}, {nil, state}) do
        ThousandIsland.Telemetry.span_event(socket.span, :ready)

        case ThousandIsland.Socket.handshake(socket) do
          {:ok, socket} -> {:noreply, {socket, state}, {:continue, :handle_connection}}
          {:error, {:tls_alert, error} = reason} -> {:stop, {:shutdown, reason}, {socket, state}}
          # Or if only the hanshake failure: {:error, {:tls_alert, {:handshake_failure, _} = reason} -> {:stop, {:shutdown, reason}, {socket, state}}
          {:error, reason} -> {:stop, reason, {socket, state}}
        end
      end

But maybe this is exposing too much of the underlying transport logic and it should be in ThousandIsland.Transports.SSL:

def handshake(socket) do
  case :ssl.handshake(socket) do
    {:ok, socket} -> {:ok, socket}
    {:error, {:tls_alert, _error} = reason} -> {:error, {:shutdown, reason}}
    {:error, error} -> {:error, error}
  end
end

@mtrudel what do you think would be correct behavior here? I'm erring on the side of the GenServer closing down normally for any failed TLS negotiation, because the client would be aware of it in any case.

Process isolation between requests

Hi @mtrudel, I was working today and I noticed a weird thing. I was subscribing to a Phoenix.PubSub in my LiveView mount function. But on the first message (only), I was seeing an error message in the console, that Bandit.HTTP1.Handler was dying because it didn't implement a handle_info/2 callback that handled my message.

So I did some digging, turned out I forgot to do if connected?(socket), do: subscribe(), so that fixed the issue for me.

But then I got to thinking: what is really happening here? Are my controller actions being run in the same process as the socket connection handling? So I ran a test, and put the following code at the top of a controller action:

val = Process.get(:testing, 0) |> IO.inspect(label: "Requests served by this process")
Process.put(:testing, val + 1)

And sure enough, in my logs, Requests served by this process: 6 appeared pretty quickly.

I ran the same test using cowboy (different version of phoenix though, admittedly), and got the expected 0 every time.

So I guess I am wondering if this is an issue. Is there a class of bugs that will be hard to find because of this approach? Will it encourage users to use the Process dict, by making it persist between requests?

And what are the performance implications of encapsulating single requests in their own process?

Or is the answer simply "don't do anything weird and everything will be ok"?

edit: these processes might end up doing a lot more GC than normal. The advantage of encapsulating requests in processes is that the BEAM can just toss everything when the process ends. If we do it this way, re-using processes, then we'll be forcing the BEAM to GC anything we are doing in our processes. Not sure how this shakes out in benchmarks.

handling non-TI messages within same GenServer fails after PR#96

PR #96 adds guard rails to ensure message type matches thousand_island format.
However, it's possible to have additional messages, from outside thousand_island,
destined for the same genserver.

I use this functionality as a tcp proxy, sharing the same GenServer state. This
avoids message-passing overhead that would be introduced between the handler and
a further GenServer if they were separate processes.

  • is this a reasonable intended use case for normal TI handlers, or
    should I go for the full custom implementation?

  • the guard rails seem currently to prevent this, or am I holding it wrong?

I assume the macro expansions catch the additional handler messages before
my custom ones do, and I'm not clear if this can still be accommodated.

Example cribbed from messenger.ex and handler docs

#!/usr/bin/env elixir
Mix.install([{:thousand_island, github: "mtrudel/thousand_island"}])

defmodule Ecco do
  require Logger
  use ThousandIsland.Handler
  alias ThousandIsland.Socket

  @host "httpbin.org"
  @port 80
  @opts [:binary, active: true, nodelay: true]

  @impl ThousandIsland.Handler
  def handle_connection(%Socket{socket: client}, []) do
    {:ok, ip} = String.to_charlist(@host) |> :inet.getaddr(:inet)

    case :gen_tcp.connect(ip, @port, @opts) do
      {:ok, upstream} ->
        Logger.debug("connected to #{@host}:#{@port} via #{inspect(upstream)}")
        {:continue, %{upstream: upstream, client: client}}

      {:error, reason} ->
        Logger.debug("failed to connect to #{@host}:#{@port} because #{reason}")
        {:close, :upstream_connect_failed}
    end
  end

  @impl ThousandIsland.Handler
  def handle_data(data, _socket, state = %{upstream: upstream}) do
    # Send the data to the upstream service
    :gen_tcp.send(upstream, data)
    # Upstream response arrives asynchronously
    {:continue, state}
  end

  @impl GenServer
  def handle_info({:tcp, _port, _msg}, {socket, state}) do
    # just test the shape of the pattern matching in & out
    # ThousandIsland.Socket.send(socket, msg)
    # {:noreply, {socket, state}} also doesn't work
    {:noreply, {socket, state}, socket.read_timeout}
  end
end

# running the server
require Logger

{:ok, _pid} =
  ThousandIsland.start_link(
    port: 1234,
    handler_module: Ecco
  )

Logger.debug("listening on port 1234, curl me please")

unless IEx.started?() do
  Process.sleep(:infinity)
end

you may need to remove ~/.cache/mix/installs///deps/thousand_island/
to ensure you get latest main commit as dependency

Then running curl --trace-ascii - http://localhost:1234 now fails:

16:22:59.843 [debug] listening on port 1234, curl me please

16:23:02.387 [debug] connected to httpbin.org:80 via #Port<0.9>

16:23:02.800 [error] GenServer #PID<0.499.0> terminating
** (RuntimeError)   The callback's `state` doesn't match the expected `{socket, state}` form.
  Please ensure that you are returning a `{socket, state}` tuple from any
  `GenServer.handle_*` callbacks you have implemented

    /home/dch/.cache/mix/installs/elixir-1.15.7-erts-13.2.2.4/3e8cc75637dec96c7c8ea27f725bf2fe/deps/thousand_island/lib/thousand_island/handler.ex:400: Ecco.handle_info/2
    (stdlib 4.3.1.3) gen_server.erl:1123: :gen_server.try_dispatch/4
    (stdlib 4.3.1.3) gen_server.erl:1200: :gen_server.handle_msg/6
    (stdlib 4.3.1.3) proc_lib.erl:240: :proc_lib.init_p_do_apply/3

Last message: {:tcp, #Port<0.9>, "HTTP/1.1 200 OK\r\n..."}

State: {%ThousandIsland.Socket{socket: #Port<0.7>,
  transport_module: ThousandIsland.Transports.TCP,
  read_timeout: 60000,
  silent_terminate_on_error: false,
  span: %ThousandIsland.Telemetry{...}},
  %{client: #Port<0.7>, upstream: #Port<0.9>}}

The :packet option of gen_tcp is ignored ?

Hello, I'm starting the ThousandIsland like this from my application:

{ThousandIsland, port: 4041, handler_module: Tcp.ConnectionHandler, transport_options: [packet: 1]}. As you see I'm passing packet: 1 i.e i'd like to read packages whose length is their 1st byte.

However the handle_data is called immediately after any data is received without waiting for the packet to be completed. I also tried packet: :line without luck.

Here's my ConnectionHalnder for refernece:

defmodule Tcp.ConnectionHandler do
  use ThousandIsland.Handler

  @impl ThousandIsland.Handler
  def handle_connection(socket, state) do
    ThousandIsland.Socket.send(socket, "Hello, World")
    IO.puts("Connected!")
    {:continue, state}
  end

  @impl ThousandIsland.Handler
  def handle_data(msg, _socket, state) do
    IO.puts(msg)
    {:continue, state}
  end

  def handle_info({:send, msg}, {socket, state}) do
    ThousandIsland.Socket.send(socket, msg)
    {:noreply, {socket, state}}
  end
end

Cannot start multiple ThousandIsland children under a Supervisor

Hi, thanks for the awesome library. The ranch docs really leave me scratching my head after reading them so it's nice to have an Elixir interface with typespecs that I can actually understand. I thought it would be fun to port some Protohackers problems from ranch to ThousandIsland and I ran into a small issue when starting multiple ThousandIsland children in a supervision tree.

I defined a supervisor in my application as follows:

defmodule Protohackers.Interface do
  use Supervisor

  alias Protohackers.Interface

  def start_link(opts) do
    Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(_opts) do
    children = [
      {ThousandIsland,
       handler_module: Interface.ThousandIsland.SmokeTest, port: 5555},
      {ThousandIsland,
       handler_module: Interface.ThousandIsland.PrimeTime, port: 5556},
      {Protohackers.RanchListener, protocol: Interface.Ranch.MeansToEnd, port: 5557}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

I was getting the following error:

** (Mix) Could not start application protohackers: Protohackers.Application.start(:normal, []) returned an error: shutdown: failed to start child: Protohackers.Interface
    ** (EXIT) bad child specification, more than one child specification has the id: ThousandIsland.
If using maps as child specifications, make sure the :id keys are unique.
If using a module or {module, arg} as child, use Supervisor.child_spec/2 to change the :id, for example:

    children = [
      Supervisor.child_spec({MyWorker, arg}, id: :my_worker_1),
      Supervisor.child_spec({MyWorker, arg}, id: :my_worker_2)
    ]

What I noticed is that the top-level ThousandIsland supervisor hardcodes the Supervisor's :id to __MODULE__ (https://github.com/mtrudel/thousand_island/blob/main/lib/thousand_island.ex#L144):

  @doc false
  @spec child_spec(options()) :: Supervisor.child_spec()
  def child_spec(opts) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [opts]},
      type: :supervisor,
      restart: :permanent
    }
  end

So multiple children cannot be started under a supervision tree like they can be by calling start_link directly. I forked your project and solved this by essentially taking in the :id from opts. I'm happy to submit a PR, however, I have some questions:

  1. Should this option bubble down past child_spec? I don't think it makes sense to pass it to start_link and try to shove it into the ServerConfig since this option really has nothing to do with the server, just the parent Supervisor.

  2. What should be done about the typespec on child_spec/1? It shares the input type of options() with start_link, however, the :id is only really required for child_spec/1

Thanks!

configurable acceptor pools

Moved from mtrudel/bandit#199

New plan:

  • figure out #74 and redo the benchmarks (with a higher number of connections)
  • more tcp-level benchmarks (e.g. accept rate)
  • identify places in ThousandIsland where the acceptor pool can be configured, and what exactly acceptor pools need to implement for ThousandIsland+Bandit to work (e.g. suspend / resume currently seem optional)
  • check with the community if any of that configurability / alternative acceptor pools are actually desired

SSL error handling

Hi, I'm trying the code with #10 included. Deliberately using an invalid SSL cert path is ... ugly. Only at the point something connects to us we get a crash such as:

[error] GenServer #PID<0.1164.0> terminating
** (stop) {:options, {:certfile, 'apps2/my_app/priv/cert/selfsigned.pem', {:error, :enoent}}}
Last message: {:thousand_island_ready, %ThousandIsland.Socket{acceptor_id: "0045505C6B54", connection_id: "0E05FC524E9B", socket: {:sslsocket, {:gen_tcp, #Port<0.20>, :tls_connection, [option_tracker: #PID<0.999.0>, session_tickets_tracker: :disabled, session_id_tracker: #PID<0.1000.0>]}, [#PID<0.1163.0>, #PID<0.1162.0>]}, transport_module: ThousandIsland.Transports.SSL}}
State: {%ThousandIsland.Socket{acceptor_id: "0045505C6B54", connection_id: "0E05FC524E9B", socket: {:sslsocket, {:gen_tcp, #Port<0.20>, :tls_connection, [option_tracker: #PID<0.999.0>, session_tickets_tracker: :disabled, session_id_tracker: #PID<0.1000.0>]}, [#PID<0.1163.0>, #PID<0.1162.0>]}, transport_module: ThousandIsland.Transports.SSL}, []}

I wonder if we should check for path existing in the ThousandIsland.Transports.SSL.listen() function ?

Are there disadvantages of this? Perhaps some corner case where the user wants to rotate SSL certs at startup and technically all will be in place by the time the socket receives a real connection?? Any other reasons for it to be valid to start SSL pointing at missing certs? Worst case we could do a warn here and leave the crash at execution time?

I then fix the path to the SSL and attempt to connect to my self signed cert using an elixir lib which demands non self signed certs. The resulting error is a bit of a mess, but something like the following (note that initial errors from my client side code):

[notice] TLS :client: In state :wait_cert_cr at ssl_handshake.erl:2013 generated CLIENT ALERT: Fatal - Bad Certificate

[notice] TLS :server: In state :wait_finished received CLIENT ALERT: Fatal - Bad Certificate

[info] my_app failed to connect to: "localhost":7778 - {:tls_alert, {:bad_certificate, 'TLS client: In state wait_cert_cr at ssl_handshake.erl:2013 generated CLIENT ALERT: Fatal - Bad Certificate\n'}}

[error] GenServer #PID<0.879.0> terminating
** (stop) {:tls_alert, {:bad_certificate, 'TLS server: In state wait_finished received CLIENT ALERT: Fatal - Bad Certificate\n'}}
Last message: {:thousand_island_ready, %ThousandIsland.Socket{acceptor_id: "A0ABC6B51E5C", connection_id: "96AE1BDD528B", socket: {:sslsocket, {:gen_tcp, #Port<0.21>, :tls_connection, [option_tracker: #PID<0.707.0>, session_tickets_tracker: :disabled, session_id_tracker: #PID<0.708.0>]}, [#PID<0.877.0>, #PID<0.875.0>]}, transport_module: ThousandIsland.Transports.SSL}}
State: {%ThousandIsland.Socket{acceptor_id: "A0ABC6B51E5C", connection_id: "96AE1BDD528B", socket: {:sslsocket, {:gen_tcp, #Port<0.21>, :tls_connection, [option_tracker: #PID<0.707.0>, session_tickets_tracker: :disabled, session_id_tracker: #PID<0.708.0>]}, [#PID<0.877.0>, #PID<0.875.0>]}, transport_module: ThousandIsland.Transports.SSL}, []}

I guess we aren't handling some state, but I haven't poked deeper at it yet.

Improve support for read timeouts

Currently, Thousand Island supports the notion of setting a timeout as part of the return value from handle_connection and handle_data calls. If this timeout is reached without more data being sent from the client, handle_timeout is called to allow the handler a mechanism to shut down the connection (or attempt to 'wake up' the client, etc). This mechanism is entirely tied to the async handle_data callback and doesn't have any effect on imperative Socket.recv calls (they have their own timeouts that map directly to the underlying :gen_tcp call).

Moreover, at the point of use this ends up with handler implementations having to carry around configured timeout values themselves, since the only place they can communicate them to Thousand Island is via callbacks. We can do better than this, especially considering that timeout values are usually a 'one-time' configuration point, and don't change throughout the lifetime of a connection.

It would be preferable to have a single harmonized way for handlers to specify timeouts. I'm thinking:

  1. Add a read_timeout option to the list of ThousandIsland.start_link options (alongside eg. num_acceptors)
  2. Plumb this value through to be stored in ThousandIsland.Socket structs
  3. Extend the ThousandIsland.Socket.recv function to use this value in its call to the transport module's recv call if the caller doesn't pass in a more specific value (the current implementation just defaults to :infinity in this case)
  4. Do something similar with the timeout code in the handler's return value parsing at https://github.com/mtrudel/thousand_island/blob/main/lib/thousand_island/handler.ex#L378 (if an explicit timeout is returned by handle_connection or handle_data call then use it, otherwise fall back to the socket default).

If not specified, we should use a default value of :infinity to match current behaviour (and the semantics of the underlying socket).

Once merged, the updated version of mtrudel/bandit#8 can be addressed and a bunch of awkward bookkeeping code pulled out of Bandit's HTTP/2 handler.

A way to align custom and library `GenServer` responses

Hello there ๐Ÿ‘‹

Thank you for such a great library!

There's a tiny itch that I've encountered while while implementing a ThousandIsland.Handler. I want to discuss whether it makes sense to solve it the way I did it and whether it makes sense to solve it on the library level.

So, I created a couple of GenServer.handle_info/cast/call callbacks while implementing a ThousandIsland.Handler. Everything works superb. What's more, those handlers share the logic with ThousandIsland.Handler.handle_data aside from one thing - return values.

I end up with the function that casts ThousandIsland.Handler.handler_result() to what GenServer handlers expect. Such function exists in the library - ThousandIsland.Handler.handle_continuation/2, but it's private.

I wonder whether it would make sense to make the function public? This way, library clients can tap into GenServer logic, but align their responses to what ThousandIsland.Handler does under the hood with its callbacks. E.g., share the same shutdown methods, or use socket.read_timeout as a default value for GenServer responses, and such.

Edit: a tiny PR that demonstrates the proposition #95

No way for handler to receive messages

Hi @mtrudel,

Thanks for working on this, it's pretty cool and has better ergonomics than ranch when developing in Elixir rather than erlang.

I'm running into something that should be easy to do, but currently is not possible with TI easily. I need to receive messages from processes other than Thousand Island over handle_info in my handler process.

Do you think it's possible to have TI handle only messages it is actually interested into and delegate the others to the handler?

problems with tcpkali

๐Ÿ‘‹

It's a bit off-topic but I've been benchmarking alternative acceptor pool implementations with https://github.com/satori-com/tcpkali and failed to get it working with ThousandIsland. I wonder if you'd see any problems with my setup

Mix.install [{:thousand_island, "~> 1.0.0-pre"}]
# ...
# thousand_island 1.0.0-pre.6
# ...

defmodule Echo do
  use ThousandIsland.Handler 
  @impl ThousandIsland.Handler
  def handle_data(data, socket, state) do
    ThousandIsland.Socket.send(socket, data)
    {:continue, state}
  end
end

{:ok, _pid} = ThousandIsland.start_link(handler_module: Echo, port: 8080)
$ tcpkali --message '$' --connections=100 --duration=15s 127.0.0.1:8080
Destination: [127.0.0.1]:8080
Interface lo0 address [127.0.0.1]:0
Using interface lo0 to connect to [127.0.0.1]:8080
Could not create 100 connections in allotted time (15s)

The number of connections doesn't seem to go far above ~5 and I'm not seeing any errors in the logs.

Support for UDP transport

Hi,
I'm interested in using thousand_island for writing a UDP server.
Is it a planned feature and/or would you mind some help writing this feature ?

Inconsistent callback invoked when data received

Please correct me if this is by design.

The nature of my complain is, I defined a supervisor like this:

defmodule CPayFirewall.Supervisor do
  use Supervisor

  def start_link(args) do
    Supervisor.start_link(__MODULE__, args)
  end

  @impl true
  def init(args) do

    {:listening_port, listening_port} = List.keyfind(args, :listening_port, 0)
    {:connect_host, connect_host} = List.keyfind(args, :connect_host, 0)
    {:connect_port, connect_port} = List.keyfind(args, :connect_port, 0)
    {:timeout, _timeout} = List.keyfind(args, :timeout, 0)

    header_config = Application.fetch_env!(:cpay_tester, :template_of_ascii_message_header_and_bitmap_config)
    fields_formats = Application.fetch_env!(:cpay_tester, :iso_ascii_field_format)

    children = [
        {ThousandIsland, port: listening_port, handler_module: Mudahnyyyyya},
        {Registry, keys: :duplicate, name: :cpayfirewall_registry},
        {CPayFirewall.Client, host: connect_host, port: connect_port, timeout: 500000, opts: [active: true, mode: :binary], id: :cpayfirewall_client},
    ]

    # ThousandIsland.start_link(port: 1234, handler_module: Echo, handler_options: %{prev_message: <<>>})

    Supervisor.init(children, strategy: :one_for_one)
  end
end

Then I defined the handler module like this:

defmodule Mudahnyyyyya do
  use ThousandIsland.Handler
  #import MessageAccumulator

  @impl ThousandIsland.Handler
  def handle_data(data, socket, state) do

    IO.inspect "what actually happened......"

    {:continue, state}
  end

  def terminate(socket, state) do
    {:continue, state}
  end

  @impl ThousandIsland.Handler
  def handle_connection(socket, state) do

    IO.inspect "client is connecting to x"

    disable_existing_connection("dummy")

    {:continue, state}
  end

  def handle_info({:forward_to_outside, msg}, {socket, state}) do

    IO.inspect "thousand island forwarding to ..."
    ThousandIsland.Socket.send(socket, msg)
    {:noreply, state}
  end

  def handle_info({:tcp, port, msg}, state) do

    IO.inspect "mudahnya"

    default_handle_info_tcp(msg, :tcp, nil, state)

    {:noreply, state}
  end

  def handle_info(a, {socket, state}) do
    {:noreply, state}
  end

  def default_handle_info_tcp(raw_message, _transport, _socket, state) do

    IO.inspect "forwarding to inside..."
    send(:firewall_internal, {:forward_to_inside, raw_message, self()})
    state

  end

  def disable_existing_connection(key) do

    registry_name = :cpayfirewall_registry

    case Registry.lookup(registry_name, key) do
      []  -> nil
      _   ->
        Registry.match(registry_name, "dummy", :enabled)
        |> Enum.each(fn {pid, _val} -> send(pid, :disable) end)

    end

    {:ok, _} = Registry.register(registry_name, key, :enabled)
  end

end

Also I defined a tcp client module:

defmodule CPayFirewall.Client do

  use Connection
  import MessageAccumulator

  def child_spec(opts) do

    {:id, id} = List.keyfind(opts, :id, 0)

    %{
      id: id,
      start: {__MODULE__, :start_link, [opts]},
      type: :worker,
      restart: :permanent,
      shutdown: 500
    }
  end

  def start_link(args) do

    {:host, host} = List.keyfind(args, :host, 0)
    {:port, port} = List.keyfind(args, :port, 0)
    {:timeout, timeout} = List.keyfind(args, :timeout, 0)
    {:opts, opts} = List.keyfind(args, :opts, 0)

    Connection.start_link(__MODULE__, {host, port, opts, timeout}, name: :firewall_internal)
  end

  def init({host, port, opts, timeout}) do

    state = %{host: host, port: port, opts: opts, timeout: timeout, sock: nil, queue: [], prev_message: <<>>, txn_count: 0}

    {:connect, :init, state}
  end


  def connect(_condition, %{host: host, port: port, opts: opts, timeout: timeout} = state) do

    case :gen_tcp.connect(host, port, opts, timeout) do
      {:ok, sock} ->
        
        IO.inspect "connected to port " <> Integer.to_string(port)
        {:ok, %{state | sock: sock}}
      {:error, _} -> {:backoff, 1000, state}
    end
  end

  def handle_info({:tcp_closed, _socket}, state) do
    IO.inspect "socket closed"
    IO.inspect "attempt to reconnect"

    {:connect, :init, state}
  end


  def handle_info({:tcp, socket, msg_in}, %{prev_message: prev_message} = state) do

    {list_of_messages, extra_data} = get_message(prev_message <> msg_in, [])

    state = Enum.reduce(list_of_messages, state, fn msg, acc -> default_handle_info_tcp(msg, :gen_tcp, socket, acc) end)

    {:noreply, %{state | prev_message: extra_data}}
  end


  def handle_info({:forward_to_inside, raw_msg, _pid}, %{sock: sock, queue: queue, txn_count: txn_count} = state) do

    # introduce sleep in case server cannot keep up
    :timer.sleep(1)

    txn_count = txn_count + 1

    :gen_tcp.send(sock, raw_msg)

    IO.inspect "firewall request txn_count ^^^ : #{txn_count}"

    {:noreply, %{state | queue: queue, txn_count: txn_count}}
  end

  def default_handle_info_tcp(raw_message, _transport, _socket, %{queue: _queue} = state) do

    IO.inspect Registry.count(:cpayfirewall_registry)

    # find which connection is enabled then forward to that process
    Registry.match(:cpayfirewall_registry, "dummy", :enabled)
    |> Enum.each(fn {pid, _val} -> send(pid, {:forward_to_outside, raw_message}) end)

    state

  end


end

The purpose of the 3 modules is to act as intermediaries between a TCP server and a TCP Client like my drawing below:

{A very important application} <-- TCP -- {CPay firewall client} <-- erlang message -- {CPay firewall server (thousand island)} <-- TCP -- {some client}

What I found out is, in my current setup Thousand Island always call handle_info({:tcp, port, msg}, state) callback, however if I open command prompt do telnet to the port listened by the Thousand Island, callback handle_data(data, socket, state) will be called, I tried to write a separate TCP client that connect to the Thousand Island server also handle_data(data, socket, state) is called.

I hope my explanation is clear.

Unexpected error in accept: :emfile

We're getting a lot of these errors:

RuntimeError: Unexpected error in accept: :emfile
  File "lib/thousand_island/acceptor.ex", line 27, in ThousandIsland.Acceptor.accept/5
  File "lib/task/supervised.ex", line 89, in Task.Supervised.invoke_mfa/2
  File "proc_lib.erl", line 240, in :proc_lib.init_p_do_apply/3

Paired with:

** (exit) exited in: GenServer.call(:undefined, :acceptor_info, 5000)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
  File "lib/gen_server.ex", line 1027, in GenServer.call/3
  File "lib/thousand_island/acceptor.ex", line 10, in ThousandIsland.Acceptor.run/1
  File "lib/task/supervised.ex", line 89, in Task.Supervised.invoke_mfa/2
  File "proc_lib.erl", line 240, in :proc_lib.init_p_do_apply/3

This happens in a Phoenix application with thousand island 0.6.5 and bandit 0.7.4.

Any indication what could cause this?

Upgrading the transport mid-connection

We have a (partial) implementation of PostgreSQL server protocol that I recently ported to ThousandIsland. PostgreSQL protocol has a quirk: it always starts as plain TCP, then, as a first couple of bytes, client may send a request to update to one of the supported encrypted protocols - one of them being SSL. It's then expected of the server to run a SSL handshake and proceed over the socket under the SSL wrapping.

I understand that it is possible to achieve by hand-rolling a handler instead of use ThousandIsland.Handler, however it may be a useful feature to allow either returning something akin to a :switch tuple you have in Bandit, or just returning an updated socket - since all function on ThousandIsland.Socket just use transport_module field, it would be enough to allow smth like

def handle_data("magic message", socket, state) do
  socket = ThousandIsland.Transport.Ssl.handshake(socket, additional_ssl_options)
  socket = %{socket | transport_module: ThousandIsland.Transport.Ssl}
  {:continue, {socket, state}} # or {:switch, socket, state, _timeout} to be less conflicting
end

Because currently I don't see a way of upgrading the connection within handle_connection, and I think it would be a good feature.

I can probably offer up a PR if there is a consensus on the best return value for this

No way to set `inet6` option for the TCP transport (and by extension, to SSL transport)

I am trying to run Bandit with FD instead of specifying port. The passed FD is IPv6 socket opened by systemd (I am testing systemd library with Bandit) and my :options in Bandit are set to:

[
  port: 0,
  transport_options: [
    :inet6,
    fd: fd,
    exit_on_close: false
  ]
]

But it fails, as ThousandIsland is using Keyword module, which do not accept "raw" options, I cannot use inet6: true as it is not accepted by :gen_tcp (as it do not use proplist module), I cannot use approach from Ranch of net: :inet6 as it is not supported at all.

Slow accepting

Hi,

This is not a bug, more to a question, however I don't know where else to ask.

I created an application that listen to a port.

defmodule MyApp.ConnectionHandler do

  use ThousandIsland.Handler

  @impl ThousandIsland.Connection
  def handle_connection(socket, state) do

    IO.inspect "new client connected"

    {:close, state}
  end

  @impl ThousandIsland.Handler
  def handle_data(data, socket, state) do
    {:continue, state}
  end
end

Notice I disconnects immediately to enable me to measure the TCP accept timespan.

defmodule Test1.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      {ThousandIsland, port: 11234, handler_module: MyApp.ConnectionHandler},
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Test1.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Then I write a simple bat file

ECHO:| TIME >> measure_time__.txt
telnet localhost 11234
ECHO:| TIME >> measure_time__.txt

Then I run the bat file.

measure_connect.bat

Then I got this result

The current time is: 12:28:11.24
Enter the new time: 
The current time is: 12:28:13.51
Enter the new time: 

The result summarize that it takes about 2 seconds to just accept then disconnects, is that normal?
Is there anything that I can do to make it accept the connection faster?
Is this just happening on Window only?

Thank you,

-- Haimi

Feature request: Let the process outlive beyond the connection

I notice in the code that when a tcp_error or tcp_closed is received, the connection process is terminated.

It would be beneficial if instead of ignoring the return value from handle_close / handle_error callbacks, we could depend on the return value of these callbacks to determine whether to keep the process running or to terminate it.

This feature is particularly useful in implementing protocols like MQTT, where we aim to make the session resilient to connection interruptions. Let's say when a connection drops, we allow the process to remain alive for a reasonable amount of time. If the client reconnects, we can then assign the controlling process of the socket to the already existing process, saving us a significant amount of time and effort in persisting/initializing the state.

Changing the behavior to handle the return value of close/error callbacks would be a breaking change. However, I believe it is worth adding this feature for the version 2 milestone.

Acceptor pool race condition

With TestServer I often see these errors:

03:40:44.098 [error] Task #PID<0.15608.0> started from #PID<0.15606.0> terminating
** (RuntimeError) Unexpected error in accept: :einval
    (thousand_island 0.5.17) lib/thousand_island/acceptor.ex:25: ThousandIsland.Acceptor.accept/3
    (elixir 1.14.2) lib/task/supervised.ex:89: Task.Supervised.invoke_mfa/2
    (stdlib 4.1.1) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Function: &ThousandIsland.Acceptor.run/1
    Args: [{#PID<0.15360.0>, #PID<0.15606.0>, %ThousandIsland.ServerConfig{port: 58147, transport_module: ThousandIsland.Transports.TCP, transport_opts: [:inet], handler_module: Bandit.DelegatingHandler, handler_opts: %{handler_module: Bandit.InitialHandler, plug: {TestServer.Plug, {TestServer.HTTPServer.Bandit, [], #PID<0.15359.0>}}}, genserver_opts: [], num_acceptors: 100, read_timeout: 15000}}]

I suspect it has something to do with how I spin up/down bandit adhoc. A common use case would be start the server, quick test, then shut it down in the on_exit handler. TestServer.start calls Bandit.start_link/1 and TestServer.stop calls ThousandIslands.stop/1 in the above.

I wonder if it's the ThousandIsland.ShutdownListener that breaks the supervision tree in some edge cases, since it seems that it can terminate the listener earlier than expected making it so the Acceptor is still starting up while the Listener process is killed:

  def init(config) do
    children = [
      Supervisor.child_spec({ThousandIsland.Listener, config}, id: :listener),
      Supervisor.child_spec({ThousandIsland.AcceptorPoolSupervisor, {self(), config}},
        id: :acceptor_pool_supervisor
      ),
      {ThousandIsland.ShutdownListener, self()}
    ]

    Supervisor.init(children, strategy: :rest_for_one)
  end

Moving the shutdown listener up does resolve the issue, but that probably breaks the early draining logic? I haven't been able to isolate the problem and make a specific test for it. It happens with certain test seeds, so it might be reproducable. I'll try dig deeper to find out what exactly needs to happen for the :einval errors to be thrown, but maybe you already got a good idea what this could be @mtrudel? The port is not being reused btw.

FunctionClauseError randomly happens

We've been recently trying to use Bandit and thousand_island for our services and have received some FunctionClauseError issues on Sentry. I've attached a screenshot with the stacktrace here.

Screenshot 2023-08-30 at 1 45 35 PM

Using the ThousandIsland.Handler "escape hatch"

I am using :thousand_island from hex as {:thousand_island, "~>1.3.5"}.
According to mix.lock I do have 1.3.5.

I am trying to use ThousandIsland to handle a proprietary TCP protocol. I am trying to replicate some existing code (written for another environment) which reads the header for a message (the header includes a payload length), then reads the body of the message using that length. Most of the time those should come in as a single TCP transmission (and would therefore be a single call to ThousandIsland.Handler.handle_data I assume.)

But (based on this existing code) that also might not be true in all cases. I believe that might lead to a single message which would span multiple calls to handle_data.

I thought I would solve the problem by using the info under "When Handler Isn't Enough" in the documentation, and read the data using two recv calls, directly on the socket. To that end I have created a GenServer that handles the :thousand_island_ready message according to the documentation:

  defmodule MyApp.ProtocolHandler do

  ...

  @impl GenServer
  def handle_info(
        {:thousand_island_ready, socket, _server_config, _acceptor_span, _start_time},
        %{ handler: packet_handler }
      ) do

    {:ok, socket} = ThousandIsland.Socket.handshake(socket)

  ...

The problem is that the "socket" parameter I am receiving as part of handling that message is #Port<0.4> and not a ThousandIsland.Socket struct. Consequently the call to handshake fails with

** (FunctionClauseError) no function clause matching in ThousandIsland.Socket.handshake/1

So it appears that the process sending the :thousand_island_ready tuple is sending the raw socket instead of a ThousandIsland.Socket to wrap it. I suspect this is a bug, but it could also be a problem with the documentation.

(P.S. Just as a point of interest, I had this code working though :ranch in a similar fashion. I am trying to adopt ThousandIsland primarily as a learning experience).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.