Firebase Python SDK 发生的阻塞是否有解决方法?比如添加完成回调?
Is there a workaround for the blocking that happens with Firebase Python SDK? Like adding a completion callback?
最近,我已将 express.js 中的 REST 服务器代码移至使用 FastAPI。到目前为止,直到最近,我一直在过渡中取得成功。我注意到基于 firebase python admin sdk 文档,与 node.js 不同,python sdk 是阻塞的。文档说 here:
In Python and Go Admin SDKs, all write methods are blocking. That is, the write methods do not return until the writes are committed to the database.
我认为这个特性对我的代码有一定的影响。这也可能是我构建代码的方式。下面是我的一个文件中的一些代码:
from app.services.new_service import nService
from firebase_admin import db
import json
import redis
class TryNewService:
async def tryNew_func(self, request):
# I've already initialized everything in another file for firebase
ref = db.reference()
r = redis.Redis()
holdingData = await nService().dialogflow_session(request)
fulfillmentText = json.dumps(holdingData[-1])
body = await request.json()
if ("user_prelimInfo_address" in holdingData):
holdingData.append("session")
holdingData.append(body["session"])
print(holdingData)
return(holdingData)
else:
if (("Default Welcome Intent" in holdingData)):
pass
else:
UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]})
print(holdingData)
return(fulfillmentText)
对于在我的代码中使用 ref.set()
行的阻塞效果有什么解决方法吗?有点像在 node.js 中添加回调?我是 python 3.
异步世界的新手
Update as of 06/13/2020: So I added following code and am now getting a RuntimeError: Task attached to a different loop
. In my second else statement I do the following:
loop = asyncio.new_event_loop()
UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool:
result = await loop.run_in_executor(pool, ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]}))
print("custom thread pool:{}".format(result))
对于这个新的 RuntimeError,我将不胜感激,希望能帮助我找出答案。
运行 使用 ThreadPoolExecutor
在事件循环中阻止数据库调用。参见 https://medium.com/@hiranya911/firebase-python-admin-sdk-with-asyncio-d65f39463916
如果你想运行异步协程中的同步代码,那么步骤是:
- 循环=get_event_loop()
注意:获取而不是新的。获取当前 event_loop 和 new_even_loop returns 新的
- 等待loop.run_in_executor(None, sync_method)
- 第一个参数=None -> 使用默认执行器实例
- 第二个参数(sync_method)是要调用的同步代码
请记住 sync_method 使用的资源需要正确同步:
- a) 或者使用 asyncio.Lock
- b) 或使用 asyncio.run_coroutine_threadsafe 函数(参见下面的示例)
忘记关于 ThreadPoolExecutor 的这种情况(它提供了一种 I/O 并行性的方法,而不是 asyncio 提供的并发性)。
您可以试试下面的代码:
loop = asyncio.get_event_loop()
UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
result = await loop.run_in_executor(None, sync_method, ref, UserVal, holdingData)
print("custom thread pool:{}".format(result))
有了新功能:
def sync_method(ref, UserVal, holdingData):
result = ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]}))
return result
请让我知道您的反馈
注意:之前的代码未经测试。我只测试了下一个最小示例(使用 pytest 和 pytest-asyncio):
import asyncio
import time
import pytest
@pytest.mark.asyncio
async def test_1():
loop = asyncio.get_event_loop()
delay = 3.0
result = await loop.run_in_executor(None, sync_method, delay)
print(f"Result = {result}")
def sync_method(delay):
time.sleep(delay)
print(f"dddd {delay}")
return "OK"
回答@jeff-ridgeway 评论:
让我们尝试更改之前的答案以阐明如何使用 run_coroutine_threadsafe,从同步工作线程执行收集这些共享资源的协程:
- 在 run_in_executor
中添加循环作为附加参数
- 将所有共享资源从sync_method移动到一个新的sync_method,用run_coroutine_threadsafe
执行
loop = asyncio.get_event_loop()
UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
result = await loop.run_in_executor(None, sync_method, ref, UserVal, holdingData, loop)
print("custom thread pool:{}".format(result))
def sync_method(ref, UserVal, holdingData, loop):
coro = async_method(ref, UserVal, holdingData)
future = asyncio.run_coroutine_threadsafe(coro, loop)
future.result()
async def async_method(ref, UserVal, holdingData)
result = ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]}))
return result
注意:之前的代码未经测试。现在我测试的最小示例已更新:
@pytest.mark.asyncio
async def test_1():
loop = asyncio.get_event_loop()
delay = 3.0
result = await loop.run_in_executor(None, sync_method, delay, loop)
print(f"Result = {result}")
def sync_method(delay, loop):
coro = async_method(delay)
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result()
async def async_method(delay):
time.sleep(delay)
print(f"dddd {delay}")
return "OK"
希望对您有所帮助
最近,我已将 express.js 中的 REST 服务器代码移至使用 FastAPI。到目前为止,直到最近,我一直在过渡中取得成功。我注意到基于 firebase python admin sdk 文档,与 node.js 不同,python sdk 是阻塞的。文档说 here:
In Python and Go Admin SDKs, all write methods are blocking. That is, the write methods do not return until the writes are committed to the database.
我认为这个特性对我的代码有一定的影响。这也可能是我构建代码的方式。下面是我的一个文件中的一些代码:
from app.services.new_service import nService
from firebase_admin import db
import json
import redis
class TryNewService:
async def tryNew_func(self, request):
# I've already initialized everything in another file for firebase
ref = db.reference()
r = redis.Redis()
holdingData = await nService().dialogflow_session(request)
fulfillmentText = json.dumps(holdingData[-1])
body = await request.json()
if ("user_prelimInfo_address" in holdingData):
holdingData.append("session")
holdingData.append(body["session"])
print(holdingData)
return(holdingData)
else:
if (("Default Welcome Intent" in holdingData)):
pass
else:
UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]})
print(holdingData)
return(fulfillmentText)
对于在我的代码中使用 ref.set()
行的阻塞效果有什么解决方法吗?有点像在 node.js 中添加回调?我是 python 3.
Update as of 06/13/2020: So I added following code and am now getting a
RuntimeError: Task attached to a different loop
. In my second else statement I do the following:
loop = asyncio.new_event_loop()
UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool:
result = await loop.run_in_executor(pool, ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]}))
print("custom thread pool:{}".format(result))
对于这个新的 RuntimeError,我将不胜感激,希望能帮助我找出答案。
运行 使用 ThreadPoolExecutor
在事件循环中阻止数据库调用。参见 https://medium.com/@hiranya911/firebase-python-admin-sdk-with-asyncio-d65f39463916
如果你想运行异步协程中的同步代码,那么步骤是:
- 循环=get_event_loop()
注意:获取而不是新的。获取当前 event_loop 和 new_even_loop returns 新的 - 等待loop.run_in_executor(None, sync_method)
- 第一个参数=None -> 使用默认执行器实例
- 第二个参数(sync_method)是要调用的同步代码
请记住 sync_method 使用的资源需要正确同步:
- a) 或者使用 asyncio.Lock
- b) 或使用 asyncio.run_coroutine_threadsafe 函数(参见下面的示例)
忘记关于 ThreadPoolExecutor 的这种情况(它提供了一种 I/O 并行性的方法,而不是 asyncio 提供的并发性)。
您可以试试下面的代码:
loop = asyncio.get_event_loop()
UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
result = await loop.run_in_executor(None, sync_method, ref, UserVal, holdingData)
print("custom thread pool:{}".format(result))
有了新功能:
def sync_method(ref, UserVal, holdingData):
result = ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]}))
return result
请让我知道您的反馈
注意:之前的代码未经测试。我只测试了下一个最小示例(使用 pytest 和 pytest-asyncio):
import asyncio
import time
import pytest
@pytest.mark.asyncio
async def test_1():
loop = asyncio.get_event_loop()
delay = 3.0
result = await loop.run_in_executor(None, sync_method, delay)
print(f"Result = {result}")
def sync_method(delay):
time.sleep(delay)
print(f"dddd {delay}")
return "OK"
回答@jeff-ridgeway 评论:
让我们尝试更改之前的答案以阐明如何使用 run_coroutine_threadsafe,从同步工作线程执行收集这些共享资源的协程:
- 在 run_in_executor 中添加循环作为附加参数
- 将所有共享资源从sync_method移动到一个新的sync_method,用run_coroutine_threadsafe 执行
loop = asyncio.get_event_loop()
UserVal = r.hget(name='{}'.format(body["session"]), key="userId").decode("utf-8")
result = await loop.run_in_executor(None, sync_method, ref, UserVal, holdingData, loop)
print("custom thread pool:{}".format(result))
def sync_method(ref, UserVal, holdingData, loop):
coro = async_method(ref, UserVal, holdingData)
future = asyncio.run_coroutine_threadsafe(coro, loop)
future.result()
async def async_method(ref, UserVal, holdingData)
result = ref.child("users/{}".format(UserVal)).child("c_data").set({holdingData[0]:holdingData[1]}))
return result
注意:之前的代码未经测试。现在我测试的最小示例已更新:
@pytest.mark.asyncio
async def test_1():
loop = asyncio.get_event_loop()
delay = 3.0
result = await loop.run_in_executor(None, sync_method, delay, loop)
print(f"Result = {result}")
def sync_method(delay, loop):
coro = async_method(delay)
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result()
async def async_method(delay):
time.sleep(delay)
print(f"dddd {delay}")
return "OK"
希望对您有所帮助