使用 redux-observable 并订阅 websocket
Using redux-observable and subscribing to a websocket
试图弄清楚如何让我的史诗继续下去,它将订阅一个 websocket,然后在从 websocket 发出的事件滚入时调度一些动作。
我看到的示例使用的是多路复用,实际上并没有在 websocket 上调用订阅,我对更改它有点困惑。
我是这样开始的。但我相信 redux observable 需要
const socket$ = Observable.webSocket<DataEvent>(
"ws://thewebsocketurl"
);
const bankStreamEpic = (action$, store) =>
action$.ofType(START_BANK_STREAM).mergeMap(action => {
console.log("in epic mergeMap");
socket$
.subscribe(
e => {
console.log("dispatch event " + e);
distributeEvent(e);
},
e => {
logger.log("AmbassadorsDataService", "Unclean socket closure");
},
() => {
logger.log("AmbassadorsDataService", "Socket connection closed");
}
)
});
function distributeEvent(event: DataEvent) : void {
//this.logger.log('AmbassadorsDataService', 'Event Received: ' + event.command + ' and id: ' + event.id);
if(event.source === '/ambassadors/bank') {
if( event.command === 'REMOVE') {
removeDataEvent(event);
}
else if(event.command == 'ADD') {
loadDataEvent(event);
}
}
}
它抛出一个错误:
Uncaught TypeError:您提供了 'undefined' 预期流的位置。您可以提供 Observable、Promise、Array 或 Iterable。
如有任何帮助,我们将不胜感激!
谢谢
在 redux-observable 中,你几乎永远不会(除非你知道我为什么说 "almost")自己调用 subscribe
。相反,Observables 是链式的,中间件和其他操作符将为您处理订阅。
如果您只想为收到的每个事件发送一个动作,那很简单:
const socket$ = Observable.webSocket<DataEvent>(
"ws://thewebsocketurl"
);
const bankStreamEpic = (action$, store) =>
action$.ofType('START_BANK_STREAM')
.mergeMap(action =>
socket$
.map(payload => ({
type: 'BANK_STREAM_MESSAGE',
payload
}))
);
您可能(或可能不需要)根据从套接字接收到的消息内容进行更多自定义,但实际上您可能会更好地服务于 that减速器中的其他逻辑,因为它可能与副作用无关。
您可能需要一种停止流的方法,这只是一个 takeUntil
:
const socket$ = Observable.webSocket<DataEvent>(
"ws://thewebsocketurl"
);
const bankStreamEpic = (action$, store) =>
action$.ofType('START_BANK_STREAM')
.mergeMap(action =>
socket$
.map(payload => ({
type: 'BANK_STREAM_MESSAGE',
payload
}))
.takeUntil(
action$.ofType('STOP_BANK_STREAM')
)
);
我用了 mergeMap
因为你用了,但在这种情况下我认为 switchMap
更合适,因为每个都有多个这样的东西似乎是多余的,除非你需要有多个而你的问题只是省略了每一个的独特之处。
试图弄清楚如何让我的史诗继续下去,它将订阅一个 websocket,然后在从 websocket 发出的事件滚入时调度一些动作。
我看到的示例使用的是多路复用,实际上并没有在 websocket 上调用订阅,我对更改它有点困惑。
我是这样开始的。但我相信 redux observable 需要
const socket$ = Observable.webSocket<DataEvent>(
"ws://thewebsocketurl"
);
const bankStreamEpic = (action$, store) =>
action$.ofType(START_BANK_STREAM).mergeMap(action => {
console.log("in epic mergeMap");
socket$
.subscribe(
e => {
console.log("dispatch event " + e);
distributeEvent(e);
},
e => {
logger.log("AmbassadorsDataService", "Unclean socket closure");
},
() => {
logger.log("AmbassadorsDataService", "Socket connection closed");
}
)
});
function distributeEvent(event: DataEvent) : void {
//this.logger.log('AmbassadorsDataService', 'Event Received: ' + event.command + ' and id: ' + event.id);
if(event.source === '/ambassadors/bank') {
if( event.command === 'REMOVE') {
removeDataEvent(event);
}
else if(event.command == 'ADD') {
loadDataEvent(event);
}
}
}
它抛出一个错误: Uncaught TypeError:您提供了 'undefined' 预期流的位置。您可以提供 Observable、Promise、Array 或 Iterable。
如有任何帮助,我们将不胜感激!
谢谢
在 redux-observable 中,你几乎永远不会(除非你知道我为什么说 "almost")自己调用 subscribe
。相反,Observables 是链式的,中间件和其他操作符将为您处理订阅。
如果您只想为收到的每个事件发送一个动作,那很简单:
const socket$ = Observable.webSocket<DataEvent>(
"ws://thewebsocketurl"
);
const bankStreamEpic = (action$, store) =>
action$.ofType('START_BANK_STREAM')
.mergeMap(action =>
socket$
.map(payload => ({
type: 'BANK_STREAM_MESSAGE',
payload
}))
);
您可能(或可能不需要)根据从套接字接收到的消息内容进行更多自定义,但实际上您可能会更好地服务于 that减速器中的其他逻辑,因为它可能与副作用无关。
您可能需要一种停止流的方法,这只是一个 takeUntil
:
const socket$ = Observable.webSocket<DataEvent>(
"ws://thewebsocketurl"
);
const bankStreamEpic = (action$, store) =>
action$.ofType('START_BANK_STREAM')
.mergeMap(action =>
socket$
.map(payload => ({
type: 'BANK_STREAM_MESSAGE',
payload
}))
.takeUntil(
action$.ofType('STOP_BANK_STREAM')
)
);
我用了 mergeMap
因为你用了,但在这种情况下我认为 switchMap
更合适,因为每个都有多个这样的东西似乎是多余的,除非你需要有多个而你的问题只是省略了每一个的独特之处。