如何在 OCaml 中创建大量线程?
How to create a big number of threads in OCaml?
我在 Racket 群里找到了 topic 关于 channel
创建性能的
我想写一个OCaml的版本来测试。
let post (c,x) = Event.sync (Event.send c x);;
let accept c = Event.sync (Event.receive c);;
let get_chan c = let n = accept c in print_int n;print_newline ();;
let chan_trans (old_chan, new_chan) =
let s = accept old_chan in
post (new_chan,(s+1));;
let rec whisper count init_val =
let rec aux n chan =
if n >= count then chan
else
let new_chan = Event.new_channel ()
in Thread.create chan_trans (chan, new_chan);
aux (n+1) new_chan
in let leftest_chan = Event.new_channel ()
in let t0 = Thread.create post (leftest_chan, init_val)
in let rightest_chan = aux 0 leftest_chan
in get_chan rightest_chan;;
whisper 10000 1;;
问题是,当我测试 whisper 1000 1
时,它按预期生成了 1001
。但是,当我尝试测试 whisper 10000 1
时,出现了错误
Fatal error: exception Sys_error("Thread.create: Resource temporarily unavailable")
我用这个命令编译和运行
ocamlc -thread unix.cma threads.cma -o prog whisper.ml&&./prog -I
+threads
OCaml 线程模块使用真正的系统(内核)线程。线程总数受内核限制:
cat /proc/sys/kernel/threads-max
251422
你当然可以增加这个,
echo 100000 > /proc/sys/kernel/threads-max
但更好的方法是将线程视为一种资源并相应地管理它们。
let rec whisper count init_val =
let rec aux n t chan =
if n >= count then chan
else
let new_chan = Event.new_channel () in
let t' = Thread.create chan_trans (chan, new_chan) in
Thread.join t;
aux (n+1) t' new_chan in
let leftest_chan = Event.new_channel () in
let t = Thread.create post (leftest_chan, init_val) in
let rightest_chan = aux 0 t leftest_chan in
get_chan rightest_chan
在这种情况下,它将 运行 与任何大小的管道。例如:
$ ocamlbuild -use-ocamlfind -tag thread -pkg threads ev.native
$ time ./ev.native
100001
real 0m1.581s
但是 Chinese Whispers 的这种实现非常粗糙且效率低下。你不应该为此使用重量级的本机线程(而且 go 也不使用它们)。相反,您应该使用 Lwt or Async 库中的协作轻量级线程。这将是非常有效和美好的。
使用 Lwt 实现
此实现与 blog post 中的 Go 实现非常相似,但我认为我们可以在不使用邮箱的情况下在 OCaml 中更高效、更简洁地完成此操作(但我不确定它是否符合基准规则)。
open Lwt.Infix
let whispers n =
let rec whisper i p =
if i < n then
Lwt_mvar.take p >>= fun x ->
whisper (i+1) (Lwt_mvar.create (x+1))
else Lwt_mvar.take p in
whisper 0 (Lwt_mvar.create 1)
let () = print_int @@ Lwt_main.run (whispers 100000)
结果是:
$ ocamlbuild -use-ocamlfind -tag thread -pkg lwt.unix lev.native --
$ time ./lev.native
100001
real 0m0.007s
与矿机上的 Go 实现比较:
$ go build whispers.go
$ time ./whispers
100001
real 0m0.952s
"Slow"实施
上面的代码是对原始 Go 版本的完全诚实的重新实现。但它如此之快的原因之一是 OCaml 和 Lwt 非常聪明,尽管它创建了 100_000
线程和 100_001
通道,但没有线程会屈服于后台,因为每次whisper
被称为通道已经包含数据,因此线程处于就绪状态。结果,这只是一个创建线程和通道的高效循环。它可以在 50 毫秒内创建一百万个线程。
所以这是一种惯用且正确的做事方式。但是为了真正的比较,让我们模仿 Go 的行为。下面的实现会先在堆中创建100_001个通道,然后100_000个线程,等待从左通道向右通道传输数据。并且只有在之后它才会将一个值放入最左边的通道以引发连锁反应。这基本上会模仿 Go 引擎盖下发生的事情。
let whispers n =
let rec loop i p =
if i < n then
let p' = Lwt_mvar.create_empty () in
let _t =
Lwt_mvar.take p >>= fun x ->
Lwt_mvar.put p' (x+1) in
loop (i+1) p'
else Lwt_mvar.take p in
let p0 = Lwt_mvar.create_empty () in
let t = loop 1 p0 in
Lwt_mvar.put p0 1 >>= fun () -> t
$ time ./lev.native
100001
real 0m0.111s
所以它稍微慢了一点,实际上它比以前的实现慢了20倍(我用100万个线程来比较它们),但它仍然比Go快10倍。
阅读链接 post 看来您可能想使用 "cooperative threads library for OCaml" 的 lwt。结果看起来像这样:
let whisper left right =
let%lwt n = Lwt_mvar.take right in
Lwt_mvar.put left (n+1)
let main () =
let n = 100_000 in
let%lwt () = Lwt_io.printf "With %d mvars!\n" n in
let leftmost = Lwt_mvar.create_empty () in
let rec setup_whispers left i =
if i >= n
then left
else let right = Lwt_mvar.create_empty () in
let () = Lwt.async (fun () -> whisper left right) in
setup_whispers right (i+1) in
let rightmost = setup_whispers leftmost 0 in
let%lwt () = Lwt_mvar.put rightmost 1 in
let%lwt res = Lwt_mvar.take leftmost in
Lwt_io.printf "%d\n" res
let () = Lwt_main.run (main ())
然后编译运行它
$ ocamlbuild -use-ocamlfind -pkg lwt,lwt.ppx,lwt.unix whisper.native
$ time ./whisper.native
With 100000 mvars!
100001
real 0m0.169s
user 0m0.156s
sys 0m0.008s
我在 Racket 群里找到了 topic 关于 channel
创建性能的
我想写一个OCaml的版本来测试。
let post (c,x) = Event.sync (Event.send c x);;
let accept c = Event.sync (Event.receive c);;
let get_chan c = let n = accept c in print_int n;print_newline ();;
let chan_trans (old_chan, new_chan) =
let s = accept old_chan in
post (new_chan,(s+1));;
let rec whisper count init_val =
let rec aux n chan =
if n >= count then chan
else
let new_chan = Event.new_channel ()
in Thread.create chan_trans (chan, new_chan);
aux (n+1) new_chan
in let leftest_chan = Event.new_channel ()
in let t0 = Thread.create post (leftest_chan, init_val)
in let rightest_chan = aux 0 leftest_chan
in get_chan rightest_chan;;
whisper 10000 1;;
问题是,当我测试 whisper 1000 1
时,它按预期生成了 1001
。但是,当我尝试测试 whisper 10000 1
时,出现了错误
Fatal error: exception Sys_error("Thread.create: Resource temporarily unavailable")
我用这个命令编译和运行
ocamlc -thread unix.cma threads.cma -o prog whisper.ml&&./prog -I +threads
OCaml 线程模块使用真正的系统(内核)线程。线程总数受内核限制:
cat /proc/sys/kernel/threads-max
251422
你当然可以增加这个,
echo 100000 > /proc/sys/kernel/threads-max
但更好的方法是将线程视为一种资源并相应地管理它们。
let rec whisper count init_val =
let rec aux n t chan =
if n >= count then chan
else
let new_chan = Event.new_channel () in
let t' = Thread.create chan_trans (chan, new_chan) in
Thread.join t;
aux (n+1) t' new_chan in
let leftest_chan = Event.new_channel () in
let t = Thread.create post (leftest_chan, init_val) in
let rightest_chan = aux 0 t leftest_chan in
get_chan rightest_chan
在这种情况下,它将 运行 与任何大小的管道。例如:
$ ocamlbuild -use-ocamlfind -tag thread -pkg threads ev.native
$ time ./ev.native
100001
real 0m1.581s
但是 Chinese Whispers 的这种实现非常粗糙且效率低下。你不应该为此使用重量级的本机线程(而且 go 也不使用它们)。相反,您应该使用 Lwt or Async 库中的协作轻量级线程。这将是非常有效和美好的。
使用 Lwt 实现
此实现与 blog post 中的 Go 实现非常相似,但我认为我们可以在不使用邮箱的情况下在 OCaml 中更高效、更简洁地完成此操作(但我不确定它是否符合基准规则)。
open Lwt.Infix
let whispers n =
let rec whisper i p =
if i < n then
Lwt_mvar.take p >>= fun x ->
whisper (i+1) (Lwt_mvar.create (x+1))
else Lwt_mvar.take p in
whisper 0 (Lwt_mvar.create 1)
let () = print_int @@ Lwt_main.run (whispers 100000)
结果是:
$ ocamlbuild -use-ocamlfind -tag thread -pkg lwt.unix lev.native --
$ time ./lev.native
100001
real 0m0.007s
与矿机上的 Go 实现比较:
$ go build whispers.go
$ time ./whispers
100001
real 0m0.952s
"Slow"实施
上面的代码是对原始 Go 版本的完全诚实的重新实现。但它如此之快的原因之一是 OCaml 和 Lwt 非常聪明,尽管它创建了 100_000
线程和 100_001
通道,但没有线程会屈服于后台,因为每次whisper
被称为通道已经包含数据,因此线程处于就绪状态。结果,这只是一个创建线程和通道的高效循环。它可以在 50 毫秒内创建一百万个线程。
所以这是一种惯用且正确的做事方式。但是为了真正的比较,让我们模仿 Go 的行为。下面的实现会先在堆中创建100_001个通道,然后100_000个线程,等待从左通道向右通道传输数据。并且只有在之后它才会将一个值放入最左边的通道以引发连锁反应。这基本上会模仿 Go 引擎盖下发生的事情。
let whispers n =
let rec loop i p =
if i < n then
let p' = Lwt_mvar.create_empty () in
let _t =
Lwt_mvar.take p >>= fun x ->
Lwt_mvar.put p' (x+1) in
loop (i+1) p'
else Lwt_mvar.take p in
let p0 = Lwt_mvar.create_empty () in
let t = loop 1 p0 in
Lwt_mvar.put p0 1 >>= fun () -> t
$ time ./lev.native
100001
real 0m0.111s
所以它稍微慢了一点,实际上它比以前的实现慢了20倍(我用100万个线程来比较它们),但它仍然比Go快10倍。
阅读链接 post 看来您可能想使用 "cooperative threads library for OCaml" 的 lwt。结果看起来像这样:
let whisper left right =
let%lwt n = Lwt_mvar.take right in
Lwt_mvar.put left (n+1)
let main () =
let n = 100_000 in
let%lwt () = Lwt_io.printf "With %d mvars!\n" n in
let leftmost = Lwt_mvar.create_empty () in
let rec setup_whispers left i =
if i >= n
then left
else let right = Lwt_mvar.create_empty () in
let () = Lwt.async (fun () -> whisper left right) in
setup_whispers right (i+1) in
let rightmost = setup_whispers leftmost 0 in
let%lwt () = Lwt_mvar.put rightmost 1 in
let%lwt res = Lwt_mvar.take leftmost in
Lwt_io.printf "%d\n" res
let () = Lwt_main.run (main ())
然后编译运行它
$ ocamlbuild -use-ocamlfind -pkg lwt,lwt.ppx,lwt.unix whisper.native
$ time ./whisper.native
With 100000 mvars!
100001
real 0m0.169s
user 0m0.156s
sys 0m0.008s