Java,在多线程环境中通过散列统一划分传入的工作
Java, divide incoming work uniformly via hashing in multithreaded evnironments
我已经实现了一个 java 代码来执行传入的任务(如 Runnable
),其中有 n 个线程基于它们的 hashCode 模块 nThreads
。理想情况下,工作应该均匀地分布在这些线程中。
具体来说,我们有一个 dispatchId
作为每个任务的字符串。
这是 java 代码片段:
int nThreads = Runtime.getRuntime().availableProcessors(); // Number of threads
Worker[] workers = new Worker[nThreads]; // Those threads, Worker is just a thread class that can run incoming tasks
...
Worker getWorker(String dispatchId) { // Get a thread for this Task
return workers[(dispatchId.hashCode() & Integer.MAX_VALUE) % nThreads];
}
重要: 在大多数情况下,dispatchId 是:
String dispatchId = 'SomePrefix' + counter.next()
但是,我担心除以 nThreads 的模数不是一个好的选择,因为 nThreads 应该是质数,以便更均匀地分配 dispatId 键。
关于如何更好地传播工作,还有其他选择吗?
更新 1:
每个Worker都有一个队列:
Queue<RunnableWrapper> tasks = new ConcurrentLinkedQueue();
worker从中获取任务并执行。可以从其他线程将任务添加到此队列。
更新二:
相同dispatchId
的任务可以多次进来,所以我们需要通过dispatchId
找到他们的线程。
最重要的是,每个工作线程必须按顺序处理其传入的任务。于是,上面的update 1中就有了数据结构Queue。
更新 3:
此外,一些线程可能很忙,而其他线程则空闲。因此,我们需要以某种方式将队列与线程分离,但要为任务执行保持相同 dispatchId
的 FIFO 顺序。
解法:
我已经实现了 Ben Manes 的想法(他在下面的回答),代码可以找到 here.
听起来你需要每个调度 ID 进行 FIFO 排序,所以理想的是将调度队列作为抽象。这可以解释您对散列不提供统一分布的担忧,因为一些调度队列可能比其他调度队列更活跃,并且在工人之间不公平地平衡。通过将队列与工作线程分开,您可以保留 FIFO 语义并均匀分布工作。
提供此抽象的非活动库是 HawtDispatch。它与 Java 6 兼容。
一个非常简单的 Java8 方法是使用 CompletableFuture as a queuing mechanism, ConcurrentHashMap for registration, and an Executor (e.g. ForkJoinPool) for computing. See EventDispatcher 来实现这个想法,其中注册是显式的。如果您的调度员更加动态,那么您可能需要定期修剪地图。基本思路如下
ConcurrentMap<String, CompletableFuture<Void>> dispatchQueues = ...
public CompletableFuture<Void> dispatch(String queueName, Runnable task) {
return dispatchQueues.compute(queueName, (k, queue) -> {
return (queue == null)
? CompletableFuture.runAsync(task)
: queue.thenRunAsync(task);
});
}
更新 (JDK7)
上述想法的反向移植将用番石榴翻译成类似的东西,
ListeningExecutorService executor = ...
Striped<Lock> locks = Striped.lock(256);
ConcurrentMap<String, ListenableFuture<?>> dispatchQueues = ...
public ListenableFuture<?> dispatch(String queueName, final Runnable task) {
Lock lock = locks.get(queueName);
lock.lock();
try {
ListenableFuture<?> future = dispatchQueues.get(queueName);
if (future == null) {
future = executor.submit(task);
} else {
final SettableFuture<Void> next = SettableFuture.create();
future.addListener(new Runnable() {
try {
task.run();
} finally {
next.set(null);
}
}, executor);
future = next;
}
dispatchQueues.put(queueName, future);
} finally {
lock.unlock();
}
}
我已经实现了一个 java 代码来执行传入的任务(如 Runnable
),其中有 n 个线程基于它们的 hashCode 模块 nThreads
。理想情况下,工作应该均匀地分布在这些线程中。
具体来说,我们有一个 dispatchId
作为每个任务的字符串。
这是 java 代码片段:
int nThreads = Runtime.getRuntime().availableProcessors(); // Number of threads
Worker[] workers = new Worker[nThreads]; // Those threads, Worker is just a thread class that can run incoming tasks
...
Worker getWorker(String dispatchId) { // Get a thread for this Task
return workers[(dispatchId.hashCode() & Integer.MAX_VALUE) % nThreads];
}
重要: 在大多数情况下,dispatchId 是:
String dispatchId = 'SomePrefix' + counter.next()
但是,我担心除以 nThreads 的模数不是一个好的选择,因为 nThreads 应该是质数,以便更均匀地分配 dispatId 键。
关于如何更好地传播工作,还有其他选择吗?
更新 1:
每个Worker都有一个队列:
Queue<RunnableWrapper> tasks = new ConcurrentLinkedQueue();
worker从中获取任务并执行。可以从其他线程将任务添加到此队列。
更新二:
相同dispatchId
的任务可以多次进来,所以我们需要通过dispatchId
找到他们的线程。
最重要的是,每个工作线程必须按顺序处理其传入的任务。于是,上面的update 1中就有了数据结构Queue。
更新 3:
此外,一些线程可能很忙,而其他线程则空闲。因此,我们需要以某种方式将队列与线程分离,但要为任务执行保持相同 dispatchId
的 FIFO 顺序。
解法: 我已经实现了 Ben Manes 的想法(他在下面的回答),代码可以找到 here.
听起来你需要每个调度 ID 进行 FIFO 排序,所以理想的是将调度队列作为抽象。这可以解释您对散列不提供统一分布的担忧,因为一些调度队列可能比其他调度队列更活跃,并且在工人之间不公平地平衡。通过将队列与工作线程分开,您可以保留 FIFO 语义并均匀分布工作。
提供此抽象的非活动库是 HawtDispatch。它与 Java 6 兼容。
一个非常简单的 Java8 方法是使用 CompletableFuture as a queuing mechanism, ConcurrentHashMap for registration, and an Executor (e.g. ForkJoinPool) for computing. See EventDispatcher 来实现这个想法,其中注册是显式的。如果您的调度员更加动态,那么您可能需要定期修剪地图。基本思路如下
ConcurrentMap<String, CompletableFuture<Void>> dispatchQueues = ...
public CompletableFuture<Void> dispatch(String queueName, Runnable task) {
return dispatchQueues.compute(queueName, (k, queue) -> {
return (queue == null)
? CompletableFuture.runAsync(task)
: queue.thenRunAsync(task);
});
}
更新 (JDK7)
上述想法的反向移植将用番石榴翻译成类似的东西,
ListeningExecutorService executor = ...
Striped<Lock> locks = Striped.lock(256);
ConcurrentMap<String, ListenableFuture<?>> dispatchQueues = ...
public ListenableFuture<?> dispatch(String queueName, final Runnable task) {
Lock lock = locks.get(queueName);
lock.lock();
try {
ListenableFuture<?> future = dispatchQueues.get(queueName);
if (future == null) {
future = executor.submit(task);
} else {
final SettableFuture<Void> next = SettableFuture.create();
future.addListener(new Runnable() {
try {
task.run();
} finally {
next.set(null);
}
}, executor);
future = next;
}
dispatchQueues.put(queueName, future);
} finally {
lock.unlock();
}
}