Python 金字塔设置在注册表中不可用

Python Pyramid settings unavailable in registry

根据文档 here,即使无法访问请求对象,也可以访问 .ini 文件中的设置以供整个应用程序使用。我会发现这非常有用,因为我想为每个 .ini 规定不同的数据库字符串,并且不仅可以在 __init__ 访问它们,而且可以从应用程序的其他地方访问它们。然而,无论何时何地,我试了一下,settings的值是None。为什么? (金字塔 v1.5.7)

registry = pyramid.threadlocal.get_current_registry()
settings = registry.settings
debug_frobnosticator = settings['debug_frobnosticator']

以下是应用程序的初始化方式:

from pyramid.config import Configurator
from pyramid.authentication import AuthTktAuthenticationPolicy
from pyramid.authorization import ACLAuthorizationPolicy
from pyramid.security import authenticated_userid
from bang.model import RootFactory, groupFinder
from pyramid.renderers import JSON
import json
from datetime import datetime, date
from bson.objectid import ObjectId
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from pytz import utc
import platform
from bang.classes import *  # the db string is required in this module
import logging
from datetime import timedelta

# main app config
def main(global_config, **settings):
    authn_policy = AuthTktAuthenticationPolicy(
        secret='xxxxxx',
        callback=groupFinder,
        hashalg='sha512',
        include_ip=True,
        timeout=36000)
    authz_policy = ACLAuthorizationPolicy()
    config = Configurator(
        settings=settings,
        root_factory=RootFactory)
    config.set_authentication_policy(authn_policy)
    config.set_authorization_policy(authz_policy)



    logging.getLogger("apscheduler.scheduler").setLevel(logging.ERROR)
    logging.getLogger("apscheduler.executors.default").setLevel(logging.ERROR)
    logging.getLogger("requests.packages.urllib3.connectionpool").setLevel(logging.ERROR)

    ### first use of db from bang.classes here
    # set house_id auto_inc to the highest current house_id
    new_int = Asset().find().sort('house_id_int', -1)[0].house_id_int
    Asset()._database['Sequences'].find_and_modify({'_id': 'house_id_int'}, {'$set': {'seq': new_int}}, upsert=True)

    # set house_id auto_inc to the highest current house_id
    new_int = Contract().find().sort('code_int', -1)[0].code_int
    Contract()._database['Sequences'].find_and_modify({'_id': 'code_int'}, {'$set': {'seq': new_int}}, upsert=True)

    def get_user(request):
        user = User().find_one({'_id': ObjectId(authenticated_userid(request))}, fields={'password': False})
        return {'_id': user._id, 'defaults': user.defaults, 'full_name': user.full_name, 'first_name': user.first_name,
            'last_name': user.last_name, 'login_count': user.login_count, 'permissions': user.permissions}

    # if this Pyramid instance is an Agent host, then start a scheduler
    agent = Agent().find_one({'host_name': platform.node()})
    if agent:

        # created background scheduler
        scheduler = BackgroundScheduler(
            jobstores={
            'mongo': MongoDBJobStore(client=cn, database='foobar', collection='ScheduledJobs_%s' % agent.host_name, timezone=utc,
                                     job_defaults={'coalesce': True, 'misfire_grace_time': 86400})})
        log.info('created scheduler for agent_host %s' % agent.host_name)
        # start the scheduler
        scheduler.start()

        def get_scheduler(request):
            return scheduler
        config.add_request_method(get_scheduler, 'scheduler', reify=True)

    for agent in Agent().find({'host_name': platform.node()}):
        # add a job to scan the agent
        log.info('added scan schedule for %s on %s' % (agent.name, agent.host_name))
        scheduler.add_job(agent_scan, trigger='interval', args=[str(agent._id)], jobstore='mongo',
                      id='%s_scan' % agent._id, seconds=agent.scan_cycle, replace_existing=True)
        # add a job to process the agent's joblist
        log.info('added jobs schedule for %s on %s' % (agent.name, agent.host_name))
        scheduler.add_job(agent_jobs, trigger='interval', args=[str(agent._id)], jobstore='mongo',
                      id='%s_jobs' % agent._id, seconds=10, start_date=datetime.utcnow() + timedelta(seconds=60), replace_existing=True)

    # add the authenticated user to each request
    config.add_request_method(get_user, 'user', reify=True)

    # custom json adapters
    custom_json = JSON()

    def haystack_obj_adapter(obj, request):
        return obj.json

    config.add_renderer('json', custom_json)

    # routes for standard static images
    config.add_route('apple-touch', '/apple-touch-icon.png')
    # etc

    # routes
    config.add_route('remote.files', '/remote/files/{method}')
    # etc

    config.add_static_view('/', 'static', cache_max_age=0)

    config.scan()

    return config.make_wsgi_app()


TypeError: 'NoneType' object has no attribute '__getitem__'

示例代码缺少完整的回溯和视图代码。

它实际上并不像您预期​​的那样工作。 Thread-local 可能没有设置,因为如果我理解正确的话,这个函数主要是为 pyramid.testing 和单元测试而设计的。正常请求和单元测试的线程生命周期不同。

请在视图中使用 request.registry 并明确传递 registry。如果您需要访问 main() 中的 registry,请将其从 config.registry.

传递过来