使用 __main__.py 将 python 脚本转换为目录

Convert python script to directory with __main__.py

尽管我自认为了解python的import系统,但我仍然发现自己迷失了...

我想将一个文件(这是我的程序的主要入口点)更改为一个目录,但我无法成功导入到 运行

我似乎无法理解如何让 sys.path 匹配。

$ cat > prog.py << EOF
> import sys
> pprint(sys.path[0])
> EOF

$ python3 prog.py 
/home/me/pyprogram

$ mkdir prog

$ mv prog.py prog/__main__.py

$ python3 prog
prog

$ mv prog/__main__.py prog/__init__.py

$ python3 prog/__init__.py 
/home/me/pyprogram/prog

有关我正在努力实现的目标的更多背景信息,(我可能设计的程序有误,很高兴接受反馈)

$ tree --dirsfirst
.
├── prog
│   ├── data_process.py
│   └── __init__.py
├── destination.py
└── source.py

1 directory, 4 files

$ cat source.py 
def get():
    return 'raw data'

$ cat destination.py 
def put(data):
    print(f"{data} has ',
 '/usr/lib/python37.zip',
 '/usr/lib/python3.7',
 '/usr/lib/python3.7/lib-dynload',
 '/home/me/.local/lib/python3.7/site-packages',
 '/usr/local/lib/python3.7/dist-packages',
 '/usr/lib/python3/dist-packages']
been passed successfully")

$ cat prog/__init__.py 
#!/usr/bin/env python

import os


class Task:
    def __init__(self, func, args=None, kwargs=None):
        self.func = func
        self.args = args if args else []
        self.kwargs = kwargs if kwargs else {}

    def run(self):
        self.func(*self.args, **self.kwargs)


tasks = []

def register_task(args=None, kwargs=None):
    def registerer(func):
        tasks.append(Task(func, args, kwargs))
        return func
    return registerer

for module in os.listdir(os.path.dirname(os.path.abspath(__file__))):
    if module.startswith('_') or module.startswith('.'):
        continue
    __import__(os.path.splitext(module)[0])
del module

for task in tasks:
    task.run()

$ cat prog/data_process.py 
from source import get
from destination import put
from . import register_task

@register_task(kwargs={'replace_with': 'cleaned'})
def process(replace_with):
    raw = get()
    cleaned = raw.replace('raw', replace_with)
    put(cleaned)

$ python3 prog/__init__.py 
Traceback (most recent call last):
  File "prog/__init__.py", line 27, in <module>
    __import__(os.path.splitext(module)[0])
  File "/home/me/pyprogram/prog/data_process.py", line 1, in <module>
    from source import get
ModuleNotFoundError: No module named 'source'

$ mv prog/__init__.py prog/__main__.py

$ python3 prog/
Traceback (most recent call last):
  File "/usr/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "prog/__main__.py", line 27, in <module>
    __import__(os.path.splitext(module)[0])
  File "prog/data_process.py", line 1, in <module>
    from source import get
ModuleNotFoundError: No module named 'source'

项目结构更新

我改变了结构; 1. 将所有库放入utils。 2. 将所有项目放入 projects(使用 __init__.py 可以轻松导入文件夹中所有创建的项目)。 3.主程序脚本program.py在项目顶层目录

项目结构:

$ tree
.
├── utils
│   ├── source.py
│   ├── remote_dest.py
│   ├── local_dest.py
│   └── __init__.py
├── projects
│   ├── process2.py
│   ├── process1.py
│   └── __init__.py
└── program.py

utils 目录中定义的库内容:

$ cat utils/source.py
"""
Emulates expensive resource to get,
bringing the need to cache it for all client projects.
"""

import time

class _Cache:
    def __init__(self):
        self.data = None

_cache = _Cache()

def get():
    """
    Exposed source API for getting the data,
    get from remote resource or returns from available cache.
    """
    if _cache.data is None: # As well as cache expiration.
        _cache.data = list(_expensive_get())
    return _cache.data

def _expensive_get():
    """
    Emulate an expensive `get` request,
    prints to console if it was invoked.
    """
    print('Invoking expensive get')
    sample_data = [
        'some random raw data',
        'which is in some raw format',
        'it is so raw that it will need cleaning',
        'but now it is very raw'
    ]
    for row in sample_data:
        time.sleep(1)
        yield row

$ cat utils/remote_dest.py
"""
Emulate limited remote resource.
Use thread and queue to have the data sent in the backround.
"""

import time
import threading
import queue

_q = queue.Queue()

def put(data):
    """
    Exposed remote API `put` method
    """
    _q.put(data)

def _send(q):
    """
    Emulate remote resource,
    prints to console when data is processed.
    """
    while True:
        time.sleep(1)
        data = q.get()
        print(f"Sending {data}")

threading.Thread(target=_send, args=(_q,), daemon=True).start()

$ cat utils/local_dest.py
"""
Emulate second source of data destination.
Allowing to demonstrate need from shared libraries.
"""

import datetime
import os

# Create `out` dir if it doesn't yet exist.
_out_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'out')
if not os.path.exists(_out_dir):
    os.makedirs(_out_dir)

def save(data):
    """
    Exposed API to store data locally.
    """
    out_file = os.path.join(_out_dir, 'data.txt')
    with open(out_file, 'a') as f:
        f.write(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {data}\n")

主程序执行脚本内容:

$ cat program.py
#!/usr/bin/env python

import os

class Task:
    """
    Class storing `func` along with its `args` and `kwargs` to be run with.
    """
    def __init__(self, func, args=None, kwargs=None):
        self.func = func
        self.args = args if args else []
        self.kwargs = kwargs if kwargs else {}

    def run(self):
        """
        Executes stored `func` with its arguments.
        """
        self.func(*self.args, **self.kwargs)

    def __repr__(self):
        return f"<Task({self.func.__name__})>"

# List that will store the registered tasks to be executed by the main program.
tasks = []

def register_task(args=None, kwargs=None):
    """
    Registers decorated function along with the passed `args` and `kwargs` in the `tasks` list
    as a `Task` for maintained execution.
    """
    def registerer(func):
        print(f"Appending '{func.__name__}' in {__name__}")
        tasks.append(Task(func, args, kwargs)) # Saves the function as a task.
        print(f"> tasks in {__name__}: {tasks}")
        return func # returns the function untouched.
    return registerer

print(f"Before importing projects as {__name__}. tasks: {tasks}")
import projects
print(f"After importing projects as {__name__}. tasks: {tasks}")

print(f"Iterating over tasks: {tasks} in {__name__}")
while True:
    for task in tasks:
        task.run()
    break # Only run once in the simulation

projects 目录中定义的各个项目的内容:

$ cat projects/process1.py
"""
Sample project that uses the shared remote resource to get data
and passes it on to another remote resource after processing.
"""

from utils.source import get
from utils.remote_dest import put
from program import register_task

@register_task(kwargs={'replace_with': 'cleaned'})
def process1(replace_with):
    raw = get()
    for record in raw:
        put(record.replace('raw', replace_with))

$ cat projects/process2.py
"""
Sample project that uses the shared remote resource to get data
and saves it locally after processing.
"""

from utils.source import get
from utils.local_dest import save
from program import register_task

@register_task()
def process2():
    raw = get()
    for record in raw:
        save(record.replace('raw', '----'))

projects 目录中 __init__.py 文件的内容:

$ cat projects/__init__.py
"""
use __init__ file to import all projects
that might have been registered with `program.py` using `register_task`
"""

from . import process1, process2

# TODO: Dynamically import all projects (whether file or directory (as project)) that wil be created in the `projects` directory automatically (ignoring any modules that will start with an `_`)
# Something in the sense of:
# ```
# for module in os.listdir(os.path.dirname(os.path.abspath(__file__))):
#     if module.startswith('_') or module.startswith('.'):
#         continue
#     __import__(os.path.splitext(module)[0])
# ```

然而,当我 运行 程序时,我看到了; 1. program.py 被执行两次(一次是 __main__,一次是 program)。 2. 追加任务(第二次执行运行)。 然而,在遍历任务时,发现 none。

$ python3 program.py
Before importing projects as __main__. tasks: []
Before importing projects as program. tasks: []
After importing projects as program. tasks: []
Iterating over tasks: [] in program
Appending 'process1' in program
> tasks in program: [<Task(process1)>]
Appending 'process2' in program
> tasks in program: [<Task(process1)>, <Task(process2)>]
After importing projects as __main__. tasks: []
Iterating over tasks: [] in __main__

没看懂;

  1. 为什么主(program.py)文件被执行了两次,我以为不能循环导入因为python缓存了导入的模块?
    (我采用了 flask 应用程序中使用的循环导入的想法,即 app.py 导入 routesmodels 等,它们全部导入 app 和使用它来定义功能,然后 app.py 将它们导入回来以便添加功能(仅作为烧瓶 运行s app.py))
  2. 为什么 tasks 列表在追加进程后为空?

将我的循环导入与基于 Flask 的应用进行循环导入的比较如下

使用循环导入的示例烧瓶程序

Flask 应用结构

(venv) $ echo $FLASK_APP
mgflask.py

(venv) $ tree
.
├── app
│   ├── models
│   │   ├── __init__.py
│   │   ├── post.py
│   │   └── user.py
│   ├── templates/
│   ├── forms.py
│   ├── __init__.py
│   └── routes.py
├── config.py
└── mgflask.py

(venv) $ cat mgflask.py 
#!/usr/bin/env python

from app import app

# ...

(venv) $ cat app/__init__.py 
from flask import Flask
from config import Config
# ... # config imports

app = Flask(__name__) # <---
# ... # config setup

from . import routes, models, errors # <---

(venv) $ cat app/routes.py 

from flask import render_template, flash, redirect, url_for, request
# ... # import extensions
from . import app, db # <---
from .forms import ... 
from .models import ...

@app.route('/')
def index():
    return render_template('index.html', title='Home')

(venv) $ flask run
 * Serving Flask app "mgflask.py" (lazy loading)
 * Environment: production
   WARNING: Do not use the development server in a production environment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: ???-???-???

我通过以下方式重组了我的应用程序;

  1. 我将 Task class、tasks 列表、register_task 修饰函数移到了 projects/__init__.py 的底部init.py文件我导入目录
  2. 定义的项目
  3. program.py 文件中,我只是 from projects import tasks 并且一切正常。

唯一留下的问题是 运行 prog.pyprog/(其中包含 __main__.py)之间的区别是什么(我的问题的第一次迭代在这里.. .)