Redux Observable - 如何发送一个动作以开始一个单独的史诗,然后等待 epics 响应(或超时)

Redux Observable - How to send off an action to start a separate epic, then wait for that epics response (or timeout)

所以我基本上有一个 ,这允许我通过 WEBSOCKET_MESSAGE_SEND 发送通用消息并通过 WEBSOCKET_MESSAGE_RECEIVED 操作接收它们。

然而,在某些情况下,我想以类似于 Ajax REST 调用的方式发出请求。例如,为我可能想要一个史诗的用户请求文档列表:

  1. 收到一个动作,例如({ type: GET_DOCUMENTS })
  2. 生成一个随机密钥来跟踪当前请求,我们将其称为'request_id'
  3. 发送 ({ type: WEBSOCKET_MESSAGE_SEND, request_id }) 操作。
  4. 等待其中一个
    1. 一个动作({ type: WEBSOCKET_MESSAGE_RECEIVED, request_id, message }) **必须有一个匹配的'request_id'否则它应该被忽略。
      • -> 发出一个动作,例如 ({ type: GET_DOCUMENTS_SUCCESS, documents: message })
    2. 超时,例如 10 秒
      • -> 发出一个动作,例如 ({ type: GET_DOCUMENTS_TIMEOUT })

我一直在努力将其放入代码中,我认为整个史诗中最尴尬的部分是我想在我的史诗中间发出一个动作并等待。这对我来说不太合适... ani-pattern?但是我真的不确定我应该怎么做。

没错。没有什么好的方法可以在史诗的中间发出一个动作。将史诗一分为二如何?

const getDocumentsEpic = action$ =>
    action$.pipe(
        ofType("GET_DOCUMENTS"),
        map(() => {
          const requestId = generateRequestId();
          return {
            type: "WEBSOCKET_MESSAGE_SEND",
            requestId
          };
        })
    );

const websocketMessageEpic = action$ =>
    action$.pipe(
        ofType("WEBSOCKET_MESSAGE_SEND"),
        switchMap(requestId => {
          return action$.pipe(
              ofType("WEBSOCKET_MESSAGE_RECEIVED"),
              filter(action => action.requestId === requestId),
              timeout(10000),
              map(({ message }) => ({
                type: "GET_DOCUMENTS_SUCCESS",
                documents: message
              })),
              catchError(() => of({ type: "GET_DOCUMENTS_TIMEOUT" }))
          );
        })
    );

更新答案 (2020-04-17):

我对原来的答案不满意,所以决定再试一次。

NotificationOperators.js

import { of } from 'rxjs';
import { map, switchMap, filter, timeout, catchError, first, mergeMap } from 'rxjs/operators';
import { notificationActionTypes } from '../actions';

const NOTIFICATION_TIMEOUT = 60 * 1000;

const generateRequestId = () => Math.random().toString(16).slice(2);

const toNotificationRequest = notificationRequest => input$ =>
    input$.pipe(mergeMap(async action => ({
        type: notificationActionTypes.WEBSOCKET_MESSAGE_SEND,
        message: {
            request_id: generateRequestId(),
            ...(
                typeof notificationRequest === "function" ?
                    await Promise.resolve(notificationRequest(action)) :
                    ({ eventType: notificationRequest })
            )
        }
    })));

const mapNotificationRequestResponses = (notificationRequest, mapper) => $input =>
    $input.pipe(
        filter(action =>
            action.type === notificationActionTypes.WEBSOCKET_MESSAGE_SEND &&
            action.message.eventType === notificationRequest),
        concatMap(sendAction =>
            $input.pipe(
                filter(receiveAction => {
                    return (
                        receiveAction.type === notificationActionTypes.WEBSOCKET_MESSAGE_RECEIVED &&
                        receiveAction.message.request_id === sendAction.message.request_id
                    )
                }),
                first(),
                timeout(NOTIFICATION_TIMEOUT),
                map(({ message }) => mapper(message.success ? false : message.error, message.result, sendAction.message)),
                catchError(errorMessage => of(mapper(errorMessage && errorMessage.message, null, sendAction.message))))));

export { toNotificationRequest, mapNotificationRequestResponses };

用法:

export const getDocumentsReqEpic = action$ => action$.pipe(
    ofType(documentActionTypes.REFRESH_DOCUMENTS_REQUEST),
    toNotificationRequest(EventTypes.get_user_documents_req)
);

export const getDocumentsRecEpic = action$ => action$.pipe(
    mapNotificationRequestResponses(
        EventTypes.get_user_documents_req,
        (error, result) => error ? refreshDocumentsError(error) : refreshDocumentsSuccess(result))
);

原回答:

我觉得我可能需要多次重复这个过程,这似乎是一个合理数量的重复样板,我应该创建一个方法来根据需要生成 epics。出于这个原因,我扩展了@sneas 很棒的答案,并在下面发布,以防它帮助其他人。

请注意,此实现假定来自 的 websocket 实现。它还假定服务器 websocket 实现将接受 'request_id' 并使用相同的 'request_id' 进行响应,以便可以链接请求和响应消息。可能还值得注意的是 'epicLinkId' 只是 client-side,并且只是使正在创建的 2 epics 能够相互链接,否则您将只能调用 createNotifyReqResEpics()一次。

createNotifyReqResEpics.js(基于上述代码的助手)

import { ofType } from 'redux-observable';
import { of } from 'rxjs';
import { map, switchMap, filter, timeout, catchError, first } from 'rxjs/operators';
import { notificationActionTypes } from '../actions';

const generateRequestId = () => Math.random().toString(16).slice(2);

export default ({
    requestFilter,
    requestMessageMapper,
    responseMessageMapper
}) => {

    if (typeof requestFilter !== "function")
        throw new Error("Invalid function passed into createNotifyReqResEpics 'requestFilter' argument.");
    if (typeof requestMessageMapper !== "function")
        throw new Error("Invalid function passed into createNotifyReqResEpics 'requestMessageMapper' argument.");
    if (typeof responseMessageMapper !== "function")
        throw new Error("Invalid function passed into createNotifyReqResEpics 'responseMessageMapper' argument.");

    const epicLinkId = generateRequestId();

    const websocketSendEpic = action$ =>
        action$.pipe(
            filter(requestFilter),
            map(action => ({
                epic_link_id: epicLinkId,
                type: notificationActionTypes.WEBSOCKET_MESSAGE_SEND,
                message: {
                    request_id: generateRequestId(),
                    ...requestMessageMapper(action)
                }
            }))
        );

    const websocketReceiveEpic = action$ =>
        action$.pipe(
            ofType(notificationActionTypes.WEBSOCKET_MESSAGE_SEND),
            filter(action => action.epic_link_id === epicLinkId),
            switchMap(sendAction =>
                action$.pipe(
                    ofType(notificationActionTypes.WEBSOCKET_MESSAGE_RECEIVED),
                    filter(receiveAction => receiveAction.request_id === sendAction.request_id),
                    first(),
                    timeout(10000),
                    map(receiveAction => responseMessageMapper(false, receiveAction.message)),
                    catchError(errorMessage => of(responseMessageMapper(errorMessage && errorMessage.message, null))))));

    return [websocketSendEpic, websocketReceiveEpic];
};

documents.js (epics)

import EventTypes from '../shared-dependencies/EventTypes';
import { documentActionTypes, refreshDocumentsError, refreshDocumentsSuccess } from '../actions';
import { createNotifyReqResEpics } from '../utils';

const [getDocumentsReqEpic, getDocumentsRespEpic] = createNotifyReqResEpics({
    requestFilter: action => action.type === documentActionTypes.REFRESH_DOCUMENTS_REQUEST,
    requestMessageMapper: action => ({ eventType: EventTypes.get_user_documents_req }),
    responseMessageMapper: (error, action) => error ? refreshDocumentsError(error) : refreshDocumentsSuccess(action.result)
});

export { getDocumentsReqEpic, getDocumentsRespEpic };

其中 2 从 documents.js 导出 epics 进入 combineEpics。