分布式去中心化环

Distributed decentralized ring

我正在尝试在 Erlang 中实现一个分布式环,其中每个节点都会存储数据。

我的想法是创建一个 gen_server 模块 node_ring 它将提供环中节点的状态:

-record(nodestate, {id, hostname, previd, nextid, prevnodename, nextnodename, data}).

接下来,我通过以下方式创建了虚拟主机:

werl -sname node -setcookie cook
werl -sname node1 -setcookie cook
werl -sname node2 -setcookie cook

在第一个节点中:node@Machine 我在环中启动第一个项目:

**(node@Machine)1> node_ring:start_link()**

函数:

start_link() ->
    {Hostname, Id} = {'node@Machine', 0},
    {ok, Pid} = gen_server:start_link({local, ?MODULE}, ?MODULE, [first, Hostname, Id], []).

和:

init([first, Hostname, Id]) ->          
            State = #nodestate{id = Id, hostname = Hostname, previd = Id, nextid = Id, prevnode = Hostname, nextnode = Hostname, data = dict:new()},
            {ok, State}.

在下一个节点中:**(node1@Machine)1>**我想启动同一个模块node_ring, 但我不知道如何 link 它与环中的前一个项目以及下一个节点如何知道哪个节点和 node_ring 已启动。

谁能解释一下,如何在 Erlang 中制作分布式环?我知道有一些已实施的系统,例如 Riak。我查看了源代码,但我是分布式Erlang编程的新手,看不懂。

分布式系统编程很难。很难理解。很难正确实施。

riak_core 的源代码一开始可能很难理解。这里有一些资源可以帮助我更好地理解 riak_core:

  • Where to Start with Riak Core (specifically, Try Try Try 作者:瑞安泽泽斯基)
  • project-fifo. howl 中的任何 riak_core 项目可能是建立在 riak_core 之上的最小项目,并且相当容易理解。
  • 了解 riak_core 的核心是一种一致的哈希算法,它允许它以统一的方式使用分区分布数据并跨环工作:Why Riak Just Works
    • 不久前我写了erlang-ryng,这是一个通用的环一致性哈希算法处理程序。这可能有助于理解环上下文中一致性哈希的目的。
  • 了解 riak_pipe 的工作原理也有助于我更好地理解如何以统一的方式分配工作。

关于 "It's hard to implement correctly",您可以阅读 Jepsen posts by aphyr 以了解主要数据库和分布式存储系统在它们自己的实现中已经或以前存在问题的示例和案例。

也就是说,这是一个非常简单的环在 Erlang 中的实现,但是它仍然有许多漏洞,下面将解决:

-module(node_ring).
-behaviour(gen_server).

% Public API
-export([start_link/0]).
-export([erase/1]).
-export([find/1]).
-export([store/2]).

% Ring API
-export([join/1]).
-export([nodes/0]).
-export([read/1]).
-export([write/1]).
-export([write/2]).

% gen_server
-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-export([code_change/3]).

-record(state, {
    node = node()        :: node(),
    ring = ordsets:new() :: ordsets:ordset(node()),
    data = dict:new()    :: dict:dict(term(), term())
}).

% Public API
start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

erase(Key) ->
    write({erase, Key}).

find(Key) ->
    read({find, Key}).

store(Key, Value) ->
    write({store, Key, Value}).

% Ring API
join(Node) ->
    gen_server:call(?MODULE, {join, Node}).

nodes() ->
    gen_server:call(?MODULE, nodes).

read(Request) ->
    gen_server:call(?MODULE, {read, Request}).

write(Request) ->
    gen_server:call(?MODULE, {write, Request}).

write(Node, Request) ->
    gen_server:call(?MODULE, {write, Node, Request}).

% gen_server
init([]) ->
    State = #state{},
    {ok, State}.

handle_call({join, Node}, _From, State=#state{node=Node}) ->
    {reply, ok, State};
handle_call({join, Peer}, From, State=#state{node=Node, ring=Ring}) ->
    case net_adm:ping(Peer) of
        pong ->
            case ordsets:is_element(Peer, Ring) of
                true ->
                    {reply, ok, State};
                false ->
                    monitor_node(Peer, true),
                    NewRing = ordsets:add_element(Peer, Ring),
                    spawn(fun() ->
                        rpc:multicall(Ring, ?MODULE, join, [Peer])
                    end),
                    spawn(fun() ->
                        Reply = rpc:call(Peer, ?MODULE, join, [Node]),
                        gen_server:reply(From, Reply)
                    end),
                    {noreply, State#state{ring=NewRing}}
            end;
        pang ->
            {reply, {error, connection_failed}, State}
    end;
handle_call(nodes, _From, State=#state{node=Node, ring=Ring}) ->
    {reply, ordsets:add_element(Node, Ring), State};
handle_call({read, Request}, From, State) ->
    handle_read(Request, From, State);
handle_call({write, Request}, From, State=#state{node=Node, ring=Ring}) ->
    spawn(fun() ->
        rpc:multicall(Ring, ?MODULE, write, [Node, Request])
    end),
    handle_write(Request, From, State);
handle_call({write, Node, _Request}, _From, State=#state{node=Node}) ->
    {reply, ok, State};
handle_call({write, _Peer, Request}, From, State) ->
    handle_write(Request, From, State);
handle_call(_Request, _From, State) ->
    {reply, ignore, State}.

handle_cast(_Request, State) ->
    {noreply, State}.

handle_info({nodedown, Peer}, State=#state{ring=Ring}) ->
    NewRing = ordsets:del_element(Peer, Ring),
    {noreply, State#state{ring=NewRing}};
handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%% @private
handle_read({find, Key}, _From, State=#state{data=Data}) ->
    {reply, dict:find(Key, Data), State}.

%% @private
handle_write({erase, Key}, _From, State=#state{data=Data}) ->
    {reply, ok, State#state{data=dict:erase(Key, Data)}};
handle_write({store, Key, Value}, _From, State=#state{data=Data}) ->
    {reply, ok, State#state{data=dict:store(Key, Value, Data)}}.

如果我们启动 3 个不同的节点,-sname 设置为 node0node1node2

erl -sname node0 -setcookie cook -run node_ring start_link
erl -sname node1 -setcookie cook -run node_ring start_link
erl -sname node2 -setcookie cook -run node_ring start_link

以下是我们如何将节点加入环:

(node0@localhost)1> node_ring:nodes().
['node0@localhost']
(node0@localhost)2> node_ring:join('node1@localhost').
ok
(node0@localhost)3> node_ring:nodes().
['node0@localhost', 'node1@localhost']

如果我们 运行 node_ring:nodes()node1 我们得到:

(node1@localhost)1> node_ring:nodes().
['node0@localhost', 'node1@localhost']

现在让我们去 node2 并加入另外两个节点之一:

(node2@localhost)1> node_ring:nodes().
['node2@localhost']
(node2@localhost)2> node_ring:join('node0localhost').
ok
(node2@localhost)3> node_ring:nodes().
['node0@localhost', 'node1@localhost',
 'node2@localhost']

请注意 node0node1 是如何添加到 node2 的,即使我们只在联接中指定了 node0。这意味着如果我们有数百个节点,我们只需要加入其中一个节点就可以加入整个环。

现在我们可以在任何节点上使用 store(Key, Value),它将被复制到其他两个:

(node0@localhost)4> node_ring:store(mykey, myvalue).
ok

让我们尝试从其他两个读取 mykey,首先 node1:

(node1@localhost)2> node_ring:find(mykey).
{ok,myvalue}

然后node2:

(node2@localhost)4> node_ring:find(mykey).
{ok,myvalue}

让我们在 node2 上使用 erase(Key) 并尝试在其他节点上再次读取密钥:

(node2@localhost)5> node_ring:erase(mykey).
ok

node0:

(node0@localhost)5> node_ring:find(mykey).
error

node1:

(node1@localhost)3> node_ring:find(mykey).
error

太棒了!我们有一个分布式的去中心化环,可以充当一个简单的 key/value 商店!那很容易,一点也不难!只要我们没有任何节点宕机、数据包丢失、网络分区、节点添加到环中或其他形式的混乱,我们这里就有一个近乎完美的解决方案。然而,实际上,您必须考虑所有这些因素,才能拥有一个不会让您在长期 运行.

发疯的系统

这是我们的小 node_ring 无法处理的事情的简短示例:

  1. node1 下降
  2. node0 存储键 a 和值 1
  3. node1回来加入擂台
  4. node1 试图找到密钥 a

先杀node1。如果我们检查 node0:

上的节点
(node0@localhost)6> node_ring:nodes().
['node0@localhost','node2@localhost']

node2 上:

(node2@localhost)6> node_ring:nodes().
['node0@localhost','node2@localhost']

我们看到 node1 已自动从戒指中删除。让我们在 node0:

上存储一些东西
(node0@localhost)7> node_ring:store(a, 1).
ok

并从 node2 阅读:

(node2@localhost)7> node_ring:find(a).
{ok,1}

让我们再次启动node1并加入圆环:

(node1@localhost)1> node_ring:join('node0@localhost').
ok
(node1@localhost)2> node_ring:nodes().
['node0@localhost','node1@localhost',
 'node2@localhost']
(node1@localhost)3> node_ring:find(a).
error

哎呀,环上的数据不一致。进一步研究其他分布式系统和 CAP theorem 是必要的,然后我们才能决定我们希望我们的小 node_ring 在这些不同情况下如何表现(比如我们是否希望它表现得像 AP 或 CP 系统) .