如何从两个流中获取相似的元素并收集由此形成的对而不丢失顺序?
How to get similar elements from two streams and collect the pairs thus formed without losing the order?
例如:
输入是无限流,可用作 map reduce 每次传递的有限列表:
list 1: List<String> : {"a1_5", "c1_91", "b1_43", "b1_76", "a1_68"}
list 2: List<String> : {"c2_3", "b2_19", "c2_29", "a2_45", "b2_53"}
我的输出应该是由 List 输出实例组成的无限流:
List<String> : {"a1_5,a2_45", "c1_91,c2_3", "b1_43,b2_19", "b1_76,b2_53", "a1_68,a2_45"}
或者输出可能是:
List<String> : {"c1_91,c2_3", "b1_43,b2_19", "a1_5,a2_45", "b1_76,b2_53"}
假设两个列表的大小相同,您可以这样做:
public static void main(String[] args) {
List<String> list1 = new ArrayList<String>();
list1.add("a1_5");
list1.add("c1_91");
list1.add("b1_43");
list1.add("b1_76");
list1.add("a1_68");
List<String> list2 = new ArrayList<String>();
list2.add("c2_3");
list2.add("b2_19");
list2.add("c2_29");
list2.add("a2_45");
list2.add("b2_53");
List<String> result = new ArrayList<String>();
for (int i = 0; i < list1.size(); i++) {
result.add(list1.get(i) + "," + list2.get(i));
}
//Printing the results
for (String a : result) {
System.out.println(a);
}
}
如果列表可能有不同的大小,我会用一些基本代码来控制它:
public static void main(String[] args) {
List<String> list1 = new ArrayList<String>();
list1.add("a1_5");
list1.add("c1_91");
list1.add("b1_43");
list1.add("b1_76");
list1.add("a1_68");
//New instance
list1.add("a2");
List<String> list2 = new ArrayList<String>();
list2.add("c2_3");
list2.add("b2_19");
list2.add("c2_29");
list2.add("a2_45");
list2.add("b2_53");
List<String> result = new ArrayList<String>();
int aux = 0;
if (list1.size() >= list2.size()) {
aux = list1.size();
} else {
aux = list2.size();
}
for (int i = 0; i < aux; i++) {
if(i == list1.size()){
result.add(null+","+list2.get(i));
}else if(i == list2.size()){
result.add(list1.get(i)+","+null);
}else{
result.add(list1.get(i)+","+list2.get(i));
}
}
//Printing the results
for (String a : result) {
System.out.println(a);
}
}
如果问题是关于 Java 8 个流,可以使用相当复杂的自定义 Spliterator 解决,如下所示:
public static <T,K,R> Stream<R> pairs(Stream<T> a, Stream<T> b,
Function<T, K> keyExtractor, BiFunction<T, T, R> merger) {
Map<K, Queue<T>> aMap = new HashMap<>();
Map<K, Queue<T>> bMap = new HashMap<>();
Spliterator<T> aSpltr = a.spliterator();
Spliterator<T> bSpltr = b.spliterator();
Spliterator<R> res = new Spliterators.AbstractSpliterator<R>(Math.min(
aSpltr.estimateSize(), bSpltr.estimateSize()), Spliterator.ORDERED) {
T at, bt;
boolean hasBuffered = false;
R buf;
@Override
public boolean tryAdvance(Consumer<? super R> action) {
if(hasBuffered) {
hasBuffered = false;
action.accept(buf);
return true;
}
while(true) {
if(!aSpltr.tryAdvance(t -> at = t) || !bSpltr.tryAdvance(t -> bt = t))
return false;
K ak = keyExtractor.apply(at);
K bk = keyExtractor.apply(bt);
Queue<T> bq = bMap.get(ak);
boolean found = false;
if(bq != null) {
found = true;
action.accept(merger.apply(at, bq.poll()));
if(bq.isEmpty()) bMap.remove(ak);
} else {
aMap.computeIfAbsent(ak, k -> new ArrayDeque<>()).add(at);
}
Queue<T> aq = aMap.get(bk);
if(aq != null) {
if(found) {
hasBuffered = true;
buf = merger.apply(aq.poll(), bt);
} else {
found = true;
action.accept(merger.apply(aq.poll(), bt));
}
if(aq.isEmpty()) aMap.remove(bk);
} else {
bMap.computeIfAbsent(bk, k -> new ArrayDeque<>()).add(bt);
}
if(found)
return true;
}
}
};
return StreamSupport.stream(res, a.isParallel() || b.isParallel())
.onClose(a::close).onClose(b::close);
}
此方法接受两个流(可能是无限的)、键提取函数(在您的情况下需要提取第一个字符)和合并函数(如何将两个元素组合在一起;在您的情况下使用 ","
连接).这是用法示例:
List<String> list1 = Arrays.asList("a1_5", "c1_91", "b1_43", "b1_76", "a1_68");
List<String> list2 = Arrays.asList("c2_3", "b2_19", "c2_29", "a2_45", "b2_53");
pairs(list1.stream(), list2.stream(), s -> s.charAt(0), (a, b) -> a+","+b)
.forEach(System.out::println);
输出:
c1_91,c2_3
b1_43,b2_19
a1_5,a2_45
b1_76,b2_53
具有实际无限流的替代示例:组合来自两个流的随机数对,它们仅在最后一位不同:
pairs(new Random().ints(0, 1000).boxed(), new Random().ints(0, 1000).boxed(),
i -> i/10, (a, b) -> a+","+b)
.limit(100)
.forEach(System.out::println);
请注意,对于无限流,如果流中有许多未配对的元素,则可能 OutOfMemoryError
。
假设你谈论 Java 8 个流,列表具有相同的长度,每个元素都可以按照你描述的方式配对,你不介意使用额外的库 Java俚语,可以这样做(尽管适用于不同大小的列表):
// functional way
static List<String> pairingFun(List<String> list1, List<String> list2,
BiPredicate<String, String> isPair) {
return pairingFun(list1.size(), Stream.empty(), Stream.ofAll(list1), Stream.ofAll(list2).cycle(), isPair)
.toJavaList();
}
// recursive helper function
static Stream<String> pairingFun(int size, Stream<String> acc, Stream<String> stream1, Stream<String> stream2,
BiPredicate<String, String> isPair) {
if (stream1.isEmpty()) {
return acc;
} else {
String elem1 = stream1.head();
Option<String> elem2 = stream2.take(size).find(that -> isPair.test(elem1, that));
return pairingFun(size,
elem2.map(elem -> acc.append(elem1 + "," + elem)).getOrElse(acc),
stream1.tail(),
elem2.isDefined() ? stream2.dropUntil(that -> isPair.test(elem1, that)).tail() : stream2,
isPair);
}
}
在理想情况下,您不会在 Java 集合和 Java 俚语集合之间来回转换,而只使用 Java 俚语集合。这将进一步减少样板文件。但是,我怀疑您很可能绑定到其他第 3 方库的 API。
但请注意,如果 list1 包含太多元素,我们使用上面的递归函数可能会产生堆栈溢出。因此,我建议使用古老的命令式方式:
// imperative way
static List<String> pairingImp(List<String> list1, List<String> list2,
BiPredicate<String, String> isPair) {
int size = list1.size();
List<String> result = new ArrayList<>(size);
Stream<String> stream = Stream.ofAll(list2).cycle();
for (String elem1 : list1) {
Option<String> elem2 = stream.take(size).find(that -> isPair.test(elem1, that));
if (elem2.isDefined()) {
result.add(elem1 + "," + elem2.get());
stream = stream.dropUntil(that -> isPair.test(elem1, that)).tail();
}
}
return result;
}
这是一个测试:
import javaslang.collection.Stream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiPredicate;
// ...
public static void main(String[] args) {
List<String> list1 = Arrays.asList("a1_5", "c1_91", "b1_43", "b1_76", "a1_68");
List<String> list2 = Arrays.asList("c2_3", "b2_19", "c2_29", "a2_45", "b2_53");
BiPredicate<String, String> isPair = (s1, s2) -> s1.charAt(0) == s2.charAt(0);
// [a1_5,a2_45, c1_91,c2_3, b1_43,b2_19, b1_76,b2_53, a1_68,a2_45]
System.out.println(pairingFun(list1, list2, isPair));
// [a1_5,a2_45, c1_91,c2_3, b1_43,b2_19, b1_76,b2_53, a1_68,a2_45]
System.out.println(pairingImp(list1, list2, isPair));
}
因为我们为 list1 的每个元素迭代 list2,所以我们具有二次运行时性能,即 O(n^2)。这可以通过使用映射来查找配对候选来进一步改进。我认为最快的解决方案将在 O(n * log n) 中执行。
免责声明:我是 Java俚语的创造者。
这个频道class是我解决问题的幼稚方法。按照@Daniel Dietrich 和@Tagir Valeev 的建议,我将使用 java 8 个流来更清晰地完成它。
真正的工作是匹配函数。
class Channel{
private StringBuilder output_string_builder = null;
public static void main(String[] args){
Channel ch1 = new Channel(list_channel1);
Channel ch2 = new Channel(list_channel2);
Integer count1 = new Integer(ch1.getQ().size());
Integer count2 = new Integer(ch2.getQ().size());
while((ch1.getQ().size()>0 && count1>0)
&& (ch2.getQ().size()>0 && count2>0))
{
ch1.match(ch2, output_string_builder);
count1--;
count2--;
}
System.out.println(output_string_builder.toString());
}
private List<String> Rs;
private List<String> Gs;
private List<String> Bs;
private List<String> my_list;
public Channel(List<String> channel_list){
Rs = new ArrayList<String>();
Gs = new ArrayList<String>();
Bs = new ArrayList<String>();
my_list = channel_list;
for(String str: channel_list){
if(str.charAt(0) == 'R'){
insertR(str);
}
if(str.charAt(0) == 'G'){
insertG(str);
}
if(str.charAt(0) == 'B'){
insertB(str);
}
}
}
public List<String> getQ(){
return my_list;
}
public void match(Channel ch, StringBuilder output){
if(getQ().size() < 1 || ch.getQ().size() < 1){
return;
}
String str = ch.getQ().get(0);
if(str.charAt(0) == 'R' && hasR()){
//self updated
String my_val = this.getR();
getQ().remove(my_val);
//remote channel's data updated
ch.getQ().remove(str);
ch.getR();
//need to do placing of string 1 before 2
if(str.charAt(1) == '1'){
output.append(str + "," + my_val + " ");
}
else{
output.append(my_val + "," + str + " ");
}
}
if(str.charAt(0) == 'G' && hasG()){
//self updated
String my_val = this.getG();
getQ().remove(my_val);
//remote channel's data updated
ch.getQ().remove(str);
ch.getG();
//need to do placing of string 1 before 2
if(str.charAt(1) == '1'){
output.append(str + "," + my_val + " ");
}
else{
output.append(my_val + "," + str + " ");
}
}
if(str.charAt(0) == 'B' && hasB()){
//self updated
String my_val = this.getB();
getQ().remove(my_val);
//remote channel's data updated
ch.getQ().remove(str);
ch.getB();
//need to do placing of string 1 before 2
if(str.charAt(1) == '1'){
output.append(str + "," + my_val + " ");
}
else{
output.append(my_val + "," + str + " ");
}
}
}
private void insertR(String _string){
Rs.add(_string);
}
private void insertG(String _string){
Gs.add(_string);
}
private void insertB(String _string){
Bs.add(_string);
}
public boolean hasR(){
if(Rs.size() > 0){
return true;
}
return false;
}
public boolean hasG(){
if(Gs.size() > 0){
return true;
}
return false;
}
public boolean hasB(){
if(Bs.size() > 0){
return true;
}
return false;
}
public String getR(){
if(hasR()){
return Rs.remove(0);
}
return null;
}
public String getG(){
if(hasG()){
return Gs.remove(0);
}
return null;
}
public String getB(){
if(hasB()){
return Bs.remove(0);
}
return null;
}
}
使用通过 SocketChannel 读入两个列表的无限数据流测试输出。
例如:
输入是无限流,可用作 map reduce 每次传递的有限列表:
list 1: List<String> : {"a1_5", "c1_91", "b1_43", "b1_76", "a1_68"}
list 2: List<String> : {"c2_3", "b2_19", "c2_29", "a2_45", "b2_53"}
我的输出应该是由 List 输出实例组成的无限流:
List<String> : {"a1_5,a2_45", "c1_91,c2_3", "b1_43,b2_19", "b1_76,b2_53", "a1_68,a2_45"}
或者输出可能是:
List<String> : {"c1_91,c2_3", "b1_43,b2_19", "a1_5,a2_45", "b1_76,b2_53"}
假设两个列表的大小相同,您可以这样做:
public static void main(String[] args) {
List<String> list1 = new ArrayList<String>();
list1.add("a1_5");
list1.add("c1_91");
list1.add("b1_43");
list1.add("b1_76");
list1.add("a1_68");
List<String> list2 = new ArrayList<String>();
list2.add("c2_3");
list2.add("b2_19");
list2.add("c2_29");
list2.add("a2_45");
list2.add("b2_53");
List<String> result = new ArrayList<String>();
for (int i = 0; i < list1.size(); i++) {
result.add(list1.get(i) + "," + list2.get(i));
}
//Printing the results
for (String a : result) {
System.out.println(a);
}
}
如果列表可能有不同的大小,我会用一些基本代码来控制它:
public static void main(String[] args) {
List<String> list1 = new ArrayList<String>();
list1.add("a1_5");
list1.add("c1_91");
list1.add("b1_43");
list1.add("b1_76");
list1.add("a1_68");
//New instance
list1.add("a2");
List<String> list2 = new ArrayList<String>();
list2.add("c2_3");
list2.add("b2_19");
list2.add("c2_29");
list2.add("a2_45");
list2.add("b2_53");
List<String> result = new ArrayList<String>();
int aux = 0;
if (list1.size() >= list2.size()) {
aux = list1.size();
} else {
aux = list2.size();
}
for (int i = 0; i < aux; i++) {
if(i == list1.size()){
result.add(null+","+list2.get(i));
}else if(i == list2.size()){
result.add(list1.get(i)+","+null);
}else{
result.add(list1.get(i)+","+list2.get(i));
}
}
//Printing the results
for (String a : result) {
System.out.println(a);
}
}
如果问题是关于 Java 8 个流,可以使用相当复杂的自定义 Spliterator 解决,如下所示:
public static <T,K,R> Stream<R> pairs(Stream<T> a, Stream<T> b,
Function<T, K> keyExtractor, BiFunction<T, T, R> merger) {
Map<K, Queue<T>> aMap = new HashMap<>();
Map<K, Queue<T>> bMap = new HashMap<>();
Spliterator<T> aSpltr = a.spliterator();
Spliterator<T> bSpltr = b.spliterator();
Spliterator<R> res = new Spliterators.AbstractSpliterator<R>(Math.min(
aSpltr.estimateSize(), bSpltr.estimateSize()), Spliterator.ORDERED) {
T at, bt;
boolean hasBuffered = false;
R buf;
@Override
public boolean tryAdvance(Consumer<? super R> action) {
if(hasBuffered) {
hasBuffered = false;
action.accept(buf);
return true;
}
while(true) {
if(!aSpltr.tryAdvance(t -> at = t) || !bSpltr.tryAdvance(t -> bt = t))
return false;
K ak = keyExtractor.apply(at);
K bk = keyExtractor.apply(bt);
Queue<T> bq = bMap.get(ak);
boolean found = false;
if(bq != null) {
found = true;
action.accept(merger.apply(at, bq.poll()));
if(bq.isEmpty()) bMap.remove(ak);
} else {
aMap.computeIfAbsent(ak, k -> new ArrayDeque<>()).add(at);
}
Queue<T> aq = aMap.get(bk);
if(aq != null) {
if(found) {
hasBuffered = true;
buf = merger.apply(aq.poll(), bt);
} else {
found = true;
action.accept(merger.apply(aq.poll(), bt));
}
if(aq.isEmpty()) aMap.remove(bk);
} else {
bMap.computeIfAbsent(bk, k -> new ArrayDeque<>()).add(bt);
}
if(found)
return true;
}
}
};
return StreamSupport.stream(res, a.isParallel() || b.isParallel())
.onClose(a::close).onClose(b::close);
}
此方法接受两个流(可能是无限的)、键提取函数(在您的情况下需要提取第一个字符)和合并函数(如何将两个元素组合在一起;在您的情况下使用 ","
连接).这是用法示例:
List<String> list1 = Arrays.asList("a1_5", "c1_91", "b1_43", "b1_76", "a1_68");
List<String> list2 = Arrays.asList("c2_3", "b2_19", "c2_29", "a2_45", "b2_53");
pairs(list1.stream(), list2.stream(), s -> s.charAt(0), (a, b) -> a+","+b)
.forEach(System.out::println);
输出:
c1_91,c2_3
b1_43,b2_19
a1_5,a2_45
b1_76,b2_53
具有实际无限流的替代示例:组合来自两个流的随机数对,它们仅在最后一位不同:
pairs(new Random().ints(0, 1000).boxed(), new Random().ints(0, 1000).boxed(),
i -> i/10, (a, b) -> a+","+b)
.limit(100)
.forEach(System.out::println);
请注意,对于无限流,如果流中有许多未配对的元素,则可能 OutOfMemoryError
。
假设你谈论 Java 8 个流,列表具有相同的长度,每个元素都可以按照你描述的方式配对,你不介意使用额外的库 Java俚语,可以这样做(尽管适用于不同大小的列表):
// functional way
static List<String> pairingFun(List<String> list1, List<String> list2,
BiPredicate<String, String> isPair) {
return pairingFun(list1.size(), Stream.empty(), Stream.ofAll(list1), Stream.ofAll(list2).cycle(), isPair)
.toJavaList();
}
// recursive helper function
static Stream<String> pairingFun(int size, Stream<String> acc, Stream<String> stream1, Stream<String> stream2,
BiPredicate<String, String> isPair) {
if (stream1.isEmpty()) {
return acc;
} else {
String elem1 = stream1.head();
Option<String> elem2 = stream2.take(size).find(that -> isPair.test(elem1, that));
return pairingFun(size,
elem2.map(elem -> acc.append(elem1 + "," + elem)).getOrElse(acc),
stream1.tail(),
elem2.isDefined() ? stream2.dropUntil(that -> isPair.test(elem1, that)).tail() : stream2,
isPair);
}
}
在理想情况下,您不会在 Java 集合和 Java 俚语集合之间来回转换,而只使用 Java 俚语集合。这将进一步减少样板文件。但是,我怀疑您很可能绑定到其他第 3 方库的 API。
但请注意,如果 list1 包含太多元素,我们使用上面的递归函数可能会产生堆栈溢出。因此,我建议使用古老的命令式方式:
// imperative way
static List<String> pairingImp(List<String> list1, List<String> list2,
BiPredicate<String, String> isPair) {
int size = list1.size();
List<String> result = new ArrayList<>(size);
Stream<String> stream = Stream.ofAll(list2).cycle();
for (String elem1 : list1) {
Option<String> elem2 = stream.take(size).find(that -> isPair.test(elem1, that));
if (elem2.isDefined()) {
result.add(elem1 + "," + elem2.get());
stream = stream.dropUntil(that -> isPair.test(elem1, that)).tail();
}
}
return result;
}
这是一个测试:
import javaslang.collection.Stream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiPredicate;
// ...
public static void main(String[] args) {
List<String> list1 = Arrays.asList("a1_5", "c1_91", "b1_43", "b1_76", "a1_68");
List<String> list2 = Arrays.asList("c2_3", "b2_19", "c2_29", "a2_45", "b2_53");
BiPredicate<String, String> isPair = (s1, s2) -> s1.charAt(0) == s2.charAt(0);
// [a1_5,a2_45, c1_91,c2_3, b1_43,b2_19, b1_76,b2_53, a1_68,a2_45]
System.out.println(pairingFun(list1, list2, isPair));
// [a1_5,a2_45, c1_91,c2_3, b1_43,b2_19, b1_76,b2_53, a1_68,a2_45]
System.out.println(pairingImp(list1, list2, isPair));
}
因为我们为 list1 的每个元素迭代 list2,所以我们具有二次运行时性能,即 O(n^2)。这可以通过使用映射来查找配对候选来进一步改进。我认为最快的解决方案将在 O(n * log n) 中执行。
免责声明:我是 Java俚语的创造者。
这个频道class是我解决问题的幼稚方法。按照@Daniel Dietrich 和@Tagir Valeev 的建议,我将使用 java 8 个流来更清晰地完成它。
真正的工作是匹配函数。
class Channel{
private StringBuilder output_string_builder = null;
public static void main(String[] args){
Channel ch1 = new Channel(list_channel1);
Channel ch2 = new Channel(list_channel2);
Integer count1 = new Integer(ch1.getQ().size());
Integer count2 = new Integer(ch2.getQ().size());
while((ch1.getQ().size()>0 && count1>0)
&& (ch2.getQ().size()>0 && count2>0))
{
ch1.match(ch2, output_string_builder);
count1--;
count2--;
}
System.out.println(output_string_builder.toString());
}
private List<String> Rs;
private List<String> Gs;
private List<String> Bs;
private List<String> my_list;
public Channel(List<String> channel_list){
Rs = new ArrayList<String>();
Gs = new ArrayList<String>();
Bs = new ArrayList<String>();
my_list = channel_list;
for(String str: channel_list){
if(str.charAt(0) == 'R'){
insertR(str);
}
if(str.charAt(0) == 'G'){
insertG(str);
}
if(str.charAt(0) == 'B'){
insertB(str);
}
}
}
public List<String> getQ(){
return my_list;
}
public void match(Channel ch, StringBuilder output){
if(getQ().size() < 1 || ch.getQ().size() < 1){
return;
}
String str = ch.getQ().get(0);
if(str.charAt(0) == 'R' && hasR()){
//self updated
String my_val = this.getR();
getQ().remove(my_val);
//remote channel's data updated
ch.getQ().remove(str);
ch.getR();
//need to do placing of string 1 before 2
if(str.charAt(1) == '1'){
output.append(str + "," + my_val + " ");
}
else{
output.append(my_val + "," + str + " ");
}
}
if(str.charAt(0) == 'G' && hasG()){
//self updated
String my_val = this.getG();
getQ().remove(my_val);
//remote channel's data updated
ch.getQ().remove(str);
ch.getG();
//need to do placing of string 1 before 2
if(str.charAt(1) == '1'){
output.append(str + "," + my_val + " ");
}
else{
output.append(my_val + "," + str + " ");
}
}
if(str.charAt(0) == 'B' && hasB()){
//self updated
String my_val = this.getB();
getQ().remove(my_val);
//remote channel's data updated
ch.getQ().remove(str);
ch.getB();
//need to do placing of string 1 before 2
if(str.charAt(1) == '1'){
output.append(str + "," + my_val + " ");
}
else{
output.append(my_val + "," + str + " ");
}
}
}
private void insertR(String _string){
Rs.add(_string);
}
private void insertG(String _string){
Gs.add(_string);
}
private void insertB(String _string){
Bs.add(_string);
}
public boolean hasR(){
if(Rs.size() > 0){
return true;
}
return false;
}
public boolean hasG(){
if(Gs.size() > 0){
return true;
}
return false;
}
public boolean hasB(){
if(Bs.size() > 0){
return true;
}
return false;
}
public String getR(){
if(hasR()){
return Rs.remove(0);
}
return null;
}
public String getG(){
if(hasG()){
return Gs.remove(0);
}
return null;
}
public String getB(){
if(hasB()){
return Bs.remove(0);
}
return null;
}
}
使用通过 SocketChannel 读入两个列表的无限数据流测试输出。