Erlang Twitter 流客户端 - 处理分块响应

Erlang Twitter streaming client - handling of chunked responses

我正在使用底部的 Erlang 代码连接到 Twitter firehose。

现在我可以正常接收数据流,但想知道 Erlang httpc 客户端是否正确处理了 CRLF ('\r\n') 分块响应定界符;因为我期望依次调用三个 handle_info 块 (stream_start, stream*, stream_end) -

handle_info({http, {_RequestId, stream_start, _Headers}}, State) ->
    io:format("start~n"),
    {noreply, State};
handle_info({http, {_RequestId, stream, Data}}, State) ->
    io:format("~p~n", [Data]),
    {noreply, State};
handle_info({http, {_RequestId, stream_end, _Headers}}, State) ->
    io:format("end~n"),
    {noreply, State};

但相反的是 'stream_start' 块在开始时被调用一次,然后所有后续数据都由 'stream' 块处理; 'stream_end' 从未被调用。

然而,当我查看由 'stream' 块处理的块时,很多块都将 CRLF 定界符作为后缀。

所以我想知道 httpc 客户端是否正确处理了分块块终止;或者我没有正确配置它?

TIA

%% https://dev.twitter.com/streaming/reference/post/statuses/filter

-module(twitter_streaming_demo).

-behaviour(gen_server).

%% API.

-export([start_link/5]).

%% gen_server.

-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-export([code_change/3]).

-define(METHOD, "POST").

-define(URL, "https://stream.twitter.com/1.1/statuses/filter.json").

-define(APPLICATION_FORM_URLENCODED, "application/x-www-form-urlencoded").

-define(TRACK, "track").

-record(state, {consumer,
        tokens,
        url,
        query,
        request_id}).

%% API.

%% twitter_streaming_demo:start_link("", "", "", "", "").

start_link(ConsumerKey, ConsumerSecret, Token, TokenSecret, Query) ->
    gen_server:start_link(?MODULE, [ConsumerKey, ConsumerSecret, Token, TokenSecret, Query], []).

%% gen_server.

init([ConsumerKey, ConsumerSecret, Token, TokenSecret, Query]) ->
    Consumer={ConsumerKey, ConsumerSecret, hmac_sha1},
    Tokens={Token, TokenSecret},
    {ok, #state{consumer=Consumer,
        tokens=Tokens,
        url=?URL,
        query=Query}, 0}.

handle_call(_Request, _From, State) ->
    {reply, ignored, State}.

handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info(timeout, #state{consumer=Consumer, tokens=Tokens, url=Url, query=Query}=State) ->    
    {Token, TokenSecret}=Tokens,
    Params=[{?TRACK, Query}],
    Signed=oauth:sign("POST", Url, Params, Consumer, Token, TokenSecret),
    {AuthorizationParams, _QueryParams}=lists:partition(fun({K, _}) -> lists:prefix("oauth_", K) end, Signed),
    Request={oauth:uri(Url, []), %% it's a POST request :-)
         [oauth:header(AuthorizationParams)],
         ?APPLICATION_FORM_URLENCODED,
         ?TRACK++"="++Query},
    {ok, RequestId}=httpc:request(post, Request, [], [{sync, false}, {stream, self}]),
    {noreply, State#state{request_id=RequestId}};
handle_info({http, {_RequestId, stream_start, _Headers}}, State) ->
    io:format("start~n"),
    {noreply, State};
handle_info({http, {_RequestId, stream, Data}}, State) ->
    io:format("~p~n", [Data]),
    {noreply, State};
handle_info({http, {_RequestId, stream_end, _Headers}}, State) ->
    io:format("end~n"),
    {noreply, State};
handle_info(Info, State) ->
    io:format("~p~n", [Info]),
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

看起来 OTP 中的 httpc 客户端应该处理分块编码(尽管有错误)。

如果 CRLF 定界符在块的长度范围内,即如果长度在 CRLF 的位置之后,则它们在块内有效。也许这些就是您看到的换行符? Example from Wikipedia:

e\r\n
 in\r\n\r\nchunks.\r\n

此处,长度(e 或 14)包含 inchunks. 之间的 CRLF(长度为 2)(尾随换行从不计入长度)。

至于 Twitter API(我不熟悉),它可能永远不会 returns(永远只给你块)?