如何等待芹菜链完成任务?

How to wait for task completes with celery chain?

我正在尝试在这里创建一个芹菜链:

chain(getAllProducts.s(shopname, hdrs),
    editOgTags.s(title, description, whichImage, readableShopname, currentThemeId),
    notifyBulkEditFinish.si(email, name, readableShopname, totalProducts),
    updateBulkEditTask.si(taskID))()

在 editOgTags 中,有 3 个子任务:

@shared_task(ignore_result=True)
def editOgTags(products, title, description, whichImage, readableShopname, currentThemeId):
    for product in products:
        editOgTitle.delay(product, title, readableShopname)
        editOgDescription.delay(product, description, readableShopname)
        editOgImage.delay(product, int(whichImage), currentThemeId)

在每一个editOgXXX函数中,都有一个调用限速的函数:

@shared_task(rate_limit='1/s')
def updateMetafield(index, loop_var, target_id, type_value):
    resource = type_value + 's'
    # print(f"loop_var key = {loop_var[index]['key']}")
    if type_value == 'product' or type_value == 'collection' or type_value == 'article' or type_value == 'page':
        meta = shopify.Metafield.find(resource=resource, resource_id=target_id, namespace='global', key=loop_var[index]['key'])
        checkAndWaitShopifyAPICallLimit()
    else:
        print("Not available metafield type! Cannot update.")
        return

    if meta:
        # meta[0].destroy()
        meta[0].value = loop_var[index]['value']
        meta[0].save()
    else:
        metafield = shopify.Metafield.create({
            'value_type': 'string',
            'namespace': 'global',
            'value': loop_var[index]['value'],
            'value-type': 'string',
            'key': loop_var[index]['key'],
            'resource': resource,
            'resource_id': target_id,
            })
        metafield.save()

在漏桶算法下,它一次提供 40 api 个调用,2 个 reqs / s 补货。由于 shopify 函数 的速率限制为 2 个请求/秒。我将速率限制设置为 1/s。当它用完api配额时,我会在checkAndWaitShopifyAPICallLimit()中调用time.sleep(20)等待补货。

问题是在所有任务完成之前调用了电子邮件通知函数 (notifyBulkEditFinish)。如何确保在所有任务完成后调用电子邮件功能?

我怀疑睡眠功能使任务落后于队列中的电子邮件功能。

扩展@bruno 的评论:使用chord 并修改editOgTags 函数以创建一个与通知相关的组:

from celery import chord

@shared_task(ignore_result=True)
def editOgTags(products, title, description, whichImage, readableShopname, currentThemeId, name, email, totalProducts):
    tasks = []
    for product in products:
        tasks.append(editOgTitle.si(product, title, readableShopname))
        tasks.append(editOgDescription.si(product, description, readableShopname))
        tasks.append(editOgImage.si(product, int(whichImage), currentThemeId))
    # kick off the chord, notifyBulk... will be called after all of these 
    # edit... tasks complete.
    chord(tasks)(notifyBulkEditFinish.si(email, name, readableShopname, totalProducts))

Your problem is with the definition of "after all tasks finish".

that editOgTags launches len(products) * 3 subtasks - which apparently each launch yet another async substack. If you want to wait until all those tasks have been executed before sending the email, you need some synchronization mechanism. Celery's builtin solution for this is the chord object. ATM, your code wait for editOgTags to be finished, but the only thing this task do is to launch other subtasks - then it returns, whether or not those subtasks are themselves finished.

A chord is just like a group but with a callback. The chain primitive lets us link together signatures so that one is called after the other, essentially forming a chain of callbacks. What is the difference to change the chain to chord?

Note that I'm not saying you should necessarily replace your whole chain with a chord. Hint: chains and groups and chords are tasks to, so you can create complex workflows by combining tasks, chains, groups and chords.

And the difference, as mentionned above, is that a chord will wait until all tasks in it's header are finished before it executes the callback. This allow to have N async tasks executed in parallel yet still wait for all of them to be finished before 运行 the callback. This will of course require some thinking and possibly reorganisation in your code (so sub-sub-tasks are taken into account if they need to be), but this does answer your question: "How can I make sure the email function is called after all tasks finish? "