Redux Observables:具有多个事件的自定义观察者?
Redux Observables: Custom observer with multiple events?
您好,我正在尝试将 redux-observables 与 react native 和一个名为 Phoenix 的 websocket 包装器一起使用,它基本上允许您在通过 websocket 接收到某些消息时执行回调。
这是我要执行的基本设置:
import 'rxjs';
import { Observable } from 'rxjs';
import { Socket, Channel } from 'phoenix';
import * as channelActions from '../ducks/channel';
export default function connectSocket(action$, store) {
return action$.ofType(channelActions.SETUP)
.mergeMap(action => {
return new Observable(observer => {
const socket = new Socket('http://localhost:4000/socket');
const userId = store.getState().user.id;
const channel = socket.channel(`user:${userId}`);
channel
.join()
.receive('ok', response => observer.next({ type: 'join', payload: 'something' }))
.receive('error', reason => observer.next({ type: 'error', payload: 'reason' }))
channel.on('rooms:add', room => observer.next({ type: 'rooms:add', payload: '123' }))
channel.on('something:else', room => observer.next({ type: 'something:else', payload: '123' }))
});
})
.map(action => {
switch (action.type) {
case 'join':
return channelActions.join(action.payload);
case 'error':
return channelActions.error(action.payload);
case 'rooms:add':
return channelActions.add(action.payload);
case 'something:else':
return channelActions.something(action.payload);
}
});
};
如您所见,此方法存在几个问题:
一个观察者正在触发不同的事件。除了 .map()
函数
中的 switch 语句外,我不知道如何将它们拆分
我不知道在哪里调用 observer.complete()
因为我希望它继续监听所有这些事件,而不仅仅是一次。
如果我可以将 channel
常量提取到一个可供多个 epics 使用的单独文件中,这将解决这些问题。但是,channel
依赖于socket
,依赖于user
,来自于redux状态
所以我对如何解决这个问题感到很困惑。我认为提取全局 channel
object 的能力可以修复它,但这也取决于来自 redux 状态的用户 ID。感谢您的帮助。
P.S。如果值得的话,我的用例与这个人非常相似 (https://github.com/MichalZalecki/connect-rxjs-to-react/issues/1)。一位响应者建议使用 Rx.Subject
但我不知道从哪里开始...
看起来你让你的生活变得更加困难。您已经映射了事件,那么为什么要组合和重新映射它们呢?将每个事件映射到它自己的流中并将结果合并在一起。
// A little helper function to map your nonstandard receive
// function into its own stream
const receivePattern = (channel, signal, selector) =>
Observable.fromEventPattern(
h => channel.receive(signal, h),
h => {/*However you unsubscribe*/},
selector
)
export default function connectSocket(action$, store) {
return action$.ofType(channelActions.SETUP)
// Observable.create is usually a crutch pattern
// Use defer or using here instead as it is more semantic and handles
// most of the stream lifecycle for you
.mergeMap(action => Observable.defer(() => {
const socket = new Socket('http://localhost:4000/socket');
const userId = store.getState().user.id;
const channel = socket.channel(`user:${userId}`);
const joinedChannel = channel.join();
// Convert the ok message into a channel join
const ok = receivePattern(joinedChannel, 'ok')
// Just an example of doing some arbitrary mapping since you seem to be doing it
// in your example
.mapTo('something')
.map(channelActions.join);
// Convert the error message
const error = receivePattern(joinedChannel, 'error')
.map(channelActions.error);
// Since the listener methods follow a standard event emitter interface
// You can use fromEvent to capture them
// Rather than the more verbose fromEventPattern we used above
const addRooms = Observable.fromEvent(channel, 'rooms:add')
.map(channelActions.add);
const somethingElse = Observable.fromEvent(channel, 'something:else')
.map(channelActions.somethingElse);
// Merge the resulting stream into one
return Observable.merge(ok, error, addRooms, somethingElse);
});
);
};
您好,我正在尝试将 redux-observables 与 react native 和一个名为 Phoenix 的 websocket 包装器一起使用,它基本上允许您在通过 websocket 接收到某些消息时执行回调。
这是我要执行的基本设置:
import 'rxjs';
import { Observable } from 'rxjs';
import { Socket, Channel } from 'phoenix';
import * as channelActions from '../ducks/channel';
export default function connectSocket(action$, store) {
return action$.ofType(channelActions.SETUP)
.mergeMap(action => {
return new Observable(observer => {
const socket = new Socket('http://localhost:4000/socket');
const userId = store.getState().user.id;
const channel = socket.channel(`user:${userId}`);
channel
.join()
.receive('ok', response => observer.next({ type: 'join', payload: 'something' }))
.receive('error', reason => observer.next({ type: 'error', payload: 'reason' }))
channel.on('rooms:add', room => observer.next({ type: 'rooms:add', payload: '123' }))
channel.on('something:else', room => observer.next({ type: 'something:else', payload: '123' }))
});
})
.map(action => {
switch (action.type) {
case 'join':
return channelActions.join(action.payload);
case 'error':
return channelActions.error(action.payload);
case 'rooms:add':
return channelActions.add(action.payload);
case 'something:else':
return channelActions.something(action.payload);
}
});
};
如您所见,此方法存在几个问题:
一个观察者正在触发不同的事件。除了
.map()
函数 中的 switch 语句外,我不知道如何将它们拆分
我不知道在哪里调用
observer.complete()
因为我希望它继续监听所有这些事件,而不仅仅是一次。如果我可以将
channel
常量提取到一个可供多个 epics 使用的单独文件中,这将解决这些问题。但是,channel
依赖于socket
,依赖于user
,来自于redux状态
所以我对如何解决这个问题感到很困惑。我认为提取全局 channel
object 的能力可以修复它,但这也取决于来自 redux 状态的用户 ID。感谢您的帮助。
P.S。如果值得的话,我的用例与这个人非常相似 (https://github.com/MichalZalecki/connect-rxjs-to-react/issues/1)。一位响应者建议使用 Rx.Subject
但我不知道从哪里开始...
看起来你让你的生活变得更加困难。您已经映射了事件,那么为什么要组合和重新映射它们呢?将每个事件映射到它自己的流中并将结果合并在一起。
// A little helper function to map your nonstandard receive
// function into its own stream
const receivePattern = (channel, signal, selector) =>
Observable.fromEventPattern(
h => channel.receive(signal, h),
h => {/*However you unsubscribe*/},
selector
)
export default function connectSocket(action$, store) {
return action$.ofType(channelActions.SETUP)
// Observable.create is usually a crutch pattern
// Use defer or using here instead as it is more semantic and handles
// most of the stream lifecycle for you
.mergeMap(action => Observable.defer(() => {
const socket = new Socket('http://localhost:4000/socket');
const userId = store.getState().user.id;
const channel = socket.channel(`user:${userId}`);
const joinedChannel = channel.join();
// Convert the ok message into a channel join
const ok = receivePattern(joinedChannel, 'ok')
// Just an example of doing some arbitrary mapping since you seem to be doing it
// in your example
.mapTo('something')
.map(channelActions.join);
// Convert the error message
const error = receivePattern(joinedChannel, 'error')
.map(channelActions.error);
// Since the listener methods follow a standard event emitter interface
// You can use fromEvent to capture them
// Rather than the more verbose fromEventPattern we used above
const addRooms = Observable.fromEvent(channel, 'rooms:add')
.map(channelActions.add);
const somethingElse = Observable.fromEvent(channel, 'something:else')
.map(channelActions.somethingElse);
// Merge the resulting stream into one
return Observable.merge(ok, error, addRooms, somethingElse);
});
);
};