Haskell concurrent
Haskell concurrent amplia Haskell98 amb concurrència explícita.
Els dos conceptes principals en què es basa Haskell concurrent són les Mutable variables MVar α i la possibilitat d'engegar un nou fil d'execució via forkIO.
Concurrència [modifica]
- forkIO
- engega un fil d'execució lleuger del planificador del Run Time System que, segons el compilador de Haskell concurrent, pot implementar multitasca cooperativa o bé arrabassadora (ang:preemptive)).[1]
Vegeu "La implementació de concurrencia de GHC".[2]
accés sincronitzat amb variables MVar (Mutable variable) [modifica]
La consulta d'una variable MVar (takeMVar) en buida el contingut causant que altres fils d'execució que hi accedeixin posteriorment quedin blocats en espera que se'n reposi el valor actualitzat (amb putMVar).[3]
- "takeMVar mvar" bloca el fil d'exec. si mvar estava buida
- "putMVar mvar valor" bloca si mvar estava plena
Generadors:
do mvarPlenaInicialment <- newMVar contingut -- newMVar :: t -> IO (MVar t) mvarBuidaInicialment <- newEmptyMVar :: IO (MVar T) -- mvar per a un contingut de tipus T
Una MVar té tres facetes:[3]
- Variable amb accés sincronitzat
- Bústia de comunicació d'un sol element (takeMVar com a receive, putMVar com a send)
- Semàfor binari (takeMVar com a wait, putMVar com a signal)
mvarSemàfor <- newEmptyMVar :: IO (MVar Bool) -- engega fil d'exec. i en acabar -- desperta el primer dels fils suspesos pendents de la MVar threadId <- forkIO procés `finally` putMVar mvarSemàfor True ... -- suspèn en espera que el fil d'exec. de ''procés'' acabi takeMVar mvarSemàfor
Darrerament s'hi han afegit crides no blocants (tryTakeMVar, tryPutMVar).
També tenim modifyMVar_ (composició de takeMVar i putMVar que retorna IO ()) i altres novetats.
L'accés a una variable MVar és d'avaluació tardana, per tant el contingut serà avaluat en el consumidor i no en el productor!![3]
Els fils blocats es desperten per ordre de suspensió (FIFO).[3] "putMVar mvar" desperta el primer dels fils que ha cridat takeMVar amb la mateixa mvar, que s'emporta el valor i la deixa buida.
bústies de comunicació amb MVar, Chan, BoundedChan [modifica]
Canals amb sincronització per baldes (ang: locks).
- MVar: bústies d'un sol element (de Control.Concurrent)
- Chan: bústies amb cua il·limitada (de Control.Concurrent.Chan)
- BoundedChan: bústies limitades (del paquet BoundedChan)
Exemples:
- #Concurrència simple amb MVars - Productor-consumidor
- i afegint-hi un canal asíncron a #Client-servidor - Cues d'entrada (Chan)
forkOS, Bound threads: fils d'execució amb crides FFI externes o a biblioteques amb allotjament local al fil. [modifica]
GHC assigna els fils d'exec. lleugers llançats amb forkIO en relació N-M amb els fils del sistema (anomenats capability) un per cada processador elemental.[2][4]
forkOS: llança un fil d'execució lligat als del sistema (Bound threads), per poder fer ús de crides externes (FFI: Foreign Function Interface) en un fil, o l'ús d'allotjament lligat al fil d'execució. Biblioteques com OpenGL requereixen aquest ús.[5][6]
memòria transaccional: variables TVar, i comunicació amb TMVar i TChan [modifica]
- TVar són variables de "Memòria transaccional per programari".
- TMVar i TChan són els equivalents transaccionals de MVar i Chan
Vegeu exemple #Concurrència condicionada amb TVars - Mònada STM - Memòria transaccional
Encadenament de càlculs simultanis - la mònada Par [modifica]
Permet combinar tasques en paral·lel encadenant resultats com a paràmetres de paral·lelitzacions subseqüents. Vegeu refs.[7][8]
{-# LANGUAGE PackageImports #-} import "monad-par" Control.Monad.Par (Par, runPar, spawn, get) f = (*2) g = (/2) h (a, b) = a + b k (a, b) = a - b calcula_paraŀlelitzant :: Double -> Par (Double, Double) calcula_paraŀlelitzant x = do fx <- spawn (return (f x)) -- start evaluating (f x) gx <- spawn (return (g x)) -- start evaluating (g x) a <- get fx -- wait for fx b <- get gx -- wait for gx hab <- spawn (return (h (a,b))) kab <- spawn (return (k (a,b))) c <- get hab -- wait for hab d <- get kab -- wait for kab return (c,d) main = do let res = runPar $ calcula_paraŀlelitzant 4 print res
cabal install monad-par ghc --make -threaded prova.hs ./prova (10.0,6.0)
Operacions d'Entrada/Sortida asíncrones i simultànies [modifica]
Amb la biblioteca Async.[9][10]
operacions asíncrones [modifica]
withAsync llança una operació IO asíncronament (sense esperar-ne la finalitzazió) en un fil d'execució nou i n'assegura la terminació (basat en bracket (obtenció de recurs: async io) (alliberament de recurs: cancel)).
wait la_meva_async_io atura el fil actual en espera que s'acabi l'op. asíncrona, propagant-ne l'excepció, cas que n'hi hagi. waitCatch la retorna en un tipus Either.
waitAny [Async a] espera la primera que finalitzi o llanci una excepció. waitAnyCancel afegeix cancelació de les altres. Hi ha versions waitAnyCatch i waitAnyCatchCancel per incloure l'excepció en el retorn (tipus Either).
waitEither permet esperar la primera de dues ops. asíncrones amb tipus del retorn diferent.
obtenirURLs url1 url2 = do withAsync (getURL url1) $ \async1 -> do withAsync (getURL url2) $ \async2 -> do page1 <- wait async1 page2 <- wait async2 return (page1, page2)
combinació d'operacions simultànies [modifica]
- Simultaneja dues operacions, i en retorna ambdós resultats
concurrently :: IO a -> IO b -> IO (a, b)
- Simultaneja dues operacions, canceŀlant la que tardi més
race :: IO a -> IO b -> IO (Either a b)
- Mapeja una funció en la mònada IO a una coŀlecció de valors, i les executa simultàniament, retornant-ne la coŀlecció de resultats.
mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
el tipus Concurrently [modifica]
A més alt nivell el tipus Concurrently embolcalla l'operativa precedent en la implementació de les classes Applicative (aplicant l'op. concurrently) i Alternative (aplicant race, per esperar la primera que acabi, canceŀlant les altres).
obtenirURLs url1 url2 url3 = do (page1, page2, page3) <- runConcurrently $ (,,) -- (,,) constructor de Tupla3 <$> Concurrently (getURL url1) -- ''functor aplicatiu'' <*> Concurrently (getURL url2) <*> Concurrently (getURL url3) -- la implementació d'Alternative (<|>) sobre Concurrently retorna la que acaba primer, canceŀlant les altres ghci Prelude>:m + Control.Applicative Control.Concurrent Control.Concurrent.Async Control.Exception Prelude ...> let esperaSegons secs = threadDelay (secs * 1000000) >> return secs :: IO Int Prelude ...> let ca = Concurrently $ esperaSegons 5 Prelude ...> let cb = Concurrently $ esperaSegons 3 Prelude ...> runConcurrently $ ca <|> cb -- la primera que acabi 3 Prelude ...> let cc = Concurrently $ throwIO $ userError "excepció" Prelude ...> runConcurrently $ ca <|> cb <|> cc *** Exception: user error (excepció)
CSP [modifica]
Communicating Haskell Processes [modifica]
La biblioteca "Communicating Haskell Processes" de la universitat de Kent implementa una àlgebra de processos CSP, basada en mònades.[11][12][13]
Aquesta biblio. consta de diversos paquets. Cal el paquet chp-plus del Hackage per fer córrer els exemples de les guies.
Si p i q són processos, s'hi defineix els següents operadors de composició:
stop -- :: CHP () -- CSP: STOP o bé 0 skip -- :: CHP a -- CSP: SKIP p <-> q -- alternativa, CSP: P | Q p <||> q -- paraŀlelisme, CSP: P || Q p >> q -- seqüència, CSP: P ; Q forever p -- iteració , CSP: ∗P p </> q -- alternativa amb prioritat p <&> q -- sincronia (''join''), prospera si ambdós processos estan a punt (missatge disponible) simultàniament.
El codi (per a GHC 6.10) ha quedat una mica desfasat. Per compilar a GHC 7.4 cal soŀlucionar ambigüetats per la coŀlisió de símbols afegits a les noves versions de les biblioteques emprades i un símbol desaparegut de les noves versions de QuickCheck que es troba a la versió 2.3.* Passos:
# -- Module 'Test.QuickCheck.Property' does not export `liftIOResult' # -- Al compilar QuickCheck-2.3.0.2 -- couldn't deduce (Show a) from (Integral a) -- a la signatura de ''ranges'' # descarregar QuickCheck-2.3.0.2 # actualitzar la versió al fitxer .cabal afegint-hi .1 (Version: 2.3.0.2.1) # modif. Test.QuickCheck.Text afegint (Show a) a la signatura de la funció ''ranges'' cabal install # -- soŀlucionar al paquet "chp" mòdul Control.Concurrent.CHP.Clocks, l'ambigüetat dels símbols modifyTVar i modifyTVar' # -- modifyTVar i modifyTVar' corresponen a funcions definides localment al mòdul # import Control.Concurrent.STM hiding (modifyTVar, modifyTVar') # actualitzar la versió al fitxer .cabal afegint-hi .1 cabal install cabal install chp-plus --constraint=QuickCheck==2.3.0.2.1 --constraint=pretty==1.1.0.* --constraint=chp==num_versió_modificada
model d'Actors [modifica]
la biblio Actor [modifica]
Implementa actors multi-canal despatxant, a la recepció, en comptes de per missatge, per llista de missatges de canals diferents.[14]
Per compilar a GHC >= 7.2[15] cal evitar la dependència de Haskell98, reanomenant les importacions de mòduls H98 {IO, Monad, Control.Exception} a nomenclatura GHC equivalent {System.IO, Control.Monad, Control.OldException}, i treure la dependència de haskell98 del fitxer de projecte .cabal
Cloud Haskell [modifica]
Concurrència per pas de missatges a l'estil de l'Erlang, per a sistemes distribuïts. Aplica el model d'Actors als processos distribuïts de manera similar a les construccions del llenguatge i sistema concurrent distribuit de nodes de l'Erlang.[16][17][18]
Paral·lelisme [modifica]
Vegeu-ho a GHC
Exemples de concurrència [modifica]
Concurrència simple amb MVars - Productor-consumidor [modifica]
Amb variables de sincronització per baldes (blocants) MVar.[19]
- forkIO
- Engega fil d'execució lleuger del planificador del Haskell en multiprocés cooperatiu.
- putMVar mvar valor
- bloca si la variable MVar és plena (ocupada) fins que estigui disponible (buida) i llavors l'omple amb el valor.
- takeMVar mvar
- bloca el fil d'execució si la variable MVar és buida fins que la li omplin i en retorna el valor buidant-la.
module Main( main ) where import Control.Concurrent (forkIO, threadDelay, MVar, newEmptyMVar, putMVar, takeMVar) import Control.Exception (finally) import qualified Control.Monad as Monad import System.IO (stdout, hFlush) import Text.Printf (printf) import Data.Time (FormatTime, formatTime, getCurrentTime, utcToLocalZonedTime) import System.Locale (defaultTimeLocale) obtenir_hora :: IO String obtenir_hora = do local_t <- (getCurrentTime >>= utcToLocalZonedTime) -- els parèntesis hi són per legibilitat return $ formatTime defaultTimeLocale "%T" local_t productor :: MVar Int -> IO () productor mv_bústia = do Monad.forM_ [3,2..0] $ \compte_enrere -> do -- per als valors de la llista threadDelay 1000000 -- espera microsegons putMVar mv_bústia compte_enrere -- posa valor a la MVar consumidor :: MVar Int -> IO () consumidor mv_receptacle = do x <- takeMVar mv_receptacle -- bloca a l'espera que omplin la MVar local_t <- obtenir_hora let h = formatTime defaultTimeLocale "%T" local_t printf "%s - consumidor: recollit %d\n" h x hFlush stdout if x == 0 then return () -- s'ha acabat else consumidor mv_receptacle -- tornem-hi main = do -- nova MVar per la sicronització productor / consumidor mv_bústia <- newEmptyMVar :: IO (MVar Int) -- nova MVar per la sincronització de finalització de fil d'exec. mv_fi_prod <- newEmptyMVar :: IO (MVar Bool) mv_fi_consum <- newEmptyMVar :: IO (MVar Bool) -- forkIO: engega fil d'execució consumidor_id <- forkIO $ consumidor mv_bústia `finally` putMVar mv_fi_consum True -- assenyala l'acabament a la MVar -- despertant el primer dels fils blocats per la mateixa productor_id <- forkIO $ productor mv_bústia `finally` putMVar mv_fi_prod True -- emulem amb MVar's la feina de ''pthread_join()'' de l'Unix -- per esperar la finalització dels fils d'exec. creats takeMVar mv_fi_prod -- bloca fins a la fi del productor takeMVar mv_fi_consum -- bloca fins a la fi del consumidor putStrLn "fi del programa"
produeix la sortida següent:
11:32:03 - consumidor: recollit 3 11:32:04 - consumidor: recollit 2 11:32:05 - consumidor: recollit 1 11:32:06 - consumidor: recollit 0 fi del programa
Client-servidor - Cues d'entrada (Chan) [modifica]
- Client-servidor, canalitzant la impressió
Canals no acotats (en la dimensió de la cua) (Control.Concurrent.Chan)[20] "de primera classe" (és a dir, que es pot passar com a paràmetre)[21]
- forkIO: Engega fil d'execució lleuger del planificador del Haskell.
- forkOS: engega un fil lligat a un del sistema operatiu, per al cas de crides FFI o allotjament lligat al fil.[5]
- comunic. per cues il·limitades
- writeChan canal
- afegeix a la cua il·limitada i retorna tot seguit (sense blocar) (comunicació asíncrona).
- readChan canal
- bloca si la cua del canal és buida
- resposta per MVars
- semàfors:
- newQSem valorInicial
- nou semàfor
- signalQSem semàfor
- incrementa semàfor i assenyala
- waitQSem semàfor
- si semàfor > 0 llavors decrementa i continua sinó espera
- A l'exemple el client encua el parell (comanda, ref. resposta (mv_resposta)), i queda a l'espera de la resposta.
module Main( main ) where import Control.Concurrent (forkIO, forkOS, threadDelay, MVar, newEmptyMVar, putMVar, takeMVar, isEmptyMVar) import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan) import Control.Concurrent.QSem (newQSem, signalQSem, waitQSem) import Control.Exception (finally) import Data.IORef (IORef, newIORef, readIORef, writeIORef, modifyIORef) import qualified Control.Monad as Monad import System.IO (stdout, hFlush) import Text.Printf (printf) import Data.Time (FormatTime, formatTime, getCurrentTime, utcToLocalZonedTime) import System.Locale (defaultTimeLocale) data TInfo = InfoDelClient Int | InfoDelServidor String Int | InfoPlega -- missatges a l'informador type Canal_Comanda = Chan (Int, MVar_Resposta) type MVar_Resposta = MVar Int type Canal_Info = Chan TInfo obtenir_hora :: IO String obtenir_hora = do local_t <- (getCurrentTime >>= utcToLocalZonedTime) -- els parèntesis hi són per legibilitat però, de fet, no calen return $ formatTime defaultTimeLocale "%T" local_t client :: Canal_Comanda -> MVar_Resposta -> Canal_Info -> IO () client chan_torn mv_resposta chan_info = do Monad.forM_ [3,2..0] $ \cnt -> do -- llista de valors a passar, finalitzant en zero threadDelay 1000000 -- espera microsegons writeChan chan_info (InfoDelClient cnt) -- no bloca (asíncron, encua al canal i continua) -- assegura que la mvar de resposta sigui buida mv_resp_esBuida <- isEmptyMVar mv_resposta Monad.when (not mv_resp_esBuida) (takeMVar mv_resposta >> return ()) -- encua la comanda passant la ref. de la mvar de resposta. writeChan chan_torn (cnt, mv_resposta) takeMVar mv_resposta -- bloca (espera resposta per continuar) obtenir_resposta :: Int -> IORef Int -> IO Int obtenir_resposta x ref_estat = readIORef ref_estat >>= (return . (+x)) -- la que vulgueu servidor :: Canal_Comanda -> Canal_Info -> IORef Int -> IO () servidor chan_torn chan_info ref_estat = do (x, mv_resposta) <- readChan chan_torn -- bloca fins obtenir comanda al chan_torn h <- obtenir_hora resp <- obtenir_resposta x ref_estat mv_resp_esBuida <- isEmptyMVar mv_resposta Monad.when mv_resp_esBuida $ putMVar mv_resposta resp -- respon si és possible writeChan chan_info (InfoDelServidor h x) -- no bloca (asíncron, encua al canal i continua) if x == 0 then return () -- acaba else servidor chan_torn chan_info ref_estat -- tornem-hi informador :: Canal_Info -> IO () informador chan_info = do info <- readChan chan_info -- bloca si la cua és buida case info of InfoDelClient intValor -> do printf "client: comanda %d\n" intValor hFlush stdout informador chan_info -- tornem-hi InfoDelServidor strHora intValor -> do printf "%s - servidor: recollit %d\n" strHora intValor hFlush stdout informador chan_info -- tornem-hi InfoPlega -> return () -- acaba main = do ref_estat <- newIORef 0 :: IO (IORef Int) -- ref. no sincronitzada (la manipula un sol fil) chan_torn <- newChan :: IO (Canal_Comanda) -- cua de comandes al servidor mv_resposta <- newEmptyMVar :: IO (MVar_Resposta) -- ref. sincronitzada chan_informacio <- newChan :: IO (Canal_Info) -- cua d'impressió -- semàfors per a l'espera d'acabament dels fils d'execució semàfor <- newQSem 0 -- semàfor per a client i servidor mv_fi_info <- newEmptyMVar :: IO (MVar Bool) -- semàfor amb MVar per a l'informador -- malgrat que la gestió de ''stdout'' pel fil d'exec. de l'informador -- no requereix ''bound threads'', li poso el forkOS per trencar el tabú. informador_id <- forkOS {- per les crides externes -} $ informador chan_informacio `finally` putMVar mv_fi_info True servidor_id <- forkIO $ servidor chan_torn chan_informacio ref_estat `finally` signalQSem semàfor -- incrementa semàfor i assenyala client_id <- forkIO $ client chan_torn mv_resposta chan_informacio `finally` signalQSem semàfor -- incrementa semàfor i assenyala -- espera finalització dels dos processos, el client i el servidor waitQSem semàfor -- si semàfor > 0 llavors decrementa sinó espera waitQSem semàfor -- si semàfor > 0 llavors decrementa sinó espera writeChan chan_informacio InfoPlega takeMVar mv_fi_info -- espera fi informador putStrLn "fi del programa"
dóna la següent sortida:
client: comanda 3 15:18:55 - servidor: recollit 3 client: comanda 2 15:18:56 - servidor: recollit 2 client: comanda 1 15:18:57 - servidor: recollit 1 client: comanda 0 15:18:58 - servidor: recollit 0 fi del programa
Concurrència condicionada amb TVars - Mònada STM - Memòria transaccional [modifica]
Només al compilador GHC.[22] Les transaccions de memòria eviten blocar els fils d'execució, descartant canvis a les variables transaccionals si no es completa, excepte en cas que hi posem condicions forçant el reintent amb la clàusula "retry".
L'evolució de la transacció es modela com a aplicacions en una Mònada STM, inicials de "Software Transactional Memory".
En aquest exemple les transaccions[23] s'efectuen sobre variables transaccionals[24] TVar (accés sincronitzat per STM) i s'encapsulen en una mònada STM.
També substituïm les les MVar (comunic. síncrona) per TMVar, i les Chan (comunic. asíncrona) per TChan, per quedar lliures de problemes de bloquejos.[25]
- atomically
- admet o tot o no res dels canvis a les variables transaccionals, passa el resultat Mònada STM a Mònada IO
- retry
- provoca el reintent si no es donen les condicions esperades i reintenta una transacció alternativa per la branca "orElse" si existeix, i si no, bloca el fil d'execució fins que es modifiqui alguna de les variables transaccionals implicades en la transacció.
- orElse
- introdueix una transacció alternativa, que s'avalua si la primera fa "retry"
- always
- comprova invariant i si falla, genera un error "Transactional invariant violation" finalitzant el programa
- catchSTM
- atrapa excepcions dins la mònada STM
A partir de GHC 6.12, STM desapareix de la biblioteca pral. i, si no s'ha fet la instal·lació amb la Plataforma Haskell que l'incorpora, caldrà carregar el paquet stm del Hackage.[26]
- biblioteca Async[27]
- withAsync io $ \async -> accions al fil d'exec. actual
- llença l'op. io en un altre fil d'execució, asíncronament (sense esperar que acabi)
- wait async
- espera finalització del fil d'exec. llançat i en retorna el resultat
-- transaccions als comptes --fitxer stm_part1.hs module Stm_part1 (aporta_quan_cal_i_obtenir_saldo, retira_fons_de_dos_comptes ) where import Control.Monad.STM (STM, retry, orElse, always) import Control.Concurrent.STM.TVar (TVar, readTVar, writeTVar) import qualified Control.Monad as Monad saldo_baix = 4 aporta_quan_saldo_baix :: TVar Int -> Int -> STM () aporta_quan_saldo_baix tv_compte aportacio = do saldo <- readTVar tv_compte Monad.when (saldo > saldo_baix) retry -- bloca si no es dóna la condició, -- fins que es modifiqui alguna TVar, llavors reintenta let nou_saldo = saldo + aportacio writeTVar tv_compte nou_saldo invariant :: TVar Int -> STM Bool invariant tv_compte = do saldo <- readTVar tv_compte return $ saldo >= 0 retira_fons :: TVar Int -> Int -> STM () retira_fons tv_compte quantitat = do saldo <- readTVar tv_compte Monad.when (saldo < quantitat) retry -- si no hi ha saldo reintenta la transacció alternativa -- o bloca i torna a la inicial si no hi ha més alternatives writeTVar tv_compte $ saldo - quantitat always $ invariant tv_compte -- comprova invariant de la transacció retira_fons_de_dos_comptes :: TVar Int -> TVar Int -> Int -> STM Int retira_fons_de_dos_comptes tv_compteA tv_compteB quantitat = do retira_fons tv_compteA quantitat `orElse` -- alternativa de transacció retira_fons tv_compteB quantitat saldoA <- readTVar tv_compteA saldoB <- readTVar tv_compteB return $ saldoA + saldoB aporta_quan_cal_i_obtenir_saldo :: TVar Int -> TVar Int -> Int -> STM Int aporta_quan_cal_i_obtenir_saldo tv_compteA tv_compteB quantitat = do aporta_quan_saldo_baix tv_compteB quantitat saldoA <- readTVar tv_compteA saldoB <- readTVar tv_compteB return $ saldoA + saldoB
Principal engegant fils d'execució per a creditor, deutor i informador (gestiona stdout).
{-# LANGUAGE PackageImports #-} module Main( main ) where --fitxer stm_main.hs import Prelude hiding (catch) import Stm_part1 import "async" Control.Concurrent.Async (withAsync, wait, asyncThreadId) import Control.Concurrent (forkIO, threadDelay, killThread ,MVar, newEmptyMVar, putMVar, takeMVar) import Control.Monad.STM (STM, atomically) import Control.Concurrent.STM.TVar (TVar, newTVar) import Control.Concurrent.STM.TMVar (TMVar, newEmptyTMVarIO, putTMVar, takeTMVar) import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan) import Control.Exception (finally, block, mask_) import qualified Control.Monad as Monad import System.IO (stdout, hFlush) import Text.Printf (printf) import Control.Exception (catch, SomeException) data TInfo = InfoDelCreditor Int Int | InfoDelDeutor Int | InfoPlega -- missatges a l'informador pagament = 3 -- creditor passa rebuts al cobrament de manera periòdica creditor :: TVar Int -> TVar Int -> TChan TInfo -> IO () creditor tv_compteA tv_compteB tchan_informacio = do Monad.forM_ ([1,2..6]::[Int]) $ \periode -> do -- per als periodes de la llista threadDelay 1000000 -- espera microsegons saldo_conjunt <- atomically $ retira_fons_de_dos_comptes tv_compteA tv_compteB pagament atomically $ writeTChan tchan_informacio $ InfoDelCreditor periode saldo_conjunt -- deutor aporta diners al compte, quan el saldo baixa per sota d'un valor ''saldo_baix'' deutor :: TVar Int -> TVar Int -> TChan TInfo -> IO () deutor tv_compteA tv_compteB tchan_informacio = do ( Monad.forever $ do mask_ $ do -- mask_: no interrompible per excepcions asíncrones, tractar-les en completar el bloc saldo_conjunt <- atomically $ aporta_quan_cal_i_obtenir_saldo tv_compteA tv_compteB pagament atomically $ writeTChan tchan_informacio $ InfoDelDeutor saldo_conjunt ) `catch` esperaExcepcióProvocada where esperaExcepcióProvocada :: SomeException -> IO () esperaExcepcióProvocada = \_excep -> return () -- informador: gestiona sortides a ''stdout'' en un sol fil d'execució -- vehicula missatges a imprimir a través del canal transaccional TChan (versió transac. de Chan) informador :: TChan TInfo -> IO () informador tchan_informacio = do info <- atomically $ readTChan tchan_informacio -- bloca mentre canal buit case info of InfoDelCreditor periode saldo -> do printf "creditor: periode %d, saldo %d\n" periode saldo hFlush stdout informador tchan_informacio -- tornem-hi InfoDelDeutor saldo -> do printf "deutor: saldo %d\n" saldo hFlush stdout informador tchan_informacio -- tornem-hi InfoPlega -> return () main = do tv_compteA <- atomically $ newTVar 10 -- compte A tv_compteB <- atomically $ newTVar 4 -- compte B tchan_informacio <- atomically (newTChan :: STM (TChan TInfo)) -- canal per a la informació a imprimir withAsync (informador tchan_informacio) $ \asyncInformador -> do withAsync (deutor tv_compteA tv_compteB tchan_informacio) $ \asyncDeutor -> do withAsync (creditor tv_compteA tv_compteB tchan_informacio) $ \asyncCreditor -> do wait asyncCreditor killThread (asyncThreadId asyncDeutor) -- genera excepció asíncrona al fil de asyncDeutor wait asyncDeutor atomically $ writeTChan tchan_informacio InfoPlega -- afegeix ordre de plegar al canal de l'informador wait asyncInformador putStrLn "fi del programa"
Compilació i exec.
ghc --make stm_part1.hs stm_main.hs -o stm_main ./stm_main
Referències [modifica]
- ↑ mòdul Control.Concurrent Vegeu Scheduling
- ↑ 2,0 2,1 GHC's implementation of concurrency(anglès)
- ↑ 3,0 3,1 3,2 3,3 Variables MVar(anglès)
- ↑ [1](anglès)
- ↑ 5,0 5,1 Control.Concurrent - Bound threads(anglès) fils d'exec. lligats als del sistema.
- ↑ Concurrència i crides externes (FFI) al compilador GHC(anglès)
- ↑ La mònada Par - presentació (anglès)
- ↑ La mònada Par(anglès)
- ↑ Haskell Exchange 2012 - Simon Marlow - High performance concurrency
- ↑ El paquet Async
- ↑ CHP.pdf(anglès) Communicating Haskell Processes: Composable Explicit Concurrency Using Monads
- ↑ Univ. de Kent - Communicating Haskell Processes - pàgina inicial
- ↑ paquet chp
- ↑ Actors with Multi-Headed Receive Clauses (anglès)
- ↑ GHC >= 7.2 no suporta simultanejar les dependències de les biblios. haskell98 i base
- ↑ Cloud Haskell(anglès)
- ↑ A Cloud Haskell Appetiser(anglès) Un aperitiu de Cloud Haskell
- ↑ Vídeo: HIW 2012. Duncan Coutts: Cloud Haskell 2.0(anglès)
- ↑ Variables de sicronització MVar's (anglès)
- ↑ Control.Concurrent.Chan (anglès)
- ↑ Communicating .. - First Class Channels(anglès)
- ↑ API de concurrència del compilador GHC (anglès)
- ↑ HaskellWiki - STM - Transaccions de memòria per software (anglès)
- ↑ Variables transaccionals
- ↑ Avaluació de models de programació multicor (PDF)(anglès)
- ↑ Què se n'ha fet de Control.Concurrent.STM(anglès)
- ↑ mòdul Control.Concurrent.Async(anglès)
Enllaços externs [modifica]
- Haskellwiki - Concurrència
- Haskellwiki - Paral·lelisme
- The Unhandled Exception Handler (anglès) De com recollir al fil d'exec. principal les excepcions no caçades als fils d'execució fills.