Erlang Udp 服务器丢弃大量数据包

Erlang Udp server dropping lots of packets

我一直在为 运行 一个 udp 服务器测试各种 languages/methods,看看哪个性能最好。到目前为止,我已经测试了 .NET、NodeJs,然后是 Erlang。我看到这个代码片段有一个问题,它丢弃了我发送的超过 50% 的数据包,而节点和 .net 大约是 4%。场景是我尽快发送 1000 20 byte messages 并在屏幕上打印一个递增的数字。 Erlang 只得到了大约 400 个。你能建议我做些什么来改善这个结果吗?

-module(udp).
-export([start/0]).

start() ->
    spawn(fun() -> server(41235) end).

server(Port) ->
    {ok, Socket} = gen_udp:open(Port, [binary, {active, false}]),
    io:format("server opened socket:~p~n",[Socket]),
    loop(Socket,0).

loop(Socket,N) ->
    inet:setopts(Socket, [{active, once}]),
    receive
        {udp, Socket, Host, Port, Bin} ->
            io:format("~p~n",[N]),
            loop(Socket,N+1)
    end.

如果我完全搞砸了这个 erlang 代码,我不会感到惊讶。我真的很难理解一些概念。

我的服务器感谢借用:http://erlycoder.com/83/erlang-udp-socket-usage-example-with-gen

如果您有兴趣,这是我的客户:

namespace LocalUdpClient
{
    class Program
    {
        private static long _sentCount = 1;
        private static CustomQueue _queue;
        private static bool _continue = true;
        static void Main(string[] args)
        {
            _queue = new CustomQueue();
            _queue.ItemRemovedEventHandler += QueueOnItemRemovedEventHandler;
            PopulateQueue();
            var con = new System.Net.Sockets.UdpClient();
            con.Connect(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 41235));

            while (_continue)
            {
                byte[] bits = null;
                if (_queue.TryDequeue(out bits))
                {
                    con.SendAsync(bits, bits.Length);
                    Interlocked.Increment(ref _sentCount);
                    Console.Clear();
                    Console.Write(Interlocked.Read(ref _sentCount));
                }

            }

        }

        private static void QueueOnItemRemovedEventHandler(object sender, EventArgs eventArgs)
        {
            var queue = sender as CustomQueue;
            if (queue.Count <= 0)
            {
                Task.Run(() => PopulateQueue()).Wait();
            }
        }

        public static void PopulateQueue()
        {
            Console.ReadLine();
            RandomNumberGenerator rand2 = new RNGCryptoServiceProvider();
            Parallel.For(0, 1000, p =>
            {
                if (_queue.Count >= 1000) return;
                byte[] bytes = new byte[20];
                rand2.GetBytes(bytes);
                _queue.Enqueue(bytes);
            });



        }

    public class CustomQueue : ConcurrentQueue<byte[]>
    {
        public event EventHandler ItemRemovedEventHandler;

        new public bool TryDequeue(out byte[] item)
        {
            var x = base.TryDequeue(out item);
            if (ItemRemovedEventHandler != null)
            {
                ItemRemovedEventHandler(this,new EventArgs());
            }
            return x;

        }


    }
}

我相信有些数据包丢失了,因为在发送时没有人积极期待它们。 您可以通过这种方式修改您的 Erlang 代码来进一步减少接收到的数据包数量:

        io:format("~p~n",[N]),
        receive after 10 -> ok end,
        loop(Socket,N+1)

其中一个解决方案可能是使用 {active, true} 选项,这将确保将数据包定向到侦听进程的邮箱。

顺便说一句,在我的机器上,原始代码可以达到 ~99% 的数据包接收率,但是在添加延迟之后,Erlang 部分只能接收几十个数据包。

与保证传送的 TCP 不同,UDP 数据包可能会被简单地丢弃。通常这是溢出的结果。在任何有 producer/consumer 关系的地方,您都需要关注当生产者超过消费者时会发生什么。使用 TCP 流量控制会对发送方施加背压,但使用 UDP 则不存在这种背压。

当您在服务器端打开一个 UDP 套接字时,它会分配一定数量的内存来接收来自网络的数据。如果您的客户端从套接字读取的速度不如数据从网络到达时的速度,您将遇到缓冲区溢出,新的 UDP 数据包将覆盖以前收到但尚未读取的 UDP 数据包。

您可以使用 rcvbuf 和 sndbuf 选项增加套接字的接收(和传输)缓冲区的大小。但是,您可能必须先增加内核限制(例如 sysctl -w net.core.rmem_max)。

要查看您是否因内核缓冲区溢出而丢失 UDP 数据包,请使用 netstat -su 检查 UDP 统计信息(查看 'packet receive errors')。