Previous Up Next

 7  スレッド

スレッド とは一つのプログラムの中で互いに並列に実行できる制御の流れであり、軽量プロセス とも言われます。

この章ではスレッドを作成し (Thread モジュール) 同期を行うプログラムを説明します。同期にはロック (Mutex モジュール)、 条件変数 (Condition モジュール)、そして同期イベント (Event モジュール) が使われます。

7.1  イントロダクション

fork は現在のプロセスのコピーを作成します (したがってプログラムのコピーも作成します) が、スレッドの作成はこれとは大きく異なります。フォークを行った場合親プロセスと子プロセスのアドレス空間は完全に分離し、二つのプロセスは (ファイルやパイプに対する入出力といった) システムコールを通じてでしかやり取りをすることができません。

これに対して、あるプログラム内の全てのスレッドは同じアドレス空間を共有します。スレッド間で共有されず、スレッド同士で異なる唯一の情報はスレッドの ID と実行スタック (シグナルマスクやロックと条件変数の状態などのシステム情報を含む) です。この観点では、スレッドはコルーチンに似ています。プログラム内のスレッドは全て同じ方法で扱われますが、プログラムが始まって最初に作られるスレッドは別です。このスレッドが終了すると、そのプログラムが持つ全てのスレッドが終了ししたがってプログラム全体が終了します(これから複数のスレッドといった場合には考えているプログラム内の複数のスレッドを意味し、別のプログラム内で動いているスレッドのことは意味しないことにします)。

制御を明示的にに受け渡し並列に実行できないコルーチンと違い、スレッドは並列に実行でき、(他のスレッドやプロセスではなく) システムによってプリエンプティブにスケジュールされます。この観点ではスレッドはプロセスに似ています。

アドレス空間が共有されることで、スレッド同士は共有メモリを使って直接通信することができます。スレッドは並列に実行されるために、全てのスレッドは共有データに対するアクセスを同期しなければいけません。あるスレッドが書き込み終わってから別のスレッドが読み込むようにするためです。原理上は必要なわけではありませんが、実際にはこれを行うためにオペレーティングシステムを介する必要があります。スレッドの同期はロックや条件変数を使うか、あるいはイベントを使って高レベルに行われます。

プロセスに対するスレッドの利点は低い作成コストと大きなデータを受け渡すときにデータ全体をコピーするのではなくポインタを渡すだけですむことです。

一方、スレッドを使うとスレッド同士の同期を管理しなければならず、スレッドが致命的なエラーを出す場合にも対処する必要があります。例えばスレッドは停止する前にロックを開放し不変量が保存されるようにする必要があります。上記の利点による恩恵があまりないのであればスレッドよりもプロセスのほうが望ましいでしょう。

OCaml における実装

ネイティブスレッドを使うアプリケーションをコンパイルするには次のようにします:

ocamlc -thread unix.cma threads.cma -o prog mod1.ml mod2.ml mod3.ml ocamlopt -thread unix.cmxa threads.cmxa -o prog mod1.ml mod2.ml mod3.ml

ocamlbuild ツールを使うのであれば、_tags ファイルに以下の内容を追加するだけですみます:

<mod{1,2,3}.ml> : thread <prog.{native,byte}> : use_unix, thread

インストールされた OCamlがネイティブスレッドをサポートしない場合、7.8 節およびマニュアルにあるシミュレートされた “vm レベル” スレッドを使う方法を参照してください。この章のこれ以降の内容はネイティブスレッドを仮定し、vm レベルスレッドには一般的では成り立ちません。

7.2  スレッドの作成と終了

この章で説明される関数は Thread モジュールに定義されています。

システムコール create f v は関数適用 f v を実行する新しいスレッドを作成し、スレッド ID を返します。呼び出し側はこの ID を使って新しく作られたスレッドを制御することができます。

val create : ('a -> 'b) -> 'a -> t

新しいスレッドで実行される f v という関数適用はプログラム内の他のスレッドと並行に実行されます。関数適用が値を返すとスレッドは終了し、返り値は無視されます。スレッドが捕捉されないエラーによって終了したとしても、そのエラーが他のスレッドに及ぶことはありません。そのエラーに関するメッセージが標準エラー出力に出力されるか、エラーは完全に無視されます。他のスレッドは独立して実行されるので、このエラーを受け取ることはできません。

スレッドはこの他にも Thread モジュールのシステムコール exit を呼ぶことによってでも終了できます。 Pervasives.exit と混同しないようにしてください。これはプログラム全体、つまり全てのスレッドを終了します。

val exit : unit -> unit

プログラムの最初のスレッドは終了するときに暗黙のうちに Pervasives.exit を呼びます。

最初のスレッドが終了する前にそれ以外のスレッドが終了すると、そのスレッドは OCamlのランタイムライブラリによってすぐに開放されます。fork で作成された Unix プロセスのようにゾンビになることはありません。

システムコール self は呼び出したスレッドのスレッド ID を返します。

val self : unit -> t

前の章で作成した “フォーク” と “ダブルフォーク” を使った並行サーバを、子プロセスではなくスレッドを使うように書き換えるための準備が整いました。そのようなサーバを作るためには、6.7 節で定義した Misc.fork_treatment に似た Misc.co_treatment を作成します。

let co_treatment server_sock service (client_descr, _ as client) = try ignore (Thread.create service client) with exn -> close client_descr; raise exn;;

スレッドの作成に成功した場合、サービスと client_descr を閉じる処理は service 関数によって処理されます。スレッドの作成に失敗した場合、ディスクリプタ client_descr を閉じてクライアントを放棄し、エラーに対する処理をメインプログラムに任せます。

スレッドを使ったサーバでは難しい部分が service 関数に隠れていることに注意してください。この関数は接続を切断するまでロバストに処理しなければいけません。プロセスを使った並行サーバの例ではサービスが他のプロセスによって実行されており、エラーが起こってサービスが途中で終了したとしても望みどおりの処理 — 接続の処理 — が行われます。これはプロセスが終了するとシステムがファイルディスクリプタを閉じるためです。しかしサービスがスレッドによって実行されている場合、デフォルトの設定ではディスクリプタはスレッド間で共有され、スレッドの終了時に閉じられることがありません。そのためスレッドは終了する前にディスクリプタを閉じる必要があります。加えて、サービスは致命的なエラーのときに Pervasives.exit を呼ぶこともできません。なぜならこの呼び出しはサービスだけではなくサーバ全体を終了させるからです。致命的なエラーのときに Thread.exit を使っても解決にならないことが多いです。なぜならスレッドの持つ開いたままのリソース、つまり接続が閉じられていない可能性があるからです。

一つの解決法は終了処理を実行するコードがハンドラとして登録されている例外 (例えば Exit 例外) を使うことです。また同様の理由でサービスの実行中に sigpipe シグナルをブロックし、シグナルに対する動作を動作をスレッドの終了から EPIPE 例外に変えておくことが重要です。

7.3  スレッドの終了待ち

この節で説明される関数は Thread モジュールに定義されています。

システムコール join を使うとあるスレッドから他のスレッドの終了を待つことができます。

val join : t -> unit

この関数を呼んだスレッドの実行は指定された ID のスレッドの実行が終了するまで中断されます。主スレッドがこの関数を使えば終了する前に他の全てのスレッドが終了するのを待つことができます(この関数を使わないと主スレッドが終了した時点で他の全てのスレッドも終了します)。

join はブロックするので “遅い” システムコールですが、この関数はシグナルを受け取ったときに自動的に再開します。正確にはシグナルによって実行が中断され、ハンドラが呼び出されてから、もう一度関数が呼ばれます。そのためこの関数は指定したスレッドが本当に終了するまで返らず、EINTR 例外を出すことはありません。OCamlプログラマの観点からは、join はシグナルは関数から返った瞬間に実行されるように振る舞います。

スレッドは非同期に実行されるので、値を返すことがありません。しかしその動作は — 幸運にも! — 副作用によって観測することができます。例えばスレッドは計算の結果を参照値に保存し、他のスレッドは計算が終わったことを確認してからその値を読み込むことができます。次の例がこのことを説明します。

exception Exited type 'a result = Value of 'a | Exception of exn let eval f x = try Value (f x) with z -> Exception z let coexec (f : 'a -> 'b) (x : 'a) : unit -> 'b = let result = ref (Exception Exited) in let p = Thread.create (fun x -> result := eval f x) x in function () -> match (join p; !result) with | Value v -> v | Exception exn -> raise exn;; let v1 = coexec succ 4 and v2 = coexec succ 5 in v1 () + v2 ();;

システムは他のスレッドに制御を移すため、あるいはスレッドが他のスレッドやプロセスが利用中のリソース (例えばロックや条件変数、ファイルディスクリプタ) を待つために、スレッドを停止することができます。yeild 関数を使うとシステムによるプリエンプションを待つことなく明示的に実行を停止することができます。

val yield : unit -> unit

この関数はスレッドスケジューラに対してヒントを出しますが、なんの効果も持たない場合もあります。例えば yield が呼ばれた時点で他に実行できるスレッドがない場合は、システムは同じスレッドの実行を続けます。

また他のスレッドが実行されるためには yeild を呼ばなくてはいけないわけではありません。システムは yield コマンドを任意のタイミングで実行する権限を持っているからです。実際にはシステムはこの権限を頻繁に使って他のスレッドを実行し、プロセスが一つしか無いマシンでもスレッドが並列に実行されているように見せています。

3.3 節の例をプロセスではなくスレッドを使って書き直します。

1 let rec psearch k cond v = 2 let n = Array.length v in 3 let slice i = Array.sub v (i * k) (min k (n - i * k)) in 4 let slices = Array.init (n/k) slice in 5 let found = ref false in 6 let pcond v = if !found then (Thread.exit (); cond v) else cond v in 7 let search v = if simple_search pcond v then found := true in 8 let proc_list = Array.map (Thread.create search) slices in 9 Array.iter Thread.join proc_list; 10 !found;;

psearch k f vk このスレッドを使って配列の要素のうち f を満たすものを並列に探索します。pcond は探索がすでに成功している場合にスレッドを終了させることで探索を打ち切ります。全てのスレッドは参照 found を共有し、アクセスは並列に起こります。この参照への書き込みは必ず値を true にするので、クリティカルセクションは存在しません。探索対象が見つからなかったときに全体の探索結果をセットしてはいけません。例えば 7 行目を

let search v = found := !found && simple_search pcond v

あるいは:

let search v = let r = simple_search pcond v in found := !found && r

に入れ替えると正しく動かなくなります。

* * *

要素の比較が (デイスクアクセスやネットワークへの接続で) 一時的なブロックを引き起こすならば、並列探索はプロセスが一つしか無いマシンでも意味があります。この場合リソースを待っているスレッドが他のスレッドに実行を譲ることで配列の他の部分の計算を続けることができ、ブロックされたスレッドの実行はリソースが利用可能になってから再開されます。

特定の要素へのアクセスがとても大きなレイテンシを持つという場合があり、とくにネットワーク越しにデータを取得しなければいけない場合にはレイテンシが他の要素に比べて数百倍となります。このようなケースではシーケンシャルな探索と並列な探索の差が明確になります。

練習問題 18

配列に対するクイックソートを並列化してください。 解答

* * *

これから紹介するスレッドを停止させる別の方法は、オペレーティングシステムのリソースに関係しています。スレッドは delay s を呼ぶことで任意の時間だけ実行を停止することができます。実行は s 秒後に再開されます。

val delay : float -> unit

この関数は vm-レベルのスレッドでもポータブルに使うことができますが、delay signore (Unix.select [] [] [] s) の省略形に過ぎません。join と違ってこの関数はシグナルで中断されても自動的に再開しません。

スレッドを外部の操作と同期させるために select を使うことができます。この関数がブロックするのはプログラム全体ではなくこの関数を読んだスレッドだけであることに注意してください。シミュレートされたスレッドが Unix モジュールの select を呼ぶとプログラム全体すなわち全てのスレッドをブロックしてしまうために、Thread モジュールはこの関数を再定義しています。ネイティブスレッドを利用するプログラムでは Unix.selectThread.select の動作に違いはありませんが、この理由からネイティブスレッドを利用するプログラムでも Unix モジュールではなく Thread モジュールの select 関数を使うことが必要になります。

5.2 節のエラトステネスのふるいの例を Unix プロセスではなくスレッドを使って動かすためには、 filter 関数の 516 行目を

let p = Thread.create filter (in_channel_of_descr fd_in) in let output = out_channel_of_descr fd_out in try while true do let n = input_int input in if List.exists (fun m -> n mod m = 0) first_primes then () else output_int output n done; with End_of_file -> close_out output; Thread.join p

に変え、さらに sieve 関数の 410 行目を

let k = Thread.create filter (in_channel_of_descr fd_in) in let output = out_channel_of_descr fd_out in generate len output; close_out output; Thread.join k;;

に変えれば十分です。ただし、計算時間に比べてプロセスをあまり使わないこの例では実行速度の高速化は望めません。

* * *

7.4  スレッドの同期: ロック

この節で説明される関数は Mutex (Mutual exclusion) モジュールに定義されています。

前の節で変更可能なリソースに対する並行アクセスの問題に触れました。次のシナリオは共有リソースへのアクセスの問題を説明します。二つのスレッド pq がカウンタ c の値を進めようとしているとします。

6にあるシナリオを考えます。スレッド p がカウンタ c の値を読み、実行が q へ移ります。この次に qc の値を読み、値 k+1 を c に書き込みます。次に実行は p へと戻りますが、このスレッドは k+1 を c に書き込みます。この結果、最小的な c の値は k+2 ではなく k+1 となってしまいます。

Figure 6 — 共有リソースへのアクセスの競合

この古典的な問題はロックを使って pq が交互に実行されることをなくすことで解決できます。

ロックとは共有オブジェクトであり、プログラムの各時点でロックを所有できるのは多くとも一つのスレッドだけです。ロックは create 関数で作成します。

val create : unit -> t

この関数はどのスレッドにも所有されていない新しいロックを返します。作成したロックを取得するには、そのロックに対してシステムコール lock を呼ぶ必要があります。目的のロックが他のスレッドによって所有されていた場合、 lock を読んだスレッドはロックが開放されるまでブロックします。ロックを所有しているスレッドはシステムコール unlock によってロックを開放しなければいけません。

val lock : t -> unit val unlock : t -> unit

シグナルへの動作について、lockThread.join に似ています。もし lock の実行中にシグナルを受け取った場合、そのシグナルは記録されOCamlランタイムがシグナルの到着を通知しますが、関数を呼び出したスレッドは lock がロックが取得できるまでブロックされ、lock から ENITR 例外が出ることはありません。OCamlによるシグナルの処理は lock が返るタイミングで行われます。

システムコール trylock を使うとブロックせずにロックの取得を試みることができます。

val try_lock : t -> bool

この関数はロックが取得できた場合に true を返し、それ以外のときには false を返します。後者の場合、ロックは取得されないのでスレッドの実行が停止することはありません。そのためスレッドは他の処理を行ってからもう一度ロックの取得を試みることができます。

複数のスレッドによって利用されるグローバルなカウンタを進める処理には同期の問題があります。カウンタの値を読んでから新しい値を書き込むまでの間はクリティカルセクション、つまり二つ以上のスレッドが同時に実行してはいけない区間になります。ロックを使えば同期を簡単に管理できます。

type counter = { lock : Mutex.t; mutable counter : int } let newcounter () = { lock = Mutex.create (); counter = 0 } let addtocounter c k = Mutex.lock c.lock; c.counter <- c.counter + k; Mutex.unlock c.lock;;

カウンタの値を読むだけであれば同期の問題は起こりません。カウンタの変更と同時に読み込みが起こることはありえます。しかしその場合書き込みの直前または直後の値を読み込むだけであり、いずれにせよ矛盾が起きることはありません。

* * *

ある関数を実行する間ロックを保持するというパターンがよく必要になります。このパターンでは関数が成功するか失敗するかにかかわらず最後にロックを開放することが当然必要になります。この処理を次のライブラリ関数にまとめます:

let run_with_lock l f x = Mutex.lock l; try_finalize f x Mutex.unlock l

一つ前の例は次のように書けます:

let addtocounter c = Misc.run_with_lock c.lock (fun k -> c.counter <- c.counter + k)

スレッドを使ったサーバのモデルの代わりに、リクエストを並列に処理するスレッドをいくつか事前に作っておくことができます。

val tcp_farm_server : int -> (file_descr -> file_descr * sockaddr -> 'a) -> sockaddr -> unit

tcp_farm_server 関数は tcp_server と同じように動作しますが、開始するスレッドの数を表す引数を追加で取ります。スレッドは同じアドレスに対するサーバになります。スレッドプールを使う利点はスレッドを一括して事前に作っておくことで接続を処理するときにスレッドを作る処理の分だけ時間を節約できる点です。

let tcp_farm_server n treat_connection addr = let server_sock = Misc.install_tcp_server_socket addr in let mutex = Mutex.create () in let rec serve () = let client = Misc.run_with_lock mutex (Misc.restart_on_EINTR accept) server_sock in treat_connection server_sock client; serve () in for i = 1 to n-1 do ignore (Thread.create serve ()) done; serve ();;

唯一注意するべき点は accept 関数を排他的に実行することです。これによって接続を受け付ける関数が常に多くとも一つとなります。ここでは treat_connection 関数がシーケンシャルな処理を行うことを想定していますが、これが必須なわけではありません。スレッドプールとスレッドの作成を同時に行って使用率に応じてスレッドの数を調整することもできます。

* * *

ロックの取得がブロックせずに成功した場合、処理はすぐに終了します。ロックの取得は全ての現代的なプロセッサで提供される “test-and-set” 命令 (とキャシュの更新などのコストの小さないくつかの処理) で一般的に実装されているためです。一方ロックが利用可能でない場合は、スレッドは実行を停止しもう一度スケジュールされるまで待たなくてはならず、大量のコストがかさみます。そのためこのコストを取るのは他のスレッドに実行を受け渡すために本当に実行を中断するときだけにするべきであり、ロックを取得しようとした結果実行がブロックされたというのはできる限り避けるべきです。したがってほとんど常にロックはなるべく早く開放して必要になったらもう一度取得するようにするべきで、ロックを保持したままでいることは避けるべきです。ロックの開放を行わないとクリティカルセクションが大きくなり、他のスレッドのロックの取得が競合し、実行を停止しなければならなくなる頻度が上がります。

ロックによってスレッドが交互に実行されなくなりますが、そのかわりデッドロックの危険性があります。例えばスレッド p がロック v の開放を待っていて、v を持っているスレッド qp が持っているロック u を待っている場合です (最悪な場合には、スレッドが自分が持っているロックを待つこともあります) 。並行プログラミングは難しく、デッドロックの排除は常に簡単なわけではありません。デッドロックを避ける単純で実行可能なことが多い方法は、ロックに階層を定義してロックの取得が階層をたどるようにするというものです。スレッドがあるロックを取得できるのはそのロックよりも下のロックを全て取得しているときだけです。

7.5  完全な例: http リレー

6.14 節で作成した httpリレーを改変し、リクエストをスレッドを使って処理するようにします。

直感的には、プロセスのクローンを作成する establish_server 関数をスレッドを作るようにすればすみます。しかし、いくつか注意点があります。複数のスレッドを使用する上での困難はメモリ空間を全て共有することです。そのためあるスレッド行ったことを他のスレッドが打ち消してしまうような、“互いのつま先を踏み合う” ことのないようにしなければなりません。これは二つのスレッドが変更可能な同じ構造体を並列に変更するときによく起こります。

httpサーバにはいくつかの変更が必要になります。まずリソースへのアクセスの問題から始めます。 6.14 節で説明した proxy_service 関数は接続への対応を処理します。parse_hostparse_url そして parse_request を途中で使って、proxy_serviceStr ライブラリの regexp_match 関数を呼びます。しかし、このライブラリは再入可能ではありません (最後の探索の結果がグローバル変数にか格納されます)。一見害の無いように見える関数呼び出しも内部にデータの衝突を隠しているかもしれないので注意しなければいけないことをこの例は示しています。今回は Str ライブラリを書き直すのではなく単純にその利用をシーケンシャルにします。ライブラリ関数の呼び出しをロックで防御するだけで十分です (他の選択肢があるわけでもありません)。例外によって関数が異常終了した場合にもロックを開放するように注意する必要があります。

現在のコードへの変更を最小限にするために、Url モジュールのregexp_match 関数の名前を unsafe_regexp_match に変更し、新たに regexp_match 関数を防御された unsafe_regexp_match として作成します。

let strlock = Mutex.create ();; let regexp_match r string = Misc.run_with_lock strlock (unsafe_regexp_match r) string;;

変更は最小限だと言えるでしょう。regexp_match 関数が正規表現に対するマッチングとマッチしてグループの取得を両方を含むことを指摘しておきます。Str.string_matchStr.matched_group を個別にロックで防御するのは完全な間違いです。

もう一つの解法は Str ライブラリを使わずに解析関数を書き直すことです。しかしライブラリ関数を同期して実行するのは簡単であり、実行を遅くしないことが実行すれば分かるので、この方法を取るべき理由は何もありません。ただ Str が最初から再入可能であればそのほうが良かったでしょう。

他の関数はすでに再入可能です。例えば Misc.retransmit 関数は呼び出しごとに違うバッファを確保します。

これ以外にエラー処理に関連して気をつけなければいけないことがあります。前に書いたように、スレッドによる接続の処理はロバストでなければいけません。特にエラーが起こった場合に他のスレッドが影響を受けてはいけません。つまりそのスレッドは受け持った接続を適切に閉じて他の待っている接続を受け付ける状態に戻るという “通常” 終了をする必要があります。

このためにまず handle_error 関数中の exit の呼び出しを Thread.exit に変更します。これは exit がプロセス全体を終了させるためです。しかしただ Thread.exit を読ぶだけでは正しくなりません。なぜならプロセスの終了と違ってスレッドの終了は (共有された) ディスクリプタを閉じないからです。接続処理中にエラーが起きても接続は開いたままなので、 Exit 例外を出して最終処理を行うようにする必要があります。そして treat_connection 関数は EPIIE だけでなく Exit を含んだ全ての例外を補足しなければいけません。例外を捕捉するように関数を呼び出す関数を使ってこれを行います:

let allow_connection_errors f s = try f s with Exit | Unix_error(EPIPE,_,_) -> ()
let treat_connection s = Misc.co_treatment s (allow_connection_errors proxy_service) in
練習問題 19

http/1.1 プロトコルのプロキシをスレッドを使って書き直してください。

* * *
練習問題 20

コルーチンは別のスレッドへの実行の受け渡しを常に明示的に行う必要がある非常に特殊なスレッドと見ることができます。スレッドを使ってコルーチンを実装してください。

* * *

7.6  条件変数

この節で説明される関数は Condition モジュールに定義されています。

ロックを使った同期はとてもシンプルですが、機能が十分ではありません。ロックを使うと共有データが開放されるのを待つことができますが、データがある状態になるのを待つことはできません。

カウンタの例をスレッド感で共有される (先入れ先出しの) キューを使って書き直してみましょう。キューに要素を追加することはこれまで説明してきたロックによって行えます。これはキューの状態にかかわらず要素を追加することは常に可能なためです。

しかしキューから要素を削除する場合はどうでしょうか? キューが空のときは何をすればいいでしょうか? キューが空の場合にロックを持ちながら新しい要素を待つことはできません。なぜならキューに要素を追加するにはロックが必要なために、ロックを持っている限りキューに新しい要素が足されることはないからです。だとすれば、キューが空でないことをどうやって知ることができるでしょうか?キューの状態を定期的にチェックする方法は “ビジーウェイト” と呼ばれ、望ましい解決法ではありません。(周期が短ければ) 計算機のサイクルを無駄遣いし、 (周期が長ければ) 反応時間が長くなるからです。

条件変数 がこの問題を解決します。ロックを持つスレッドは条件変数に対して別のスレッドがシグナルを送るまで待つことができます。ロックと同じように、条件変数は同期関数によって生成される中身の見えないオブジェクトです。create 関数で条件変数を作ることができます。

val create : unit -> t

システムコール wait を使うと、ロック vすでに持つ プロセス p は条件変数 c とロック vに対して待つことができます。この呼び出しによって プロセス p はシステムに条件変数 c とロック v に対して待っていると伝え、ロック v を開放して実行を中断します。他のスレッド q が条件変数 c の変更をシグナルし、かつロック v が利用可能になると v を持った状態で実行が再開されます。

val wait : t -> Mutex.t -> unit

ロック v を取得していない状態で wait c v を呼ぶとエラーとなります。またシグナルに対する wait c v の動作は Mutex.lock と同じです。

スレッドが条件変数が変更されたシグナルを送ると、その条件変数に対して待っている全てのスレッドを起動する (システムコール broadcast) か、それらのうち一つを起動する (システムコール signal) かのどちらかが行われます。

val signal : t -> unit val broadcast : t -> unit

条件変数のシグナルの送信またはブロードキャストは (wait と違って) ロックを必要としません。そのためシステムのエラーは起きませんが、それでもプログラミング上のエラーは起きます。

スレッドを一つだけ起動するのか全て起動するかは問題によります。キューの例をもう一度考えると、スレッドが空のキューに要素を入れたときに待っているスレッドを全て起動する必要はありません。キューから要素を取り出して処理を行えるのは一つのスレッドだけだからです。一方、もしスレッドが複数の要素をキューに入れ、その要素の数が静的にわからない場合には待っている全てのスレッドを起動する必要があります。

空でないキューに要素を追加するときにシグナルを出さない場合には、空のキューへの要素の追加は常にブロードキャストを行う必要があることに注意してください。空のキューへの要素の追加されてからシグナルが発せられるまでに他の要素が追加されることがあり、このときは要素が複数追加されたとして対処しなければならないためです。

通常スレッドは他のスレッドが条件変数に対して待っている理由を近似的にしか知りません。そのため他のスレッドが条件変数を待っているかもしれない状況では常にシグナルすることが求められます。また起動されたスレッドは期待した状況が整っていると仮定してはいけません。一般的には、起動されたスレッドは共有状態のデータを読み取って所望の条件が成り立っていることを確認し、必要ならばもう一度待つ必要があります。この処理は他のスレッドがシグナルしたときにだけ起こるのでビジーウェイトとは違います。

このアプローチを正当化する理由がもう一つあります。スレッドが大量のリソースを生み出して broadcast を使って全てのスレッドを起動したときに、最初に実行されるスレッドは貪欲に全てのリソースを消費することが可能な点です。このとき二番目以降に起動されるスレッドは次回はもっとラッキーになるように願いながらもう一度実行を中断することになります。

今までの説明でキューに関する完全な回答の準備が整いました。Queue に定義されているキューの構造がロックと条件変数 non_empty で拡張されます。

1 type 'a t = 2 { queue : 'a Queue.t; lock : Mutex.t; non_empty : Condition.t } 3 let create () = 4 { queue = Queue.create (); 5 lock = Mutex.create (); non_empty = Condition.create () } 6 7 let add e q = 8 Mutex.lock q.lock; 9 if Queue.length q.queue = 0 then Condition.broadcast q.non_empty; 10 Queue.add e q.queue; 11 Mutex.unlock q.lock;; 12 13 let take q = 14 Mutex.lock q.lock; 15 while Queue.length q.queue = 0 16 do Condition.wait q.non_empty q.lock done; 17 let x = Queue.take q.queue in 18 Mutex.unlock q.lock; x;;

要素の追加ではスレッドはブロックしませんが、追加の前にキューが空だった場合に条件変数 non_empty をシグナルすることを忘れてはいけません。この条件変数に対して待っているスレッドが存在する可能性があるからです。

要素の削除はもう少し複雑です。スレッドはロックを取得してから削除を試みますが、キューが空だった場合には条件変数 non_emptyに対して待ち起動されたあとにもう一度同じことを行います。同じことができるのは起動されたスレッドがロックを持っているからです。

上で説明したように、broadcast q.non_empty によるシグナル (9 行目) はすでにロック q.lock を持っているスレッド p によって実行されます。これは読み込むを行うために take を実行するスレッド q が 15 行目と 16 行目の間の、キューが空であることは確認したがまだ実行を中断していない状態にないことを意味します。もしスレッド q がこの状態になったとすると、p によって送られるシグナルは無視され効果を持ちません。その後 q は実行を中断しますが、 p がすでにシグナルしているために q が起動されることはありません。スレッド p によるロックの取得は q がすでに実行を中断したかキューの状態を調べていないかのどちらかであり、キューの状態を調べたが実行を中断していないという状況にならないことを保証します。

練習問題 21

有界なキューを実装してください。キューへ追加されるのはキューがある長さに達したときにブロックする機能です (並行プログラミングで限りなく生産する生産者と実行がブロックする消費者が存在する場合にこのキューが必要になります)。 解答

* * *

7.7  イベントベースの同期通信

この節で説明される関数は Event モジュールに定義されています。

ロックと条件変数を使えばどんな形の同期も行うことができますが、必ずしも簡単に行えるというわけではありません。最初は単純だったキューでも同期のためのコードが緻密になることは前節の例で見ました。

イベントベースの同期通信は高レベルの通信プリミティブを集めたものであり、並行プログラミングを容易にすることを意図しています。Event モジュールのプリミティブはまず John Reppy によって Standard ML 言語の拡張 Concurrent ML [16] として開発されました。OCamlではこれらの通信プリミティブはロックと条件変数などの基本的な同期よりも上のレベルに位置しています。

通信は イベントチャンネル を通じて送ることで行います。チャンネルとは “軽量パイプ” のようなものです。これを使うと同じプログラム内のスレッド間で通信を行い、生産者と消費者の間で同期を行うことができます。'a 型の値を運ぶチャンネルは 'a channel 型を持ちます。チャンネルは途中で型を変えることなく常に同じ型の値を運びます。チャンネルは new_channel 関数で作成されます。

val new_channel : unit -> 'a channel

メッセージの送受信は直接的にではなくイベントという中間体を通して行われます。基本的なイベントは “メッセージの送信” と “メッセージの受信” であり、以下のプリミティブによって作成できます。

val send : 'a channel -> 'a -> unit event val receive : 'a channel -> 'a event

メッセージの作成してもすぐに何かが起こるわけではありません。行われるべき動作を示すデータ構造が作られるだけです。イベントを実際に起こすには、スレッドはそのイベントを実行すべきスレッドと同期しなければいけません。sync プリミティブを使うと引数として渡すイベントが実際に実行されるのを待つことができます。

val sync : 'a event -> 'a

まとめると、値 v をチャンネル c に送るには sync (send c v) を実行します。sync を読んだスレッドはイベントが起こるまで、つまり他のスレッドがチャンネル cから値を受信可能になるまでブロックされます。対称的に、スレッドがチャンネル c に対するメッセージを待つには sync (recieve c) を使います。

全ての生産者と全ての消費者は競合します。例えばいくつかのスレッドが一つのチャンネルにメッセージを送りメッセージを受信できるスレッドが一つしか無かった場合、自分の送ったイベントが起きる生産者は一つだけなのは明らかです。他の生産者は実行を中断されたままであり、先に他のスレッドがイベントを受け取ったことに気がつくことはありません。

競合は同じスレッド内でも起こりえます。複数のイベントは choose プリミティブを使うとまとめることができます。

val choose : 'a event list -> 'a event

この関数は引数に渡されたイベントを並列にオファーし、これらのイベントのうちちょうど一つが実行されます。ここではオファーと実行を区別していることに注意してください。sync (choose [e1; e2]) は二つのイベント e1e2 の選択をオファーし、同期したときには二つのうち一つのイベントしか実行されません (実行と同時に同時にもう一つのイベントはキャンセルされます)。wrap_abort プリミティブを使うとイベントがキャンセルされたときの処理を設定できます。

val wrap_abort : 'a event -> (unit -> unit) -> 'a event

wrap_abort e fe と同じイベントを作成しますが、同期したときにこのイベントが実行されなかった場合には f が実行されます (このイベントが複雑なイベントの一部であるときにだけ意味を持ちます)。

poll を使うと (Mutex.try_lock のように) ブロックせずにイベントの同期を試みることができます。

val poll : 'a event -> 'a option

poll e はイベント e をオファーしますが、このイベントがすぐに実行されない場合にはブロックせずにオファーをキャンセルします。このとき関数呼び出しはなんの効果も持ちません (より正確には、poll e という式が None で置き換えられたかのような動作をします)。これに対して、もしイベントがすぐに実行できるならば sync e が実行されたときの動作をします。ただしその時の返り値は v ではなく Some v となります。

7.3 節で示したエラトステネスのふるいの例では、異なるスレッド感の通信には元のプログラムと同じパイプが使われ、システムメモリ (パイプ) が中間体でした。プロセスのメモリを使って直接通信を行ったほうが効率的な可能性もあります。単純な解法はパイプを整数が送られるチャンネルで置き換えることです。

整数をチャンネルに送るだけでは十分ではなく、ストリームの終端を検出する必要もあります。最も単純な解法は整数を Some n の形で送り、None を送ることで終了することです。変更を最小にするために、5.2 節の例で使ったコードを再利用します。パイプとその入出力関数をチャンネルとその入出力関数でシミュレートします。

前のバージョンのプログラムを取ってきて入出力関数を Pervasives ライブラリの入出力バッファを使ったものからチャンネルを使ったものに入れ替えればすみます。例えば次のコードを open Unix 命令の後に追加します。

let pipe () = let c = Event.new_channel () in c, c let out_channel_of_descr x = x let in_channel_of_descr x = x let input_int chan = match Event.sync (Event.receive chan) with | Some v -> v | None -> raise End_of_file let output_int chan x = Event.sync (Event.send chan (Some x)) let close_out chan = Event.sync (Event.send chan None);;

しかしチャンネルを使ったバージョンの実行速度を前のバージョンと比べると、チャンネルを使うほうが二倍程度遅いことがわかります。整数を通信するときに毎回二つのスレッドの同期が必要となり、ロックの取得と開放に何回かのシステムコールが必要となるためです。一方で、バッファされた i/oを使うパイプを用いた通信では一回のシステムコールで数千個の整数がまとめて送られます。

公平な比較のためには、チャンネルを使った通信にバッファを用意していくつかの整数をまとめてチャンネルを利用するようにするべきです。子はプライベートなキューに計算結果を他のスレッドと同期することなく蓄積します。キューが満杯になるか明示的な要求が届くと、チャンネルが同期されてキューが空になります。親は自分用のキューを持ち、このキューはチャンネル同期によって満たされ計算が進むに連れて一つづつ消費されます。

このときのコードは以下のようになります:

type 'a buffered = { c : 'a Queue.t Event.channel; mutable q : 'a Queue.t; size : int } let pipe () = let c = Event.new_channel () in c, c;; let size = 1024;; let out_channel_of_descr chan = { c = chan; q = Queue.create (); size = size };; let in_channel_of_descr = out_channel_of_descr;; let input_int chan = if Queue.length chan.q = 0 then begin let q = Event.sync (Event.receive chan.c) in if Queue.length q > 0 then chan.q <- q else raise End_of_file end; Queue.take chan.q;; let flush_out chan = if Queue.length chan.q > 0 then Event.sync (Event.send chan.c chan.q); chan.q <- Queue.create ();; let output_int chan x = if Queue.length chan.q = size then flush_out chan; Queue.add x chan.q let close_out chan = flush_out chan; Event.sync (Event.send chan.c chan.q);;

このバージョンではパイプを使ったバージョンと比較可能な程度の効率を取り戻すことができます。

プロセスとパイプを使ったオリジナルのバージョンと比べると、チャンネルと使った方法には利点が二つあります。一つ目はスレッドの起動はプロセスのものと比べてコストがかからないことです。二つ目はチャンネルを使った通信がデータ全体のコピーではなくポインタの受け渡しのみを行うことです。今回の例ではスレッドの数とやり取りされるデータのサイズがシステムコールと計算のコストに比べて小さいのでこの利点はわかりにくなっています。

まとめると、スレッド間の通信は (プロセスが中断される場合) システムコールと同程度のコストがかかりますが、このコストは通信をバッファして大きな構造体をやり取りすることで大きく削減できるということになります。

* * *
練習問題 22

httpサーバには高く激しい負荷がかかることがあります。応答にかかる時間を減らすために、新しい接続を処理するためのスレッドを常に数十個保持するように httpサーバのアーキテクチャを変更することができます。この変更は各スレッドが一つのリクエストを処理するのではなく、キューから読んだリクエストを際限なく処理することを意味します。

マシンへの負荷を避けるために、スレッドの数をタスクを管理する時間がリクエストに対応する時間 (ディスクのデータ待ちの時間など) よりも小さくなる数に制限する必要があります。さらに接続が増えた場合は接続を処理待ち状態にしておき、それ以上に増えた場合は接続を拒否します。負荷が減りスレッドの数が “理想的な” 数よりも大きくなればスレッドのいくつかを削除します。

7.5 節の例をこのアーキテクチャに変更してください。

* * *

7.8  実装の詳細

Unix におけるスレッドの実装

もともと Unix システムはスレッドをサポートするようにデザインされていませんでしたが、ほとんどの現代的な Unix 実装はスレッドをサポートします。ただしスレッドが後から追加されたものであるということが明白になることがあります。例えば exec を直後に実行するのでない限り、スレッドを使っているときに fork を使うことは強く非推奨とされています。fork は現在のスレッドをコピーし、そのスレッドは他のスレッドが存在すると信じて実行を続けますが、実際には他のスレッドは存在せず、正しく動作することができません。

fork を実行した親は通常通り実行を続けます。また fork の直後に子プロセスが他のプログラムを起動する特殊ケースについては親プロセスに問題は起こりません。この方法で問題が起きないことはラッキーです。なぜならこれが他のプログラムを起動する唯一の方法だからです。

順番を逆にして、fork の後に親または子プロセスがスレッドを作成することは問題ありません。

OCaml におけるネイティブおよびシミュレート実装

実行されているオペレーティングシステムがスレッドの機能を持つ場合には、OCamlはスレッドのネイティブ実装を使います。このとき OCamlはオペレーティングシステムにスレッドの管理をできるだけ任せます。スレッドは同じアドレス空間を共有する異なる Unix プロセスとなります。

システムがスレッドをサポートしない場合には、OCamlはスレッドをエミュレートすることができます。全てのスレッドは同じ Unix プロセスで実行されそのスケジューリングを含んだ管理はOCamlのランタイムシステムによって行われます。ただし、この実装はバイトコードにコンパイルするときにのみ利用可能です。

OCamlシステムはネイティブとシミュレートされたバージョンのスレッドに同じインターフェースを提供しています。そのためスレッドの実装は二つに分かれています。一つはエミュレートされたバージョンのためのもので、タスクコントローラを含みます。もう一つは posix (1003.1c) に基づくスレッドの実装で、他の言語で書かれたライブラリ関数を OCamlへとつなぐものです。OCamlは単純な管理タスクを行いインターフェースがエミュレートされたバージョンと同じことを確認します。これによってある Unix アーキテクチャでコンパイルできたプログラムがほかの Unix アーキテクチャでもコンパイルできることが保証されます。

ただしネイティブのスレッドとエミュレートされたスレッドでは同期のための C ライブラリへの呼び出しが異なるために、プログラムの意味全体が異なってしまいます。そのためプログラムが二つのバージョンで同じように動くと信じる前にいくつか対処をしなくてはいけません。この節では主に二つの実装の違いについて議論しますが、通常はネイティブ実装について語ることを覚えておいてください。

エミュレートされたスレッドを使うには ocamlc コンパイラに -thread ではなく -vmthread オプションを渡します。このオプションを ocamlopt コンパイラに渡すことはできません。

OCaml コードの逐次化

OCamlによるスレッドの実装は自動メモリ管理やアロケートされたデータの頻繁な利用といった OCamlの特色を持っている必要があります。採用されている解法は一番シンプルで最も効率的なもので、OCamlの全てのスレッドの実行を逐次化するというものです。つまりランタイムシステムが持つロックによって二つ以上の OCamlコードを同時に実行できなくします。これはスレッドという考え方と矛盾するように聞こえますが、そうではありません。ロックはブロックするシステムコールの直前に開放され、システムコールから返るともう一度取得されるからです。この間に他のスレッドが実行されます。このようなシステムコールの特殊なケースは実行中のスレッドで定期的に実行され、他のスレッドに実行を受け渡す sched_yield の呼び出しです。

マルチプロセッサのマシンでは、本当の並列性は C のコードとシステムコールを実行することでしか得ることができません。プロセッサが一つしか無いマシンでは、 OCamlコードが逐次化されたということに気づくことはできません。

スレッドの実行はほぼ任意のタイミングで他のスレッドに移るので、プログラマはこの逐次化を頼ることはできません。ただし例外として、逐次化されたコードではメモリの一貫性が保証されます。つまり、C のコードを実行するときなどを除いて二つのスレッドは同じメモリを目にするということです。その結果として、ロックの受け渡しがメモリの同期を意味するようになります。あるスレッドが書き込んだアドレスを別のスレッドが読み込んだ場合常に新しい値が読み込まれ、同期処理は必要ありません。

スレッドとシグナル

一般的に言って、非同期という特性を持つシグナルを使うことはシングルスレッドのプログラムでさえ注意を要する難しいものでした。シグナルはマルチスレッドのプログラムではさらに難しくなります。どのスレッドにシグナルが送られるべきでしょうか? 全てのスレッド、主スレッド、それとも現在実行しているスレッド? あるスレッドが別のスレッドにシグナルを送りたい場合どうすればいいでしょうか? 実際のところ、スレッドはこのような問題に答えることなく実装されました。そのため実装によってシグナルへの動作は異なります。

Thread.joinMutex.lock そして Condition.wait は遅いシステムコールにもかかわらずシグナルによって中断されることがありません (したがって EINTR エラーを出して失敗することもありません)。待っているときにシグナルが送られた場合、そのシグナルはシステムコールから返ってから受け取られて処理されます。

posix 規格はシグナルハンドラがスレッド間で共有されることとシグナルマスクはスレッドの作成時にもとのスレッドから継承された後はスレッドにプライベートとなることを定めています。しかしシグナルに対するスレッドの動作は主に定められておらず、ポータブルにはなっていません。

したがってスレッドではできる限り非同期シグナル (sigalrmsigvtalrmsigchld など) を避けることが望ましいです。シグナルは Thread.wait_signal によってブロックと調査をすることができます。シグナルの処理だけを行うスレッドを作ることもできます。このスレッドはシグナルを待ち、適切な処理を行い、他のスレッドにシグナルの情報を伝えます。

加えて、(バージョン 3.08 以降の) OCamlのスレッドはスレッドのプリエンプションに内部で sigvtalarm を使います。そのためこのシグナルを使うとシステムと干渉する可能性があるために、sigvtalarm シグナルはプログラムの中で使うことができません。


Previous Up Next