Python 金字塔设置在注册表中不可用
Python Pyramid settings unavailable in registry
根据文档 here,即使无法访问请求对象,也可以访问 .ini 文件中的设置以供整个应用程序使用。我会发现这非常有用,因为我想为每个 .ini 规定不同的数据库字符串,并且不仅可以在 __init__
访问它们,而且可以从应用程序的其他地方访问它们。然而,无论何时何地,我试了一下,settings
的值是None
。为什么? (金字塔 v1.5.7)
registry = pyramid.threadlocal.get_current_registry()
settings = registry.settings
debug_frobnosticator = settings['debug_frobnosticator']
以下是应用程序的初始化方式:
from pyramid.config import Configurator
from pyramid.authentication import AuthTktAuthenticationPolicy
from pyramid.authorization import ACLAuthorizationPolicy
from pyramid.security import authenticated_userid
from bang.model import RootFactory, groupFinder
from pyramid.renderers import JSON
import json
from datetime import datetime, date
from bson.objectid import ObjectId
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from pytz import utc
import platform
from bang.classes import * # the db string is required in this module
import logging
from datetime import timedelta
# main app config
def main(global_config, **settings):
authn_policy = AuthTktAuthenticationPolicy(
secret='xxxxxx',
callback=groupFinder,
hashalg='sha512',
include_ip=True,
timeout=36000)
authz_policy = ACLAuthorizationPolicy()
config = Configurator(
settings=settings,
root_factory=RootFactory)
config.set_authentication_policy(authn_policy)
config.set_authorization_policy(authz_policy)
logging.getLogger("apscheduler.scheduler").setLevel(logging.ERROR)
logging.getLogger("apscheduler.executors.default").setLevel(logging.ERROR)
logging.getLogger("requests.packages.urllib3.connectionpool").setLevel(logging.ERROR)
### first use of db from bang.classes here
# set house_id auto_inc to the highest current house_id
new_int = Asset().find().sort('house_id_int', -1)[0].house_id_int
Asset()._database['Sequences'].find_and_modify({'_id': 'house_id_int'}, {'$set': {'seq': new_int}}, upsert=True)
# set house_id auto_inc to the highest current house_id
new_int = Contract().find().sort('code_int', -1)[0].code_int
Contract()._database['Sequences'].find_and_modify({'_id': 'code_int'}, {'$set': {'seq': new_int}}, upsert=True)
def get_user(request):
user = User().find_one({'_id': ObjectId(authenticated_userid(request))}, fields={'password': False})
return {'_id': user._id, 'defaults': user.defaults, 'full_name': user.full_name, 'first_name': user.first_name,
'last_name': user.last_name, 'login_count': user.login_count, 'permissions': user.permissions}
# if this Pyramid instance is an Agent host, then start a scheduler
agent = Agent().find_one({'host_name': platform.node()})
if agent:
# created background scheduler
scheduler = BackgroundScheduler(
jobstores={
'mongo': MongoDBJobStore(client=cn, database='foobar', collection='ScheduledJobs_%s' % agent.host_name, timezone=utc,
job_defaults={'coalesce': True, 'misfire_grace_time': 86400})})
log.info('created scheduler for agent_host %s' % agent.host_name)
# start the scheduler
scheduler.start()
def get_scheduler(request):
return scheduler
config.add_request_method(get_scheduler, 'scheduler', reify=True)
for agent in Agent().find({'host_name': platform.node()}):
# add a job to scan the agent
log.info('added scan schedule for %s on %s' % (agent.name, agent.host_name))
scheduler.add_job(agent_scan, trigger='interval', args=[str(agent._id)], jobstore='mongo',
id='%s_scan' % agent._id, seconds=agent.scan_cycle, replace_existing=True)
# add a job to process the agent's joblist
log.info('added jobs schedule for %s on %s' % (agent.name, agent.host_name))
scheduler.add_job(agent_jobs, trigger='interval', args=[str(agent._id)], jobstore='mongo',
id='%s_jobs' % agent._id, seconds=10, start_date=datetime.utcnow() + timedelta(seconds=60), replace_existing=True)
# add the authenticated user to each request
config.add_request_method(get_user, 'user', reify=True)
# custom json adapters
custom_json = JSON()
def haystack_obj_adapter(obj, request):
return obj.json
config.add_renderer('json', custom_json)
# routes for standard static images
config.add_route('apple-touch', '/apple-touch-icon.png')
# etc
# routes
config.add_route('remote.files', '/remote/files/{method}')
# etc
config.add_static_view('/', 'static', cache_max_age=0)
config.scan()
return config.make_wsgi_app()
TypeError: 'NoneType' object has no attribute '__getitem__'
示例代码缺少完整的回溯和视图代码。
它实际上并不像您预期的那样工作。 Thread-local 可能没有设置,因为如果我理解正确的话,这个函数主要是为 pyramid.testing
和单元测试而设计的。正常请求和单元测试的线程生命周期不同。
请在视图中使用 request.registry
并明确传递 registry
。如果您需要访问 main()
中的 registry
,请将其从 config.registry
.
传递过来
根据文档 here,即使无法访问请求对象,也可以访问 .ini 文件中的设置以供整个应用程序使用。我会发现这非常有用,因为我想为每个 .ini 规定不同的数据库字符串,并且不仅可以在 __init__
访问它们,而且可以从应用程序的其他地方访问它们。然而,无论何时何地,我试了一下,settings
的值是None
。为什么? (金字塔 v1.5.7)
registry = pyramid.threadlocal.get_current_registry()
settings = registry.settings
debug_frobnosticator = settings['debug_frobnosticator']
以下是应用程序的初始化方式:
from pyramid.config import Configurator
from pyramid.authentication import AuthTktAuthenticationPolicy
from pyramid.authorization import ACLAuthorizationPolicy
from pyramid.security import authenticated_userid
from bang.model import RootFactory, groupFinder
from pyramid.renderers import JSON
import json
from datetime import datetime, date
from bson.objectid import ObjectId
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from pytz import utc
import platform
from bang.classes import * # the db string is required in this module
import logging
from datetime import timedelta
# main app config
def main(global_config, **settings):
authn_policy = AuthTktAuthenticationPolicy(
secret='xxxxxx',
callback=groupFinder,
hashalg='sha512',
include_ip=True,
timeout=36000)
authz_policy = ACLAuthorizationPolicy()
config = Configurator(
settings=settings,
root_factory=RootFactory)
config.set_authentication_policy(authn_policy)
config.set_authorization_policy(authz_policy)
logging.getLogger("apscheduler.scheduler").setLevel(logging.ERROR)
logging.getLogger("apscheduler.executors.default").setLevel(logging.ERROR)
logging.getLogger("requests.packages.urllib3.connectionpool").setLevel(logging.ERROR)
### first use of db from bang.classes here
# set house_id auto_inc to the highest current house_id
new_int = Asset().find().sort('house_id_int', -1)[0].house_id_int
Asset()._database['Sequences'].find_and_modify({'_id': 'house_id_int'}, {'$set': {'seq': new_int}}, upsert=True)
# set house_id auto_inc to the highest current house_id
new_int = Contract().find().sort('code_int', -1)[0].code_int
Contract()._database['Sequences'].find_and_modify({'_id': 'code_int'}, {'$set': {'seq': new_int}}, upsert=True)
def get_user(request):
user = User().find_one({'_id': ObjectId(authenticated_userid(request))}, fields={'password': False})
return {'_id': user._id, 'defaults': user.defaults, 'full_name': user.full_name, 'first_name': user.first_name,
'last_name': user.last_name, 'login_count': user.login_count, 'permissions': user.permissions}
# if this Pyramid instance is an Agent host, then start a scheduler
agent = Agent().find_one({'host_name': platform.node()})
if agent:
# created background scheduler
scheduler = BackgroundScheduler(
jobstores={
'mongo': MongoDBJobStore(client=cn, database='foobar', collection='ScheduledJobs_%s' % agent.host_name, timezone=utc,
job_defaults={'coalesce': True, 'misfire_grace_time': 86400})})
log.info('created scheduler for agent_host %s' % agent.host_name)
# start the scheduler
scheduler.start()
def get_scheduler(request):
return scheduler
config.add_request_method(get_scheduler, 'scheduler', reify=True)
for agent in Agent().find({'host_name': platform.node()}):
# add a job to scan the agent
log.info('added scan schedule for %s on %s' % (agent.name, agent.host_name))
scheduler.add_job(agent_scan, trigger='interval', args=[str(agent._id)], jobstore='mongo',
id='%s_scan' % agent._id, seconds=agent.scan_cycle, replace_existing=True)
# add a job to process the agent's joblist
log.info('added jobs schedule for %s on %s' % (agent.name, agent.host_name))
scheduler.add_job(agent_jobs, trigger='interval', args=[str(agent._id)], jobstore='mongo',
id='%s_jobs' % agent._id, seconds=10, start_date=datetime.utcnow() + timedelta(seconds=60), replace_existing=True)
# add the authenticated user to each request
config.add_request_method(get_user, 'user', reify=True)
# custom json adapters
custom_json = JSON()
def haystack_obj_adapter(obj, request):
return obj.json
config.add_renderer('json', custom_json)
# routes for standard static images
config.add_route('apple-touch', '/apple-touch-icon.png')
# etc
# routes
config.add_route('remote.files', '/remote/files/{method}')
# etc
config.add_static_view('/', 'static', cache_max_age=0)
config.scan()
return config.make_wsgi_app()
TypeError: 'NoneType' object has no attribute '__getitem__'
示例代码缺少完整的回溯和视图代码。
它实际上并不像您预期的那样工作。 Thread-local 可能没有设置,因为如果我理解正确的话,这个函数主要是为 pyramid.testing
和单元测试而设计的。正常请求和单元测试的线程生命周期不同。
请在视图中使用 request.registry
并明确传递 registry
。如果您需要访问 main()
中的 registry
,请将其从 config.registry
.