使用 python 在多处理期间将数据写入 JSON 文件

write data to JSON file during multiprocessing using python

我是 python 的新手。如果网站无法访问,我正在编写一个 python 程序来写入 JSON 文件。多个网站将存储在主机变量中。它将被安排每 5 秒检查一次。我已经使用 multiprocessing 中的池同时处理网站,没有延迟。之后,我会将数据写入 json 文件。但在这里,它只将一个网站数据写入 json 文件。那么如何让这个同时写入两个数据呢

示例代码如下:

import os
from multiprocessing import Pool
from datetime import datetime
import time
import json

hosts = ["www.google.com","www.smackcoders.com"]
n = len(hosts)

def write(hosts):
    u = "down"
    name = "stack.json"
    if not os.path.exists(name):
        with open(name, 'w') as f:
            f.write('{}')
    result = [(timestamp, {'monitor.status': u,
                           "monitor.id": "tcp-tcp@"+hosts
                           })]

    with open(name, 'rb+') as f:
        f.seek(-1, os.SEEK_END)
        f.truncate()
        for entry in result:
            _entry = '"{}":{},\n'.format(entry[0], json.dumps(entry[1]))
            _entry = _entry.encode()
            f.write(_entry)
        f.write('}'.encode('ascii'))

def main(hosts):
    p = Pool(processes= n)
    result = p.map(write, hosts)
while True:
    timestamp = datetime.now().strftime("%B %d %Y, %H:%M:%S")
    main(hosts)
    time.sleep(5)

我的输出:

""March 13 2019, 10:49:03":{"monitor.id": "tcp-tcp@www.smackcoders.com", "monitor.status": "down"},
}

所需输出:

{"March 13 2019, 10:49:03":{"monitor.id": "tcp-tcp@www.smackcoders.com", "monitor.status": "down"},"March 13 2019, 10:49:03":{"monitor.id": "tcp-tcp@www.google.com", "monitor.status": "down"},
}

我对你的代码做了一些小改动并实现了一个锁。

import os
from multiprocessing import Pool,RLock
from datetime import datetime
import time
import json

file_lock=RLock()
hosts = ["www.google.com","www.smackcoders.com"]
n = len(hosts)

def write(hosts):
    u = "down"
    name = "stack.json"
    if not os.path.exists(name):
        with open(name, 'w') as f:
            f.write('{}')
    result = [(timestamp, {'monitor.status': u,
                           "monitor.id": "tcp-tcp@"+hosts
                           })]
    with file_lock:
        with open(name, 'rb+') as f:
            f.seek(-1, os.SEEK_END)
            f.truncate()
            for entry in result:
                _entry = '"{}":{},\n'.format(entry[0], json.dumps(entry[1]))
                _entry = _entry.encode()
                f.write(_entry)
            f.write('}'.encode('ascii'))


def main(hosts):
    p = Pool(processes= n)
    result = p.map(write, hosts)
while True:
    timestamp = datetime.now().strftime("%B %d %Y, %H:%M:%S")
    main(hosts)
    time.sleep(5)

然而,对于一个长时间的 运行 进程来说,不断地读取和写入文件以进行日志记录似乎是一个糟糕的实现,因为代码将不得不读取一个庞大的文件并在每个进程上完全重写它。考虑改为将日志写入数据库。

这是一个不同的选项,它将使用 Thread over Pool。

创建了一个 class 来获取 join() 的 return

# Class that overwrite Thread to get the return of join()
class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=None, kwargs=None, Verbose=None):
        if args is None:
            args = ()
        if kwargs is None:
            kwargs = {}

        super().__init__(group, target, name, args, kwargs)
        self._return = None

    def run(self):
        print(type(self._target))
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self, *args):
        Thread.join(self, *args)
        return self._return

我更改了代码以先获取每个主机的状态,然后将结果写入您的文件。还修复了 JSON 文件的写入方式。

import os
from datetime import datetime
import time
import json
from threading import Thread

hosts = ["www.google.com","www.smackcoders.com"]
filepath = os.path.join(os.getcwd(), "stack.json")
n = len(hosts)


def perform_ping(host_ip):
    """
    You have hardcoded down, this method will ping to check if we get an ICMP response
    """
    response = os.system("ping -c 1 " + host_ip)
    if response == 0:
        return 'UP'
    else:
        return 'DOWN'


def write_result(timestamp, results):
    # u = "down"  Using perform_ping to get the status

    if not os.path.exists(filepath):
        current_file = {}
    else:
       # If file exist, reading the current output
        with open(filepath, 'r') as f_read:
            current_file = json.loads(f_read.read())

    inner_result = []
    for result in results:
        host, status = result
        inner_result.append({'monitor.status': status,
                             "monitor.id": "tcp-tcp@"+host
                    })

    current_file[timestamp] = inner_result

    # writing the file with new input
    with open(filepath, 'w') as f_write:
        f_write.write(json.dumps(current_file))


def main():
    while True:
        thread_list = []
        for host_ip in hosts:
            thread_list.append(ThreadWithReturnValue(target=perform_ping, name=host_ip, args=(host_ip, )))
        results = []
        timestamp = datetime.now().strftime("%B %d %Y, %H:%M:%S")
        for thread in thread_list:
            thread.start()
        for thread in thread_list:
            results.append((thread.name, thread.join()))
        # Ping is done in parallel, writing the result at the end to avoid thread collision and reading/writing the file to many times if you increase the number of host
        write_result(timestamp, results)
        time.sleep(5)


if __name__ == '__main__':
    main()