从服务 [kotlin coroutines] 异步获取下一批的迭代器

Iterator which asynchronously fetches the next batch from a service [kotlin coroutines]

我已经实现了一个同步迭代器,它从外部服务中获取批量数据(可能需要一些时间或挂起)。

我想在调用者代码循环访问当前批次序列的条目时异步获取下一批次。

理想情况下,我想使用 kotlin 协程

所需进程:

  1. 迭代器获取第一批
  2. 迭代器开始在后台获取第二批
  3. 同时,调用方处理第一批
  4. 处理完第一批后,调用者可以立即处理迭代器在后台获取的第二批
  5. 迭代器开始在后台获取第 3 个批次
  6. 处理完第二批后,调用者可以立即处理迭代器在后台获取的第三批
  7. 迭代器开始在后台获取第 4 个批次
  8. 等等

我的同步实现:

    fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
        return object : Iterator<LogPayloadType>, AutoCloseable {
            val logging: Logging = options.service
            var currentPage: Page<LogEntry> = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...))
            var i = 0
            var batch: List<LogEntry> = currentPage.values.toList()
            var isClosed = false

            override fun close() {
                logging.close()
                isClosed = true
            }

            override fun hasNext(): Boolean {
                if (isClosed) 
                    return false
                val hasNext = i < batch.size || currentPage.hasNextPage()
                if (!hasNext) {
                    logging.close()
                }
                return hasNext
            }

            override fun next(): LogPayloadType {
                if (!hasNext()) {
                    throw NoSuchElementException()
                }
                if (i == batch.size - 1 && currentPage.hasNextPage()) {
                    currentPage = currentPage.nextPage
                    batch = currentPage.values.toList()
                    i = 0
                }
                val logEntry = batch[i++]
                return logEntry.getPayload()
            }
        }.asSequence()
    }

Ofc 我对完全不同的解决方案持开放态度,只要它们可以包装在 kotlin 的序列中即可。

编辑

这是一个使用 thread { } 的异步实现。我无法通过协同程序实现这一点

    fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
        return object : Iterator<LogPayloadType> {
            private var currentPage: Page<LogEntry> = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
            private var i = 0
            private var batch: List<LogEntry> = currentPage.values.toList()
            private var nextPage: Page<LogEntry>? = null
            private var job: Thread? = null

            override fun hasNext() = i < batch.size || currentPage.hasNextPage()

            override fun next(): LogPayloadType {
                if (!hasNext()) {
                    throw NoSuchElementException()
                }
                if (currentPage.hasNextPage()) {
                    if (nextPage == null && job == null) {
                        job = thread { nextPage = readNextPage(currentPage) }
                    }
                    if (i == batch.size) {
                        job!!.join()
                        currentPage = nextPage!!
                        job = thread { nextPage = readNextPage(currentPage) }
                        batch = currentPage.values.toList()
                        i = 0
                    }
                }
                val logEntry = batch[i++]
                return logEntry.getPayload<Payload<*>>()
            }

            private fun readNextPage(curPage: Page<LogEntry>): Page<LogEntry>? = curPage.nextPage

        }.asSequence()
    }

请记住,由于您使用的所有 class 我都不知道,因此我无法测试以下任何代码,因此可能会出现错误。

首先,使用 sequence 构建器可以大大简化您使用线程的代码:

// Not sure about how this should behave as you treat it like a blocking function.
// I return null when it's exhausted to simplify while loop.
private fun <T> readNextPageOrNull(page: Page<T>): Page<T>? = 
    if (page.hasNextPage()) page.nextPage!! else null

fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
    return sequence {
        var jobResult: Page<LogEntry>? = null
        var job = thread {
            jobResult = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
        }
        while (true) {
            job.join()
            val page = jobResult ?: break
            job = thread { jobResult = readNextPageOrNull(page) }
            yieldAll(page.values.asIterable())
        }
    }
}

您可以进一步简化它并利用线程池而不是使用一堆新线程,方法是使用 CompletableFuture.supplyAsync {} 而不是 thread {}:

fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
    return sequence {
        var job = CompletableFuture.supplyAsync {
            logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
        }
        while (true) {
            val page = job.join() ?: break
            job = CompletableFuture.supplyAsync { readNextPageOrNull(page) }
            yieldAll(page.values.asIterable())
        }
    }
}

我们可以将其转换为使用协程,但是将协程代码转换为阻塞代码很尴尬。你必须使用 runBlocking。这不会给你带来太多好处,但它会使用协程 Dispatcher 线程池,如果你正在使用协程,你可能已经在使用它。在这里,coroutineScope 是当前 class.

中合适的任何范围
fun readStructuredLogs(): Sequence<Payload.JsonPayload> {
    return sequence {
        var job: Deferred<Page<LogEntry>?> = coroutineScope.async(Dispatchers.IO) {
            logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
        }
        while (true) {
            val page = runBlocking { job.await() } ?: break
            job = coroutineScope.async(Dispatchers.IO) { readNextPageOrNull(page) }
            yieldAll(page.values.asIterable())
        }
    }
}

如果您已经在使用协程,您可以考虑使用 Flow 而不是 Sequence,这样当您等待下一个项目时它会挂起而不是阻塞。使用 Flow 运算符可能有更简单的方法,但我只是对上面的序列代码进行了快速修改:

fun readStructuredLogs(...): Flow<Payload.JsonPayload> {
    return flow {
        var job: Deferred<Page<LogEntry>?> = coroutineScope.async(Dispatchers.IO) {
            logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
        }
        while (true) {
            val page = job.await() ?: break
            job = coroutineScope.async(Dispatchers.IO) { readNextPageOrNull(page) }
            emitAll(page.values.asFlow())
        }
    }
}

编辑:使用 buffer 执行此操作的可能方法:

fun readStructuredLogs(...): Flow<Payload.JsonPayload> {
    return flow {
        var page = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
        while (true) {
            emit(page)
            page = readNextPageOrNull(page) ?: break
        }
    }
        .flowOn(Dispatchers.IO)
        .buffer(1)
        .flatMapConcat { it.values.asFlow() }
}