Haskellによる並列・並行プログラミングの4.2.1

Haskellによる並列・並行プログラミング
Simon Marlow 山下 伸夫
4873116899

こちらの演習問題の解答が stackoverflow とか github に上がってるんだけど、間違えてそうなので自分が書いたものを解説しておく。

問題の例を読むと、一回のforkで生産する要素数は200だけど、消費者側が100個目を消費したところで次のforkが走り始める必要があることがわかる。これがなかなか難しい。

まず簡単なところから。型は問題文で与えられているので、streamFoldは自明である。ForkConsを持っているので、forkする以外はConsと同じ事をやるだけでいい。

streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn !acc instrm = do
  ilst <- get instrm
  case ilst of
    Nil      -> return acc
    Cons h t -> streamFold fn (fn acc h) t
    Fork kick (Cons h t) -> fork kick >> streamFold fn (fn acc h) t

streamFromListは問題文よりchunkの大きさとforkするタイミングを渡せって言われてるので以下の型。

type ForkSetting = (Int, Int)
streamFromList :: NFData a => [a] -> Par (Stream a)
streamFromList' :: NFData a => ForkSetting -> [a] -> Par (Stream a)

これにあわせてloopも型を変える必要がある。が、さらにもうひと工夫 Maybe (IVar (IList a)) という引数が必要で、これは後述する。 

loop :: (Int, Int) -> [a] -> IVar (IList a) -> Maybe (IVar (IList a)) -> Par ()

loop の通常時の動作は改修前と変わらない。loopの引数の変更にあわせてパラメータを適切に受け渡すだけである。ただし、追加したカウンタのデクリメントは必要である。 

  loop _ [] var _ = put var Nil   -- <4>
  loop (n, k) (x:xs) var nVar = do      -- <5>
    tail <- new                         -- <6>
    put var (Cons x tail)               -- <7>
    loop (n - 1, k - 1) xs tail nVar    -- <8>

n == 1 のときも簡単。これは指定された数のチャンクを生産し終わった時なので、再帰を停止すればよい。ただ、再帰をやめるのにあたり、新しいIVarをここで生成してしまうと、このIVarへ値をputしてくれるワーカーがどこにもいなくなってしまう。そこで前述した loop へ追加した最後の引数を使う。

  loop (1, _) (x:_) var (Just next) = put var (Cons x next)

この next は、次のchunkの生産時に結果を put する Stream の先頭要素である。なので、消費者側はこの nextget することで続きのデータが得られる。

k == 0 の時が次のchunkの生産を fork すべきときで、ここで loop の最後の引数も作る必要がある。あと、 ForkCons も兼ねているので、通常の Cons を生成する場合の処理も必要となる。 nVar が次のchunkの結果を含むStreamの先頭で、これを loop で引き回して今のchunkの生産が終わったら nVar へ繋げてやる。forkする際、現在のワーカーが残り n - 1 個の生産をする予定のはずなので、それらは drop でリストから捨ててやる。

  loop (n, 0) (x:xs) var Nothing = do
    tail <- new
    nVar <- new
    put var (Fork (kick (drop (n - 1) xs) nVar)
                  (Cons x tail))
    loop (n - 1, -1) xs tail (Just nVar)

  kick xs var = loop (chunkSize, forkPoint) xs var Nothing

ここまでできてしまえば、 streamMap は生産者と消費者の両方の役割を兼ねるだけなのでさくっと実装できるかと思うが、実はそうはいかない。 streamFromList はlistを相手にしていたために Fork を消費者へ渡す時点で次のchunkがどの要素から生成を始めればよいかがわかったが、言語今回は入力が Stream なので上流からの値を chunkSize 個すべて受け取るまでは次のchunkの対象とすべき要素が確定しない。

この問題を解決するには、 loop の最後の引数を Maybe (IVar (IVar (IList a), IVar (IList b))) というように入れ個の IVar に変えてやるとよい。そして、現在のforkがchunkSize個のデータの生産を終えた時点で、次のforkに対して入力と出力の2つの Stream を表す IVar を伝えてやればよい。

Forkを要求する側は、だいたいこうなる(Nil は再帰を終えるので自明、Forkを消費する部分はConsの消費と同じなので、それぞれ省略している)。次のchunkの入出力の先頭を表すStreamを受け取るために nextstrm という IVar を生成し、これを loop の引数としてchunkの末尾まで引き回している。chunkの末尾までいって対象となる Stream が確定した時点で、(instrm, outstrm) <- get var と結果を受け取って次のchunkの処理を開始する。

  loop (n, 0) instrm outstrm Nothing = do
    ilst <- get instrm
    case ilst of
...(snip)...
      Cons h t -> do
        newtl <- new
        nextstrm <- new
        put outstrm (Fork (kick' nextstrm)
                          (Cons (fn h) newtl))
        loop (n - 1, -1) t newtl (Just nextstrm)
...(snip)...

  kick instrm outstrm = loop (chunkSize, forkPoint) instrm outstrm Nothing
  kick' var = do
    (instrm, outstrm) <- get var
    kick instrm outstrm

chunkの末尾でやることは、だいたい以下の通り。次のchunkの入力の先頭である tail がここで確定し、また、出力を書き込むために newtl という新たな Stream を生成し、 loop の末尾の引数を使って次のchunkにそれらを伝えると共に、消費者には続きを得るために newtl を返している。

  loop (1, _) instrm outstrm (Just next) = do
    ilst <- get instrm
    case ilst of
...(snip)...
      Cons h tail -> do
        newtl <- new
        put next (tail, newtl)
        put outstrm (Cons (fn h) newtl)
...(snip)...

この実装でthreadscopeをとると以下のようになった。本に載っていたサンプルは消費者の方が速いタイプだったので本件の実装が正しく働いているかは判断できないが、きちんと並列に動作していることは見て取れる。fork の回数が増えたことで、コアは2つだけではなくすべてがまんべんなく使われるようになった。

threadwatch

threadwatch

完全な実装はgithubの方を参照して欲しい。ただし、面倒だったので本に載っていない streamFilter の方は改修していないので使えない(forkしないので最初のchunkで止まる)。