Giter VIP home page Giter VIP logo

distributed-process's People

Contributors

acfoltzer avatar alios avatar bgamari avatar davidsd avatar dcoutts avatar edsko avatar ericbmerritt avatar facundominguez avatar fuuzetsu avatar hyperthunk avatar jepst avatar joelmccracken avatar jonathanlorimer avatar kowey avatar laurentrdc avatar ldobrek avatar markwright avatar matt2718 avatar mboes avatar nushio3 avatar pankajmore avatar peterbecich avatar qnikst avatar roman avatar rrnewton avatar simonmar avatar tavisrudd avatar trofi avatar yoeight avatar zenzike avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

distributed-process's Issues

Issues with IPv4 vs IPv6 on OSX

It seems that the current implementation doesnt like IPv4 when binding the server and it doesnt like IPv6 when connecting from a client.

Using distributed-process v0.3.1 installed with cabal install.

Starting the server using IPv4 (DOESNT WORK):

Workstation :: ~DEV » runhaskell Server.hs 127.0.0.1 12493
Bind to 127.0.0.1:12493
Server.hs: bind: unsupported operation (Can't assign requested address)

Starting the server using IPv6 (WORKS):

Workstation :: ~DEV » runhaskell Server.hs ::ffff:127.0.0.1 12493
Bind to ::ffff:127.0.0.1:12493
Echo server started at ::ffff:127.0.0.1:12493:0

Running the client using IPv4 (WORKS):

Workstation :: ~DEV » runhaskell Client.hs ::ffff:127.0.0.1 11111 127.0.0.1:12493:0
ConnectionOpened 1024 ReliableOrdered 127.0.0.1:12493:0
Received 1024 ["Hello world"]
ConnectionClosed 1024

Running the client using IPv6 (DOESNT WORK):

Workstation :: ~DEV » runhaskell Client.hs ::ffff:127.0.0.1 11111 ::ffff:127.0.0.1:12493:0
Client.hs: Error connecting: TransportError ConnectFailed "user error (Could not parse)"

testConnectToSelf failing

On line 451 of TestTransport.hs :
https://github.com/haskell-distributed/distributed-process/blob/edsko/network-transport-2/tests/TestTransport.hs#L451

If I add another line beneath this, so that two connections are made between an endpoint and its own address, then the 'testConnectToSelf' fails. e.g.

testConnectToSelf :: Transport -> Int -> IO ()
testConnectToSelf transport numPings = do
  done <- newEmptyMVar
  Right endpoint <- newEndPoint transport

  tlog "Creating self-connection"
  Right conn <- connect endpoint (address endpoint) ReliableOrdered
  Right dummyConn1 <- connect endpoint (address endpoint) ReliableOrdered

It is throwning an error for line 473 :
https://github.com/haskell-distributed/distributed-process/blob/edsko/network-transport-2/tests/TestTransport.hs#L473

where (Received cid' msg) is not correctly being pattern matched:

Running "ConnectToSelf": failed (exception: TestTransport.hs:473:46-99: Non-exhaustive patterns in lambda

So - having multiple connections from an endpoint to its address appears to be causing problems.

please improve API docs for: Closure, Static

(sorry Edsko, here's another beginner's request)

From reading the API docs I guess that Control.Distributed.Process.call
is one basic method to "run a process remotely and wait for it to reply".
(I guess "to reply" is "execute 'return x' in the Process monad?)

The type of spawn contains both Static and Closure,
and when I track their API docs, I am just seeing data decls with hidden
constructors. What I'm missing is a note (right there) about how such objects
should be created - or how the construction can be avoided.

Backends for Platform LSF, Oracle Grid Engine

Hello,

I'm primarily interested in using Cloud Haskell for running computations on clusters that use job schedulers like Platform LSF and Oracle Grid Engine. The typical pattern is that I submit an array of N jobs to the scheduler, and the scheduler decides which machines to run them on, and at what time. As a user, some key features of this setup are:

  1. Different processes in the job array are started at different times. This is typically because there are other users on the cluster, and the scheduler uses priorities and queues to determine what should be run when.

  2. The number of processes running at any given time is almost always less than N. The simplest example of this is if I schedule 1000 jobs on a cluster with 100 machines, then obviously some of the jobs must be run in sequence. A more common example for me is that the cluster is busy, and my jobs get interleaved with jobs from other users, reducing the effective number of available machines.

  3. I have no control over which machines processes are started on. There's also no way to know which machine a process will be started on before the process actually starts.

  4. Individual processes may be killed or suspended at any time. This is most common when they happen to be running on a machine for which another user has higher priority (enough to kick me off).

I'm wondering what would be involved in writing a Cloud Haskell backend for this type of environment. I have written some ad-hoc programs to deal with this sort of thing before. An example situation is that I have a function f which is very expensive to compute, and I would like to farm different calls to this function out to different machines. The model I've used is:

  • A single master which decides for which x's to compute f(x).
  • A bunch of workers, each equipped to compute f(x).
  • A process determines it's own master/slave status based on its index in the job array (an environment variable). Job index 1 is the master, the rest are slaves.
  • When a slave starts up, it uses the job scheduler to find the IP address of the master, and sends a "ready" message to the master.
  • The master keeps a queue of available slaves which is updated whenever a "ready" message arrives, or whenever the result of a computation arrives.
  • The master also keeps a list of running slaves and what computations they're performing.
  • If a slave dies, it's discarded from the list of running slaves, and its computation is sent to the next available slave.

This is all to cope with the (somewhat frustrating) fact that the "cloud" is dynamic, and many of its properties are only known at runtime. The number of available slaves can grow or shrink during the course of the computation. From what I've read, it looks like Cloud Haskell prefers to assume that the size and topology of the cloud is static. Is this necessary? Any recommendations on writing a backend for the environment above?

CH or NT must implement keep-alive

When one CH process A monitors another B, it expects to be notified if the connection between them breaks, even when A never sends anything to B (but only receives messages from B). This means that it is not enough to rely on send to detect network problems. This can be solved at the CH level or at the NT level.

Automatically GC connections

Consider a process such as

pingServer :: Process ()
pingServer = forever $ do
  client <- expect
  send client ()

This server "leaks" connections. Since CH guarantees ordering, we cannot close "time out" connections for instance. The programmer can fix this manually:

pingServer :: Process ()
pingServer = forever $ do
  client <- expect
  send client ()
  reconnect client

(perhaps the name reconnect is confusing?), but ideally we would take care of this at the CH level instead.

Tutorial for the task layer?

Is there a chance we see a tutorial on task/promise layer as implemented in the prototype?

Or is it job of another package?

I would like to implement a simple tasks on a master connected to many worker nodes, like:

do promises <- mapM startComputation inputList
results <- mapM redeemPromise promises

And I have no idea where to start...

add OTP style framework of standard *behaviours*

It'd be great to have a go at implementing OTP style behaviours (a la gen_server, supervisor, etc) now that distribute-process is stable (looking). I'm quite happy to contribute, but there are no contribution guidelines so I'm not sure if I should fork and add a distributed-process-framework sub package to the repository or do this as a separate project or what. Any pointers would be much appreciated.

consider adding a self-contained (compilable, runnable) example for "call" to C.D.P.Closure

I understand that with every library, you can only do so much
with the API doc - at some point users just have to read
the underlying paper to get the fundamental abstractions right, and there's no shortcut.

But - here for me the problem is not so much the abstraction,
but rather the technicalities of the work-arounds for not having "static".

With technicalities, examples are quite helpful.
You have them distributed over the API docs, that's fine,
but a minimal complete example would be a welcome addition.
I tried for the better part of an hour until arriving at the following. Is it idiomatic?

{-# language TemplateHaskell #-}

import System.Environment (getArgs)
import Control.Distributed.Process
import Control.Distributed.Process.Closure
import Control.Distributed.Process.Node (initRemoteTable)
import Control.Distributed.Process.Backend.SimpleLocalnet

compute :: Integer -> Process Integer
compute n = do
    liftIO $ putStrLn "the slave does a computation"
    return $ n + 1

$(remotable ['compute])

master :: Backend -> [NodeId] -> Process ()
master backend slaves = do
  liftIO . putStrLn $ "Slaves: " ++ show slaves
  case slaves of
      [] -> liftIO $ putStrLn "no slaves"
      s : _ -> do
           out <- call $(functionSDict 'compute) s 
                $  $(mkClosure 'compute) ( 10 ::Integer)
           liftIO $ putStrLn $ show out
  terminateAllSlaves backend

main :: IO ()
main = do
  args <- getArgs
  let rtable :: RemoteTable
      rtable = Main.__remoteTable $ initRemoteTable
  case args of
    ["master", host, port] -> do
      backend <- initializeBackend host port rtable
      startMaster backend (master backend)
    ["slave", host, port] -> do
      backend <- initializeBackend host port rtable
      startSlave backend

Support `closeConnectionTo`

which closes the entire "bundle" of (outgoing and incoming) connections to another endpoint. Basically, "disconnect completely from this other endpoint" (a "heavyweight disconnect").

Once this is implemented we can resolve a TODO in Control.Distributed.Process.Node.

More automated testing

Write a script-driven Network.Transport implementation with which we can test for specific network failures, and then use QuickCheck to generate scripts (similar to how ByteString is tested).

killThread hangs on GHC 7.0.4 7.2.1 but NOT 6.12.3 and 7.4.1

The bug is found in DemoTransport.hs, which will hang when trying to "killThread"
(Pipes backend only, demo3 and demo4). Yet this hang happens ONLY under GHC
7.0.4 and 7.2.1. Under GHC 6.12.3 and GHC 7.4.1 it works fine!

At first I thought this may be an issue with non-allocating threads not
being preempted by the runtime system (and therefore not servicing the
ThreadKilled asynchronous exception). But it's hard to explain that
pattern of outcomes on different GHC versions.

See commit: 48e257a

Asynchronous exception bug in spawn

The following code in Network.Transport.Util is unsafe with regard to asynchronous exceptions. If an asynchronous exception is thrown to the forked thread before the endpoint address is put to the addr MVar then the final takeMVar will dead-lock:

spawn :: Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn transport proc = doo
  addr <- newEmptyMVar
  forkIO $ do
    Right endpoint <- newEndPoint transport
    putMVar addr (address endpoint)
    proc endpoint
  takeMVar addr

One way to solve this is using a combination of mask and try. However a way more simple implementation is:

spawn :: Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn transport proc = do
  Right endpoint <- newEndPoint transport
  forkIO $ proc endpoint
  return $ address endpoint

Since the original code has to wait for the completion for newEndPoint anyway we could just as well execute it in the main thread and only execute proc endpoint in a separate thread. No need for an MVar so no posibility of dead-lock.

However since this code is so simple I wonder if there's actually a need for this combinator. Is it used anywhere? If not, I propose to remove it.

Reduce message overhead

Small CH messages have a large overhead. For instance, consider sending (Nothing :: Maybe ProcessId). The binary encoding of Nothing is one byte, but the total message sent looks like

<channel ID>                4 bytes
<message length>            4 bytes
<message type fingerprint>  16 bytes
<payload>                   1 byte

For an overhead of 2400% :) Obvious ways to reduce the overhead are

  • Use a single byte for the first 255 connection IDs
  • Use a single byte for message length <= 255
  • Use a single byte to index into some cache for commonly used fingerprints (since most applications will probably send only a handful of different types of messages across the network, this might be very effective)

Is there any way that Closure/Static could be factored into a separate package?

Perhaps this is too much to hope for, but it would be ideal for packages such as meta-par, HDpH and distributed-process to be able to share basic RPC functionality (Closure/Static).

With meta-par we copied and hacked a version of the original Cloud Haskell ("Remote") Closure. The ugly bit was that it had hard-coded recognition of the IO and ProcessM monads, and we had to tweak that to include monad-par's "Par" monad.

I don't understand the current Closure implementation, but it looks like it may be the case that CP.hs (the part that deals with Process values) is pretty well isolated from the rest. Does that mean that everything but CP.hs could become its own package?

It looks like TH.hs also deals with Process presently. TH.hs would either need to be replicated in all consumers of the hypothetical factored library, or it would need to become more extensible. When we were using the Closure mechanism in monad-par/meta-par this was the sticking point -- we didn't see a way to do it without adding extra arguments to the compile-time TH functions (e.g. remotable), which would be very ugly.

Any good ideas here?

Missing receiveChanTimeout

There should be a something like receiveTimeout/expectTimeout for channels.

receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)

Can't build examples

I'm having a crack at a zeromq transport and I'm having some trouble building the examples.

DemoTransport.hs refers to Network.Transport.MVar which doesn't seem to exist. When commenting DemoTransport from the make file (and get rid of the ".exe" suffixes) I get:

ghc -package-conf ../network-transport-zmq/cabal-dev/packages-7.4.2.conf -O2 -rtsopts -threaded --make  DemoProcess.hs -o DemoProcess
[1 of 1] Compiling Main             ( DemoProcess.hs, DemoProcess.o )

DemoProcess.hs:7:31:
    Module `Network.Transport.TCP' does not export `mkTransport'

DemoProcess.hs:7:44:
    Module `Network.Transport.TCP' does not export `TCPConfig(..)'
make: *** [DemoProcess] Error 1

Are the examples still the preferred method to play around with the library or should I be looking at other things now, like benchmarks and tests?

Cheers,
Ben

TCP Transport appears to run messages together

In the course of adding a new named pipes transport, I noticed the following behavior of the TCP transport on demo0.

Ten messages are sent, one concatenated message is received, resulting in the output below.

Note: I'm testing this in rev 1611171 (not yet merged) but it should apply to rev 8c57693 as well.

MVAR Transport:
logServer rcvd: ["hello 1"]
logServer rcvd: ["hello 2"]
logServer rcvd: ["hello 3"]
logServer rcvd: ["hello 4"]
logServer rcvd: ["hello 5"]
logServer rcvd: ["hello 6"]
logServer rcvd: ["hello 7"]
logServer rcvd: ["hello 8"]
logServer rcvd: ["hello 9"]
logServer rcvd: ["hello 10"]

TCP Transport:
logServer rcvd: ["hello 1hello 2hello 3hello 4hello 5hello 6hello 7hello 8hello 9hello 10"]

Pipes Transport:
logServer rcvd: ["hello 1"]
logServer rcvd: ["hello 2"]
logServer rcvd: ["hello 3"]
logServer rcvd: ["hello 4"]
logServer rcvd: ["hello 5"]
logServer rcvd: ["hello 6"]
logServer rcvd: ["hello 7"]
logServer rcvd: ["hello 8"]
logServer rcvd: ["hello 9"]
logServer rcvd: ["hello 10"]

Check for ByteString memory leaks

Since ByteString's can point to (larger) bytestrings, it is easy to create "bytestring memory leaks". We have already fixed some of these, but there may be others.

Network.Transport.TCP may reject incoming connection request

Suppose A and B are connected, but the connection breaks. When A realizes this immediately and sends a new (heavyweight) connection request to B, then it /might/ happen that B has not yet realized that the current connection has broken and will therefore reject the incoming request from A as invalid.

This is low priority because

  • the window of opportunity for the problem to occur is small, especially because in the case of a true network failure it will take some time before a new connection can be established
  • even if the problem does arise, A can simply try to connect again (A might have to that anyway, to find out if the network problem has been resolved).

Meta package to install all the other packages?

I had some issues where cabal had installed more than one version of these packages and apps were compiled using a mix of versions. Wondering if having a meta package (no code) that includes the other ones will help managing this.

I had to do something like this to get out of the cabal mess:

ghc-pkg list | grep -e distributed -e network-transport | xargs -t -I pkg ghc-pkg unregister pkg --force

And then reinstall the packages.

Maybe the following is already a good way?

cabal install distributed-process-simplelocalnet

please improve (API) doc for newcomers

I just want to play around with distributed-process
(and later I want my students to do this).

So I find it very useful to have a working example in the docs.
http://hackage.haskell.org/packages/archive/distributed-process-simplelocalnet/0.2.0.1/doc/html/Control-Distributed-Process-Backend-SimpleLocalnet.html

I can compile this (I put -threaded -rtsopts just out of habit, but
is it necessary? If so, the doc should say.)

but for running,
I am missing a sentence that says how this program should be used.
I guess, start slave(s) and master - but on the same machine?

I do not understand what the second cmd line argument ("host") is used for.
When I start slave and master on one machine,
and use "localhost" for host, it does not seem to work, but it does with "127.0.0.1"
(so perhaps this is an issue with my resolver).

I do not understand whether this example is supposed to work with slaves/master
on different machines. If so, then again I do not see what "host" arguments to use.

The API doc of "findPeers" does not help:
"findPeers t sends out a who's there? request,..."
since it does not say to whom this message is sent.

Sure, these are very basic questions, but if the package is
going to be used widely,
then I guess they will come up over and over again.
If you think the answer would be too long to put into the API doc,
then put it elsewhere, and refer to it.

Thanks, Johannes.

Network backend with security measures

Erlang uses cookies to authenticate nodes; we could do something similar or something much more sophisticated. This should be done in individual backends, as security might vary widely from one setup to another. Ideally, security is handled entirely within the backend (within the network-transport?) so that the core Cloud Haskell library is unaffected.

rank1dynamic not ghc 7.6 compatible (requires base 4.5.*)

if I change the cabal constraints to
Build-Depends: base <4.7 && > 4.4,
ghc-prim >= 0.2 && < 0.4,
binary >= 0.5 && < 0.6

it seems to build fine, and then I can cabal install distributed-process without any compile time problems
(i'll start playing around and testing if it works as desired on ghc 7.6 shortly :) )

Polymorphic expect

With the standard Cloud Haskell primitives it is impossible to write processes such as a proxy; for instance, we cannot write something like

proxy :: Process ()
proxy = forever $ do
  msg <- expect
  send someOtherProcess msg

which forwards messages of any type. The most recent version of Cloud Haskell supports matchAny, with which the above process can be written as

proxy :: Process ()
proxy = forever $ do
  msg <- receiveWait [ matchAny return ]
  forward someOtherProcess msg

but we still cannot write something like

proxy :: Process ()
proxy = forever $ do
  (destination, msg) <- expect
  send destination msg

For this we would need an alternative Binary encoding (maybe even something like protocol buffers?) which would allow us to decode a message into a pair of messages without knowing the types of the pair components, i.e., something like

decodePair :: ByteString -> (ByteString, ByteString)

build problems: time-1.4 -> 1.2.0.5 (with ghc-7.4.2)

I am not able to install distributed-process alongside other packages (e.g., snap).
It seems this is due to version conflicts with random and time.

cabal install distributed-process

Resolving dependencies...
In order, the following would be installed:
random-1.0.1.1 (reinstall) changes: time-1.4 -> 1.2.0.5
distributed-process-0.2.1 (reinstall) changes: random-1.0.1.1 added

when I put '--force-reinst', then others appear broken (red with ghc-pkg list),
when I force-reinstall those, then distributed-process is broken.

Messages from the "main channel" as a receivePort

At the moment it is not possible to wait for a message from the main channel or from a typed channel. It might be useful to add a function

expectChan :: Serializable a => Process (ReceivePort a)

which creates a ReceivePort for messages of a specific type sent to the main channel. This ReceivePort can then be merged with other ReceivePorts as usual.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.