django+celery+ansibleApi return None

django+celery+ansibleApi return None

python 用芹菜调用 ansibleApi return None,我搜索了一些 days.It 在没有芹菜的情况下可以很好地调用部署函数,但是用芹菜我的代码调用ansibleApi return None.

重现步骤。

1.tasks.py

from celery import shared_task
from .deploy_tomcat2 import django_process


@shared_task
def deploy(jira_num):
    #return 'hello world {0}'.format(jira_num)
    #rdb.set_trace()
    return django_process(jira_num)    

2.deploy_tomcat2.py

from .AnsibleApi import CallApi

def django_process(jira_num):
    server = '10.10.10.30'
    name = 'abc'
    port = 11011
    code = 'efs'
    jdk = '1.12.13'
    jvm = 'xxxx'

    if str.isdigit(jira_num):
        # import pdb
        # pdb.set_trace()
        call = CallApi(server,name,port,code,jdk,jvm)
        return call.run_task()

3.AnsibleApi.py

#!/usr/bin/env python

import logging
from .Logger import Logger
from django.conf import settings
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars import VariableManager
from ansible.inventory import Inventory
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase

Log = Logger('/tmp/auto_deploy_tomcat.log',logging.INFO)


class ResultCallback(CallbackBase):
    def __init__(self, *args, **kwargs):
        super(ResultCallback ,self).__init__(*args, **kwargs)
        self.host_ok = {}
        self.host_unreachable = {}
        self.host_failed = {}

    def v2_runner_on_unreachable(self, result):
        self.host_unreachable[result._host.get_name()] = result

    def v2_runner_on_ok(self, result, *args, **kwargs):
        self.host_ok[result._host.get_name()] = result

    def v2_runner_on_failed(self, result, *args, **kwargs):
        self.host_failed[result._host.get_name()] = result


class CallApi(object):
    user = settings.SSH_USER
    ssh_private_key_file = settings.SSH_PRIVATE_KEY_FILE
    results_callback = ResultCallback()
    Options = namedtuple('Options',
                         ['connection', 'module_path', 'private_key_file', 'forks', 'become', 'become_method',
                          'become_user', 'check']) 

    def __init__(self,ip,name,port,code,jdk,jvm):
        self.ip = ip
        self.name = name
        self.port = port
        self.code = code
        self.jdk = jdk
        self.jvm = jvm
        self.results_callback = ResultCallback()
        self.results_raw = {}

    def _gen_user_task(self):
        tasks = []
        deploy_script = 'autodeploy/tomcat_deploy.sh'
        dst_script = '/tmp/tomcat_deploy.sh'
        cargs = dict(src=deploy_script, dest=dst_script, owner=self.user, group=self.user, mode='0755')
        args = "%s %s %d %s %s '%s'" % (dst_script, self.name, self.port, self.code, self.jdk, self.jvm)
        tasks.append(dict(action=dict(module='copy', args=cargs),register='shell_out'))
        tasks.append(dict(action=dict(module='debug', args=dict(msg='{{shell_out}}'))))
        # tasks.append(dict(action=dict(module='command', args=args)))
        # tasks.append(dict(action=dict(module='command', args=args), register='result'))
        # tasks.append(dict(action=dict(module='debug', args=dict(msg='{{result.stdout}}'))))
        self.tasks = tasks

    def _set_option(self):
        self._gen_user_task()

        self.variable_manager = VariableManager()
        self.loader = DataLoader()
        self.options = self.Options(connection='smart', module_path=None, private_key_file=self.ssh_private_key_file, forks=None,
                                    become=True, become_method='sudo', become_user='root', check=False)
        self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=[self.ip])
        self.variable_manager.set_inventory(self.inventory)

        play_source = dict(
        name = "auto deploy tomcat",
            hosts = self.ip,
            remote_user = self.user,
            gather_facts='no',
            tasks = self.tasks
        )
        self.play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)

    def run_task(self):
        self.results_raw = {'success':{}, 'failed':{}, 'unreachable':{}}
        tqm = None
        from celery.contrib import rdb;rdb.set_trace()
        #import pdb;pdb.set_trace()
        self._set_option()
        try:
            tqm = TaskQueueManager(
                inventory=self.inventory,
                variable_manager=self.variable_manager,
                loader=self.loader,
                options=self.options,
                passwords=None,
                stdout_callback=self.results_callback,
            )
            result = tqm.run(self.play)
        finally:
            if tqm is not None:
                tqm.cleanup()

        for host, result in self.results_callback.host_ok.items():
            self.results_raw['success'][host] = result._result

        for host, result in self.results_callback.host_failed.items():
            self.results_raw['failed'][host] = result._result

        for host, result in self.results_callback.host_unreachable.items():
            self.results_raw['unreachable'][host]= result._result
        Log.info("result is :%s" % self.results_raw)
        return self.results_raw

4.celery 工人

celery -A jira worker -Q queue.ops.deploy -n "deploy.%h" -l info

5.produce 消息:

deploy.apply_async(args=['150'], queue='queue.ops.deploy', routing_key='ops.deploy')

好像还可以。
唯一的问题是 None 真的是 deploy 任务 return?
如果你能post你的celery worker日志就更好了。

有两种方法可以解决这个问题,禁用断言: 1.where celery starts set export PYTHONOPTIMIZE=1 或使用此参数启动 celery -O OPTIMIZATION 2.disable python 数据包多处理 process.py 第 102 行:

assert not _current_process._config.get('daemon'), \
               'daemonic processes are not allowed to have children'