将 Celery 从 3.1 升级到 4.0 后 Redis 不返回结果

Redis not returning result after upgrading Celery from 3.1 to 4.0

我最近将我的 Celery 安装升级到了 4.0。经过几天的升级过程,我终于让它工作了……有点。有些任务会return,但最后一个任务不会。

我有一个 class,SFF,它接受并解析一个文件:

# Constructor with I/O file
def __init__(self, file):

    # File data that's gonna get used a lot
    sffDescriptor = file.fileno()
    fileName = abspath(file.name)

    # Get the pointer to the file
    filePtr = mmap.mmap(sffDescriptor, 0, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ)

    # Get the header info
    hdr = filePtr.read(HEADER_SIZE)
    self.header = SFFHeader._make(unpack(HEADER_FMT, hdr))

    # Read in the palette maps
    print self.header.onDemandDataSize
    print self.header.onLoadDataSize
    palMapsResult = getPalettes.delay(fileName, self.header.palBankOff - HEADER_SIZE, self.header.onDemandDataSize, self.header.numPals)

    # Read the sprite list nodes
    nodesStart = self.header.sprListOff
    nodesEnd = self.header.palBankOff
    print nodesEnd - nodesStart
    sprNodesResult = getSprNodes.delay(fileName, nodesStart, nodesEnd, self.header.numSprites)

    # Get palette data
    self.palettes = palMapsResult.get()

    # Get sprite data
    spriteNodes = sprNodesResult.get()

    # TESTING
    spritesResultSet = ResultSet([])
    numSpriteNodes = len(spriteNodes)
    # Split the nodes into chunks of size 32 elements
    for x in xrange(0, numSpriteNodes, 32):
        spritesResult = getSprites.delay(spriteNodes, x, x+32, fileName, self.palettes, self.header.palBankOff, self.header.onDemandDataSizeTotal)
        spritesResultSet.add(spritesResult)
        break  # REMEMBER TO REMOVE FOR ENTIRE SFF

    self.sprites = spritesResultSet.join_native()

不管是 return 整个 spritesResult 是单个任务,还是我使用 ResultSet 拆分它,结果总是一样的:Python 控制台 I我只是在 spritesResultSet.join_native()spritesResult.get() 处挂起(取决于我如何格式化它)。

这里是有问题的任务:

@task
def getSprites(nodes, start, end, fileName, palettes, palBankOff, onDemandDataSizeTotal):
sprites = []

with open(fileName, "rb") as file:
    sffDescriptor = file.fileno()
    sffData = mmap.mmap(sffDescriptor, 0, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ)

    for node in nodes[start:end]:
        sprListNode = dict(SprListNode._make(node)._asdict())  # Need to convert it to a dict since values may change.
        #print node
        #print sprListNode

        # If it's a linked sprite, the data length is 0, so get the linked index.
        if sprListNode['dataLen'] == 0:
            sprListNodeTemp = SprListNode._make(nodes[sprListNode['index']])
            sprListNode['dataLen'] = sprListNodeTemp.dataLen
            sprListNode['dataOffset'] = sprListNodeTemp.dataOffset
            sprListNode['compression'] = sprListNodeTemp.compression

        # What does the offset need to be?
        dataOffset = sprListNode['dataOffset']
        if sprListNode['loadMode'] == 0:
            dataOffset += palBankOff #- HEADER_SIZE
        elif sprListNode['loadMode'] == 1:
            dataOffset += onDemandDataSizeTotal #- HEADER_SIZE

        #print sprListNode

        # Seek to the data location and "read" it in. First 4 bytes are just the image length
        start = dataOffset + 4
        end = dataOffset + sprListNode['dataLen']
        #sffData.seek(start)

        compressedSprite = sffData[start:end]

        # Create the sprite
        sprite = Sprite(sprListNode, palettes[sprListNode['palNo']], np.fromstring(compressedSprite, dtype=np.uint8))
        sprites.append(sprite)

return json.dumps(sprites, cls=SpriteJSONEncoder)

我知道它到达了 return 语句,因为如果我在它上面放一个打印,它会在 Celery window 中打印。我也知道任务是 运行 完成,因为我从工作人员那里收到以下消息:

[2016-11-16 00:03:33,639: INFO/PoolWorker-4] Task framedatabase.tasks.getSprites[285ac9b1-09b4-4cf1-a251-da6212863832] succeeded in 0.137236133218s: '[{"width": 120, "palNo": 30, "group": 9000, "xAxis": 0, "yAxis": 0, "data":...'

这是我在 settings.py 中的芹菜设置:

# Celery settings
BROKER_URL='redis://localhost:1717/1'
CELERY_RESULT_BACKEND='redis://localhost:1717/0'
CELERY_IGNORE_RESULT=False
CELERY_IMPORTS = ("framedatabase.tasks", )

...还有我的 celery.py:

from __future__ import absolute_import

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'framedatabase.settings')

from django.conf import settings  # noqa

app = Celery('framedatabase', backend='redis://localhost:1717/1', broker="redis://localhost:1717/0",
    include=['framedatabase.tasks'])

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

找到问题了。显然它导致了死锁,正如 Celery 文档中 "Avoid launching synchronous subtasks" 部分所提到的:http://docs.celeryproject.org/en/latest/userguide/tasks.html#tips-and-best-practices

所以我去掉了这行:

sprNodesResult.get()

并将最终结果改为链式:

self.sprites = chain(getSprNodes.s(fileName, nodesStart, nodesEnd, self.header.numSprites),
    getSprites.s(0,32,fileName,self.palettes,self.header.palBankOff,self.header.onDemandDataSizeTotal))().get()

而且有效!现在我只需要找到一种方法来按照我想要的方式拆分它!