Erlang Udp 服务器丢弃大量数据包
Erlang Udp server dropping lots of packets
我一直在为 运行 一个 udp 服务器测试各种 languages/methods,看看哪个性能最好。到目前为止,我已经测试了 .NET、NodeJs,然后是 Erlang。我看到这个代码片段有一个问题,它丢弃了我发送的超过 50% 的数据包,而节点和 .net 大约是 4%。场景是我尽快发送 1000 20 byte messages
并在屏幕上打印一个递增的数字。 Erlang 只得到了大约 400 个。你能建议我做些什么来改善这个结果吗?
-module(udp).
-export([start/0]).
start() ->
spawn(fun() -> server(41235) end).
server(Port) ->
{ok, Socket} = gen_udp:open(Port, [binary, {active, false}]),
io:format("server opened socket:~p~n",[Socket]),
loop(Socket,0).
loop(Socket,N) ->
inet:setopts(Socket, [{active, once}]),
receive
{udp, Socket, Host, Port, Bin} ->
io:format("~p~n",[N]),
loop(Socket,N+1)
end.
如果我完全搞砸了这个 erlang 代码,我不会感到惊讶。我真的很难理解一些概念。
我的服务器感谢借用:http://erlycoder.com/83/erlang-udp-socket-usage-example-with-gen
如果您有兴趣,这是我的客户:
namespace LocalUdpClient
{
class Program
{
private static long _sentCount = 1;
private static CustomQueue _queue;
private static bool _continue = true;
static void Main(string[] args)
{
_queue = new CustomQueue();
_queue.ItemRemovedEventHandler += QueueOnItemRemovedEventHandler;
PopulateQueue();
var con = new System.Net.Sockets.UdpClient();
con.Connect(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 41235));
while (_continue)
{
byte[] bits = null;
if (_queue.TryDequeue(out bits))
{
con.SendAsync(bits, bits.Length);
Interlocked.Increment(ref _sentCount);
Console.Clear();
Console.Write(Interlocked.Read(ref _sentCount));
}
}
}
private static void QueueOnItemRemovedEventHandler(object sender, EventArgs eventArgs)
{
var queue = sender as CustomQueue;
if (queue.Count <= 0)
{
Task.Run(() => PopulateQueue()).Wait();
}
}
public static void PopulateQueue()
{
Console.ReadLine();
RandomNumberGenerator rand2 = new RNGCryptoServiceProvider();
Parallel.For(0, 1000, p =>
{
if (_queue.Count >= 1000) return;
byte[] bytes = new byte[20];
rand2.GetBytes(bytes);
_queue.Enqueue(bytes);
});
}
public class CustomQueue : ConcurrentQueue<byte[]>
{
public event EventHandler ItemRemovedEventHandler;
new public bool TryDequeue(out byte[] item)
{
var x = base.TryDequeue(out item);
if (ItemRemovedEventHandler != null)
{
ItemRemovedEventHandler(this,new EventArgs());
}
return x;
}
}
}
我相信有些数据包丢失了,因为在发送时没有人积极期待它们。
您可以通过这种方式修改您的 Erlang 代码来进一步减少接收到的数据包数量:
io:format("~p~n",[N]),
receive after 10 -> ok end,
loop(Socket,N+1)
其中一个解决方案可能是使用 {active, true}
选项,这将确保将数据包定向到侦听进程的邮箱。
顺便说一句,在我的机器上,原始代码可以达到 ~99% 的数据包接收率,但是在添加延迟之后,Erlang 部分只能接收几十个数据包。
与保证传送的 TCP 不同,UDP 数据包可能会被简单地丢弃。通常这是溢出的结果。在任何有 producer/consumer 关系的地方,您都需要关注当生产者超过消费者时会发生什么。使用 TCP 流量控制会对发送方施加背压,但使用 UDP 则不存在这种背压。
当您在服务器端打开一个 UDP 套接字时,它会分配一定数量的内存来接收来自网络的数据。如果您的客户端从套接字读取的速度不如数据从网络到达时的速度,您将遇到缓冲区溢出,新的 UDP 数据包将覆盖以前收到但尚未读取的 UDP 数据包。
您可以使用 rcvbuf 和 sndbuf 选项增加套接字的接收(和传输)缓冲区的大小。但是,您可能必须先增加内核限制(例如 sysctl -w net.core.rmem_max
)。
要查看您是否因内核缓冲区溢出而丢失 UDP 数据包,请使用 netstat -su
检查 UDP 统计信息(查看 'packet receive errors')。
我一直在为 运行 一个 udp 服务器测试各种 languages/methods,看看哪个性能最好。到目前为止,我已经测试了 .NET、NodeJs,然后是 Erlang。我看到这个代码片段有一个问题,它丢弃了我发送的超过 50% 的数据包,而节点和 .net 大约是 4%。场景是我尽快发送 1000 20 byte messages
并在屏幕上打印一个递增的数字。 Erlang 只得到了大约 400 个。你能建议我做些什么来改善这个结果吗?
-module(udp).
-export([start/0]).
start() ->
spawn(fun() -> server(41235) end).
server(Port) ->
{ok, Socket} = gen_udp:open(Port, [binary, {active, false}]),
io:format("server opened socket:~p~n",[Socket]),
loop(Socket,0).
loop(Socket,N) ->
inet:setopts(Socket, [{active, once}]),
receive
{udp, Socket, Host, Port, Bin} ->
io:format("~p~n",[N]),
loop(Socket,N+1)
end.
如果我完全搞砸了这个 erlang 代码,我不会感到惊讶。我真的很难理解一些概念。
我的服务器感谢借用:http://erlycoder.com/83/erlang-udp-socket-usage-example-with-gen
如果您有兴趣,这是我的客户:
namespace LocalUdpClient
{
class Program
{
private static long _sentCount = 1;
private static CustomQueue _queue;
private static bool _continue = true;
static void Main(string[] args)
{
_queue = new CustomQueue();
_queue.ItemRemovedEventHandler += QueueOnItemRemovedEventHandler;
PopulateQueue();
var con = new System.Net.Sockets.UdpClient();
con.Connect(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 41235));
while (_continue)
{
byte[] bits = null;
if (_queue.TryDequeue(out bits))
{
con.SendAsync(bits, bits.Length);
Interlocked.Increment(ref _sentCount);
Console.Clear();
Console.Write(Interlocked.Read(ref _sentCount));
}
}
}
private static void QueueOnItemRemovedEventHandler(object sender, EventArgs eventArgs)
{
var queue = sender as CustomQueue;
if (queue.Count <= 0)
{
Task.Run(() => PopulateQueue()).Wait();
}
}
public static void PopulateQueue()
{
Console.ReadLine();
RandomNumberGenerator rand2 = new RNGCryptoServiceProvider();
Parallel.For(0, 1000, p =>
{
if (_queue.Count >= 1000) return;
byte[] bytes = new byte[20];
rand2.GetBytes(bytes);
_queue.Enqueue(bytes);
});
}
public class CustomQueue : ConcurrentQueue<byte[]>
{
public event EventHandler ItemRemovedEventHandler;
new public bool TryDequeue(out byte[] item)
{
var x = base.TryDequeue(out item);
if (ItemRemovedEventHandler != null)
{
ItemRemovedEventHandler(this,new EventArgs());
}
return x;
}
}
}
我相信有些数据包丢失了,因为在发送时没有人积极期待它们。 您可以通过这种方式修改您的 Erlang 代码来进一步减少接收到的数据包数量:
io:format("~p~n",[N]),
receive after 10 -> ok end,
loop(Socket,N+1)
其中一个解决方案可能是使用 {active, true}
选项,这将确保将数据包定向到侦听进程的邮箱。
顺便说一句,在我的机器上,原始代码可以达到 ~99% 的数据包接收率,但是在添加延迟之后,Erlang 部分只能接收几十个数据包。
与保证传送的 TCP 不同,UDP 数据包可能会被简单地丢弃。通常这是溢出的结果。在任何有 producer/consumer 关系的地方,您都需要关注当生产者超过消费者时会发生什么。使用 TCP 流量控制会对发送方施加背压,但使用 UDP 则不存在这种背压。
当您在服务器端打开一个 UDP 套接字时,它会分配一定数量的内存来接收来自网络的数据。如果您的客户端从套接字读取的速度不如数据从网络到达时的速度,您将遇到缓冲区溢出,新的 UDP 数据包将覆盖以前收到但尚未读取的 UDP 数据包。
您可以使用 rcvbuf 和 sndbuf 选项增加套接字的接收(和传输)缓冲区的大小。但是,您可能必须先增加内核限制(例如 sysctl -w net.core.rmem_max
)。
要查看您是否因内核缓冲区溢出而丢失 UDP 数据包,请使用 netstat -su
检查 UDP 统计信息(查看 'packet receive errors')。