Java stream parallel Stream 如何将多个函数映射到stream map

Java stream parallelStream How to map multple functions to stream map

我有这段代码适用于 Arraylist 数组列表中的每个元素都需要通过 ArithmeticBBB 和 ArithmeticCCC 进行计算 并创建 2 个返回到流中的元素 我正在使用 stream = stream.map(a::work);在循环 。 或者使用 flatMap

class ArithmeticBBB implements ArithmeticManagerStream.ArithmeticAction {
        @Override
        public String arithmetic(String n) {
            String ss  = n+" 2" + " ,Thread ID:" +Thread.currentThread().getId();
            return ss;
        }
    }
    class ArithmeticCCC implements ArithmeticManagerStream.ArithmeticAction {
        @Override
        public String arithmetic(String n) {
            String ss  = n+" 3" + " ,Thread ID:" +Thread.currentThread().getId();
            return ss;
        }
    }
    
    public class ArithmeticManagerStream {
        private List<String> integerList = null;
        private List<String> resultArray = null;
    
    
        public ArithmeticManagerStream(List<String> dataFromUser) {
            this.integerList =  dataFromUser;
        }
    
        public List<String> invokerActions(List<ArithmeticAction> actions) throws
                InterruptedException {
    
            Stream<String> stream = integerList.parallelStream();
            for (final ArithmeticManagerStream.ArithmeticAction a : actions) {
                stream = stream.flatMap(str -> actions.stream().map(b -> b.arithmetic(str)));
             }
            return resultArray = stream.collect(Collectors.toList());
        }
        public interface ArithmeticAction {
            String arithmetic(String n);
        }
    
        public static void main(String[] args) {
            List<ArithmeticAction> actions = new ArrayList();
            actions.add(new ArithmeticBBB());
            actions.add(new ArithmeticCCC());
            List<String> intData = new ArrayList<>();
            intData.add("1");
            intData.add("2");
            intData.add("3");
    
            ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);
    
            try {
                List<String> result = arithmeticManagerStream.invokerActions(actions);
                System.out.println("***********************************************");
                for(String i : result) {
                    System.out.println(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

目前,如果您运行这个程序,您将看到结果连接到字符串。

1 2 ,Thread ID:12 2 ,Thread ID:12
1 2 ,Thread ID:12 3 ,Thread ID:12
1 3 ,Thread ID:12 2 ,Thread ID:12
1 3 ,Thread ID:12 3 ,Thread ID:12
2 2 ,Thread ID:1 2 ,Thread ID:1
2 2 ,Thread ID:1 3 ,Thread ID:1
2 3 ,Thread ID:1 2 ,Thread ID:1
2 3 ,Thread ID:1 3 ,Thread ID:1
3 2 ,Thread ID:13 2 ,Thread ID:13
3 2 ,Thread ID:13 3 ,Thread ID:13
3 3 ,Thread ID:13 2 ,Thread ID:13
3 3 ,Thread ID:13 3 ,Thread ID:13

我需要的是每个元素都在其自己的元素中,例如:

1 2 ,Thread ID:12 
1 2 ,Thread ID:12  
1 3 ,Thread ID:12  
1 3 ,Thread ID:12 
2 2 ,Thread ID:1  
2 2 ,Thread ID:1 
2 3 ,Thread ID:1 
2 3 ,Thread ID:1 
3 2 ,Thread ID:13 
3 2 ,Thread ID:13  
3 3 ,Thread ID:13  
3 3 ,Thread ID:13 

显然,由于错别字和其他问题的数量,已发布问题中的代码并未从实际 运行ning 代码中删除。所以不可能说出实际问题是什么。

但是,下面的代码 运行:

public class ParallelStreams {

    public static void main(String[] args) {

        List<String> strList = List.of("a1", "a2", "a3", "a4", "a5");
        List<IWork> w = List.of(new Work1(), new Work2());

        Stream<String> stream = strList.parallelStream();
        for (final IWork a : w) {
            stream = stream.map(a::work);
        }
        List<String> finalArray = stream.collect(Collectors.toList());

        for (String s : finalArray) {
            System.out.println(s);
        }
    }
}


interface IWork {
    String work(String s);
}

class Work1 implements IWork {
    @Override
    public String work(String s) {
        return s + " * " + Thread.currentThread().getId();
    }
}

class Work2 implements IWork {
    @Override
    public String work(String s) {
        return s + " ! " + Thread.currentThread().getId();
    }
}

并产生以下输出:

a1 * 13 ! 13
a2 * 13 ! 13
a3 * 1 ! 1
a4 * 1 ! 1
a5 * 1 ! 1

为了更清楚地说明哪个 IWork 实现是 运行ning,我添加了一个“*”或“!”来代替原来的 space实现。

首先,感谢您将其更改为可运行代码,以便我们了解发生了什么。

你让它有点难以理解,因为数据只是个位数,而你的操作只是个位数,线程 ID 也可以是个位数。所以我将数据更改为“A”、“B”和“C”以使其更容易。

在您的 for 循环中,您在 actions 周围循环,但实际上并未将 action 用于循环中的任何内容。所以它本质上只是一个循环,它运行的次数与你的动作一样多,但在每次迭代中都没有区别。由于调用了循环变量 a,因此更难捕捉到这一点。试着给它起个更有意义的名字。

真正令人困惑的是,您正在循环 actions,然后在循环内流式传输操作。所以这平方应用的操作数。我不敢相信这就是你要找的东西,但你似乎并不对你输出的那个方面感到不安,只是因为它是为每个输入合并的。

另外,不清楚您对线程本身的关心程度。你在输出中有它们,但为什么呢?为什么要使用 ParallelStream()

您还通过将 Stream 视为变量并逐步移动它来使其变得更加复杂。在我发布的第一个答案中,我发现将它们组合在一起会导致整个流 运行 通过单个线程,所以我没有在那个答案中这样做,因为我不确定线程​​有多重要给你(他们不应该)。

最后,如果转储 for 循环,然后将其全部合并到一个 Stream 语句中,它会完美运行:

class ArithmeticBBB implements ArithmeticManagerStream.ArithmeticAction {
    @Override
    public String arithmetic(String n) {
        String ss = n + " 2" + " ,Thread ID:" + Thread.currentThread().getId();
        return ss;
    }
}

class ArithmeticCCC implements ArithmeticManagerStream.ArithmeticAction {
    @Override
    public String arithmetic(String n) {
        String ss = n + " 3" + " ,Thread ID:" + Thread.currentThread().getId();
        return ss;
    }
}

public class ArithmeticManagerStream {
    private List<String> integerList = null;
    private List<String> resultArray = null;


    public ArithmeticManagerStream(List<String> dataFromUser) {
        this.integerList = dataFromUser;
    }

    public List<String> invokerActions(List<ArithmeticAction> actions) throws InterruptedException {
        return integerList.parallelStream()
                          .flatMap(str -> actions.stream().map(action -> action.arithmetic(str)))
                          .collect(Collectors.toList());
    }

    public interface ArithmeticAction {
        String arithmetic(String n);
    }

    public static void main(String[] args) {
        List<ArithmeticAction> actions = new ArrayList();
        actions.add(new ArithmeticBBB());
        actions.add(new ArithmeticCCC());
        List<String> intData = new ArrayList<>();
        intData.add("A");
        intData.add("B");
        intData.add("C");

        ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);

        try {
            List<String> result = arithmeticManagerStream.invokerActions(actions);
            System.out.println("***********************************************");
            for (String i : result) {
                System.out.println(i);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这导致:

***********************************************
A 2 ,Thread ID:13
A 3 ,Thread ID:13
B 2 ,Thread ID:1
B 3 ,Thread ID:1
C 2 ,Thread ID:1
C 3 ,Thread ID:1

这对我来说似乎是正确的。每个源项目通过每个动作,生成它自己的输出,该输出被平面映射回原始流。

最后一点。我一直在寻求简化代码,并注意到您的界面是一个 Functional 结构。所以你实际上可以放弃实现 类 并在加载 actions 数组时用 lambda 声明它们。然后,当然,很明显你的界面基本上是一个 Function<String, String>。所以我完全放弃了界面,然后清理了代码,使用 Stream 作为最终输出。

结果基本上是微不足道的,我添加了更多的动作只是为了说明一点:

public class ArithmeticManagerStream {
    private List<String> integerList = null;

    public ArithmeticManagerStream(List<String> dataFromUser) {
        this.integerList = dataFromUser;
    }

    public List<String> invokerActions(List<Function<String, String>> actions) {
        return integerList.parallelStream().flatMap(str -> actions.stream().map(action -> action.apply(str))).collect(Collectors.toList());
    }

    public static void main(String[] args) {
        List<Function<String, String>> actions = new ArrayList();
        actions.add(n -> n + " 2" + " ,Thread ID:" + Thread.currentThread().getId());
        actions.add(n -> n + " 3" + " ,Thread ID:" + Thread.currentThread().getId());
        actions.add(n -> n + " 4" + " ,Thread ID:" + Thread.currentThread().getId());
        actions.add(n -> n + " 5" + " ,Thread ID:" + Thread.currentThread().getId());
        actions.add(n -> n + " 6" + " ,Thread ID:" + Thread.currentThread().getId());
        List<String> intData = new ArrayList<>();
        intData.add("A");
        intData.add("B");
        intData.add("C");

        ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);
        List<String> result = arithmeticManagerStream.invokerActions(actions);
        System.out.println("***********************************************");
        result.forEach(System.out::println);
    }
}

输出为:

***********************************************
A 2 ,Thread ID:13
A 3 ,Thread ID:13
A 4 ,Thread ID:13
A 5 ,Thread ID:13
A 6 ,Thread ID:13
B 2 ,Thread ID:1
B 3 ,Thread ID:1
B 4 ,Thread ID:1
B 5 ,Thread ID:1
B 6 ,Thread ID:1
C 2 ,Thread ID:14
C 3 ,Thread ID:14
C 4 ,Thread ID:14
C 5 ,Thread ID:14
C 6 ,Thread ID:14