F# 性能:是什么让这段代码如此缓慢?
F# Performance: What is making this code so slow?
此 F# 代码试图解决 Project Euler problem #58:
let inc = function
| n -> n + 1
let is_prime = function
| 2 -> true
| n when n < 2 || n%2=0-> false
| n ->
[3..2..(int (sqrt (float n)))]
|> List.tryFind (fun i -> n%i=0)
|> Option.isNone
let spir = Seq.initInfinite (fun i ->
let n = i%4
let a = 2 * (i/4 + 1)
(a*n) + a + (a-1)*(a-1))
let rec accum se p n =
match se with
| x when p*10 < n && p <> 0 -> 2*(n/4) + 1
| x when is_prime (Seq.head x) -> accum (Seq.tail x) (inc p) (inc n)
| x -> accum (Seq.tail x) p (inc n)
| _ -> 0
printfn "%d" (accum spir 0 1)
我不知道这个程序的运行宁时间,因为我拒绝等待它完成。相反,我用 C++ 命令式地编写了这段代码:
#include "stdafx.h"
#include "math.h"
#include <iostream>
using namespace std;
int is_prime(int n)
{
if (n % 2 == 0) return 0;
for (int i = 3; i <= sqrt(n); i+=2)
{
if (n%i == 0)
{
return 0;
}
}
return 1;
}
int spir(int i)
{
int n = i % 4;
int a = 2 * (i / 4 + 1);
return (a*n) + a + ((a - 1)*(a - 1));
}
int main()
{
int n = 1, p = 0, i = 0;
cout << "start" << endl;
while (p*10 >= n || p == 0)
{
p += is_prime(spir(i));
n++; i++;
}
cout << 2*(i/4) + 1;
return 0;
}
上面的代码 运行s 在不到 2 秒的时间内得到了正确答案。
是什么让 F# 代码 运行 如此缓慢?即使在使用了 an old Whosebug post 中提到的一些分析工具之后,我仍然无法弄清楚发生了什么昂贵的操作。
编辑#1
使用 rmunn 的 post,我能够想出一个不同的实现,在不到 30 秒的时间内得到答案:
let inc = function
| n -> n + 1
let is_prime = function
| 2 -> true
| n when n < 2 || n%2=0-> false
| n ->
[3..2..(int (sqrt (float n)))]
|> List.tryFind (fun i -> n%i=0)
|> Option.isNone
let spir2 =
List.unfold (fun state ->
let p = fst state
let i = snd state
let n = i%4
let a = 2 * (i/4 + 1)
let diag = (a*n) + a + (a-1)*(a-1)
if p*10 < (i+1) && p <> 0 then
printfn "%d" (2*((i+1)/4) + 1)
None
elif is_prime diag then
Some(diag, (inc p, inc i))
else Some(diag, (p, inc i))) (0, 0)
编辑#2
利用 FuleSnabel 的信息 post,他的 is_prime
函数使上述代码 运行 在十分之一秒内完成,使其比 C++ 代码更快:
let inc = function
| n -> n + 1
let is_prime = function
| 1 -> false
| 2 -> true
| v when v % 2 = 0 -> false
| v ->
let stop = v |> float |> sqrt |> int
let rec loop vv =
if vv <= stop then
if (v % vv) <> 0 then
loop (vv + 2)
else
false
else
true
loop 3
let spir2 =
List.unfold (fun state ->
let p = fst state
let i = snd state
let n = i%4
let a = 2 * (i/4 + 1)
let diag = (a*n) + a + (a-1)*(a-1)
if p*10 < (i+1) && p <> 0 then
printfn "%d" (2*((i+1)/4) + 1)
None
elif i <> 3 && is_prime diag then
Some(diag, (inc p, inc i))
else Some(diag, (p, inc i))) (0, 0)
核心 F# 库中没有 Seq.tail
函数 (更新:是的,有,请参阅评论),所以我假设您使用的是 Seq.tail
函数来自 FSharpx.Collections。如果您使用 Seq.tail
的不同实现,它可能是相似的——而且它几乎肯定是您的问题的原因,因为它不像您认为的那样是 O(1)。获取 List 的尾部是 O(1),因为 List 的实现方式(作为一系列 cons 单元)。但是获取 Seq 的尾部最终会从原始可枚举创建一个 全新的 Seq,丢弃其中的一项,并返回其其余项。当您第二次执行 accum
循环时,您会在 "skip 1 then return" seq 上调用 Seq.tail
。所以现在你有一个 Seq
,我将其称为 S2,它向 S1 询问 IEnumerable,跳过 S1 的第一项,并 returns 其余部分。 S1,当被问到它的第一个项目时,向 S0(原始 Seq)询问一个可枚举的,跳过它的第一个项目,然后 returns 剩下的。所以要让 S2 跳过两个项目,它必须创建两个序列。现在在你的下一个 运行 通过当你要求 S2 的 Seq.tail
时,你创建 S3 要求 S2 一个 IEnumerable,它要求 S1 一个 IEnumerable,它要求 S0 一个 IEnumerable ...和很快。这实际上是 O(N^2),当您认为您正在编写 O(N) 操作时。
恐怕我现在没有时间为您找出解决方案;使用 List.tail
无济于事,因为您需要一个无限序列。但也许仅仅了解 Seq.tail
陷阱就足以让你开始,所以我现在 post 这个答案,即使它不完整。
如果您需要更多帮助,请对这个答案发表评论,我会在有时间的时候回来——但这可能不会持续几天,所以希望其他人也能回答您的问题。
编写高性能的 F# 是很有可能的,但需要了解在紧密循环中相对 CPU 成本较高的模式的一些知识。我建议使用像 ILSpy 这样的工具来查找隐藏的开销。
例如,可以想象 F# 将此表达式扩展为有效的 for 循环:
[3..2..(int (sqrt (float n)))]
|> List.tryFind (fun i -> n%i=0)
|> Option.isNone
但是目前还没有。相反,它使用内部运算符创建一个跨越范围的 List
,并将其传递给 List.tryFind
。与我们喜欢做的实际工作(模运算)相比,这是昂贵的。 ILSpy 将上面的代码反编译为如下内容:
public static bool is_prime(int _arg1)
{
switch (_arg1)
{
case 2:
return true;
default:
return _arg1 >= 2 && _arg1 % 2 != 0 && ListModule.TryFind<int>(new Program.Original.is_prime@10(_arg1), SeqModule.ToList<int>(Operators.CreateSequence<int>(Operators.OperatorIntrinsics.RangeInt32(3, 2, (int)Math.Sqrt((double)_arg1))))) == null;
}
}
这些运算符的性能不如它们应有的性能(据我所知,目前正在改进)但无论分配 List
多么高效,然后搜索它都不会击败 for 循环。
这意味着 is_prime
没有达到应有的效果。相反,可以这样做:
let is_prime = function
| 1 -> false
| 2 -> true
| v when v % 2 = 0 -> false
| v ->
let stop = v |> float |> sqrt |> int
let rec loop vv =
if vv <= stop then
(v % vv) <> 0 && loop (vv + 2)
else
true
loop 3
此版本的 is_prime
依靠 F# 中的尾调用优化将循环扩展为高效的 for 循环(您可以使用 ILSpy 看到这一点)。 ILSpy 将循环反编译为如下内容:
while (vv <= stop)
{
if (_arg1 % vv == 0)
{
return false;
}
int arg_13_0 = _arg1;
int arg_11_0 = stop;
vv += 2;
stop = arg_11_0;
_arg1 = arg_13_0;
}
这个循环不分配内存,只是一个相当高效的循环。有人看到一些无意义的分配,但希望 JIT:er 消除这些。我相信 is_prime
可以进一步改进。
在高性能代码中使用 Seq
时,必须记住它是惰性的,默认情况下不使用记忆(请参阅 Seq.cache
)。因此,一个人可能很容易一遍又一遍地做同样的工作(见@rmunn 回答)。
此外,由于 IEnumerable/IEnumerator
的设计方式,Seq
并不是特别有效。更好的选择是例如 Nessos Streams(在 nuget 上可用)。
如果您有兴趣,我做了一个快速实现,它依赖于一个看起来性能不错的简单推送流:
// Receiver<'T> is a callback that receives a value.
// Returns true if it wants more values, false otherwise.
type Receiver<'T> = 'T -> bool
// Stream<'T> is function that accepts a Receiver<'T>
// This means Stream<'T> is a push stream (as opposed to Seq that uses pull)
type Stream<'T> = Receiver<'T> -> unit
// is_prime returns true if the input is prime, false otherwise
let is_prime = function
| 1 -> false
| 2 -> true
| v when v % 2 = 0 -> false
| v ->
let stop = v |> float |> sqrt |> int
let rec loop vv =
if vv <= stop then
(v % vv) <> 0 && loop (vv + 2)
else
true
loop 3
// tryFind looks for the first value in the input stream for f v = true.
// If found tryFind returns Some v, None otherwise
let tryFind f (s : Stream<'T>) : 'T option =
let res = ref None
s (fun v -> if f v then res := Some v; false else true)
!res
// diagonals generates a tuple stream of all diagonal values
// The first value is the side length, the second value is the diagonal value
let diagonals : Stream<int*int> =
fun r ->
let rec loop side v =
let step = side - 1
if r (side, v + 1*step) && r (side, v + 2*step) && r (side, v + 3*step) && r (side, v + 4*step) then
loop (side + 2) (v + 4*step)
if r (1, 1) then loop 3 1
// ratio computes the streaming ratio for f v = true
let ratio f (s : Stream<'T>) : Stream<float*'T> =
fun r ->
let inc r = r := !r + 1.
let acc = ref 0.
let count = ref 0.
s (fun v -> (inc count; if f v then inc acc); r (!acc/(!count), v))
let result =
diagonals
|> ratio (snd >> is_prime)
|> tryFind (fun (r, (_, v)) -> v > 1 && r < 0.1)
此 F# 代码试图解决 Project Euler problem #58:
let inc = function
| n -> n + 1
let is_prime = function
| 2 -> true
| n when n < 2 || n%2=0-> false
| n ->
[3..2..(int (sqrt (float n)))]
|> List.tryFind (fun i -> n%i=0)
|> Option.isNone
let spir = Seq.initInfinite (fun i ->
let n = i%4
let a = 2 * (i/4 + 1)
(a*n) + a + (a-1)*(a-1))
let rec accum se p n =
match se with
| x when p*10 < n && p <> 0 -> 2*(n/4) + 1
| x when is_prime (Seq.head x) -> accum (Seq.tail x) (inc p) (inc n)
| x -> accum (Seq.tail x) p (inc n)
| _ -> 0
printfn "%d" (accum spir 0 1)
我不知道这个程序的运行宁时间,因为我拒绝等待它完成。相反,我用 C++ 命令式地编写了这段代码:
#include "stdafx.h"
#include "math.h"
#include <iostream>
using namespace std;
int is_prime(int n)
{
if (n % 2 == 0) return 0;
for (int i = 3; i <= sqrt(n); i+=2)
{
if (n%i == 0)
{
return 0;
}
}
return 1;
}
int spir(int i)
{
int n = i % 4;
int a = 2 * (i / 4 + 1);
return (a*n) + a + ((a - 1)*(a - 1));
}
int main()
{
int n = 1, p = 0, i = 0;
cout << "start" << endl;
while (p*10 >= n || p == 0)
{
p += is_prime(spir(i));
n++; i++;
}
cout << 2*(i/4) + 1;
return 0;
}
上面的代码 运行s 在不到 2 秒的时间内得到了正确答案。
是什么让 F# 代码 运行 如此缓慢?即使在使用了 an old Whosebug post 中提到的一些分析工具之后,我仍然无法弄清楚发生了什么昂贵的操作。
编辑#1
使用 rmunn 的 post,我能够想出一个不同的实现,在不到 30 秒的时间内得到答案:
let inc = function
| n -> n + 1
let is_prime = function
| 2 -> true
| n when n < 2 || n%2=0-> false
| n ->
[3..2..(int (sqrt (float n)))]
|> List.tryFind (fun i -> n%i=0)
|> Option.isNone
let spir2 =
List.unfold (fun state ->
let p = fst state
let i = snd state
let n = i%4
let a = 2 * (i/4 + 1)
let diag = (a*n) + a + (a-1)*(a-1)
if p*10 < (i+1) && p <> 0 then
printfn "%d" (2*((i+1)/4) + 1)
None
elif is_prime diag then
Some(diag, (inc p, inc i))
else Some(diag, (p, inc i))) (0, 0)
编辑#2
利用 FuleSnabel 的信息 post,他的 is_prime
函数使上述代码 运行 在十分之一秒内完成,使其比 C++ 代码更快:
let inc = function
| n -> n + 1
let is_prime = function
| 1 -> false
| 2 -> true
| v when v % 2 = 0 -> false
| v ->
let stop = v |> float |> sqrt |> int
let rec loop vv =
if vv <= stop then
if (v % vv) <> 0 then
loop (vv + 2)
else
false
else
true
loop 3
let spir2 =
List.unfold (fun state ->
let p = fst state
let i = snd state
let n = i%4
let a = 2 * (i/4 + 1)
let diag = (a*n) + a + (a-1)*(a-1)
if p*10 < (i+1) && p <> 0 then
printfn "%d" (2*((i+1)/4) + 1)
None
elif i <> 3 && is_prime diag then
Some(diag, (inc p, inc i))
else Some(diag, (p, inc i))) (0, 0)
核心 F# 库中没有 Seq.tail
函数 (更新:是的,有,请参阅评论),所以我假设您使用的是 Seq.tail
函数来自 FSharpx.Collections。如果您使用 Seq.tail
的不同实现,它可能是相似的——而且它几乎肯定是您的问题的原因,因为它不像您认为的那样是 O(1)。获取 List 的尾部是 O(1),因为 List 的实现方式(作为一系列 cons 单元)。但是获取 Seq 的尾部最终会从原始可枚举创建一个 全新的 Seq,丢弃其中的一项,并返回其其余项。当您第二次执行 accum
循环时,您会在 "skip 1 then return" seq 上调用 Seq.tail
。所以现在你有一个 Seq
,我将其称为 S2,它向 S1 询问 IEnumerable,跳过 S1 的第一项,并 returns 其余部分。 S1,当被问到它的第一个项目时,向 S0(原始 Seq)询问一个可枚举的,跳过它的第一个项目,然后 returns 剩下的。所以要让 S2 跳过两个项目,它必须创建两个序列。现在在你的下一个 运行 通过当你要求 S2 的 Seq.tail
时,你创建 S3 要求 S2 一个 IEnumerable,它要求 S1 一个 IEnumerable,它要求 S0 一个 IEnumerable ...和很快。这实际上是 O(N^2),当您认为您正在编写 O(N) 操作时。
恐怕我现在没有时间为您找出解决方案;使用 List.tail
无济于事,因为您需要一个无限序列。但也许仅仅了解 Seq.tail
陷阱就足以让你开始,所以我现在 post 这个答案,即使它不完整。
如果您需要更多帮助,请对这个答案发表评论,我会在有时间的时候回来——但这可能不会持续几天,所以希望其他人也能回答您的问题。
编写高性能的 F# 是很有可能的,但需要了解在紧密循环中相对 CPU 成本较高的模式的一些知识。我建议使用像 ILSpy 这样的工具来查找隐藏的开销。
例如,可以想象 F# 将此表达式扩展为有效的 for 循环:
[3..2..(int (sqrt (float n)))]
|> List.tryFind (fun i -> n%i=0)
|> Option.isNone
但是目前还没有。相反,它使用内部运算符创建一个跨越范围的 List
,并将其传递给 List.tryFind
。与我们喜欢做的实际工作(模运算)相比,这是昂贵的。 ILSpy 将上面的代码反编译为如下内容:
public static bool is_prime(int _arg1)
{
switch (_arg1)
{
case 2:
return true;
default:
return _arg1 >= 2 && _arg1 % 2 != 0 && ListModule.TryFind<int>(new Program.Original.is_prime@10(_arg1), SeqModule.ToList<int>(Operators.CreateSequence<int>(Operators.OperatorIntrinsics.RangeInt32(3, 2, (int)Math.Sqrt((double)_arg1))))) == null;
}
}
这些运算符的性能不如它们应有的性能(据我所知,目前正在改进)但无论分配 List
多么高效,然后搜索它都不会击败 for 循环。
这意味着 is_prime
没有达到应有的效果。相反,可以这样做:
let is_prime = function
| 1 -> false
| 2 -> true
| v when v % 2 = 0 -> false
| v ->
let stop = v |> float |> sqrt |> int
let rec loop vv =
if vv <= stop then
(v % vv) <> 0 && loop (vv + 2)
else
true
loop 3
此版本的 is_prime
依靠 F# 中的尾调用优化将循环扩展为高效的 for 循环(您可以使用 ILSpy 看到这一点)。 ILSpy 将循环反编译为如下内容:
while (vv <= stop)
{
if (_arg1 % vv == 0)
{
return false;
}
int arg_13_0 = _arg1;
int arg_11_0 = stop;
vv += 2;
stop = arg_11_0;
_arg1 = arg_13_0;
}
这个循环不分配内存,只是一个相当高效的循环。有人看到一些无意义的分配,但希望 JIT:er 消除这些。我相信 is_prime
可以进一步改进。
在高性能代码中使用 Seq
时,必须记住它是惰性的,默认情况下不使用记忆(请参阅 Seq.cache
)。因此,一个人可能很容易一遍又一遍地做同样的工作(见@rmunn 回答)。
此外,由于 IEnumerable/IEnumerator
的设计方式,Seq
并不是特别有效。更好的选择是例如 Nessos Streams(在 nuget 上可用)。
如果您有兴趣,我做了一个快速实现,它依赖于一个看起来性能不错的简单推送流:
// Receiver<'T> is a callback that receives a value.
// Returns true if it wants more values, false otherwise.
type Receiver<'T> = 'T -> bool
// Stream<'T> is function that accepts a Receiver<'T>
// This means Stream<'T> is a push stream (as opposed to Seq that uses pull)
type Stream<'T> = Receiver<'T> -> unit
// is_prime returns true if the input is prime, false otherwise
let is_prime = function
| 1 -> false
| 2 -> true
| v when v % 2 = 0 -> false
| v ->
let stop = v |> float |> sqrt |> int
let rec loop vv =
if vv <= stop then
(v % vv) <> 0 && loop (vv + 2)
else
true
loop 3
// tryFind looks for the first value in the input stream for f v = true.
// If found tryFind returns Some v, None otherwise
let tryFind f (s : Stream<'T>) : 'T option =
let res = ref None
s (fun v -> if f v then res := Some v; false else true)
!res
// diagonals generates a tuple stream of all diagonal values
// The first value is the side length, the second value is the diagonal value
let diagonals : Stream<int*int> =
fun r ->
let rec loop side v =
let step = side - 1
if r (side, v + 1*step) && r (side, v + 2*step) && r (side, v + 3*step) && r (side, v + 4*step) then
loop (side + 2) (v + 4*step)
if r (1, 1) then loop 3 1
// ratio computes the streaming ratio for f v = true
let ratio f (s : Stream<'T>) : Stream<float*'T> =
fun r ->
let inc r = r := !r + 1.
let acc = ref 0.
let count = ref 0.
s (fun v -> (inc count; if f v then inc acc); r (!acc/(!count), v))
let result =
diagonals
|> ratio (snd >> is_prime)
|> tryFind (fun (r, (_, v)) -> v > 1 && r < 0.1)