React 组件不会重新渲染(Redux Observable + rxjs + rx-http-request)

React component does not re-render reactively (Redux Observable + rxjs + rx-http-requst)

我有一个 API 端点,它 returns 一个 'application/stream+json' 类型响应中的用户列表。这些项目由换行符分隔。 示例数据可见here.

组件

class UserList extends Component {
    componentDidMount() {
        const { fetchUsers } = this.props;
        fetchUsers();
    }

    render() {
        const { isFetching = false, users = [] } = this.props;
        if (isFetching) {
            return <Loader message="Users are loading..." />;
        }
        if (!users || users.length === 0) {
            return 'No users found.';
        }
        const children = users
            .map(user => <UserListItem key={user.id} user={user} />);
        return (
            <div className="UserList">
                <Paper>
                    <List>
                        <Subheader>Users</Subheader>
                        {children}
                    </List>
                </Paper>
            </div>
        );
    }
}

UserList.propTypes = {
    users: PropTypes.arrayOf(PropTypes.any),
    isFetching: PropTypes.bool.isRequired,
    fetchUsers: PropTypes.func.isRequired,
};

UserList.defaultProps = {
    users: [],
};

function mapStateToProps(state) {
    const { users, isFetching } = state.users;
    return {
        users,
        isFetching,
    };
}

function mapDispatchToProps(dispatch) {
    return {
        fetchUsers: bindActionCreators(actions.fetchUsers, dispatch),
    };
}

export default connect(mapStateToProps, mapDispatchToProps)(UserList);

减速器

const initialState = {
    users: [],
    isFetching: false,
};

function fetchUsers(state) {
    return {
        ...state,
        isFetching: true,
    };
}

function fetchUsersItemReceived(state, action) {
    const { user } = action;
    return {
        ...state,
        users: [...state.users, user],
        isFetching: false,
    };
}

export default function (state = initialState, action) {
    switch (action.type) {
    case actionTypes.FETCH_USERS_REQUEST:
        return fetchUsers(state);
    case actionTypes.FETCH_USERS_ITEM_RECEIVED:
        return fetchUsersItemReceived(state, action);
    default:
        return state;
    }
}

Action(解析器是 Streaming JSON Parser found here

export function fetchUsers() {
    return {
        type: actionTypes.FETCH_USERS_REQUEST,
    };
}

function fetchUsersItemReceived(user) {
    return {
        type: actionTypes.FETCH_USERS_ITEM_RECEIVED,
        user,
    };
}


function fetchUsersSuccess() {
    return {
        type: actionTypes.FETCH_USERS_SUCCESS,
    };
}

function fetchUsersFailure(error) {
    return {
        type: actionTypes.FETCH_USERS_FAILURE,
        error,
    };
}

function getJsonStream(url) {
    const emitter = new Subject();
    const req$ = RxHR
        .get(url)
        .flatMap(resp => resp.body)
        .subscribe(
            (data) => {
                parser.write(data);
                parser.onValue = (value) => {
                    if (!parser.key) {
                        emitter.next(value);
                    }
                };
            },
            err => emitter.error(err),
            () => emitter.complete(),
        );
    return emitter;
}

export const fetchUsersEpic = action$ =>
    action$.ofType(actionTypes.FETCH_USERS_REQUEST)
        .concatMap(() => getJsonStream(`${api.API_BASE_URL}/user`))
        .map(user => fetchUsersItemReceived(user));

configureStore.js

const logger = createLogger();

const epicMiddleware = createEpicMiddleware(rootEpic);
const createStoreWithMiddleware = applyMiddleware(epicMiddleware, logger)(createStore);

export default function configureStore(initialState) {
    return createStoreWithMiddleware(rootReducer, initialState);
}

虽然列表组件应在收到每个项目后刷新,但它会在收到整个列表后刷新。有人可以指出代码中的阻塞点吗?

这不是针对您的特定问题的解决方案(除非是意外大声笑),但我认为自定义 Observable 比 Subject 更适合这种情况。您也可以挂接到解析器的错误回调。

认为其他人稍后使用 rxjs 搜索流 JSON 可能会发现这很方便(未经测试)

function streamingJsonParse(data$) {
  return new Observable((observer) => {
    const parser = new Parser();

    parser.onError = (err) => observer.error(err);
    parser.onValue = (value) => {
      if (!parser.key) {
        observer.next(value);
      }
    };

    // return the subscription so it's correctly
    // unsubscribed for us
    return data$
      .subscribe({
        next: (data) => parser.write(data),
        error: (e) => observer.error(e),
        complete: () => observer.complete()
      });
  });
}

function getJsonStream(url) {
  return RxHR
    .get(url)
    .mergeMap(resp => streamingJsonParse(resp.body));
}

如果你有机会把 jsbin 放在一起,请告诉我!

原来问题出在 jsonParse 库上。切换到 oboe.js 固定它。使用 ”!”节点 select 或 select 多个根 JSON 元素我能够将字符流转换为用户对象流。

动作

function getJsonStream(url) {
    const emitter = new Subject();
    const emitter = new Subject();
    oboe(url)
        .node('!', (item) => {
            emitter.next(item);
        })
        .fail((error) => {
            emitter.error(error);
        });
    return emitter;
}

export const fetchUsersEpic = action$ =>
    action$.ofType(actionTypes.FETCH_USERS_REQUEST)
        .switchMap(() => getJsonStream(`${api.API_BASE_URL}/user`))
        .map(user => fetchUsersItemReceived(user));