动态创建蝗虫任务?
Create locust tasks dynamically?
基本上我需要生成 30 个用户并为他们分配 50 个不同的任务,我需要他们并行 运行。所以我正在尝试生成 50 个任务,如下所示:
class UserSimulation(HttpUser):
host = os.environ['BASE_URL']
# time in seconds the simulated user will wait before proceeding to the next task
wait_time = between(1, 2)
for item_id in range(1, 51):
@task(1)
def view_items_with_different_item_ids(self, item_id=item_id):
self.client.get(
url=f"/my-url/item24_00{item_id}",
verify=False,
auth=(os.environ['USERNAME'], os.environ['PASSWORD'])
由于显而易见的原因,这种方法不允许我动态创建 50 个任务,因为只保存了最后一个任务。有什么解决方法吗?
要按照您尝试的方式进行操作,请尝试 creating different functions programmatically。我不知道如何对它们使用装饰器,但如果没有别的,你可以在创建它们时将这些函数添加到 Locust 任务列表中。只需创建一个任务列表 tasks = []
,然后在创建函数时 tasks.append(view_items_with_different_item_ids_1)
。
但是,根据您对所需内容的描述,我不确定是否有必要这样做。如果你只需要 30 个用户去调用 50 次,你只需要一个任务,你可以在那里循环调用。
class UserSimulation(HttpUser):
host = os.environ['BASE_URL']
# time in seconds the simulated user will wait before proceeding to the next task
wait_time = between(1, 2)
@task(1)
def view_items_with_different_item_ids(self):
for item_id in range(1, 51):
self.client.get(
url=f"/my-url/item24_00{item_id}",
verify=False,
auth=(os.environ['USERNAME'], os.environ['PASSWORD'])
如果您需要随机数而不是连续数,但需要确保每个数都被调用一次:
import random
item_ids = list(range(1,51))
random.shuffle(item_ids)
@task(1)
def view_items_with_different_item_ids(self):
for item_id in item_ids:
self.client.get(
url=f"/my-url/item24_00{item_id}",
verify=False,
auth=(os.environ['USERNAME'], os.environ['PASSWORD'])
如果你只是想一直拉一个随机数而不关心重复:
import random
item_ids = list(range(1,51))
@task(1)
def view_items_with_different_item_ids(self):
random_id = random.choice(item_ids)
self.client.get(
url=f"/my-url/item24_00{random_id}",
verify=False,
auth=(os.environ['USERNAME'], os.environ['PASSWORD'])
基本上我需要生成 30 个用户并为他们分配 50 个不同的任务,我需要他们并行 运行。所以我正在尝试生成 50 个任务,如下所示:
class UserSimulation(HttpUser):
host = os.environ['BASE_URL']
# time in seconds the simulated user will wait before proceeding to the next task
wait_time = between(1, 2)
for item_id in range(1, 51):
@task(1)
def view_items_with_different_item_ids(self, item_id=item_id):
self.client.get(
url=f"/my-url/item24_00{item_id}",
verify=False,
auth=(os.environ['USERNAME'], os.environ['PASSWORD'])
由于显而易见的原因,这种方法不允许我动态创建 50 个任务,因为只保存了最后一个任务。有什么解决方法吗?
要按照您尝试的方式进行操作,请尝试 creating different functions programmatically。我不知道如何对它们使用装饰器,但如果没有别的,你可以在创建它们时将这些函数添加到 Locust 任务列表中。只需创建一个任务列表 tasks = []
,然后在创建函数时 tasks.append(view_items_with_different_item_ids_1)
。
但是,根据您对所需内容的描述,我不确定是否有必要这样做。如果你只需要 30 个用户去调用 50 次,你只需要一个任务,你可以在那里循环调用。
class UserSimulation(HttpUser):
host = os.environ['BASE_URL']
# time in seconds the simulated user will wait before proceeding to the next task
wait_time = between(1, 2)
@task(1)
def view_items_with_different_item_ids(self):
for item_id in range(1, 51):
self.client.get(
url=f"/my-url/item24_00{item_id}",
verify=False,
auth=(os.environ['USERNAME'], os.environ['PASSWORD'])
如果您需要随机数而不是连续数,但需要确保每个数都被调用一次:
import random
item_ids = list(range(1,51))
random.shuffle(item_ids)
@task(1)
def view_items_with_different_item_ids(self):
for item_id in item_ids:
self.client.get(
url=f"/my-url/item24_00{item_id}",
verify=False,
auth=(os.environ['USERNAME'], os.environ['PASSWORD'])
如果你只是想一直拉一个随机数而不关心重复:
import random
item_ids = list(range(1,51))
@task(1)
def view_items_with_different_item_ids(self):
random_id = random.choice(item_ids)
self.client.get(
url=f"/my-url/item24_00{random_id}",
verify=False,
auth=(os.environ['USERNAME'], os.environ['PASSWORD'])