One of those things I have to do fairly often in multithreaded programming is send off a whole bunch of threads to do their thing while I do something else on the main thread until they’re done. For example, imagine you’re downloading a bunch of images from the web, you don’t want to call httpGet one image right after another, because network resources are slow and processing them takes up almost no CPU time. But on the other hand, forkIO doesn’t return anything, so a thread thunk will have to put its contents somewhere you can access them later. Thus, my short, simple solution, far too small to bother putting up on Hackage:
module Control.Concurrent.Future where import Control.Concurrent future :: IO a -> IO (MVar a) future thunk = do ref <- newEmptyMVar forkIO $ thunk >>= putMVar ref return ref forceAll :: [MVar a] -> IO [a] forceAll = mapM takeMVar
To use this, simply do a mapM (future thunk) parms, where thunk corresponds to (thunk :: IO a -> IO (MVar a)) and parms is the list of parameter sets (one item per thread), an [a]. This will spark off a whole bunch of threads that will do their thing while you continue along in the program. Then when you need all the data, forceAll (also known as a “barrier” in multithreaded programming lingo) will take a list of asynchronous actions and force the program to wait for them all to complete. If you need all the results immediately after you spark the threads, then the following simple line will do:
stuff' <- mapM future stuff >>= forceAll
Hi Jeff,
If you’re interested in this, you might want to take a look at Reactive from Conal Elliott.
Conal and co found a bug in GHC that can affect the code you are writing here, at least when you catch exceptions and kill the thread. So if you experience weird behavior, don’t loose the many hours we did; it might be the GHC bug. It seems the bug will most likely be fixed in the next release.
See http://www.nabble.com/black-hole-detection-and-concurrency-td21172133i20.html for the discussion.
Thanks, I will check that, as I’ve had an interesting problem that I’ve been ignoring up till now that sounds similar.
I’ve been looking at something similar to this, and I’ve folded your stuff into mine. Here’s a snippet of my thoughts on this:
forceAll_ :: [MVar a] -> IO ()
forceAll_ = mapM_ takeMVar
parMapIO :: (a -> IO b) -> [a] -> IO [b]
parMapIO f xs = mapM (future . f) xs >>= forceAll
parForIO :: [a] -> (a -> IO b) -> IO [b]
parForIO = flip parMapIO
– strict in return values.
parMapIO’ :: (a -> IO b) -> [a] -> IO [b]
parMapIO’ f = parMapIO (\x -> f x >>= evaluate)
parForIO’ :: [a] -> (a -> IO b) -> IO [b]
parForIO’ = flip parMapIO’
– waits for completion, but returns ()
parMapIO_ :: (a -> IO b) -> [a] -> IO ()
parMapIO_ f xs = mapM (future . f) xs >>= forceAll_
parForIO_ :: [a] -> (a -> IO b) -> IO ()
parForIO_ = flip parMapIO_
– doesn’t wait for the map to complete.
parMapIO__ :: (a -> IO b) -> [a] -> IO ()
parMapIO__ f = mapM_ (\a -> forkIO $ f a >> return ())
parForIO__ :: [a] -> (a -> IO b) -> IO ()
parForIO__ = flip parMapIO__
Very nice… I like that, and that seems more or less large enough to slip into Hackage. Let me know if you decide to do so…
Sure
I’ve got unittests for this, and docs. I’m working on putting together a ‘real’ module, putting it on darcs, and uploading it to hackage. This will be my first haskell module, so I’m paying some one-time setup costs, but I’m getting pretty close.
I was thinking of calling this Control.Concurrent.ParIO
Btw, here’s a link to my current scratch pad on this:
http://code.google.com/p/eng-chi-haskell/source/browse/crutcher/libs/ParIO/ParIO.hs
You can also eliminate the MVar from the interface by returning ‘takeMVar ref’ instead of ref.
BTW, a very similar forkIO/MVar trick is at the heart of the implementation of unambiguous choice, which has functional (no IO) semantics and a concurrent implementation.
Thanks… Elegant, I like it.
And this is what I came up with a few months ago when I wanted to perform a couple of HTTP reqs in parallel:
module Parallel (parallel, parIO) where
import Control.Concurrent
import Control.Exception
import System.IO.Unsafe
– | Start evaluating the actions in the list in parallel, and return a lazy
– list with the results.
parallel :: [IO a] -> IO [a]
parallel = mapM parIO
– | Start the action in parallel and defer the result lazily.
parIO :: IO a -> IO a
parIO a = do
mvar >= putMVar mvar
fmap fromEither $ unsafeInterleaveIO $ takeMVar mvar
– | Extract the value or raise an exception.
fromEither :: Either Exception b -> b
fromEither = either throw id
I recently also thought about implementing futures in Haskell. It is also possible to implement implicite futures using unsafeInterleaveIO:
future :: IO a -> IO a
future code =
do
ack <-newEmptyMVar
thread >= putMVar ack)
unsafeInterleaveIO (
do result <- takeMVar ack
killThread thread
return result
)
I started to write a paper about it, but it is still a draft version and not finished.
Its current version is available here:
http://www.ki.informatik.uni-frankfurt.de/persons/sabel/futures.pdf
Sorry something there was a copy&paste error:
future :: IO a -> IO a
future code =
do
ack <-newEmptyMVar
thread >= putMVar ack)
unsafeInterleaveIO ( do
result <- takeMVar ack
killThread thread
return result )