如何使用并行处理 post a json API?
how to post a json API with Parallel processing?
我有这样的代码:
import requests
from multiprocessing import Pool
import json
import time
from datetime import datetime as dt
BASE_URL = 'http://localhost:3000/test'
with open('data.json', 'r') as f:
list_dict = json.load(f)
def resource_post(post_data):
stuff_got = []
timestamp = dt.now().strftime('%Y%m%d %H:%M:%S:%f')[:-3] # timestamp
post_data["timestamp"] = timestamp
response = requests.post(BASE_URL, json=post_data)
stuff_got.append(response.json())
print(stuff_got) # print hasil response
df = pd.DataFrame.from_dict(stuff_got)
df.to_csv("output.txt", sep='|', mode='a', index=False, header=False)
#time.sleep(0.1) #delay
return stuff_got
if __name__ == '__main__':
start=time.time()
pool = Pool()
with pool as p:
p.map(resource_post,list_dict)
p.close()
p.join()
elapsed = (time.time() - start)
print("\n","time elapsed is :", elapsed)
在文件 data.json 中:
[{"first_name":"John","last_name":"Swen"},{"first_name":"Ricard","last_name":"Candra"}]
在BASE_URL中有这样的数据:
{
"body": {
"first_name": "Sebastian",
"last_name": "Eschweiler"
}
我想 post 来自 data.json 的字典,格式为:
{
"body": {
"first_name":"John",
"last_name":"Swen"
}
}
上面的每个数据都会被循环,并会从服务器生成响应 {"responseCode": "0006", "responseMessage": "success", "first_name": "John"}。我会将响应输入 output.txt 文件,格式为:
0006 | success | John.
因此如何制作 data.json 中的每个词典将 post 以格式发送到服务器:
{
"body": {
"first_name":"John",
"last_name":"Swen"
}
}
我想这就是您所追求的。
- 使用共享
requests.Session()
让事情变得更快。
- 不要读取顶层的输入数据;它也会在所有多处理子进程中单独读取(无论如何 Windows)。
- 您根本没有检查错误响应。
- 您不需要 Pandas 将类 CSV 数据写入文件。
- 将数据写入文件现在是多处理主机完成的任务,以避免例如。两个进程相互干扰写入。
Pool.imap_unordered()
更快但无序。如果您需要订购,请使用 imap()
。
import json
import time
from datetime import datetime as dt
from multiprocessing import Pool
import requests
BASE_URL = 'http://localhost:3000/test'
sess = requests.Session()
def resource_post(data_from_json):
post_data = {
"timestamp": dt.now().strftime('%Y%m%d %H:%M:%S:%f')[:-3],
"body": data_from_json,
}
response = sess.post(BASE_URL, json=post_data)
response.raise_for_status()
return response.json()
def main():
with open('data.json', 'r') as f:
list_dict = json.load(f)
start = time.time()
with open("output.txt", "a") as outf:
with Pool() as p:
for resp in p.imap_unordered(resource_post, list_dict):
print(resp["responseCode"], resp["responseMessage"], resp["first_name"], sep="|", file=outf)
elapsed = (time.time() - start)
print("\n", "time elapsed is :", elapsed)
if __name__ == '__main__':
main()
我有这样的代码:
import requests
from multiprocessing import Pool
import json
import time
from datetime import datetime as dt
BASE_URL = 'http://localhost:3000/test'
with open('data.json', 'r') as f:
list_dict = json.load(f)
def resource_post(post_data):
stuff_got = []
timestamp = dt.now().strftime('%Y%m%d %H:%M:%S:%f')[:-3] # timestamp
post_data["timestamp"] = timestamp
response = requests.post(BASE_URL, json=post_data)
stuff_got.append(response.json())
print(stuff_got) # print hasil response
df = pd.DataFrame.from_dict(stuff_got)
df.to_csv("output.txt", sep='|', mode='a', index=False, header=False)
#time.sleep(0.1) #delay
return stuff_got
if __name__ == '__main__':
start=time.time()
pool = Pool()
with pool as p:
p.map(resource_post,list_dict)
p.close()
p.join()
elapsed = (time.time() - start)
print("\n","time elapsed is :", elapsed)
在文件 data.json 中:
[{"first_name":"John","last_name":"Swen"},{"first_name":"Ricard","last_name":"Candra"}]
在BASE_URL中有这样的数据:
{
"body": {
"first_name": "Sebastian",
"last_name": "Eschweiler"
}
我想 post 来自 data.json 的字典,格式为:
{
"body": {
"first_name":"John",
"last_name":"Swen"
}
}
上面的每个数据都会被循环,并会从服务器生成响应 {"responseCode": "0006", "responseMessage": "success", "first_name": "John"}。我会将响应输入 output.txt 文件,格式为:
0006 | success | John.
因此如何制作 data.json 中的每个词典将 post 以格式发送到服务器:
{
"body": {
"first_name":"John",
"last_name":"Swen"
}
}
我想这就是您所追求的。
- 使用共享
requests.Session()
让事情变得更快。 - 不要读取顶层的输入数据;它也会在所有多处理子进程中单独读取(无论如何 Windows)。
- 您根本没有检查错误响应。
- 您不需要 Pandas 将类 CSV 数据写入文件。
- 将数据写入文件现在是多处理主机完成的任务,以避免例如。两个进程相互干扰写入。
Pool.imap_unordered()
更快但无序。如果您需要订购,请使用imap()
。
import json
import time
from datetime import datetime as dt
from multiprocessing import Pool
import requests
BASE_URL = 'http://localhost:3000/test'
sess = requests.Session()
def resource_post(data_from_json):
post_data = {
"timestamp": dt.now().strftime('%Y%m%d %H:%M:%S:%f')[:-3],
"body": data_from_json,
}
response = sess.post(BASE_URL, json=post_data)
response.raise_for_status()
return response.json()
def main():
with open('data.json', 'r') as f:
list_dict = json.load(f)
start = time.time()
with open("output.txt", "a") as outf:
with Pool() as p:
for resp in p.imap_unordered(resource_post, list_dict):
print(resp["responseCode"], resp["responseMessage"], resp["first_name"], sep="|", file=outf)
elapsed = (time.time() - start)
print("\n", "time elapsed is :", elapsed)
if __name__ == '__main__':
main()