My model of rust's execution model is minimal at best. Any help in mapping this into rust conventions is appreciated.
In the current code we spawn a background process process_discv5_requests
that handles incoming requests:
|
|
|
tokio::spawn(events.process_discv5_requests()); |
|
|
The process_discv5_requests
enters an infinite loop, picking up one inbound packet, checking that it is a TalkRequest
and then handling it in process_one_request
which returns a response message which is then dispatched back to the original sender.
The handling process is implemented with a match
against each of the different message types, constructing the response packet and returning it.
This architecture is unlikely to facilitate a clean and rubust client for multiple reasons.
Doing work after handling a request
We need the ability to do work after dispatching the response packet. An example:
The OFFER/ACCEPT message pair are used to initiate a uTP stream over which the actual data will be transferred. For this to work, the node receiving the OFFER message will send back an ACCEPT message with a connection-id
for the uTP stream. After sending the ACCEPT back the receiving node will need to listen for an incoming uTP stream with that connection-id. There is likely additional validation needed after receiving the whole payload. Timeouts will also be a thing during this sequence of messages.
If the handling of the message happens within the main message handling loop, then we will end up blocking the processing of messages until this sequence has completed.
Doing maintenance work.
Every PONG message containes an ENR sequence number. Every time we receive a pong, we should check whether the sequence number is newer than our local record. In the event of a higher sequence number we need to request an updated ENR record from the node.
PING messages are going to be used in a number of places. One specific use is validating liviliness which happens in a few different contexts. We do not want to copy/paste this sequence number checking code across these places. We also do not want to have to remember to trigger this process each time we use the PING message.
The only place where PONG messages can currently be handled in the current architecture is at the call site where the ping was sent, which is very limiting.
The solution to this is to implement an API that allows subscriptions to specific message types. This allows us to spawn a background process which subscribes and is given a chance to process every inbound PONG message.
This same subscription API can also easily be used to implement standard handling of request messages. Here is the handling of inbound PING requests in the python implementation of this:
https://github.com/ethereum/ddht/blob/142911d134ff839f3f79ff8fe9e45d3fe5a58cd0/ddht/v5_1/alexandria/network.py#L290-L308
The suggested architecture
Starting with the main stream of TALKREQ
and TALKRESP
messages.
def process_requests(...):
for message in stream_of_messages:
subscription_manager.handle_message(message)
The subscription_manager
in this case is doing something like this:
class SubscriptionManager:
def handle_message(message):
subscriptions = self.get_subscriptions_for_message_type(type(message))
for subscription in subscriptions:
# the message just gets put in a queue to be consumed elsewhere
subscription.append(message)
What this facilitates is the ability to implement message handling as long running background processes, instead of doing it in the main handling loop. This ensures that the main handling loop isn't blocked by messages that take a long time to process.