Giter VIP home page Giter VIP logo

async's People

Contributors

andreasabel avatar bebesparkelsparkel avatar bitonic avatar cartazio avatar dmjio avatar edsko avatar erikd avatar facundominguez avatar hvr avatar intolerable avatar kirelagin avatar markus1189 avatar nh2 avatar nomeata avatar nominolo avatar ntc2 avatar osa1 avatar parsonsmatt avatar phadej avatar piyush-kurur avatar reiddraper avatar runeksvendsen avatar sergv avatar simonmar avatar sjakobi avatar snoyberg avatar timwspence avatar treeowl avatar ttuegel avatar yuras avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

async's Issues

`uninterruptibleCancel` should be the default

Reasoning: we want to make the common case uninterruptible, since it will often be part of a cleanup operation, after which there is an expectation for the thread to be dead.

Basically, we're arguing that the issues that affected withAsync in #24 will affect most code using cancel.

Explain link/link2 and forkRepeat

Reading through the source to async, it makes a lot of sense and I can follow many of the details, apart from link and link2. That might be my lack of understanding, might be a bug, or might be something that can be improved, or added as comments. My questions are:

  • Why doesn't link use waitCatch, which does the BlockedIndefinitelyOnSTM retry semantics.
  • If link used waitCatch is forkRepeat needed?
  • Is there any risk that throwTo raises an exception? If so, won't that trigger an infinite loop?
  • link2 uses forkRepeat, and waitCatchEither doesn't retry on BlockedIndefinitelyOnSTM. Should waitCatchEither retry? If so, should forkRepeat still live?
  • Shouldn't almost everything retry on BlockedIndefinitelyOnSTM?

FWIW, I'm increasingly unconvinced GHC's deadlock detection is correct... It fundamentally assumes that throwing an exception won't recover a thread, which isn't true. It seems to be causing quite a few weird corner cases (there was on in Shake too).

Idea: Give Async a Monad and MonadPlus instance

We would need to add a Pure :: a -> Async a data constructor to Async, but other than that, the current Async type supports the Monad and MonadPlus typeclasses. This is because STM is a Monad and MonadPlus.

return would equal Pure, obviously. Pure a >>= f = f a. For x >>= f where x isn't Pure, we give the resulting async x's asyncThreadId, and use the STM monad to generate the resulting _asyncWait.

The MonadPlus instance would be similar, with mzero corresponding to STM's mzero (and some junk ThreadId) and mplus corresponding to STM's mplus (and choosing the left ThreadId (or mzero's ThreadId).

If you don't want to add a new data constructor to Async, you can also define an Applicative and Alternative instance without changing the type at all. pure a would have a junk asyncThreadId and would use STM's pure for asyncThreadId. <*> would use STM's <*> for _asyncWait, and would choose the left-most non-junk-asyncThreadId for the resulting asyncThreadId. Alternative would work similarly. (If you do not like using an invalid ThreadId for asyncThreadId, you could slightly change asyncThreadId's type and use Nothing instead. Or you could make it a [ThreadId] of all the threads involved.)

No Monad instance for Concurrently?

Is there any particular reason that there's no Monad Concurrently instance? It seems fairly trivial to implement (at least to me) so I was wondering if there were some kind of rational behind not having it.

mapConcurrently + Memory-hogging operation

I'm using mapConcurrently to map a Memory-heavy IO action over a Tree of values. The Problem I'm experiencing is that the IO actions (mostly, readProcessWithExitCode) consume all the available Memory and starve the host program of Memory. However, I found out earlier that 100% CPU usage is easily possible even when using the number of cores as the maximum number of threads. Using the number of elements in the Tree as number of threads results in a lot of heavy processes being spawned, but not terminating quickly enough.

So, my question is: Is there a convenient way to limit the number of threads being used for a computation? Or is there another way to prevent this from happening?

UnliftIO-ify the API

Here is what unliftio has made of the async API:

https://hackage.haskell.org/package/unliftio-0.2.7.0/docs/UnliftIO-Async.html

The purpose of this abstraction is passing in transformer stacks on top of IO in the negative position, as in:

async :: MonadUnliftIO m => m a -> m (Async a)

What do you think of it? Assuming it is a good abstraction (I happen to think so but as far as I can tell the community is undecided), for organizational purposes, it would be great if async could pick up an unliftio-core dependency, then unliftio can stop mirroring the async API.

Thanks :)

Proposal for `raceBoth` function

I would like to propose adding the following raceBoth function if it's alright with the async maintainers. This function runs two IO actions at the same time and returns both of their results in the order in which they complete:

{-| Run two `IO` actions concurrently, returning their results in the same order
    which the two actions complete
-}
raceBoth :: IO a -> IO b -> IO (Either (a, IO b) (b, IO a))

The most common motivating use case for this function is to run some IO action concurrently with a timeout, and then doing something if the timeout completes first without canceling the other action. For example, you might try to read a fixed number of elements from some handle and if the timeout expires you can flush the current number of elements that you've read so far without canceling the current read:

example = do
    x <- raceBoth (threadDelay 1000000) readElement
    case x of
        -- Timeout reached!  Flush the elements we collected so far
        -- and then run `io` to finish reading the element
        Left  ((), io) -> ...
        -- Element read in time!  Keep going
        Right (e , _ ) -> ...

Here's the code I have been using for this function in my utility modules which is similar to the race/concurrently code in async:

import Control.Exception (SomeException, catch, onException)

import qualified Control.Exception       as Exception
import qualified Control.Concurrent      as Concurrent
import qualified Control.Concurrent.MVar as MVar

catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll = Exception.catch

{-| Run two `IO` actions concurrently, returning their results in the same order
    which the two actions complete
-}
raceBoth :: IO a -> IO b -> IO (Either (a, IO b) (b, IO a))
raceBoth ioA ioB = do
    mvar <- MVar.newEmptyMVar
    Exception.mask (\restore -> do
        let threadA = do
                a <- ioA
                MVar.putMVar mvar (Right (Left a))
        let threadB = do
                b <- ioB
                MVar.putMVar mvar (Right (Right b))

        let handler e = MVar.putMVar mvar (Left e)
        idA <- Concurrent.forkIO (threadA `catchAll` handler)
        idB <- Concurrent.forkIO (threadB `catchAll` handler)

        let oops = error "raceBoth: the impossible happened!"

        let stopBoth = do
                Concurrent.killThread idB
                Concurrent.killThread idA

        let collectA = do
                e0 <- MVar.takeMVar mvar
                case e0 of
                    Left   exc      -> Exception.throwIO exc
                    Right (Left  a) -> return a
                    _               -> oops
        let collectB = do
                e0 <- MVar.takeMVar mvar
                case e0 of
                    Left   exc      -> Exception.throwIO exc
                    Right (Right b) -> return b
                    _               -> oops
        let collectBoth = do
                e0 <- MVar.takeMVar mvar
                case e0 of
                    Left exc -> do
                        stopBoth
                        Exception.throwIO exc
                    Right e1 -> case e1 of
                        Left  a -> return (Left  (a, collectB))
                        Right b -> return (Right (b, collectA))

        r <- restore collectBoth `onException` stopBoth
        return r )

mapConcurrently / forConcurrently + lenses

async has some functions that can be parameterized by optics from lens, is this something that is worth adding.

mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)

forConcurrently :: Traversable t => t a -> (a -> IO b)-> IO (t b)

mapConcurrently_ :: Foldable f => (a -> IO b) -> f a -> IO ()

forConcurrently_ :: Foldable f => f a -> (a -> IO b) -> IO ()

can be parameterized by optics from lens

mapConcurrentlyOf :: LensLike Concurrently s t a b -> (a -> IO b) -> (s -> IO t)
mapConcurrentlyOf lens = flip (forConcurrentlyOf lens)

forConcurrentlyOf :: forall s t a b. LensLike Concurrently s t a b -> s -> (a -> IO b) -> IO t
forConcurrentlyOf = coerce (forOf @IO @s @t @a @b)

mapConcurrentlyOf_ :: forall s a r. Getting (Traversed r Concurrently) s a -> (a -> IO r) -> (s -> IO ())
mapConcurrentlyOf_ lens = flip (forConcurrentlyOf_ lens)

forConcurrentlyOf_ :: forall s a r. Getting (Traversed r Concurrently) s a -> s -> (a -> IO r) -> IO ()
forConcurrentlyOf_ = coerce (forOf_ @IO @r @s @a)

that run them concurrently

>> forConcurrentlyOf_ (replicated 10) 'a' print
'a''a''
'''aa'''a'
a''aaa'a'

'''
'

Chunked concurrent map

I'm quite new to haskell and it's quite tempting to use mapConcurrently for parallel HTTP requests. However, some configurations don't allow millions of parallel connections. Therefore I tried to write a chunking wrapper for mapConcurrently.

Lacking enough experience to implement this generally for Traversable I implemented this function as Int -> (a -> IO b) -> [a] -> IO [b]. While this is sufficient for my purpose, I'd love to see an improved implementation in async.

Here is my code prototype (public domain):

module AsyncChunk where

import Data.Traversable (Traversable)
import Control.Concurrent.Async (mapConcurrently)
import Control.Monad (mapM, liftM)

-- | split at regular intervals
chunk :: Int -> [a] -> [[a]]
chunk _ [] = []
chunk n xs = y1 : chunk n y2
  where
    (y1, y2) = splitAt n xs

-- | mapConcurrently that only processes a chunk with given size of a time
mapConcurrentlyChunked :: Int -> (a -> IO b) -> [a] -> IO [b]
mapConcurrentlyChunked c f l = liftM concat $ mapM (mapConcurrently f) $ chunk c l

Is anyone of you interested to:

  • Help me improve this implementation
  • Merge it into async?

Any comments are welcome!

Feature request: linkSpecific

The link function rethrows exception of an asycn in the current thread. Sometimes, it's useful to only rethrow certain exceptions. For example, canceling an asyn works by throwing an asynchronous exceptions and there are situations where this asynchronous exception should not be rethrown in the thread linked to the async. See #25.

More specifically, I propose the following new function

-- | Rethrow only specific exceptions of the given @Async@ in the current thread.
linkSpecific :: (SomeException -> Bool) -> Async a -> IO ()

The implementation of linkSpecific is a straightforward generalization of link. I all agree that such a function is useful, I can provide a pull request.

race somehow changes async exception semantics?

Summary: race a waitForever works differently than just a.

I have this program:

{-# LANGUAGE ScopedTypeVariables #-}

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.Chan
import Control.Concurrent.MVar
import Control.Exception
import Data.Functor
import System.Mem

doWork
    :: MVar () -- ^ Stop signal
    -> IO (Either () ())
doWork stop_ref =
    race (readMVar stop_ref `catch` \(e :: SomeException) -> print e)
         (threadDelay maxBound)

main :: IO ()
main = do
    var <- newEmptyMVar
    thr <- async (doWork var)
    performMajorGC
    wait thr >>= print

This program blocks forever. However, if I change doWork to this:

doWork stop_ref =
    Right <$> (readMVar stop_ref `catch` \(e :: SomeException) -> print e)

It prints exception messages as expected.

Just as an experiment, I tried using this race function:

race' left right =
  withAsync left $ \a ->
  withAsync right $ \b ->
  waitEither a b

but it still didn't print the expected messages.

I would expect this to hold: forall a . race a waitForever == Left <$> a. I think we should at least understand and explain this behavior in the documentation.

Bounded Parallelism Feature

I'd love to have an async combinator that processes a list in chunks. The purpose is to allow the user to constrain the parallelism. E.g. if a task spawned has heavy requirements (e.g. calls readProcess or chews up an inordinate amount of memory or something), I want to limit the concurrent number to something lest I accidentally create a "fork bomb" or blow out the caches.

I've wired this up with async combinators a couple times and it's always a bit tricky (for me). An optimal implementation might do something with work stealing dynamically sliding the window of concurrent tasks as they finish out of order, but I am always happy enough just to synchronizing at each K (makes exception propagation and what not a little easier and also admits a bit more deterministic behavior).

`linkOnly` and `link2Only` not exported

linkOnly and link2Only are mentioned as more flexible alternatives to link and link2 in the documentation but are not actually made available to the user. Is this intentional? It seems that either the documentation is wrong or the export list is wrong.

Faster concurrently

I was wondering if the following implementation of concurrently has been considered before:

concurrently :: IO a -> IO b -> IO (a,b)
concurrently left right = do
  mv <- newEmptyMVar
  myTid <- myThreadId
  mask $ \restore -> do
    tid <- forkIO $
             (restore right `catchAll`
                (\e -> throwTo myTid e >> return undefined)) >>= putMVar mv
    l <- restore left `onException` killThread tid
    r <- takeMVar mv
    return (l, r)

This implementation forks one thread instead of two.

It's 50% faster than the current concurrently in the concasync benchmark.

I expect to also see a significant efficiency boost of the traverse method of Concurrently which is used by mapConcurrently. Currently mapConcurrently forks 2n threads for a list of lengh n. Using this concurrently it only forks n threads.

The only thing I'm not yet sure about (and the reason I haven't created a pull request yet) is what happens when both left and right throw an exception.

MVar deadlock exceptions cause waitCatch to throw an exception

I originally discovered this problem when working around an issue with hlint (so pinging @ndmitchell, who may be interested). Consider the following program:

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Exception

main :: IO ()
main = do
    e <- try $ newEmptyMVar >>= takeMVar
    print (e :: Either SomeException Int)

    _ <- forkIO $ do
        e <- try $ newEmptyMVar >>= takeMVar
        print ("thread", e :: Either SomeException Int)
    threadDelay 1000000

    putStrLn "calling async"
    x <- async $ newEmptyMVar >>= takeMVar
    putStrLn "calling waitCatch"
    y <- waitCatch x
    putStrLn "completed waitCatch"
    print (y :: Either SomeException Int)

    putStrLn "Exiting..."

There are three logical blocks in this program, each doing essentially the same thing: creating an MVar deadlock exception and trying to catch it. The first approach does it in the main program thread, and the second in a child thread. Both of them work as expected: the exception is caught and printed, and the program continues executing.

However, the third block behaves differently. The MVar exception seems to not be got by the call to try in asyncUsing, and therefore the TMVar is never filled. This means that a later call to waitCatch throws an exception, and therefore the program exits prematurely. I've seen the same behavior with asyncBound as well, so I do not believe this is related to rawForkIO.

Here's an example output from my system:

$ ghc --make -O2 -threaded deadlock.hs && ./deadlock 
[1 of 1] Compiling Main             ( deadlock.hs, deadlock.o )
Linking deadlock ...
Left thread blocked indefinitely in an MVar operation
("thread",Left thread blocked indefinitely in an MVar operation)
calling async
calling waitCatch
deadlock: thread blocked indefinitely in an STM transaction

I'm on Ubuntu 12.04 64-bit, using GHC 7.8.3, stm-2.4.3 and async-2.0.1.5.

how to cancel an FFI call?

Hi. It's perhaps more of a documentation request:

For "cancel", you say "In particular, if the target thread is making a foreign call, the exception will not be thrown until the foreign call returns, and in this case cancel may block indefinitely."

So - what if I absolutely do need to kill the foreign call (not waiting for it to return)?

(in my application, the threads are running a SAT solver instances, via https://github.com/niklasso/minisat-haskell-bindings

J.W.

Can you please add a pollAny?

Hello,

Thanks for sharing the library.

Just want to check if you could add a pollAny function? or, if you have any objection to having such a function?

Thanks

Difference between the async/MVar versions of concurrently

There seems to be a small difference between the async version of concurrently...

concurrently left right =
  withAsync left $ \a ->
  withAsync right $ \b ->
  waitBoth a b

...and the optimized version which uses MVars.

When the main thread receives an asynchronous exception, in the async version right is killed first, and left second.

However, in the MVar version, left is killed first, right second.

This can matter in cases where right uses long uninterruptible operations, but cancelling left can trigger the end of right through indirect means. For example, see the following gist that executes an external process in Windows: https://gist.github.com/danidiaz/7e3ad1205c67d21b83a7

Unsupported use case: mapConcurrently-alike with concurrency limit, and ordered job start

Async is a nice abstraction, however as far as I can see the following use case is not supported in any straightforward obvious way:

  • A bunch of tasks to run
  • No more than N running at once (they consume resources other than just CPU so we want limits to work within bounded resources, and reduce thrashing overheads).

If it was just this, we could simply use async + a semaphore. There's one more requirement:

  • The list of tasks is ordered according to which ones's results are likely to be needed first, so though we cannot control when a task completes, we want to ensure we start tasks in order.

This last one is what means we cannot fit it into the async abstraction. But this is not an uncommon requirement. Consider a tool like cabal downloading package tarballs. We want to download e.g. 2-3 concurrently, but we want to start downloading the ones we'll need first, before the ones we'll need later, since we will also start building packages concurrently with the downloading.

The abstraction that cabal-install currently uses is this thing called JobControl. The above use case is handled by putting all the jobs into the job control queue in order, and the N worker threads simply grab the next one available.

Of course I'm not proposing to merge the rather-different JobControl abstraction into async, rather the challenge is if anyone can think of some way the existing async abstraction that would support this use case, and if any extensions would be needed, or if any helper utils would make this easier.

The basic problem is that an async starts a thread immediately, so having created a bunch of them there is no way to control the relative ordering. All of them can contend on a quantity semaphore, but there is no ordering guarantee. One plausible route might be to sequentialise the startup and add a startup synchronisation between the parent and the child async, so e.g. one could have the child thread enqueue onto a quantity semaphore before returning the async in the parent. This approach would not require any extensions, but could benefit from some helper utilities.

`withAsync` should guarantee that the async will be canceled

Right now cancel can be interrupted by async exception. The example at the end demonstrates the issue. Either uninterruptibleMask should be used, or helper thread should be spawned to ensure eventual cancellation.

I prefer eventual guarantees to be provided by cancel itself.

I can prepare pull request if you agree with one of proposed solutions.

Current behavior is documented, so this issue is not a bug, but a feature request.

Note also, that cancel may throw even under uninterruptibleMask when canceling itself. So functions like waitEitherCancel leak orphan threads.

-- uninterruptibleMask_ is used only to make the issue reproducible
main = do

  -- not killable for 5 sec
  let worker1 = do
        uninterruptibleMask_ $ threadDelay (5 * 1000 * 1000)
        forever $ do
          putStrLn "alive"
          threadDelay (1 * 1000 * 1000)

      worker2 =
        withAsync worker1 $ const $
          threadDelay (1 * 1000 * 1000)

  withAsync worker2 $ const $
    threadDelay (2 * 1000 * 1000)
  void getLine

In 1 sec after start worker2 blocks trying to cancel worker1, but in 2 secs itself is interrupted by cancellation. In 5 secs worker1 starts printing message indicating it is alive still.

Inconsistent behavior between cancel and cancelWith

New in async-2.1.1 is that cancel will block until the thread finishes. This is probably a dangerous behavior change to slip in on a minor version bump, but that's a separate discussion. Now the behavior of cancel is inconsistent compared to cancelWith. Shouldn't these two agree with cancelWith merely being a generalization of cancel?

STM exceptions are not written to result MVar

it looks like concurrently (the MVar version) has some issues rethrowing STM exceptions. Take the following program:

import Control.Concurrent.STM
import Control.Concurrent.Async
import Control.Monad

main :: IO ()
main = do
  tvar <- newTVarIO False :: IO (TVar Bool)
  void $ join (concurrently) (atomically $ readTVar tvar >>= check)

when run (with full RTS, i.e. not in ghci):

$ stack runghc async.hs
[1 of 1] Compiling Main             ( async.hs, async.o )
Linking async ...
$./async 
async: thread blocked indefinitely in an MVar operation

however it's not about the exception itself:

ฮป: join (concurrently) (throwIO BlockedIndefinitelyOnSTM)
*** Exception: thread blocked indefinitely in an STM transaction

inspecting (and adding logging to) the async code, it looks like the exception is never written to the MVar that's supposed to communicate it back to the main thread. The exception is thrown here:

takeMVar done

I have no idea why this would be the case. Pinging @qnikst who helped during debugging.

Idea for Monad instance of Concurrently compatible with the Applicative Instance

The obvious way to make Concurrently into a Monad doesn't work, since then Monad would be sequential, but Applicative would be concurrent (whereas Monad is usually supposed to be a generalization of Applicative). There is a way to make a Monad instance though that is as concurrent as possible (in particular, ap would be perfectly concurrent).

The trick is the same one used for fixIO. Given x :: Concurrently a and f :: a -> Concurrently b, we use unsafeInterleaveIO to start a thread to put x into a MVar (or TMVar or Async), and then use unsafeInterleaveIO to read the value from the variable lazily. We can then start f's action. If f needs x's value to do anything, it will be sequential. On the other hand, if f does not use x's value at all (such as in ap), it will be perfectly concurrent. It can also be anywhere in between.

I can do the code if you think this is a good concept. I just thought I'd check with you first to make sure its a good idea though.

`withAsync` leaks space.

withAsync action inner keeps a reference for action until inner finishes.

withAsyncUsing :: (IO () -> IO ThreadId)
               -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing doFork = \action inner -> do
  var <- newEmptyTMVarIO
  mask $ \restore -> do
    t <- doFork $ try (restore action) >>= atomically . putTMVar var
    let a = Async t (readTMVar var)
    r <- restore (inner a) `catchAll` \e -> do
      uninterruptibleCancel a -- here
      throwIO e
    uninterruptibleCancel a   -- and here
    return r

Here is a test case:

import Control.Concurrent
import Control.Concurrent.Async

testAsync :: IO ()
testAsync = do
  total <- newMVar 0
  let go 0 = takeMVar total
      go i = do
        t <- takeMVar total
        async . putMVar total $! t + i
        go (i - 1)
  go (100000 :: Int) >>= print

testWithAsync :: IO ()
testWithAsync = do
  total <- newMVar 0
  let go 0 = takeMVar total
      go i = do
        t <- takeMVar total
        withAsync (putMVar total $! t + i) $ \_ ->
          go (i - 1)
  go (100000 :: Int) >>= print

Running with -O2 -threaded -rtsopts -with-rtsopts=-sstderr gives for testAsync the following stats:

5000050000
     129,818,832 bytes allocated in the heap
         476,344 bytes copied during GC
          56,832 bytes maximum residency (2 sample(s))
          25,088 bytes maximum slop
               1 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0       249 colls,     0 par    0.008s   0.003s     0.0000s    0.0000s
  Gen  1         2 colls,     0 par    0.000s   0.000s     0.0001s    0.0001s

  TASKS: 4 (1 bound, 3 peak workers (3 total), using -N1)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.000s  (  0.001s elapsed)
  MUT     time    0.384s  (  0.540s elapsed)
  GC      time    0.008s  (  0.003s elapsed)
  EXIT    time    0.000s  (  0.000s elapsed)
  Total   time    0.416s  (  0.543s elapsed)

and the following for testWithAsync:

5000050000
     173,941,664 bytes allocated in the heap
     434,310,464 bytes copied during GC
     104,267,376 bytes maximum residency (9 sample(s))
      10,062,504 bytes maximum slop
             216 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0       319 colls,     0 par    0.028s   0.046s     0.0001s    0.0003s
  Gen  1         9 colls,     0 par    0.084s   0.144s     0.0160s    0.0587s

  TASKS: 4 (1 bound, 3 peak workers (3 total), using -N1)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.000s  (  0.001s elapsed)
  MUT     time    0.816s  (  0.889s elapsed)
  GC      time    0.112s  (  0.190s elapsed)
  EXIT    time    0.008s  (  0.018s elapsed)
  Total   time    0.960s  (  1.099s elapsed)

In order to circumvent this situation I'm using an ugly (and largely untested) hack. The idea is that we have a mother thread that spawns child threads which register themselves in a shared MVar, run and then delete themselves as well. And if the mother thread finishes or dies, then all remaining child threads get cancelled. However, I couldn't implement this in a natural way, because the Async constructor is not exposed, hence the use of mdo. It also took me a while to figure out where and how my program leaks, so here are the requests:

  1. Document this behaviour of withAsync. Or better improve it, if you know how.
  2. Expose the Async constructor (via the Control.Concurrent.Async.Internal module for example), so people can easily extend the library without being forced to make a local clone.
  3. If withAsync cannot be improved in this regard, could you suggest an idiomatic way to define an alternative to withAsync that doesn't leak? Some monad on top of IO (like in the lpaste above) would be fine.

cocurrently cause memory leak

I use Control.Concurrent.Async.concurrently_ to combine two service IO monad. A memory leak occurs.

My code looks like the following:

concurrently_ (server (show port)) $ loop metric

Using forkIO instead, there is no memory leak.

do
   forkIO $ server (show port)
   loop metric

I don't think this is a bug of async. Just record this issue in here.

_20180301160935

`withAsync` hangs if called during `uninterruptibleMask`

Consider the following example:

import Control.Exception
import Control.Concurrent
import Control.Concurrent.Async

main = uninterruptibleMask_ (withAsync (threadDelay maxBound) (const $ return ()))

This program hangs forever.
As documentation states, withAsync behaves like withAsync action inner = bracket (async action) uninterruptibleCancel inner. In this case uninterruptibleCancel hangs because the thread which it's trying to kill has MaskedUninterruptible masking state.
I am not sure if it's supposed to work this way. Maybe it's a bug.
Note that it has the same effect for race_ and concurrently_ functions. Both examples below hang:

main = uninterruptibleMask_ (race_ (threadDelay maxBound) (return ()))
main = uninterruptibleMask_ (concurrently_ (threadDelay maxBound) (error "foo"))

There is a simple workaround for it:

main = uninterruptibleMask_ (withAsyncWithUnmask (\unmask -> unmask $ threadDelay maxBound) (const $ return ()))

This program doesn't hang. Shouldn't it be the default behaviour? I. e. shouldn't withAsync change masking state of spawned thread automatically?

New cancel behavior hurts multiple tear downs

The new cancel behavior in 2.1.1 has implications for operations like waitAnyCancel. These operations will no longer allow threads to teardown concurrently as they wait to cancel the next thread until the previous thread is completely finished. These operations should probably not be written in terms of cancel if its going to have this new behavior.

Support for base-4.9.0.0?

I tried compiling this library with GHC 8.0.1-rc1, but got a configure-time warning saying:

Setup: At least the following dependencies are missing:
base >=4.3 && <4.9

It would be great to get a new async release that remedies this issue.

Pooled mapConcurrently

The current implementation of mapConcurrently triggers a list of asynchronous actions all at once. For large lists, this may have 2 problems :

  • resource consumption is unbounded ;
  • some use cases (e.g. network connections) will inherently fail if too many concurrent tasks are executed.

Could the async library provide a pooled version of mapConcurrently that allows limiting the number of concurrent threads ?
Here is a possible implementation :

{-# LANGUAGE TupleSections #-}
import Control.Concurrent.Async (async, waitAny)
import Data.List                (delete, sortBy)
import Data.Ord                 (comparing)

mapConcurrentlyPooled :: Int -> (a -> IO b) -> [a] -> IO [b]
mapConcurrentlyPooled n tasks args = concurrentlyPooled n $ map tasks args

concurrentlyPooled :: Int -> [IO a] -> IO [a]
concurrentlyPooled n tasks = concurrentlyPooled' n (zip [0..] tasks) [] []

concurrentlyPooled' _ [] [] results = return . map snd $ sortBy (comparing fst) results
concurrentlyPooled' 0 todo ongoing results = do
    (task, newResult) <- waitAny ongoing
    concurrentlyPooled' 1 todo (delete task ongoing) (newResult:results)
concurrentlyPooled' n [] ongoing results = concurrentlyPooled' 0 [] ongoing results
concurrentlyPooled' n ((i, task):otherTasks) ongoing results = do
    t <- async $ (i,) <$> task
    concurrentlyPooled' (n-1) otherTasks (t:ongoing) results

Managing thread lifetime

Would others think it useful to have a function in Control.Concurrent.Async that would parametrize the lifetime of a thread. For example consider the case when a thread is only needed for 1,000 microseconds. The result could either be discarded, or return an exception if not executed fully, in time.

I've put together a gist the describes the sort of thing I'm thinking of, showing 'waitTimeout' 'waitTimeoutWith' and 'asyncTimeout' :
https://gist.github.com/4195051

Or have I missed something the API which already provides this?

`cancel` is not synchronous

cancel returning does not mean that the canceled Async has indeed been terminated, and this resulted in very surprising behavior in some parts of our codebase. This is due to the fact that there is no guarantee that throwTo has reached the thread (and obviously that the thread has terminated) after it has returned.

This means that when using withAsync, or race, or concurrently if one of the two threads throws an exception, the "late" thread will linger on past their invocation.

Repro by @snoyberg :

#!/usr/bin/env stack
-- stack --resolver lts-6.4 runghc --package async
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent
import Control.Exception
import Control.Concurrent.Async
import Control.Monad
import qualified Data.ByteString.Char8 as S8

main :: IO ()
main = do
    race quick infinite >>= (S8.putStr . (`S8.append` "\n") . S8.pack . show)
    S8.putStr "definitely left race\n"
    threadDelay 10000000

quick :: IO ()
quick = do
    S8.putStr "quick\n"
    threadDelay 100000

infinite :: IO ()
infinite =
    (forever $
     do S8.putStr "infinite\n"
        threadDelay 10000) `onException`
    cleanup
  where
    cleanup = do
        threadDelay 2000000
        S8.putStr "still alive!\n"

running this script will result in

infinite
quick
infinite
infinite
infinite
infinite
infinite
infinite
infinite
infinite
infinite
Left ()
definitely left race
still alive!

Many thanks to @kantp for finding out about this behavior.

Make `Async a` `Hashable`

Async a already has instances of Eq and Ord, which makes it possible to store it in a Set, but it has no instance of Hashable, so it is not possible to store it in a HashSet, while the idea of having a HashSet of running threads seems to be quite useful.

Given that ThreadId already has an instance of Hashable and Eq and Ord of Async a delegate to the internal ThreadId, providing such an instance would be trivial, although it would require depending on hashable.

If this is acceptable, I can prepare a PR.

Build error of async 2.1.1.1 on GHC 8.4.1-alpha1

GHC 8.4.1-alpha1 was announced. While testing Agda with this version of GHC, I got the following error:

$ cabal install
Resolving dependencies...
cabal: Could not resolve dependencies:
trying: async-2.1.1.1 (user goal)
next goal: base (dependency of async-2.1.1.1)
rejecting: base-4.11.0.0/installed-4.1... (conflict: async => base>=4.3 &&
<4.11)
...

The current version on the master branch (commit 08d2f33) compiles with GHC 8.4.1-alpha1, so could you make a release, please?

Blocking agda/agda#2878.

'cancel' not waiting for async to finish

I asked about this code example on Stack Overflow and somebody suggested I report it here.

import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
import Data.Char
import Prelude
import System.IO
main :: IO ()
main = do
    a <- async $ forConcurrently_ ['a'..'z'] $ \c ->
        (print c *> delay 2) `finally` print (toUpper c)
    delay 1
    cancel a
  where
    delay sec = threadDelay (sec * 1000000)
    print c = hPutStr stderr [c]

The output from this is

abcdefghijklmnopqrstuvwxyzABC

If I add a delay after the cancel to allow enough time for all threads to complete...

main :: IO ()
main = do
    a <- async $ forConcurrently_ ['a'..'z'] $ \c ->
        (print c *> delay 2) `finally` print (toUpper c)
    delay 1
    cancel a
    delay 3
  where
    delay sec = threadDelay (sec * 1000000)
    print c = hPutStr stderr [c]

then I get the output I expect:

abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ

I'm using async 2.1.1.

Ability to cancel a linked thread without killing self

I wonder if anybody has run into this pattern (and whether I'm missing something in the existing functionality that could already do it):

data InternalCancel = InternalCancel deriving (Eq,Show,Read,Ord,Typeable)
instance Exception InternalCancel

-------------------------------------------------------------------------------
-- | Spawn action and monitor it by linking it to current thread. If
-- we later cancel it with 'cancelLinked', it dies but does not
-- re-raise the exception in our current thread.
asyncLinked :: IO () -> IO (Async ())
asyncLinked f = do
    a <- async (f `catch` (\ InternalCancel -> return ()))
    link a
    return a


-------------------------------------------------------------------------------
-- | Cancel a linked action without killing its supervising thread.
cancelLinked :: Async a -> IO ()
cancelLinked a = cancelWith a InternalCancel

Expose Async constructor

Consider some code that races two IO actions that themselves have a cleanup action to perform:

foo = bracket acquire1 release1 inner1
bar = bracket acquire1 release2 inner2

main = race_ foo bar

This will not necessarily leave time for foo and bar to complete release1 and release2 respectively.

I modified my code to essentially be:

main = 
  bracket (async foo) cleanup
    (\a1 -> bracket (async bar) cleanup
      (\a2 -> waitEither_ a1 a2))

cleanup a = cancel a >> waitCatch a

Which is fine, but as a comment in the code mentions, this is slower than just combining the async and bracket manually, as withAsync does.

So, it would be nice to have access to the Async constructor so that withAsync can be written in user code!

Provide MonadIO instance for Concurrently

I don't know if the dependency footprint of async is limited to base and stm on purpose, but it would be great if it was possible to add a dependency on transformers and implement a MonadIO instance for Concurrently, this would let Concurrently be used more easily in combination with other transformers.

For example, I just implemented a quick scraper for my own use where I used ReaderT QSem Concurrently a as my monad stack (the QSem to limit the amount of concurrent requests). This way I could simply use traverse instead of mapConcurrently to get the desired behaviour in an easy way. However, the lack of liftIO instances means that for more complicated monad stacks it becomes a hassle where one has to explicitly lift all the layers when using Concurrently at the bottom. This is easily avoided by a MonadIO instance so we can simply lift IO values through Concurrently into our stack.

instance MonadIO Concurrently where
    liftIO = Concurrently

`race` behaves differently from its simple implementation

Consider the following code: https://gist.github.com/gromakovsky/018b4169ae2ab2adffd744ce6d510f3a
There I defined race0 using the same code as in the documentation.
When I launch it (using stack async-strange-race.hs) and press Ctrl-C, release B is printed and release main is printed in a second. That's exactly what I expect.
However, if I change race0 to actual race, the behavior is different. release B and release main are printed concurrently, there is no delay (should be 1 second) and the output is sometimes messy (something like relreealseea sBe).
It looks like a significant inconsistency to me. Is it a bug?

forConcurrently

This would be quite convenient.

forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b)
forConcurrently = flip mapConcurrently

and it's usage

result <- forConcurrently [1..num] $ \x -> do
   getItem

I find myself doing flip mapConcurrently a fair amount. Seems to fit in with mapM / forM pattern. What do you think?

Here's an example usage of some code I'm using that would benefit.

------------------------------------------------------------------------------                                                                                                    
-- | User Feed Creation function                                                                                                                                                  
createUserFeed :: [Feed] -> Vertigo [FeedJSON]                                                                                                                                    
createUserFeed list =                                                                                                                                                             
  flip mapConcurrently list $ \Feed {..} -> do                                                                                                                                    
    let ContentId uuid = feedId                                                                                                                                                   
    case feedType of                                                                                                                                                              
      FeedLike -> do                                                                                                                                                              
        like :: Like <- dynamoGet $ getLikeItem (LikeId uuid)                                                                                                                     
        return $ FeedJSON like feedSeqNum FeedLike                                                                                                                                
      FeedFollow  -> do                                                                                                                                                           
        let fo = Follow (FollowerId uuid) feedUserId                                                                                                                              
        return $ FeedJSON fo feedSeqNum FeedFollow                                                                                                                                
      FeedComment -> do                                                                                                                                                           
        comment :: Comment <- dynamoGet $ getComment (CommentId uuid)                                                                                                             
        return $ FeedJSON comment feedSeqNum FeedComment                                                                                                                          
      FeedPost -> do                                                                                                                                                              
        post :: Post <- dynamoGet $ getPost (PostId uuid)                                                                                                                         
        return $ FeedJSON post feedSeqNum FeedPost                                                                                                                                

link memory leak

Usage of link may lead to a memory leak. Here is a test case:

module Main where

import Control.Concurrent
import Control.Concurrent.Async
import Control.Monad

main = do
  to <- async $ forever $ threadDelay 50000000
  forever $ do
    async $ link to >> return ()

I believe the problem is link waits only for an async and doesn't check a thread it should kill. Then link thread still lives even if it's not needed anymore.

waitAll

I noticed there is no waitAll :: [Async a] -> IO [a] function that simply waits for and returns the results of all operations, or throws the first exception.

I've been using waitAll = mapM wait but I'm not sure if this is correct (could this incorrectly be fused to wait . async . print?) or the most efficient way. It appears to work for i.e.

do jobs <- forM [1..20 :: Int] (async . print)
   waitAll jobs

Catching UserInterrupt inside threads

I have the following use case:

Given a list of computations to be done, execute N at the time and if someone presses ctrl-c in the program, wait for those currently running to finish and cancel the rest.

I've tried using plain mask_ inside mapConcurrently, but turns out AsyncCancelled exception is used within this package. It's currently not possible to mask it inside threads or distinguish by which AsyncException it was triggered.

What am I missing? :)

Add Monad instance for Concurrently

In light of the ApplicativeDo extension made available in GHC 8.0.1, I'd like to propose the following Monad instance for Concurrently.

instance Monad Concurrently where
    return = pure
    x >>= f = Concurrently $ runConcurrently x >>= (runConcurrently . f)

Where bind simply runs two Concurrently actions sequentially. This should allow one to write things of the form:

do x <- a
   y <- b
   f x y

The desugared expression will require join, and should allow f to wait on a and b, which may run concurrently.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.