redux-saga、websockets 和动作队列
redux-saga, websockets and actions queue
我遇到以下问题:服务器通过 websocket 向客户端发送消息。在客户端,我需要向用户显示此消息。但问题是有时消息来得很快,我需要组织某种队列并一个接一个地显示这些消息。
我的传奇:
import { eventChannel, effects, takeEvery } from 'redux-saga';
import { types, actionCreators } from './actions';
const { call, put, take, race } = effects;
function watchMessages(socket) {
return eventChannel((emitter) => {
socket.onopen = (e) => (emitter(actionCreators.socketOpen(e)));
socket.onclose = (e) => (emitter(actionCreators.socketClose(e)));
socket.onerror = (e) => (emitter(actionCreators.socketError(e)));
socket.onmessage = (e) => (emitter(actionCreators.socketMessage(e)));
return () => {
socket.close();
};
});
}
function* internalListener(socket) {
while (true) {
const data = yield take(types.SOCKET_SEND);
socket.send(data.payload);
}
}
function* externalListener(socketChannel) {
while (true) {
const action = yield take(socketChannel);
yield put(action);
}
}
function* wsHandling(action) {
const socket = action.payload.socket;
while (true) {
const socketChannel = yield call(watchMessages, socket);
const { cancel } = yield race({
task: [call(externalListener, socketChannel), call(internalListener, socket)],
cancel: take(types.SOCKET_CLOSE),
});
if (cancel) {
socketChannel.close();
}
}
}
export default function* rootSaga(action) {
yield takeEvery(types.SOCKET_CONNECT, wsHandling);
}
我的减速器:
function dataReducer(state = initialStateData, action) {
switch (action.type) {
case types.SOCKET_MESSAGE:
if (action.payload.channel === 'channel1') {
return state
.set('apichannel1', action.payload);
} else if (action.payload.channel === 'channel2') {
return state
.set('apichannel2', action.payload);
} else if (action.payload.channel === 'channel3') {
return state
.set('apichannel3', action.payload);
}
return state;
default:
return state;
}
}
现在,当新消息到达时,我正在更改状态并将其显示在屏幕上。
有什么想法可以将其变成以下内容:将到达的消息放入某种队列,并在自定义时间内在屏幕上一条一条地显示它们?
您可以使用 actionChannel
效果在 redux saga 中缓冲(排队)操作。
https://redux-saga.js.org/docs/advanced/Channels.html#using-the-actionchannel-effect
然后您可以按照自己的速度从通道的缓冲区中读取。
我创建了一个简化的示例,用于侦听消息并一次显示最多 3 条消息,每条消息显示 5 秒。参见:
https://codesandbox.io/s/wt8uu?file=/src/index.js
底部有一个控制台面板,用它来调用addMsg('Msg: ' + Math.random())
模拟新的socket.io消息。
所以我这样做了,这是代码,也许对某人有用
let pendingTasks = [];
let activeTasks = [];
function watchMessages(socket) {
return eventChannel((emitter) => {
socket.onopen = (e) => (emitter(actionCreators.socketOpen(e)));
socket.onclose = (e) => (emitter(actionCreators.socketClose(e)));
socket.onerror = (e) => (emitter(actionCreators.socketError(e)));
socket.onmessage = (e) => (emitter(actionCreators.socketMessage(e)));
return () => {
socket.close();
};
});
}
function* internalListener(socket) {
while (true) {
const data = yield take(types.SOCKET_SEND);
socket.send(data.payload);
}
}
function* externalListener(socketChannel) {
while (true) {
const action = yield take(socketChannel);
pendingTasks = [...pendingTasks, action];
}
}
function* wsHandling(action) {
const socket = action.payload.socket;
while (true) {
const socketChannel = yield call(watchMessages, socket);
const { cancel } = yield race({
task: [call(externalListener, socketChannel), call(internalListener, socket)],
cancel: take(types.SOCKET_CLOSE),
});
if (cancel) {
socketChannel.close();
}
}
}
function* tasksScheduler() {
while (true) {
const canDisplayTask = activeTasks.length < 1 && pendingTasks.length > 0;
if (canDisplayTask) {
const [firstTask, ...remainingTasks] = pendingTasks;
pendingTasks = remainingTasks;
yield fork(displayTask, firstTask);
yield call(delay, 300);
}
else {
yield call(delay, 50);
}
}
}
function* displayTask(task) {
activeTasks = [...activeTasks, task];
yield put(task);
yield call(delay, 3000);
activeTasks = _.without(activeTasks, task);
}
export default function* rootSaga(action) {
yield [
takeEvery(types.SOCKET_CONNECT, wsHandling),
takeEvery(types.SOCKET_CONNECT, tasksScheduler),
];
}
我遇到以下问题:服务器通过 websocket 向客户端发送消息。在客户端,我需要向用户显示此消息。但问题是有时消息来得很快,我需要组织某种队列并一个接一个地显示这些消息。
我的传奇:
import { eventChannel, effects, takeEvery } from 'redux-saga';
import { types, actionCreators } from './actions';
const { call, put, take, race } = effects;
function watchMessages(socket) {
return eventChannel((emitter) => {
socket.onopen = (e) => (emitter(actionCreators.socketOpen(e)));
socket.onclose = (e) => (emitter(actionCreators.socketClose(e)));
socket.onerror = (e) => (emitter(actionCreators.socketError(e)));
socket.onmessage = (e) => (emitter(actionCreators.socketMessage(e)));
return () => {
socket.close();
};
});
}
function* internalListener(socket) {
while (true) {
const data = yield take(types.SOCKET_SEND);
socket.send(data.payload);
}
}
function* externalListener(socketChannel) {
while (true) {
const action = yield take(socketChannel);
yield put(action);
}
}
function* wsHandling(action) {
const socket = action.payload.socket;
while (true) {
const socketChannel = yield call(watchMessages, socket);
const { cancel } = yield race({
task: [call(externalListener, socketChannel), call(internalListener, socket)],
cancel: take(types.SOCKET_CLOSE),
});
if (cancel) {
socketChannel.close();
}
}
}
export default function* rootSaga(action) {
yield takeEvery(types.SOCKET_CONNECT, wsHandling);
}
我的减速器:
function dataReducer(state = initialStateData, action) {
switch (action.type) {
case types.SOCKET_MESSAGE:
if (action.payload.channel === 'channel1') {
return state
.set('apichannel1', action.payload);
} else if (action.payload.channel === 'channel2') {
return state
.set('apichannel2', action.payload);
} else if (action.payload.channel === 'channel3') {
return state
.set('apichannel3', action.payload);
}
return state;
default:
return state;
}
}
现在,当新消息到达时,我正在更改状态并将其显示在屏幕上。
有什么想法可以将其变成以下内容:将到达的消息放入某种队列,并在自定义时间内在屏幕上一条一条地显示它们?
您可以使用 actionChannel
效果在 redux saga 中缓冲(排队)操作。
https://redux-saga.js.org/docs/advanced/Channels.html#using-the-actionchannel-effect
然后您可以按照自己的速度从通道的缓冲区中读取。
我创建了一个简化的示例,用于侦听消息并一次显示最多 3 条消息,每条消息显示 5 秒。参见:
https://codesandbox.io/s/wt8uu?file=/src/index.js
底部有一个控制台面板,用它来调用addMsg('Msg: ' + Math.random())
模拟新的socket.io消息。
所以我这样做了,这是代码,也许对某人有用
let pendingTasks = [];
let activeTasks = [];
function watchMessages(socket) {
return eventChannel((emitter) => {
socket.onopen = (e) => (emitter(actionCreators.socketOpen(e)));
socket.onclose = (e) => (emitter(actionCreators.socketClose(e)));
socket.onerror = (e) => (emitter(actionCreators.socketError(e)));
socket.onmessage = (e) => (emitter(actionCreators.socketMessage(e)));
return () => {
socket.close();
};
});
}
function* internalListener(socket) {
while (true) {
const data = yield take(types.SOCKET_SEND);
socket.send(data.payload);
}
}
function* externalListener(socketChannel) {
while (true) {
const action = yield take(socketChannel);
pendingTasks = [...pendingTasks, action];
}
}
function* wsHandling(action) {
const socket = action.payload.socket;
while (true) {
const socketChannel = yield call(watchMessages, socket);
const { cancel } = yield race({
task: [call(externalListener, socketChannel), call(internalListener, socket)],
cancel: take(types.SOCKET_CLOSE),
});
if (cancel) {
socketChannel.close();
}
}
}
function* tasksScheduler() {
while (true) {
const canDisplayTask = activeTasks.length < 1 && pendingTasks.length > 0;
if (canDisplayTask) {
const [firstTask, ...remainingTasks] = pendingTasks;
pendingTasks = remainingTasks;
yield fork(displayTask, firstTask);
yield call(delay, 300);
}
else {
yield call(delay, 50);
}
}
}
function* displayTask(task) {
activeTasks = [...activeTasks, task];
yield put(task);
yield call(delay, 3000);
activeTasks = _.without(activeTasks, task);
}
export default function* rootSaga(action) {
yield [
takeEvery(types.SOCKET_CONNECT, wsHandling),
takeEvery(types.SOCKET_CONNECT, tasksScheduler),
];
}