如何使用并行处理 post a json API?

how to post a json API with Parallel processing?

我有这样的代码:

import requests
from multiprocessing import Pool
import json
import time
from datetime import datetime as dt

BASE_URL = 'http://localhost:3000/test'

with open('data.json', 'r') as f:
    list_dict = json.load(f)


def resource_post(post_data):
    stuff_got = []
    timestamp = dt.now().strftime('%Y%m%d %H:%M:%S:%f')[:-3]  # timestamp
    post_data["timestamp"] = timestamp
    response = requests.post(BASE_URL, json=post_data)
    stuff_got.append(response.json())
    print(stuff_got) # print hasil response
    df = pd.DataFrame.from_dict(stuff_got)
    df.to_csv("output.txt", sep='|', mode='a', index=False, header=False) 
    #time.sleep(0.1) #delay
    return stuff_got

if __name__ == '__main__':  
    start=time.time()
    pool = Pool()
    with pool as p:
        p.map(resource_post,list_dict)
        p.close()
        p.join()
    elapsed = (time.time() - start)  
    print("\n","time elapsed is :", elapsed)

在文件 data.json 中:

[{"first_name":"John","last_name":"Swen"},{"first_name":"Ricard","last_name":"Candra"}]

在BASE_URL中有这样的数据:

{
  "body": {
    "first_name": "Sebastian",
    "last_name": "Eschweiler"
}

我想 post 来自 data.json 的字典,格式为:

{
  "body": {
    "first_name":"John",
    "last_name":"Swen"
  }
}

上面的每个数据都会被循环,并会从服务器生成响应 {"responseCode": "0006", "responseMessage": "success", "first_name": "John"}。我会将响应输入 output.txt 文件,格式为:

0006 | success | John.

因此如何制作 data.json 中的每个词典将 post 以格式发送到服务器:

{
  "body": {
    "first_name":"John",
    "last_name":"Swen"
  }
}

我想这就是您所追求的。

  • 使用共享 requests.Session() 让事情变得更快。
  • 不要读取顶层的输入数据;它也会在所有多处理子进程中单独读取(无论如何 Windows)。
  • 您根本没有检查错误响应。
  • 您不需要 Pandas 将类 CSV 数据写入文件。
  • 将数据写入文件现在是多处理主机完成的任务,以避免例如。两个进程相互干扰写入。
  • Pool.imap_unordered() 更快但无序。如果您需要订购,请使用 imap()
import json
import time
from datetime import datetime as dt
from multiprocessing import Pool

import requests

BASE_URL = 'http://localhost:3000/test'

sess = requests.Session()


def resource_post(data_from_json):
    post_data = {
        "timestamp": dt.now().strftime('%Y%m%d %H:%M:%S:%f')[:-3],
        "body": data_from_json,
    }
    response = sess.post(BASE_URL, json=post_data)
    response.raise_for_status()
    return response.json()


def main():
    with open('data.json', 'r') as f:
        list_dict = json.load(f)
    start = time.time()
    with open("output.txt", "a") as outf:
        with Pool() as p:
            for resp in p.imap_unordered(resource_post, list_dict):
                print(resp["responseCode"], resp["responseMessage"], resp["first_name"], sep="|", file=outf)
    elapsed = (time.time() - start)
    print("\n", "time elapsed is :", elapsed)


if __name__ == '__main__':
    main()