Haskell concurrent

De Viquipèdia
Dreceres ràpides: navegació, cerca

Haskell concurrent amplia Haskell98 amb concurrència explícita.

Els dos conceptes principals en què es basa Haskell concurrent són les Mutable variables MVar α i la possibilitat d'engegar un nou fil d'execució via forkIO.

Concurrència[modifica | modifica el codi]

forkIO
engega un fil d'execució lleuger del planificador del Run Time System que, segons el compilador de Haskell concurrent, pot implementar multitasca cooperativa o bé arrabassadora (ang:preemptive)).[1]

Vegeu "La implementació de concurrència de GHC".[2]

mecanismes de comunicació
tipus mònada generadors mòdul descripció
MVar a[3] IO newMVar x
-- var buida per a un tipus T
newEmptyMVar :: IO (MVar T)
Control.Concurrent.MVar vars. globals sincronitzades que es buiden en consultar-les
doblen la funcionalitat com a
* bústia de comunicació per baldes (ang:locks)
* semàfor binari
Chan a[4] IO newChan x
Control.Concurrent.Chan bústia de comunicació per baldes (ang:locks), amb cua il·limitada
TVar a[5] STM / IO newTVar x
-- 'top level' (IO) TVar
newTVarIO x
Control.Concurrent.STM.TVar posicions de memòria compartida
suporten transaccions de memòria atòmiques
TMVar a[6] STM / IO newTMVar x
-- 'top level' TMVar
newTMVarIO x
Control.Concurrent.STM.TMVar bústia de comunicació transaccional
TChan a[7] STM / IO newTChan x
-- 'top level' TChan
newTChanIO x
Control.Concurrent.STM.TChan bústia de comunicació transaccional amb cua iŀlimitada


accés sincronitzat amb variables MVar (Mutable variable)[modifica | modifica el codi]

Una MVar té un símil en el món real en un torn (tambor giratori d'intercanvi entre comunitats separades, en un convent o en farmàcies de guàrdia a la nit, on hi ha dos actors o més si hi ha cua (esperen el torn) i dues operacions: esperar que sigui buit per omplir-lo i esperar que l'omplin per recuperar-ne el contingut).[8]

La consulta d'una variable MVar (takeMVar) en buida el contingut causant que altres fils d'execució que la consultin posteriorment quedin blocats en espera que se'n reposi el valor actualitzat (amb putMVar).[9]

  • "takeMVar mvar" bloca el fil d'exec. si mvar estava buida
  • "putMVar mvar valor" bloca si mvar estava plena

Generadors:

do 
  mvarPlenaInicialment <- newMVar contingut     -- newMVar :: t -> IO (MVar t)

  mvarBuidaInicialment <- newEmptyMVar :: IO (MVar T)   -- mvar per a un contingut de tipus T

Una MVar té tres facetes:[9]

  • Variable amb accés sincronitzat
  • Bústia de comunicació d'un sol element (takeMVar com a receive, putMVar com a send)
  • Semàfor binari (takeMVar com a wait, putMVar com a signal)
mvarSemàfor <- newEmptyMVar :: IO (MVar Bool)

-- engega fil d'exec. i en acabar 
--        desperta el primer dels fils suspesos pendents de la MVar
threadId <- forkIO procés `finally` putMVar mvarSemàfor True
...
-- suspèn, tot esperant que el fil d'exec. de ''procés'' acabi
takeMVar mvarSemàfor

Darrerament s'hi han afegit crides no blocants (tryTakeMVar, tryPutMVar).

També tenim modifyMVar_ (composició de takeMVar i putMVar que retorna IO ()) i altres novetats.

L'accés a una variable MVar és d'avaluació tardana, per tant el contingut serà avaluat en el consumidor i no en el productor!![9]

Els fils blocats es desperten per ordre de suspensió (FIFO).[9] "putMVar mvar" desperta el primer dels fils que ha cridat takeMVar amb la mateixa mvar, que s'emporta el valor i la deixa buida.

bústies de comunicació amb MVar, Chan, BoundedChan[modifica | modifica el codi]

Canals amb sincronització per baldes (ang: locks).

  • MVar: bústies d'un sol element (de Control.Concurrent)
  • Chan: bústies amb cua il·limitada (de Control.Concurrent.Chan)
  • BoundedChan: bústies limitades (del paquet BoundedChan)

Exemples:

forkOS, Bound threads: fils d'execució per a crides FFI externes o a biblioteques amb allotjament local al fil.[modifica | modifica el codi]

GHC assigna els fils d'exec. lleugers de la multitasca GHC llançats amb forkIO, en relació N-M amb els fils del sistema (hi ha un fil del sistema per cada processador elemental, anomenats capability sobre els quals s'executen els llançats amb forkIO).[2][10]

forkOS: Per poder fer ús de característiques específiques dels fils d'execució del sistema, forkOS facilita el llançament d'un fil d'execució en relació 1-1 amb els del sistema (Fils d'execució "lligats als del sistema" (Bound threads)[11]), facilitant l'ús de crides externes (FFI: Foreign Function Interface) en un fil, o bé l'ús d'allotjament lligat al fil d'execució. Biblioteques com OpenGL requereixen aquest ús.[11][12]

memòria transaccional: variables TVar, i comunicació amb TMVar i TChan[modifica | modifica el codi]

  • TVar són variables de "Memòria transaccional per programari".
  • TMVar i TChan són els equivalents transaccionals de MVar i Chan

Vegeu exemple #Concurrència condicionada amb TVars - Mònada STM - Memòria transaccional

Futuribles -- Encadenament de càlculs asíncrons - la mònada Par[modifica | modifica el codi]

Permet combinar càlculs funcionals purs en paral·lel, encadenant resultats com a paràmetres de paral·lelitzacions subseqüents. Vegeu refs.[13][14]

  • La classe ParFuture especialitza una mònada modelant el comportament d'un futurible: un objecte que havent engegat una acció asíncronament, possibilita l'obtenció del resultat posant el fil d'execució original en espera, fins que l'acció asíncrona acabi.
  • La classe ParIVar especialitza la ParFuture modelant una promesa: un objecte que possibilita una assignació única de valor a un futurible, pel cas que l'acció asíncrona tardi en excés.
  • El tipus (IVar a) és el de les variables d'assignació única, accessibles des de la tasca asíncrona i l'original.
  • El tipus (Par (Ivar a)) instancia les classes ParFuture i ParIvar
{-# LANGUAGE PackageImports #-}

import "monad-par" Control.Monad.Par (Par, runPar, spawn, get, put) -- put permet donar valor a un futurible abans no acabi

f = (*2)
g = (/2)
h (a, b) = a + b
k (a, b) = a - b

calcula_paral·lelitzant :: Double -> Par (Double, Double)
calcula_paral·lelitzant x = do
      futur_fx <- spawn (return (f x))  -- engega l'avaluació en paral·lel de (f x)
      futur_gx <- spawn (return (g x))  -- engega l'avaluació en paral·lel de (g x)
      a <- get futur_fx       -- espera que acabi el càlcul ''fx'' i n'obté el valor
      b <- get futur_gx       -- espera que acabi el càlcul ''gx'' ...
      futur_hab <- spawn (return (h (a,b))) -- engega en paral·lel (h (a,b))
      futur_kab <- spawn (return (k (a,b))) -- ...
      c <- get futur_hab      -- espera ''hab''
      d <- get futur_kab      -- espera ''kab''
      return (c,d)

main = do
        let res = runPar $ calcula_paral·lelitzant 4
        print res
cabal install monad-par
ghc --make -threaded -rtsopts -with-rtsopts=-N prova.hs
./prova
(10.0,6.0)

Per a càlculs que poden llançar excepcions, la biblioteca Async ofereix més varietats de tractament.

Operacions d'Entrada/Sortida asíncrones i simultànies[modifica | modifica el codi]

Amb la biblioteca Async.[15][16] -- Futuribles com a efecte global IO, amb implementació com a functor aplicatiu en el tipus Concurrently (classes Applicative i Alternative).

operacions asíncrones[modifica | modifica el codi]

  • withAsync llança una operació IO asíncronament (sense esperar-ne la finalització) en un fil d'execució nou i n'assegura la terminació (basat en bracket sobre el recurs fil d'execució (obtenció de recurs: async io) (alliberament de recurs: cancel)).
  • wait async (async és el descriptor) atura el fil actual en espera que s'acabi l'op. asíncrona, oferint-ne el resultat, i propagant-ne l'excepció en cas que n'hi hagi.
  • waitCatch retorna o bé excepció, o bé resultat, en un tipus Either.
  • waitAny [Async a] espera el resultat de la primera que acabi d'una llista oferint-ne el resultat o bé llanci una excepció propagant-la. waitAnyCancel afegeix cancelació de les altres. Hi ha versions waitAnyCatch i waitAnyCatchCancel per incloure l'excepció en el retorn (tipus Either).
  • waitEither permet esperar la primera de dues ops. asíncrones amb tipus del retorn diferent.
import Control.Concurrent.Async (withAsync, wait)

obtenirURLs url1 url2 = do
    -- engega l'execució asíncrona d'una acció i la lliga a la var. async1
    withAsync (getURL url1) $ \async1 -> do       -- el bloc que segueix s'executa al fil d'execució original
        withAsync (getURL url2) $ \async2 -> do   -- engega l'execució asíncrona i la lliga a async2
            page1 <- wait async1                  -- espera que acabi async1 i en retorna el resultat
            page2 <- wait async2                  -- espera async2 ...
            return (page1, page2)

combinadors d'operacions simultànies[modifica | modifica el codi]

  • Simultaneja dues operacions, i en retorna ambdós resultats
concurrently :: IO a -> IO b -> IO (a, b)
  • Simultaneja dues operacions avortant la que no acabi primer.
race :: IO a -> IO b -> IO (Either a b)
  • Mapeja una funció amb efectes col·laterals (a -> IO b) a una col·lecció de valors, i les executa simultàniament, retornant-ne la col·lecció de resultats.
mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)

L'efecte Concurrently[modifica | modifica el codi]

A més alt nivell el tipus Concurrently embolcalla l'operativa precedent en la implementació de les classes Applicative (aplicant l'esmentat combinador concurrently) i Alternative (aplicant race, per esperar la primera que acabi, cancel·lant les altres).

import Control.Concurrent.Async (Concurrently (runConcurrently) )

obtenirURLs url1 url2 url3 = do
    (page1, page2, page3) <- runConcurrently $
       -- combina aplicativament els resultats dels efectes Concurrently en una Tupla 
       (,,) <$> Concurrently (getURL url1)   
            <*> Concurrently (getURL url2)
            <*> Concurrently (getURL url3)

-- la implementació d'Alternative (<|>) sobre Concurrently retorna la que acaba primer, cancel·lant les altres
ghci
Prelude>:m + Control.Applicative Control.Concurrent Control.Concurrent.Async Control.Exception
Prelude ...> let esperaSegons secs = threadDelay (secs * 1000000) >> return secs :: IO Int
Prelude ...> let ca = Concurrently $ esperaSegons 5
Prelude ...> let cb = Concurrently $ esperaSegons 3
Prelude ...> runConcurrently $ ca <|> cb    -- avorta la que no acabi primer
3  
Prelude ...> let cc = Concurrently $ throwIO $ userError "excepció"
Prelude ...> runConcurrently $ ca <|> cb <|> cc
*** Exception: user error (excepció)

model d'àlgebra de processos CSP[modifica | modifica el codi]

Communicating Haskell Processes[modifica | modifica el codi]

La biblioteca "Communicating Haskell Processes" de la universitat de Kent implementa una àlgebra de processos CSP, basada en mònades.[17][18][19]

Aquesta biblio. consta de diversos paquets. Cal el paquet chp-plus del Hackage per fer córrer els exemples de les guies.

Si p i q són processos, s'hi defineix, entre d'altres, els següents operadors de composició:

  stop        -- :: CHP () --   CSP: STOP o bé 0
  skip        -- :: CHP a  --   CSP: SKIP
  p <-> q     -- alternativa,   CSP: P | Q
  p <||> q    -- paral·lelisme,  CSP: P || Q
  p >> q      -- seqüència,     CSP: P; Q
  forever p   -- iteració,     CSP: ∗P
  p </> q     -- alternativa amb prioritat
  p <&> q     -- sincronia (''join''), prospera si ambdós processos estan a punt (missatge disponible) simultàniament.

El codi (per a GHC 6.10) ha quedat una mica desfasat. Per compilar a GHC 7.4 cal solucionar ambigüitats per la col·lisió de símbols afegits a les noves versions de les biblioteques emprades i un símbol desaparegut de les noves versions de QuickCheck que es troba a la versió 2.3.* Passos:

# -- Module 'Test.QuickCheck.Property' does not export `liftIOResult'
# -- Al compilar QuickCheck-2.3.0.2 -- couldn't deduce (Show a) from (Integral a) -- a la signatura de ''ranges''
# descarregar QuickCheck-2.3.0.2
# actualitzar la versió al fitxer .cabal afegint-hi .1 (Version: 2.3.0.2.1)
# modif. Test.QuickCheck.Text afegint (Show a) a la signatura de la funció ''ranges''
cabal install

# -- solucionar al paquet "chp" mòdul Control.Concurrent.CHP.Clocks, l'ambigüitat dels símbols modifyTVar i modifyTVar'
# -- modifyTVar i modifyTVar' corresponen a funcions definides localment al mòdul
# import Control.Concurrent.STM hiding (modifyTVar, modifyTVar') 
# actualitzar la versió al fitxer .cabal afegint-hi .1
cabal install

cabal install chp-plus --constraint=QuickCheck==2.3.0.2.1 --constraint=pretty==1.1.0.* --constraint=chp==num_versió_modificada

model d'Actors[modifica | modifica el codi]

la biblio Actor[modifica | modifica el codi]

Implementa actors multi-canal despatxant, a la recepció, en comptes de per missatge, per llista de missatges de canals diferents.[20]

Està codificada en Haskell98. Per compilar a GHC >= 7.2[21] cal evitar la dependència de Haskell98, reanomenant les importacions de mòduls H98 {IO, Monad, Control.Exception} a nomenclatura GHC equivalent {System.IO, Control.Monad, Control.OldException}, i treure la dependència de haskell98 del fitxer de projecte .cabal

Cloud Haskell[modifica | modifica el codi]

Concurrència per pas de missatges per a sistemes distribuïts, a l'estil de l'Erlang. Aplica el model d'Actors als processos distribuïts de manera similar a les construccions del llenguatge i sistema concurrent distribuït de nodes de l'Erlang.[22][23][24]

Gestor d'excepcions al fil principal de les excepcions no tractades dels fils subordinats[modifica | modifica el codi]

Amb setUncaughtExceptionHandler.[25]

setUncaughtExceptionHandler :: (SomeException -> IO ()) -> IO ()

Vegeu exemple a "The Unhandled Exception Handler".[26]

Paral·lelisme[modifica | modifica el codi]

Vegeu-ho a GHC

Exemples de concurrència[modifica | modifica el codi]

Concurrència simple amb MVars - Productor-consumidor[modifica | modifica el codi]

Amb variables de sincronització per baldes (blocants) MVar.[27]

forkIO
Engega fil d'execució lleuger del planificador del Haskell en multiprocés cooperatiu.
putMVar mvar valor
bloca si la variable MVar és plena (ocupada) fins que estigui disponible (buida) i llavors l'omple amb el valor.
takeMVar mvar
bloca el fil d'execució si la variable MVar és buida fins que la li omplin, llavors en retorna el valor i la deixa buida.
module Main( main ) where

import Control.Concurrent (forkIO, threadDelay, MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Exception (finally)
import qualified Control.Monad as Monad
import System.IO (stdout, hFlush)
import Text.Printf (printf)
import Data.Time.Clock (getCurrentTime)
import Data.Time.LocalTime (utcToLocalZonedTime)
import Data.Time.Format (FormatTime, formatTime, defaultTimeLocale)

obtenir_hora :: IO String
obtenir_hora = do
    local_t <- (getCurrentTime >>= utcToLocalZonedTime)   -- els parèntesis hi són per legibilitat

    return $ formatTime defaultTimeLocale "%T" local_t

productor :: MVar Int -> IO ()
productor mv_bústia = do
  Monad.forM_ [3,2..0] $ \compte_enrere -> do  -- per als valors de la llista
    threadDelay 1000000            -- espera microsegons
    putMVar mv_bústia compte_enrere    -- posa valor a la MVar

consumidor :: MVar Int -> IO ()
consumidor mv_bústia = do
        x <- takeMVar mv_bústia    -- bloca a l'espera que omplin la MVar
        hora <- obtenir_hora

        printf "%s - consumidor: recollit %d\n" hora x
        hFlush stdout
        if x == 0 then return ()                  -- s'ha acabat
                  else consumidor mv_bústia       -- tornem-hi

main = do
  -- nova MVar per la sicronització productor / consumidor
  mv_bústia <- newEmptyMVar :: IO (MVar Int)

  -- nova MVar per la sincronització de finalització de fil d'exec.
  mv_fi_prod <- newEmptyMVar :: IO (MVar Bool)
  mv_fi_consum <- newEmptyMVar :: IO (MVar Bool)

  -- forkIO: engega fil d'execució
  consumidor_id <- forkIO $ consumidor mv_bústia `finally`
                             putMVar mv_fi_consum True  -- assenyala l'acabament a la MVar
                                  -- despertant el primer dels fils blocats per la mateixa

  productor_id <- forkIO $ productor mv_bústia `finally`
                            putMVar mv_fi_prod True

  -- emulem amb MVar's la feina de ''pthread_join()'' de l'Unix
  --  per esperar la finalització dels fils d'exec. creats

  takeMVar mv_fi_prod       -- bloca fins a la fi del productor
  takeMVar mv_fi_consum      -- bloca fins a la fi del consumidor
  putStrLn "fi del programa"

produeix la sortida següent:

11:32:03 - consumidor: recollit 3
11:32:04 - consumidor: recollit 2
11:32:05 - consumidor: recollit 1
11:32:06 - consumidor: recollit 0
fi del programa

Client-servidor - Canals amb cues d'entrada (Chan)[modifica | modifica el codi]

Client-servidor, canalitzant la impressió

Canals no acotats (en la dimensió de la cua) (Control.Concurrent.Chan)[28] "de primera classe" (és a dir, que es pot passar com a paràmetre)[29]

  • forkIO: Engega fil d'execució lleuger del planificador del Haskell.
  • forkOS: engega un fil lligat a un del sistema operatiu, per al cas de voler utilitzar característiques dels fils d'exec. del sistema, com ara crides FFI o bé allotjament lligat al fil.[11]
  • comunic. per cues il·limitades
writeChan canal
afegeix a la cua il·limitada i retorna tot seguit (sense blocar) (comunicació asíncrona).
readChan canal
bloca si la cua del canal és buida
  • resposta per MVars
  • semàfors:
newQSem valorInicial
nou semàfor
signalQSem semàfor
incrementa semàfor i assenyala
waitQSem semàfor
si semàfor > 0 llavors decrementa i continua, sinó espera
  • A l'exemple el client encua al canal torn, el parell (comanda, ref. resposta (mv_resposta)), i queda a l'espera de la resposta.
module Main( main ) where

import Control.Concurrent (forkIO, forkOS, threadDelay,
                           MVar, newEmptyMVar, putMVar, takeMVar, tryPutMVar)
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
import Control.Concurrent.QSem (newQSem, signalQSem, waitQSem)

import Control.Exception (finally)
import Data.IORef (IORef, newIORef, readIORef, writeIORef, modifyIORef)
import qualified Control.Monad as Monad
import System.IO (stdout, hFlush)
import Text.Printf (printf)
import Data.Time.Clock (getCurrentTime)
import Data.Time.LocalTime (utcToLocalZonedTime)
import Data.Time.Format (FormatTime, formatTime, defaultTimeLocale)

data TInfo = InfoDelClient Int | InfoDelServidor String Int | InfoPlega  -- missatges a l'informador

type Canal_Comanda = Chan (Int, MVar_Resposta)
type MVar_Resposta = MVar Int
type Canal_Info = Chan TInfo


obtenir_hora :: IO String
obtenir_hora = do
        local_t <- (getCurrentTime >>= utcToLocalZonedTime)  -- els parèntesis hi són per legibilitat però, de fet, no calen
        return $ formatTime defaultTimeLocale "%T" local_t

client :: Canal_Comanda -> Canal_Info -> IO ()
client chan_torn chan_info = do
  Monad.forM_ [3,2..0] $ \cnt -> do    -- llista de valors a passar, finalitzant en zero
    threadDelay 1000000                -- espera microsegons
    writeChan chan_info (InfoDelClient cnt)  -- no bloca (asíncron, encua al canal i continua)

    mv_resposta <- newEmptyMVar :: IO (MVar Int)
    -- encua la comanda passant la ref. de la mvar de resposta.
    writeChan chan_torn (cnt, mv_resposta)
    takeMVar mv_resposta                     -- bloca (espera resposta per continuar)

obtenir_resposta :: Int -> IORef Int -> IO Int
obtenir_resposta x ref_estat = readIORef ref_estat >>= (return. (+x))  -- la que vulgueu

servidor :: Canal_Comanda -> Canal_Info -> IORef Int -> IO ()
servidor chan_torn chan_info ref_estat = do

        (x, mv_resposta) <- readChan chan_torn               -- bloca fins obtenir comanda al chan_torn
        hora <- obtenir_hora
        resp <- obtenir_resposta x ref_estat

        tryPutMVar mv_resposta resp    -- respon si és possible

        writeChan chan_info (InfoDelServidor hora x)   -- no bloca (asíncron, encua al canal i continua)
        -- mentre rebem x > 0 fem bucle, altrament acaba.
        Monad.when (x > 0) $ servidor chan_torn chan_info ref_estat    -- tornem-hi

-- informador: gestiona sortides a ''stdout'' en un sol fil d'execució
informador :: Canal_Info -> IO ()
informador chan_info = do
        info <- readChan chan_info           -- bloca si la cua és buida
        case info of
          InfoDelClient intValor -> do
                printf "client: comanda %d\n" intValor
                hFlush stdout
                informador chan_info         -- tornem-hi

          InfoDelServidor strHora intValor -> do
                printf "%s - servidor: recollit %d\n" strHora intValor
                hFlush stdout
                informador chan_info         -- tornem-hi

          InfoPlega -> return ()             -- acaba

main = do
  ref_estat <- newIORef 0 :: IO (IORef Int) -- ref. no sincronitzada (la manipula un sol fil)
  chan_torn <- newChan :: IO (Canal_Comanda)  -- cua de comandes al servidor

  chan_informacio <- newChan :: IO (Canal_Info)  -- cua d'impressió

  -- semàfors per a l'espera d'acabament dels fils d'execució
  semàfor <- newQSem 0                           -- semàfor per a client i servidor
  mv_fi_info <- newEmptyMVar :: IO (MVar Bool)   -- semàfor amb MVar per a l'informador

  -- malgrat que la gestió de ''stdout'' pel fil d'exec. de l'informador
  --   no requereix ''bound threads'', li poso el forkOS per trencar el tabú.

  informador_id <- forkOS {- per les crides externes -} $ informador chan_informacio
                                 `finally` putMVar mv_fi_info True

  servidor_id <- forkIO $ servidor chan_torn chan_informacio ref_estat
                                 `finally` signalQSem semàfor          -- incrementa semàfor i assenyala
  client_id <- forkIO $ client chan_torn chan_informacio
                                 `finally` signalQSem semàfor          -- incrementa semàfor i assenyala

  -- espera finalització dels dos processos, el client i el servidor
  waitQSem semàfor              -- si semàfor > 0 llavors decrementa sinó espera
  waitQSem semàfor              -- si semàfor > 0 llavors decrementa sinó espera

  writeChan chan_informacio InfoPlega
  takeMVar mv_fi_info        -- espera fi informador

  putStrLn "fi del programa"

dóna la següent sortida:

client: comanda 3
15:18:55 - servidor: recollit 3
client: comanda 2
15:18:56 - servidor: recollit 2
client: comanda 1
15:18:57 - servidor: recollit 1
client: comanda 0
15:18:58 - servidor: recollit 0
fi del programa

Concurrència condicionada amb variables transaccionals (TVar) - Mònada STM (transaccions de memòria per programari)[modifica | modifica el codi]

Només al compilador GHC.[30] Les transaccions de memòria eviten blocar els fils d'execució, descartant canvis a les variables transaccionals si no es completa, excepte en cas que hi posem condicions forçant el reintent amb la clàusula "retry".

L'evolució de la transacció es modela com a aplicacions en una Mònada STM, inicials de "Software Transactional Memory".

En aquest exemple les transaccions[31] s'efectuen sobre variables transaccionals[32] TVar (accés sincronitzat per STM) i s'encapsulen en una mònada STM.

També substituïm les les MVar (comunic. síncrona) per TMVar, i les Chan (comunic. asíncrona) per TChan, per quedar lliures de problemes de bloquejos.[33]

atomically
avalua una expressió de la mònada STM, admetent o tot o no res dels canvis a les variables transaccionals, oferint el resultat com a efecte global (mònada IO).
retry
provoca el reintent si no es donen les condicions esperades i reintenta una transacció alternativa per la branca "orElse" si existeix, i si no, bloca el fil d'execució fins que es modifiqui alguna de les variables transaccionals implicades en la transacció.
orElse
introdueix una transacció alternativa, que s'avalua si la primera fa "retry"
always
comprova invariant i si falla, genera un error "Transactional invariant violation" finalitzant el programa
catchSTM
atrapa excepcions dins la mònada STM

A partir de GHC 6.12, STM desapareix de la biblioteca pral. i, si no s'ha fet la instal·lació amb la Plataforma Haskell que l'incorpora, caldrà carregar el paquet stm del Hackage.[34]

  • La concurrència a l'exemple utilitza la biblioteca Async:[35]
withAsync acció_io $ \ async -> altres accions al fil d'exec. original
llença l'acció_io en un altre fil d'execució, asíncronament i continua amb les altres accions
wait async
espera finalització del fil d'exec. llançat asíncronament, identificat per async i en retorna el resultat
-- transaccions als comptes --fitxer stm_part1.hs

module Stm_part1 (aporta_quan_cal_i_obtenir_saldo, 
                  retira_fons_de_dos_comptes
                 ) where  
 
import Control.Monad.STM (STM, retry, orElse, always)
import Control.Concurrent.STM.TVar (TVar, readTVar, writeTVar)
import qualified Control.Monad as Monad
 
saldo_baix = 4
 
aporta_quan_saldo_baix :: TVar Int -> Int -> STM ()
aporta_quan_saldo_baix tv_compte aportacio = do 
 
		saldo <- readTVar tv_compte   
		Monad.when (saldo > saldo_baix) retry   -- bloca si no es dóna la condició, 
                                                        -- fins que es modifiqui alguna TVar, llavors reintenta
		let nou_saldo = saldo + aportacio
		writeTVar tv_compte nou_saldo
 
invariant :: TVar Int -> STM Bool
invariant tv_compte = do
                saldo <- readTVar tv_compte 
                return $ saldo >= 0
 
retira_fons :: TVar Int -> Int -> STM ()
retira_fons tv_compte quantitat = do
 
		saldo <- readTVar tv_compte
		Monad.when (saldo < quantitat) retry      -- si no hi ha saldo reintenta la transacció alternativa 
                                                          -- o bloca i torna a la inicial si no hi ha més alternatives
		writeTVar tv_compte $ saldo - quantitat
		always $ invariant tv_compte              -- comprova invariant de la transacció
 
retira_fons_de_dos_comptes :: TVar Int -> TVar Int -> Int -> STM Int
retira_fons_de_dos_comptes tv_compteA tv_compteB quantitat = do
 
                retira_fons tv_compteA quantitat `orElse`  -- alternativa de transacció
                       retira_fons tv_compteB quantitat

                saldoA <- readTVar tv_compteA
                saldoB <- readTVar tv_compteB
                return $ saldoA + saldoB
 
aporta_quan_cal_i_obtenir_saldo :: TVar Int -> TVar Int -> Int -> STM Int
aporta_quan_cal_i_obtenir_saldo tv_compteA tv_compteB quantitat = do
 
                aporta_quan_saldo_baix tv_compteB quantitat
                saldoA <- readTVar tv_compteA
                saldoB <- readTVar tv_compteB
                return $ saldoA + saldoB

Principal engegant fils d'execució per a creditor, deutor i informador (gestiona stdout).

{-# LANGUAGE PackageImports, ScopedTypeVariables #-}
module Main( main ) where  --fitxer stm_main.hs

import Prelude hiding (catch)
import Stm_part1

import "async" Control.Concurrent.Async (withAsync, wait, asyncThreadId)
import Control.Concurrent (forkIO, threadDelay, killThread
            ,MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Monad.STM (STM, atomically)
import Control.Concurrent.STM.TVar (TVar, newTVar)
import Control.Concurrent.STM.TMVar (TMVar, newEmptyTMVarIO, putTMVar, takeTMVar)
import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan)
import Control.Exception (finally, mask_)
import qualified Control.Monad as Monad
import System.IO (stdout, hFlush)
import Text.Printf (printf)
import Control.Exception (catch, SomeException)

default (Int, Double)  -- seq. de tipus per resoldre ambigüitats dels literals numèrics

data TInfo = InfoDelCreditor Int Int | InfoDelDeutor Int | InfoPlega    -- missatges a l'informador

pagament = 3

-- creditor passa rebuts al cobrament de manera periòdica

creditor :: TVar Int -> TVar Int -> TChan TInfo -> IO ()
creditor tv_compteA tv_compteB tchan_informacio = do

  Monad.forM_ ([1,2..6]) $ \període -> do    -- per als períodes de la llista
    threadDelay 1000000                             -- espera microsegons
    saldo_conjunt <- atomically $ retira_fons_de_dos_comptes tv_compteA tv_compteB pagament
    atomically $ writeTChan tchan_informacio $ InfoDelCreditor període saldo_conjunt

-- deutor aporta diners al compte, quan el saldo baixa per sota d'un valor ''saldo_baix''

deutor :: TVar Int -> TVar Int -> TChan TInfo -> IO ()
deutor  tv_compteA tv_compteB tchan_informacio = do

     ( Monad.forever $ do
           -- mask_ emmascara interrupcions asíncrones (externes) (ex. killThread que llançarem des del fil principal)
           mask_ $ do   -- el bloc no serà interromput 
               saldo_conjunt <- atomically $ aporta_quan_cal_i_obtenir_saldo tv_compteA tv_compteB pagament
               atomically $ writeTChan tchan_informacio $ InfoDelDeutor saldo_conjunt
           )
     `catch` (\(_excep :: SomeException) -> return ()) -- cas d'excepció asíncrona "killThread" del fil principal, acaba

-- informador: gestiona sortides a ''stdout'' en un sol fil d'execució
-- vehicula missatges a imprimir a través del canal transaccional TChan (versió transac. de Chan)

informador :: TChan TInfo -> IO ()
informador tchan_informacio = do
        info <- atomically $ readTChan tchan_informacio               -- bloca mentre canal buit
        case info of
          InfoDelCreditor període saldo -> do
                    printf "creditor: període %d, saldo %d\n" període saldo
                    hFlush stdout
                    informador tchan_informacio    -- tornem-hi
                    
          InfoDelDeutor saldo -> do
                    printf "deutor: saldo %d\n" saldo
                    hFlush stdout
                    informador tchan_informacio    -- tornem-hi
                    
          InfoPlega -> return ()

main = do
  tv_compteA <- atomically $ newTVar 10                -- compte A
  tv_compteB <- atomically $ newTVar 4                 -- compte B

  tchan_informacio <- atomically (newTChan :: STM (TChan TInfo))   -- canal per a la informació a imprimir

  
  withAsync (informador tchan_informacio) $ \asyncInformador -> do
          
      withAsync (deutor tv_compteA tv_compteB tchan_informacio) $ \asyncDeutor -> do
                  
          withAsync (creditor tv_compteA tv_compteB tchan_informacio) $ \asyncCreditor -> do

              wait asyncCreditor  -- espera que acabi el fil del creditor 
                          
              killThread (asyncThreadId asyncDeutor)         -- genera excepció asíncrona al fil de asyncDeutor
              wait asyncDeutor    -- espera que acabi el fil del deutor
                  
              atomically $ writeTChan tchan_informacio InfoPlega    -- afegeix ordre de plegar al canal de l'informador

              wait asyncInformador  -- espera que acabi el fil de l'informador
          
              putStrLn "fi del programa"

Compilació i exec.

ghc --make stm_part1.hs stm_main.hs -o stm_main
./stm_main

Referències[modifica | modifica el codi]

  1. mòdul Control.Concurrent Vegeu Scheduling
  2. 2,0 2,1 GHC's implementation of concurrency(anglès)
  3. Control.Concurrent.MVar(anglès)
  4. Control.Concurrent.Chan(anglès)
  5. Control.Concurrent.STM.TVar(anglès)
  6. Control.Concurrent.STM.TMVar(anglès)
  7. Control.Concurrent.STM.TChan(anglès)
  8. Diccionari de l'Institut - Torn
  9. 9,0 9,1 9,2 9,3 Variables MVar(anglès)
  10. «Concurrency – Haskell Prime».(anglès)
  11. 11,0 11,1 11,2 Bound threads (fils d'execució lligats als del sistema)(anglès)
  12. Concurrència i crides externes (FFI) al compilador GHC(anglès)
  13. La mònada Par - presentació (anglès)
  14. La mònada Par(anglès)
  15. Haskell Exchange 2012 - Simon Marlow - High performance concurrency
  16. El paquet Async
  17. CHP.pdf(anglès) Communicating Haskell Processes: Composable Explicit Concurrency Using Monads
  18. Univ. de Kent - Communicating Haskell Processes - pàgina inicial
  19. paquet chp
  20. Actors with Multi-Headed Receive Clauses (anglès)
  21. GHC >= 7.2 no suporta simultanejar les dependències de les biblios. haskell98 i base
  22. Cloud Haskell(anglès)
  23. A Cloud Haskell Appetiser(anglès) Un aperitiu de Cloud Haskell
  24. Vídeo: HIW 2012. Duncan Coutts: Cloud Haskell 2.0(anglès)
  25. GHC.Conc.Sync.setUncaughtExceptionHandler(anglès)
  26. The Unhandled Exception Handler(anglès)
  27. Variables de sicronització MVar's (anglès)
  28. Control.Concurrent.Chan (anglès)
  29. Communicating .. - First Class Channels(anglès)
  30. API de concurrència del compilador GHC (anglès)
  31. HaskellWiki - STM - Transaccions de memòria per software (anglès)
  32. Variables transaccionals
  33. Avaluació de models de programació multicor (PDF)(anglès)
  34. Què se n'ha fet de Control.Concurrent.STM(anglès)
  35. mòdul Control.Concurrent.Async(anglès)

Enllaços externs[modifica | modifica el codi]