知道在 Apache Spark 的 Java ForkJoinPool 中哪个线程转到哪个处理器吗?

Know which thread goes to which processor in a Java ForkJoinPool in Apache Spark?

目标:当我分叉一个线程时,要知道它将落在哪个处理器上。那可能吗?不管潜在的方法是否有效,这个狭窄的问题是否有好的答案?谢谢

(现在我需要为每个线程制作一个 classes 的副本,在该线程中写入它并稍后将它们全部合并。使用 synchronized 方法不是可能是因为我的 Java 专家老板认为这是个坏主意,经过大量讨论后我同意了。如果我知道每个线程将落在哪个处理器上,我只需要制作尽可能多的副本 class 因为有处理器。)

我们使用 Apache Spark 来让我们的工作分布在一个集群中,但在我们的应用程序中,运行 一个大的执行器是有意义的,然后在每台机器上做一些我们自己的多线程群集。

我可以节省大量的深度复制 如果我能知道一个线程被发送到哪个处理器,这可能吗? 我加入了我们的代码,但它可能更多一道概念题:

当我深入到 compute() 的 "do task" 部分时,我能知道它 运行 正在使用哪个处理器吗?

public class TholdExecutor extends RecursiveTask<TholdDropEvaluation> {

    final static Logger logger = LoggerFactory.getLogger(TholdExecutor.class);

    private List<TholdDropResult> partitionOfN = new ArrayList<>();
    private int coreCount;
    private int desiredPartitionSize; // will be updated by whatever is passed into the constructor per-chromosome
    private TholdDropEvaluation localDropEvaluation; // this DropEvaluation
    private TholdDropResult mSubI_DR;


    public TholdExecutor(List<TholdDropResult> subsetOfN, int cores, int partSize, TholdDropEvaluation passedDropEvaluation, TholdDropResult mDrCopy) {
        partitionOfN = subsetOfN;
        coreCount = cores;
        desiredPartitionSize = partSize;

        // the TholdDropEvaluation needs to be a copy for each thread? It can't be the same one passed to threads ... so ...
        TholdDropEvaluation localDropEvaluation = makeDECopy(passedDropEvaluation); // THIS NEEDS TO BE A DEEP COPY OF THE DROP EVAL!!! NOT THE ORIGINAL!!

        // we never modify the TholdDropResult that is passed in, we just need to read it all on the same JVM/worker, so
        mSubI_DR = mDrCopy; // this is purely a reference and can point to the passed in value (by reference, right?)

    }

    // this makes a deep copy of the TholdDropEvaluation for each thread, we copy the SharingRun's startIndex and endIndex only,
    // as LEG events will be calculated during the subsequent dropComparison. The constructor for TholdDropEvaluation must set
    // LEG events to zero.
    private void makeDECopy(TholdDropEvaluation passedDropEvaluation) {
        TholdDropEvaluation tholdDropEvaluation = new TholdDropEvaluation();

        // iterate through the SharingRuns in the SharingRunList from the TholdDropEval that was passed in
        for (SharingRun sr : passedDropEvaluation.getSharingRunList()) {
            SharingRun ourSharingRun = new SharingRun();
            ourSharingRun.startIndex = sr.startIndex;
            ourSharingRun.endIndex = sr.endIndex;

            tholdDropEvaluation.addSharingRun(ourSharingRun);
        }
        return tholdDropEvaluation
    }

    @Override
    protected TholdDropEvaluation compute() {

        int simsToDo = partitionOfN.size();
        UUID tag = UUID.randomUUID();

        long computeStartTime = System.nanoTime();

        if (simsToDo <= desiredPartitionSize) {
            logger.debug("IN MULTI-THREAD compute() --- UUID {}:Evaluating partitionOfN sublist length", tag, simsToDo);

            // job within size limit, do the task and return the completed TholdDropEvaluation
            // iterate through each TholdDropResult in the sub-partition and do the dropComparison to the refernce mSubI_DR,
            // writing to the copy of the DropEval in tholdDropEvaluation
            for (TholdDropResult currentResult : partitionOfN) {

                mSubI_DR.dropComparison(currentResult, localDropEvaluation);

            }
        } else {

            // job too large, subdivide and call this recursively
            int half = simsToDo / 2;
            logger.info("Splitting UUID = {}, half is {} and simsToDo is {}", tag, half, simsToDo );
            TholdExecutor nextExec = new TholdExecutor(partitionOfN.subList(0, half), coreCount, desiredPartitionSize, tholdDropEvaluation, mSubI_DR);
            TholdExecutor futureExec = new TholdExecutor(partitionOfN.subList(half, simsToDo), coreCount, desiredPartitionSize, tholdDropEvaluation, mSubI_DR);
            nextExec.fork();
            TholdDropEvaluation futureEval = futureExec.compute();
            TholdDropEvaluation nextEval = nextExec.join();
            tholdDropEvaluation.merge(futureEval);
            tholdDropEvaluation.merge(nextEval);
        }

        logger.info("{} Compute time is {} ns",tag, System.nanoTime() - computeStartTime);

        // NOTE: this was inside the else block in Rob's example, but don't we want it outside the block so it's returned
        // whether
        return tholdDropEvaluation;
    }
}

即使您能弄清楚线程 运行 最初 的位置,也没有理由假设它会在 processor/core 的其余部分中存在它的生命。对于任何大到值得产生线程成本的任务,很可能它不会,因此您需要完全控制它 运行 的位置以提供该级别的保证 运行ce。

据我所知,Java 中没有控制从线程到处理器内核的映射的标准机制。通常称为 "thread affinity" 或 "processor affinity"。例如,在 Windows 和 Linux 上,您可以使用以下方式控制它:

因此理论上您可以编写一些 C 和 JNI 代码,使您能够在您关心的 Java 主机上对其进行足够的抽象以使其工作。

这感觉像是对您似乎面临的实际问题的错误解决方案,因为您最终从 OS 调度程序中撤回了选项,这可能不允许它做出最明智的调度决策,从而导致总共 运行 增加时间。除非你将不寻常的工作负载 and modelling/querying 处理器 information/topology 降低到 NUMA 和共享缓存的级别,否则它应该更好地找出位置运行 个线程来处理大多数工作负载。您的 JVM 通常 运行 除了您在调用 main() 之后显式创建的线程之外还有大量其他线程。此外,关于您 运行 今天(甚至明天)可能决定自己对线程亲和性执行的 JVM,我不想做出任何承诺。

话虽如此,但潜在的问题似乎是您希望每个线程拥有一个对象实例。通常,这比预测线程将 运行 的位置然后在任何时间点手动找出 N 个处理器和 M 个线程之间的映射要容易得多。通常你会使用 "thread local storage" (TLS) 来解决这个问题。

大多数语言以一种或另一种形式提供这个概念。在 Java 中,这是通过 ThreadLocal class 提供的。给出的链接文档中有一个示例:

 public class ThreadId {
     // Atomic integer containing the next thread ID to be assigned
     private static final AtomicInteger nextId = new AtomicInteger(0);

     // Thread local variable containing each thread's ID
     private static final ThreadLocal<Integer> threadId =
         new ThreadLocal<Integer>() {
             @Override protected Integer initialValue() {
                 return nextId.getAndIncrement();
         }
     };

     // Returns the current thread's unique ID, assigning it if necessary
     public static int get() {
         return threadId.get();
     }
 }

基本上有两件事是你关心的:

  1. 当你调用get()它returns属于当前线程的值(对象)
  2. 如果您在当前没有任何内容的线程中调用 get,它将调用您实现的 initialValue(),这允许您构造或获取新对象。

因此,在您的场景中,您可能希望从只读全局版本深度复制某些本地状态的初始版本。

最后一点注意:如果你的目标是分而治之;在很多线程上做一些工作,然后将它们的所有结果合并到一个答案中,合并部分通常称为缩减。在那种情况下,您可能正在寻找 MapReduce,这可能是最著名的使用归约的并行形式。