如何在 OCaml 中创建大量线程?

How to create a big number of threads in OCaml?

我在 Racket 群里找到了 topic 关于 channel 创建性能的

我想写一个OCaml的版本来测试。

let post (c,x) = Event.sync (Event.send c x);;

let accept c = Event.sync (Event.receive c);;

let get_chan c = let n = accept c in print_int n;print_newline ();;

let chan_trans (old_chan, new_chan) =
  let s = accept old_chan in
  post (new_chan,(s+1));;

let rec whisper count init_val =
  let rec aux n chan =
    if n >= count then chan
    else
      let new_chan = Event.new_channel ()
      in Thread.create chan_trans (chan, new_chan);
      aux (n+1) new_chan
  in let leftest_chan = Event.new_channel ()
  in let t0 = Thread.create post (leftest_chan, init_val)
  in let rightest_chan = aux 0 leftest_chan
  in get_chan rightest_chan;;

whisper 10000 1;;

问题是,当我测试 whisper 1000 1 时,它按预期生成了 1001。但是,当我尝试测试 whisper 10000 1 时,出现了错误

Fatal error: exception Sys_error("Thread.create: Resource temporarily unavailable")

我用这个命令编译和运行

ocamlc -thread unix.cma threads.cma -o prog whisper.ml&&./prog -I +threads

OCaml 线程模块使用真正的系统(内核)线程。线程总数受内核限制:

 cat /proc/sys/kernel/threads-max
 251422

你当然可以增加这个,

 echo 100000 > /proc/sys/kernel/threads-max

但更好的方法是将线程视为一种资源并相应地管理它们。

let rec whisper count init_val =
  let rec aux n t chan =
    if n >= count then chan
    else
      let new_chan = Event.new_channel () in
      let t' = Thread.create chan_trans (chan, new_chan) in
      Thread.join t;
      aux (n+1) t' new_chan in
  let leftest_chan = Event.new_channel () in
  let t = Thread.create post (leftest_chan, init_val) in
  let rightest_chan = aux 0 t leftest_chan in
  get_chan rightest_chan

在这种情况下,它将 运行 与任何大小的管道。例如:

$ ocamlbuild -use-ocamlfind -tag thread -pkg threads ev.native
$ time ./ev.native 
100001

real    0m1.581s

但是 Chinese Whispers 的这种实现非常粗糙且效率低下。你不应该为此使用重量级的本机线程(而且 go 也不使用它们)。相反,您应该使用 Lwt or Async 库中的协作轻量级线程。这将是非常有效和美好的。

使用 Lwt 实现

此实现与 blog post 中的 Go 实现非常相似,但我认为我们可以在不使用邮箱的情况下在 OCaml 中更高效、更简洁地完成此操作(但我不确定它是否符合基准规则)。

open Lwt.Infix

let whispers n =
  let rec whisper i p =
    if i < n then
      Lwt_mvar.take p >>= fun x ->
      whisper (i+1) (Lwt_mvar.create (x+1))
    else Lwt_mvar.take p in
  whisper 0 (Lwt_mvar.create 1)

let () = print_int @@ Lwt_main.run (whispers 100000)

结果是:

$ ocamlbuild -use-ocamlfind -tag thread -pkg lwt.unix lev.native --
$ time ./lev.native 
100001
real    0m0.007s

与矿机上的 Go 实现比较:

$ go build whispers.go 
$ time ./whispers 
100001

real    0m0.952s

"Slow"实施

上面的代码是对原始 Go 版本的完全诚实的重新实现。但它如此之快的原因之一是 OCaml 和 Lwt 非常聪明,尽管它创建了 100_000 线程和 100_001 通道,但没有线程会屈服于后台,因为每次whisper 被称为通道已经包含数据,因此线程处于就绪状态。结果,这只是一个创建线程和通道的高效循环。它可以在 50 毫秒内创建一百万个线程。

所以这是一种惯用且正确的做事方式。但是为了真正的比较,让我们模仿 Go 的行为。下面的实现会先在堆中创建100_001个通道,然后100_000个线程,等待从左通道向右通道传输数据。并且只有在之后它才会将一个值放入最左边的通道以引发连锁反应。这基本上会模仿 Go 引擎盖下发生的事情。

let whispers n =
  let rec loop i p =
    if i < n then
      let p' = Lwt_mvar.create_empty () in
      let _t =
        Lwt_mvar.take p >>= fun x ->
        Lwt_mvar.put p' (x+1) in
      loop (i+1) p'
    else Lwt_mvar.take p in
  let p0 = Lwt_mvar.create_empty () in
  let t = loop 1 p0 in
  Lwt_mvar.put p0 1 >>= fun () -> t

$ time ./lev.native
100001
real    0m0.111s

所以它稍微慢了一点,实际上它比以前的实现慢了20倍(我用100万个线程来比较它们),但它仍然比Go快10倍。

阅读链接 post 看来您可能想使用 "cooperative threads library for OCaml" 的 lwt。结果看起来像这样:

let whisper left right =
  let%lwt n = Lwt_mvar.take right in
  Lwt_mvar.put left (n+1)

let main () =
  let n = 100_000 in
  let%lwt () = Lwt_io.printf "With %d mvars!\n" n in
  let leftmost = Lwt_mvar.create_empty () in
  let rec setup_whispers left i =
    if  i >= n
    then left
    else let right = Lwt_mvar.create_empty () in
      let () = Lwt.async (fun () -> whisper left right) in
      setup_whispers right (i+1) in
  let rightmost = setup_whispers leftmost 0 in
  let%lwt () = Lwt_mvar.put rightmost 1 in
  let%lwt res = Lwt_mvar.take leftmost in
  Lwt_io.printf "%d\n" res

let () = Lwt_main.run (main ())

然后编译运行它

$ ocamlbuild -use-ocamlfind -pkg lwt,lwt.ppx,lwt.unix whisper.native
$ time ./whisper.native
With 100000 mvars!
100001

real    0m0.169s
user    0m0.156s
sys 0m0.008s