Java stream parallel Stream 如何将多个函数映射到stream map
Java stream parallelStream How to map multple functions to stream map
我有这段代码适用于 Arraylist
数组列表中的每个元素都需要通过 ArithmeticBBB 和 ArithmeticCCC 进行计算
并创建 2 个返回到流中的元素
我正在使用 stream = stream.map(a::work);在循环 。
或者使用 flatMap
class ArithmeticBBB implements ArithmeticManagerStream.ArithmeticAction {
@Override
public String arithmetic(String n) {
String ss = n+" 2" + " ,Thread ID:" +Thread.currentThread().getId();
return ss;
}
}
class ArithmeticCCC implements ArithmeticManagerStream.ArithmeticAction {
@Override
public String arithmetic(String n) {
String ss = n+" 3" + " ,Thread ID:" +Thread.currentThread().getId();
return ss;
}
}
public class ArithmeticManagerStream {
private List<String> integerList = null;
private List<String> resultArray = null;
public ArithmeticManagerStream(List<String> dataFromUser) {
this.integerList = dataFromUser;
}
public List<String> invokerActions(List<ArithmeticAction> actions) throws
InterruptedException {
Stream<String> stream = integerList.parallelStream();
for (final ArithmeticManagerStream.ArithmeticAction a : actions) {
stream = stream.flatMap(str -> actions.stream().map(b -> b.arithmetic(str)));
}
return resultArray = stream.collect(Collectors.toList());
}
public interface ArithmeticAction {
String arithmetic(String n);
}
public static void main(String[] args) {
List<ArithmeticAction> actions = new ArrayList();
actions.add(new ArithmeticBBB());
actions.add(new ArithmeticCCC());
List<String> intData = new ArrayList<>();
intData.add("1");
intData.add("2");
intData.add("3");
ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);
try {
List<String> result = arithmeticManagerStream.invokerActions(actions);
System.out.println("***********************************************");
for(String i : result) {
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
目前,如果您运行这个程序,您将看到结果连接到字符串。
1 2 ,Thread ID:12 2 ,Thread ID:12
1 2 ,Thread ID:12 3 ,Thread ID:12
1 3 ,Thread ID:12 2 ,Thread ID:12
1 3 ,Thread ID:12 3 ,Thread ID:12
2 2 ,Thread ID:1 2 ,Thread ID:1
2 2 ,Thread ID:1 3 ,Thread ID:1
2 3 ,Thread ID:1 2 ,Thread ID:1
2 3 ,Thread ID:1 3 ,Thread ID:1
3 2 ,Thread ID:13 2 ,Thread ID:13
3 2 ,Thread ID:13 3 ,Thread ID:13
3 3 ,Thread ID:13 2 ,Thread ID:13
3 3 ,Thread ID:13 3 ,Thread ID:13
我需要的是每个元素都在其自己的元素中,例如:
1 2 ,Thread ID:12
1 2 ,Thread ID:12
1 3 ,Thread ID:12
1 3 ,Thread ID:12
2 2 ,Thread ID:1
2 2 ,Thread ID:1
2 3 ,Thread ID:1
2 3 ,Thread ID:1
3 2 ,Thread ID:13
3 2 ,Thread ID:13
3 3 ,Thread ID:13
3 3 ,Thread ID:13
显然,由于错别字和其他问题的数量,已发布问题中的代码并未从实际 运行ning 代码中删除。所以不可能说出实际问题是什么。
但是,下面的代码 运行:
public class ParallelStreams {
public static void main(String[] args) {
List<String> strList = List.of("a1", "a2", "a3", "a4", "a5");
List<IWork> w = List.of(new Work1(), new Work2());
Stream<String> stream = strList.parallelStream();
for (final IWork a : w) {
stream = stream.map(a::work);
}
List<String> finalArray = stream.collect(Collectors.toList());
for (String s : finalArray) {
System.out.println(s);
}
}
}
interface IWork {
String work(String s);
}
class Work1 implements IWork {
@Override
public String work(String s) {
return s + " * " + Thread.currentThread().getId();
}
}
class Work2 implements IWork {
@Override
public String work(String s) {
return s + " ! " + Thread.currentThread().getId();
}
}
并产生以下输出:
a1 * 13 ! 13
a2 * 13 ! 13
a3 * 1 ! 1
a4 * 1 ! 1
a5 * 1 ! 1
为了更清楚地说明哪个 IWork 实现是 运行ning,我添加了一个“*”或“!”来代替原来的 space实现。
首先,感谢您将其更改为可运行代码,以便我们了解发生了什么。
你让它有点难以理解,因为数据只是个位数,而你的操作只是个位数,线程 ID 也可以是个位数。所以我将数据更改为“A”、“B”和“C”以使其更容易。
在您的 for
循环中,您在 actions
周围循环,但实际上并未将 action
用于循环中的任何内容。所以它本质上只是一个循环,它运行的次数与你的动作一样多,但在每次迭代中都没有区别。由于调用了循环变量 a
,因此更难捕捉到这一点。试着给它起个更有意义的名字。
真正令人困惑的是,您正在循环 actions
,然后在循环内流式传输操作。所以这平方应用的操作数。我不敢相信这就是你要找的东西,但你似乎并不对你输出的那个方面感到不安,只是因为它是为每个输入合并的。
另外,不清楚您对线程本身的关心程度。你在输出中有它们,但为什么呢?为什么要使用 ParallelStream()
?
您还通过将 Stream 视为变量并逐步移动它来使其变得更加复杂。在我发布的第一个答案中,我发现将它们组合在一起会导致整个流 运行 通过单个线程,所以我没有在那个答案中这样做,因为我不确定线程有多重要给你(他们不应该)。
最后,如果转储 for
循环,然后将其全部合并到一个 Stream 语句中,它会完美运行:
class ArithmeticBBB implements ArithmeticManagerStream.ArithmeticAction {
@Override
public String arithmetic(String n) {
String ss = n + " 2" + " ,Thread ID:" + Thread.currentThread().getId();
return ss;
}
}
class ArithmeticCCC implements ArithmeticManagerStream.ArithmeticAction {
@Override
public String arithmetic(String n) {
String ss = n + " 3" + " ,Thread ID:" + Thread.currentThread().getId();
return ss;
}
}
public class ArithmeticManagerStream {
private List<String> integerList = null;
private List<String> resultArray = null;
public ArithmeticManagerStream(List<String> dataFromUser) {
this.integerList = dataFromUser;
}
public List<String> invokerActions(List<ArithmeticAction> actions) throws InterruptedException {
return integerList.parallelStream()
.flatMap(str -> actions.stream().map(action -> action.arithmetic(str)))
.collect(Collectors.toList());
}
public interface ArithmeticAction {
String arithmetic(String n);
}
public static void main(String[] args) {
List<ArithmeticAction> actions = new ArrayList();
actions.add(new ArithmeticBBB());
actions.add(new ArithmeticCCC());
List<String> intData = new ArrayList<>();
intData.add("A");
intData.add("B");
intData.add("C");
ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);
try {
List<String> result = arithmeticManagerStream.invokerActions(actions);
System.out.println("***********************************************");
for (String i : result) {
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
这导致:
***********************************************
A 2 ,Thread ID:13
A 3 ,Thread ID:13
B 2 ,Thread ID:1
B 3 ,Thread ID:1
C 2 ,Thread ID:1
C 3 ,Thread ID:1
这对我来说似乎是正确的。每个源项目通过每个动作,生成它自己的输出,该输出被平面映射回原始流。
最后一点。我一直在寻求简化代码,并注意到您的界面是一个 Functional
结构。所以你实际上可以放弃实现 类 并在加载 actions
数组时用 lambda 声明它们。然后,当然,很明显你的界面基本上是一个 Function<String, String>
。所以我完全放弃了界面,然后清理了代码,使用 Stream 作为最终输出。
结果基本上是微不足道的,我添加了更多的动作只是为了说明一点:
public class ArithmeticManagerStream {
private List<String> integerList = null;
public ArithmeticManagerStream(List<String> dataFromUser) {
this.integerList = dataFromUser;
}
public List<String> invokerActions(List<Function<String, String>> actions) {
return integerList.parallelStream().flatMap(str -> actions.stream().map(action -> action.apply(str))).collect(Collectors.toList());
}
public static void main(String[] args) {
List<Function<String, String>> actions = new ArrayList();
actions.add(n -> n + " 2" + " ,Thread ID:" + Thread.currentThread().getId());
actions.add(n -> n + " 3" + " ,Thread ID:" + Thread.currentThread().getId());
actions.add(n -> n + " 4" + " ,Thread ID:" + Thread.currentThread().getId());
actions.add(n -> n + " 5" + " ,Thread ID:" + Thread.currentThread().getId());
actions.add(n -> n + " 6" + " ,Thread ID:" + Thread.currentThread().getId());
List<String> intData = new ArrayList<>();
intData.add("A");
intData.add("B");
intData.add("C");
ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);
List<String> result = arithmeticManagerStream.invokerActions(actions);
System.out.println("***********************************************");
result.forEach(System.out::println);
}
}
输出为:
***********************************************
A 2 ,Thread ID:13
A 3 ,Thread ID:13
A 4 ,Thread ID:13
A 5 ,Thread ID:13
A 6 ,Thread ID:13
B 2 ,Thread ID:1
B 3 ,Thread ID:1
B 4 ,Thread ID:1
B 5 ,Thread ID:1
B 6 ,Thread ID:1
C 2 ,Thread ID:14
C 3 ,Thread ID:14
C 4 ,Thread ID:14
C 5 ,Thread ID:14
C 6 ,Thread ID:14
我有这段代码适用于 Arraylist 数组列表中的每个元素都需要通过 ArithmeticBBB 和 ArithmeticCCC 进行计算 并创建 2 个返回到流中的元素 我正在使用 stream = stream.map(a::work);在循环 。 或者使用 flatMap
class ArithmeticBBB implements ArithmeticManagerStream.ArithmeticAction {
@Override
public String arithmetic(String n) {
String ss = n+" 2" + " ,Thread ID:" +Thread.currentThread().getId();
return ss;
}
}
class ArithmeticCCC implements ArithmeticManagerStream.ArithmeticAction {
@Override
public String arithmetic(String n) {
String ss = n+" 3" + " ,Thread ID:" +Thread.currentThread().getId();
return ss;
}
}
public class ArithmeticManagerStream {
private List<String> integerList = null;
private List<String> resultArray = null;
public ArithmeticManagerStream(List<String> dataFromUser) {
this.integerList = dataFromUser;
}
public List<String> invokerActions(List<ArithmeticAction> actions) throws
InterruptedException {
Stream<String> stream = integerList.parallelStream();
for (final ArithmeticManagerStream.ArithmeticAction a : actions) {
stream = stream.flatMap(str -> actions.stream().map(b -> b.arithmetic(str)));
}
return resultArray = stream.collect(Collectors.toList());
}
public interface ArithmeticAction {
String arithmetic(String n);
}
public static void main(String[] args) {
List<ArithmeticAction> actions = new ArrayList();
actions.add(new ArithmeticBBB());
actions.add(new ArithmeticCCC());
List<String> intData = new ArrayList<>();
intData.add("1");
intData.add("2");
intData.add("3");
ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);
try {
List<String> result = arithmeticManagerStream.invokerActions(actions);
System.out.println("***********************************************");
for(String i : result) {
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
目前,如果您运行这个程序,您将看到结果连接到字符串。
1 2 ,Thread ID:12 2 ,Thread ID:12
1 2 ,Thread ID:12 3 ,Thread ID:12
1 3 ,Thread ID:12 2 ,Thread ID:12
1 3 ,Thread ID:12 3 ,Thread ID:12
2 2 ,Thread ID:1 2 ,Thread ID:1
2 2 ,Thread ID:1 3 ,Thread ID:1
2 3 ,Thread ID:1 2 ,Thread ID:1
2 3 ,Thread ID:1 3 ,Thread ID:1
3 2 ,Thread ID:13 2 ,Thread ID:13
3 2 ,Thread ID:13 3 ,Thread ID:13
3 3 ,Thread ID:13 2 ,Thread ID:13
3 3 ,Thread ID:13 3 ,Thread ID:13
我需要的是每个元素都在其自己的元素中,例如:
1 2 ,Thread ID:12
1 2 ,Thread ID:12
1 3 ,Thread ID:12
1 3 ,Thread ID:12
2 2 ,Thread ID:1
2 2 ,Thread ID:1
2 3 ,Thread ID:1
2 3 ,Thread ID:1
3 2 ,Thread ID:13
3 2 ,Thread ID:13
3 3 ,Thread ID:13
3 3 ,Thread ID:13
显然,由于错别字和其他问题的数量,已发布问题中的代码并未从实际 运行ning 代码中删除。所以不可能说出实际问题是什么。
但是,下面的代码 运行:
public class ParallelStreams {
public static void main(String[] args) {
List<String> strList = List.of("a1", "a2", "a3", "a4", "a5");
List<IWork> w = List.of(new Work1(), new Work2());
Stream<String> stream = strList.parallelStream();
for (final IWork a : w) {
stream = stream.map(a::work);
}
List<String> finalArray = stream.collect(Collectors.toList());
for (String s : finalArray) {
System.out.println(s);
}
}
}
interface IWork {
String work(String s);
}
class Work1 implements IWork {
@Override
public String work(String s) {
return s + " * " + Thread.currentThread().getId();
}
}
class Work2 implements IWork {
@Override
public String work(String s) {
return s + " ! " + Thread.currentThread().getId();
}
}
并产生以下输出:
a1 * 13 ! 13
a2 * 13 ! 13
a3 * 1 ! 1
a4 * 1 ! 1
a5 * 1 ! 1
为了更清楚地说明哪个 IWork 实现是 运行ning,我添加了一个“*”或“!”来代替原来的 space实现。
首先,感谢您将其更改为可运行代码,以便我们了解发生了什么。
你让它有点难以理解,因为数据只是个位数,而你的操作只是个位数,线程 ID 也可以是个位数。所以我将数据更改为“A”、“B”和“C”以使其更容易。
在您的 for
循环中,您在 actions
周围循环,但实际上并未将 action
用于循环中的任何内容。所以它本质上只是一个循环,它运行的次数与你的动作一样多,但在每次迭代中都没有区别。由于调用了循环变量 a
,因此更难捕捉到这一点。试着给它起个更有意义的名字。
真正令人困惑的是,您正在循环 actions
,然后在循环内流式传输操作。所以这平方应用的操作数。我不敢相信这就是你要找的东西,但你似乎并不对你输出的那个方面感到不安,只是因为它是为每个输入合并的。
另外,不清楚您对线程本身的关心程度。你在输出中有它们,但为什么呢?为什么要使用 ParallelStream()
?
您还通过将 Stream 视为变量并逐步移动它来使其变得更加复杂。在我发布的第一个答案中,我发现将它们组合在一起会导致整个流 运行 通过单个线程,所以我没有在那个答案中这样做,因为我不确定线程有多重要给你(他们不应该)。
最后,如果转储 for
循环,然后将其全部合并到一个 Stream 语句中,它会完美运行:
class ArithmeticBBB implements ArithmeticManagerStream.ArithmeticAction {
@Override
public String arithmetic(String n) {
String ss = n + " 2" + " ,Thread ID:" + Thread.currentThread().getId();
return ss;
}
}
class ArithmeticCCC implements ArithmeticManagerStream.ArithmeticAction {
@Override
public String arithmetic(String n) {
String ss = n + " 3" + " ,Thread ID:" + Thread.currentThread().getId();
return ss;
}
}
public class ArithmeticManagerStream {
private List<String> integerList = null;
private List<String> resultArray = null;
public ArithmeticManagerStream(List<String> dataFromUser) {
this.integerList = dataFromUser;
}
public List<String> invokerActions(List<ArithmeticAction> actions) throws InterruptedException {
return integerList.parallelStream()
.flatMap(str -> actions.stream().map(action -> action.arithmetic(str)))
.collect(Collectors.toList());
}
public interface ArithmeticAction {
String arithmetic(String n);
}
public static void main(String[] args) {
List<ArithmeticAction> actions = new ArrayList();
actions.add(new ArithmeticBBB());
actions.add(new ArithmeticCCC());
List<String> intData = new ArrayList<>();
intData.add("A");
intData.add("B");
intData.add("C");
ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);
try {
List<String> result = arithmeticManagerStream.invokerActions(actions);
System.out.println("***********************************************");
for (String i : result) {
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
这导致:
***********************************************
A 2 ,Thread ID:13
A 3 ,Thread ID:13
B 2 ,Thread ID:1
B 3 ,Thread ID:1
C 2 ,Thread ID:1
C 3 ,Thread ID:1
这对我来说似乎是正确的。每个源项目通过每个动作,生成它自己的输出,该输出被平面映射回原始流。
最后一点。我一直在寻求简化代码,并注意到您的界面是一个 Functional
结构。所以你实际上可以放弃实现 类 并在加载 actions
数组时用 lambda 声明它们。然后,当然,很明显你的界面基本上是一个 Function<String, String>
。所以我完全放弃了界面,然后清理了代码,使用 Stream 作为最终输出。
结果基本上是微不足道的,我添加了更多的动作只是为了说明一点:
public class ArithmeticManagerStream {
private List<String> integerList = null;
public ArithmeticManagerStream(List<String> dataFromUser) {
this.integerList = dataFromUser;
}
public List<String> invokerActions(List<Function<String, String>> actions) {
return integerList.parallelStream().flatMap(str -> actions.stream().map(action -> action.apply(str))).collect(Collectors.toList());
}
public static void main(String[] args) {
List<Function<String, String>> actions = new ArrayList();
actions.add(n -> n + " 2" + " ,Thread ID:" + Thread.currentThread().getId());
actions.add(n -> n + " 3" + " ,Thread ID:" + Thread.currentThread().getId());
actions.add(n -> n + " 4" + " ,Thread ID:" + Thread.currentThread().getId());
actions.add(n -> n + " 5" + " ,Thread ID:" + Thread.currentThread().getId());
actions.add(n -> n + " 6" + " ,Thread ID:" + Thread.currentThread().getId());
List<String> intData = new ArrayList<>();
intData.add("A");
intData.add("B");
intData.add("C");
ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);
List<String> result = arithmeticManagerStream.invokerActions(actions);
System.out.println("***********************************************");
result.forEach(System.out::println);
}
}
输出为:
***********************************************
A 2 ,Thread ID:13
A 3 ,Thread ID:13
A 4 ,Thread ID:13
A 5 ,Thread ID:13
A 6 ,Thread ID:13
B 2 ,Thread ID:1
B 3 ,Thread ID:1
B 4 ,Thread ID:1
B 5 ,Thread ID:1
B 6 ,Thread ID:1
C 2 ,Thread ID:14
C 3 ,Thread ID:14
C 4 ,Thread ID:14
C 5 ,Thread ID:14
C 6 ,Thread ID:14