Firebase Python SDK 发生的阻塞是否有解决方法?比如添加完成回调?

Is there a workaround for the blocking that happens with Firebase Python SDK? Like adding a completion callback?

最近,我已将 express.js 中的 REST 服务器代码移至使用 FastAPI。到目前为止,直到最近,我一直在过渡中取得成功。我注意到基于 firebase python admin sdk 文档,与 node.js 不同,python sdk 是阻塞的。文档说 here:

In Python and Go Admin SDKs, all write methods are blocking. That is, the write methods do not return until the writes are committed to the database.

我认为这个特性对我的代码有一定的影响。这也可能是我构建代码的方式。下面是我的一个文件中的一些代码:

from app.services.new_service import nService
from firebase_admin import db
import json
import redis

class TryNewService:
  async def tryNew_func(self, request):
    # I've already initialized everything in another file for firebase
    ref = db.reference()
    r = redis.Redis()
    holdingData = await nService().dialogflow_session(request)
    fulfillmentText = json.dumps(holdingData[-1])
    body = await request.json()

    if ("user_prelimInfo_address" in holdingData):
      holdingData.append("session")
      holdingData.append(body["session"])
      print(holdingData)
      return(holdingData)
    else:
      if (("Default Welcome Intent" in holdingData)):
        pass
      else:
        UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
        ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]})
        print(holdingData)
      return(fulfillmentText)

对于在我的代码中使用 ref.set() 行的阻塞效果有什么解决方法吗?有点像在 node.js 中添加回调?我是 python 3.

异步世界的新手

Update as of 06/13/2020: So I added following code and am now getting a RuntimeError: Task attached to a different loop. In my second else statement I do the following:

loop = asyncio.new_event_loop()
UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool:
  result = await loop.run_in_executor(pool, ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]}))
  print("custom thread pool:{}".format(result))

对于这个新的 RuntimeError,我将不胜感激,希望能帮助我找出答案。

运行 使用 ThreadPoolExecutor 在事件循环中阻止数据库调用。参见 https://medium.com/@hiranya911/firebase-python-admin-sdk-with-asyncio-d65f39463916

如果你想运行异步协程中的同步代码,那么步骤是:

  1. 循环=get_event_loop()
    注意:获取而不是新的。获取当前 event_loop 和 new_even_loop returns 新的
  2. 等待loop.run_in_executor(None, sync_method)
    • 第一个参数=None -> 使用默认执行器实例
    • 第二个参数(sync_method)是要调用的同步代码

请记住 sync_method 使用的资源需要正确同步:

  • a) 或者使用 asyncio.Lock
  • b) 或使用 asyncio.run_coroutine_threadsafe 函数(参见下面的示例)

忘记关于 ThreadPoolExecutor 的这种情况(它提供了一种 I/O 并行性的方法,而不是 asyncio 提供的并发性)。

您可以试试下面的代码:

loop = asyncio.get_event_loop()
UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
result = await loop.run_in_executor(None, sync_method, ref, UserVal, holdingData)
print("custom thread pool:{}".format(result))

有了新功能:

def sync_method(ref, UserVal, holdingData):
    result = ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]}))
    return result

请让我知道您的反馈

注意:之前的代码未经测试。我只测试了下一个最小示例(使用 pytest 和 pytest-asyncio):

import asyncio
import time

import pytest


@pytest.mark.asyncio
async def test_1():
    loop = asyncio.get_event_loop()
    delay = 3.0
    result = await loop.run_in_executor(None, sync_method, delay)
    print(f"Result = {result}")

def sync_method(delay):
    time.sleep(delay)
    print(f"dddd {delay}")
    return "OK"

回答@jeff-ridgeway 评论:

让我们尝试更改之前的答案以阐明如何使用 run_coroutine_threadsafe,从同步工作线程执行收集这些共享资源的协程:

  1. 在 run_in_executor
  2. 中添加循环作为附加参数
  3. 将所有共享资源从sync_method移动到一个新的sync_method,用run_coroutine_threadsafe
  4. 执行
loop = asyncio.get_event_loop()
UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
result = await loop.run_in_executor(None, sync_method, ref, UserVal, holdingData, loop)
print("custom thread pool:{}".format(result))

def sync_method(ref, UserVal, holdingData, loop):
    coro = async_method(ref, UserVal, holdingData)
    future = asyncio.run_coroutine_threadsafe(coro, loop)
    future.result()
async def async_method(ref, UserVal, holdingData)
    result = ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]}))
    return result

注意:之前的代码未经测试。现在我测试的最小示例已更新:

@pytest.mark.asyncio
async def test_1():
    loop = asyncio.get_event_loop()
    delay = 3.0
    result = await loop.run_in_executor(None, sync_method, delay, loop)
    print(f"Result = {result}")

def sync_method(delay, loop):
    coro = async_method(delay)
    future = asyncio.run_coroutine_threadsafe(coro, loop)
    return future.result()

async def async_method(delay):
    time.sleep(delay)
    print(f"dddd {delay}")
    return "OK"

希望对您有所帮助