`.pipe()` 没有执行 `debounceTime`

`.pipe()` not executing `debounceTime`

我正在尝试 debounce() 一个 Observablepipe() 并链接 .subscribe() 但由于某种原因,订阅中的函数仍然被调用了十几个一气呵成。

我想做的是通过管道传输 withChangesForTables 并消除同步调用的抖动,因为我希望仅在进行了整批更改后才调用它。所以我为同步创建了一个提供程序并将其包装在我的 RootNavigator

周围

withChangesForTables on WatermelonDB source code

const SyncContext = createContext();
function useSync() {
    return useContext(SyncContext);
}

function SyncProvider({children}) {
    const [isSyncing, setIsSyncing] = useState(false);
    const [hasUnsynced, setHasUnsynced] = useState(false);

    async function checkUnsyncedChanges() {
        const hasChanges = await hasUnsyncedChanges({
            database
        });
        setHasUnsynced(hasChanges);
    }
    async function sync() {
        await checkUnsyncedChanges();
        if (!isSyncing && hasUnsynced) {
            setIsSyncing(true);
            await synchronizeWithServer();
            setIsSyncing(false);
        }
    }

    
    database.withChangesForTables([
        'table_name',
        'table_name2'
    ]).pipe(
        skip(1),
        // ignore records simply becoming `synced`
        filter(changes => !changes.every(change => change.record.syncStatus === 'synced')),
        // debounce to avoid syncing in the middle of related actions - I put 100000 to test only
        debounceTime(100000),
    ).subscribe({
        //calls API endpoint to sync local DB with server
        next: () => sync(), 
        error: e => console.log(e)
    });

    const value = {
        isSyncing,
        hasUnsynced,
        checkUnsyncedChanges,
        sync
    };

    return (
        <SyncContext.Provider value={value}>
            {children}
        </SyncContext.Provider>
    );
}

我不得不将 withChangesForTables 移动到 useEffect 并返回它以取消订阅,这似乎已经解决了问题。代码现在看起来像这样:

useEffect(() => {
    return database.withChangesForTables([
        'table_name',
        'table_name2'
    ]).pipe(
        skip(1),
        filter(changes => !changes.every(change => change.record.syncStatus === 'synced')),
        debounceTime(500),
    ).subscribe({
        next: () => sync(), 
        error: e => console.log(e)
    });
}, [])