c-cube / lwt-pipe Goto Github PK
View Code? Open in Web Editor NEW[beta] A multi-consumer, multi-producers blocking queue and stream for Lwt
Home Page: https://c-cube.github.io/lwt-pipe/
License: BSD 2-Clause "Simplified" License
[beta] A multi-consumer, multi-producers blocking queue and stream for Lwt
Home Page: https://c-cube.github.io/lwt-pipe/
License: BSD 2-Clause "Simplified" License
This is more of a question about the intended behavior when creating pipes from Lwt's channels.
One usecase that I had in mind was to use connect
to shuttle content between an input and output channel. A simplified version can be seen in the following snippet for an echo server.
module Pipe = Lwt_pipe
Lwt_io.establish_server_with_client_address listen_address (fun _ (ic, oc) ->
let reader = Pipe.IO.read ic in
let writer = Pipe.IO.write oc in
Lwt_pipe.connect ~ownership:`OutOwnsIn reader writer;
Pipe.wait writer)
When using connect it is clear that using ownsership
' we can control which pipes owns the other, and that when the owner is closed, the ownee will also be closed. But when creating pipes via a channel its not clear to me when to close the owner pipe. connect
returns unit so i can't wait on a promise that resolves when all the content has been transferred from owner to ownee. Without that if i can't close the reader pipe, then the writer pipe will also remain open and in my example above the promise in the server handle will never be resolved.
I'm not very familiar with Lwt's channels yet, but should the reader pipe created from an input_channel automatically close when all content from the channel has been consumed? (whenever the channel encounters EOF)
I'm trying to use this library for creating communication channel between Lwt "threads", as it's recommended in Lwt documentation, but can't figure it out (disclaimer: ocaml newbie here).
I'm trying to create an [r|w]
pipe, use it in producer and consumer. According to the docs it should be possible, but Lwt_pipe.Reader.iter
expects [r]
permissions and when I try to pass [r|w]
pipe into Lwt_pipe.Reader.iter
I get the following error:
# Error: This expression has type (item, [ `r | `w ]) Lwt_pipe.t
# but an expression was expected of type
# item Lwt_pipe.Reader.t = (item, [ `r ]) Lwt_pipe.t
# The second variant type does not allow tag(s) `w
I tried fiddling with coercion but no luck. Usage example in readme creates Lwt_pipe.Reader.t
using of_list
which is not exactly what I'm trying to do. Any help is greatly appreciated!
Maybe I'm wrong but it seems that lwt-pipe
is not published on OPAM.
This is the result of my experiments.
TL;DR; I WIN! I IMPLEMENTED A SUPER USEFUL FUNCTION! ๐
Longer and more serious version.
First I stole got inspired by the values_avalaible
function present in Async. What I need is a way to read from a pipe, blocking if nothing is available like a normal read
, but not forever. If you think about it it's more or less a "high level" form of Unix select
.
The code was convoluted and @c-cube asked me a revision, and he was right.
So I re-implemented it in another branch using a mutable list of "waiters". This somewhat works but there are at least two problems here:
Not good at all.
So I decided to give up with this funny function and implement a read_with_timeout
in the read-with-timeout branch. The patch is this one: pdonadeo/lwt-pipe@071e12e
The idea here is to store in the readers Queue a bool ref
with Lwt.u. This bool becomes true when the timeout is reached. In the normal read
it's stored as false
and never changed. In the "read with timeout" it's stored false
at the beginning but another thread starts concurrently and sets it to true
.
The write
: when a reader is available from the Queue it's popped as usual. If the timeout expired that reader is thrown away and the write_step
is called again recursively.
This function has a clear semantics, I believe, with no ambiguity and only one queue of readers.
What do you think?
I want to discuss here before pushing a request.
I'm opening this "issue" only to ask... why aren't you publishing lwt-pipe? In my opinion it's a very useful library, with also a mention in the official documentation of Lwt.
It definitely deserve to be a first class citizen in the OCaml ecosystem.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.