如何从两个流中获取相似的元素并收集由此形成的对而不丢失顺序?

How to get similar elements from two streams and collect the pairs thus formed without losing the order?

例如:
输入是无限流,可用作 map reduce 每次传递的有限列表:

 list 1:  List<String> : {"a1_5", "c1_91", "b1_43", "b1_76", "a1_68"}
 list 2:  List<String> : {"c2_3", "b2_19", "c2_29", "a2_45", "b2_53"}

我的输出应该是由 List 输出实例组成的无限流:

List<String> : {"a1_5,a2_45", "c1_91,c2_3", "b1_43,b2_19", "b1_76,b2_53", "a1_68,a2_45"}

或者输出可能是:

List<String> : {"c1_91,c2_3", "b1_43,b2_19", "a1_5,a2_45", "b1_76,b2_53"}

假设两个列表的大小相同,您可以这样做:

public static void main(String[] args) {

    List<String> list1 = new ArrayList<String>();
    list1.add("a1_5");
    list1.add("c1_91");
    list1.add("b1_43");
    list1.add("b1_76");
    list1.add("a1_68");

    List<String> list2 = new ArrayList<String>();
    list2.add("c2_3");
    list2.add("b2_19");
    list2.add("c2_29");
    list2.add("a2_45");
    list2.add("b2_53");

    List<String> result = new ArrayList<String>();

    for (int i = 0; i < list1.size(); i++) {
        result.add(list1.get(i) + "," + list2.get(i));
    }

    //Printing the results
    for (String a : result) {
        System.out.println(a);
    }
}

如果列表可能有不同的大小,我会用一些基本代码来控制它:

public static void main(String[] args) {

    List<String> list1 = new ArrayList<String>();
    list1.add("a1_5");
    list1.add("c1_91");
    list1.add("b1_43");
    list1.add("b1_76");
    list1.add("a1_68");
    //New instance
    list1.add("a2");

    List<String> list2 = new ArrayList<String>();
    list2.add("c2_3");
    list2.add("b2_19");
    list2.add("c2_29");
    list2.add("a2_45");
    list2.add("b2_53");     

    List<String> result = new ArrayList<String>();
    int aux = 0;
    if (list1.size() >= list2.size()) {
        aux = list1.size();
    } else {
        aux = list2.size();
    }

    for (int i = 0; i < aux; i++) {
        if(i == list1.size()){
            result.add(null+","+list2.get(i));
        }else if(i == list2.size()){
            result.add(list1.get(i)+","+null);
        }else{
            result.add(list1.get(i)+","+list2.get(i));
        }           
    }

    //Printing the results
    for (String a : result) {
        System.out.println(a);
    }
}

如果问题是关于 Java 8 个流,可以使用相当复杂的自定义 Spliterator 解决,如下所示:

public static <T,K,R> Stream<R> pairs(Stream<T> a, Stream<T> b, 
                 Function<T, K> keyExtractor, BiFunction<T, T, R> merger) {
    Map<K, Queue<T>> aMap = new HashMap<>();
    Map<K, Queue<T>> bMap = new HashMap<>();
    Spliterator<T> aSpltr = a.spliterator();
    Spliterator<T> bSpltr = b.spliterator();

    Spliterator<R> res = new Spliterators.AbstractSpliterator<R>(Math.min(
            aSpltr.estimateSize(), bSpltr.estimateSize()), Spliterator.ORDERED) {
        T at, bt;
        boolean hasBuffered = false;
        R buf;

        @Override
        public boolean tryAdvance(Consumer<? super R> action) {
            if(hasBuffered) {
                hasBuffered = false;
                action.accept(buf);
                return true;
            }
            while(true) {
                if(!aSpltr.tryAdvance(t -> at = t) || !bSpltr.tryAdvance(t -> bt = t))
                    return false;
                K ak = keyExtractor.apply(at);
                K bk = keyExtractor.apply(bt);
                Queue<T> bq = bMap.get(ak);
                boolean found = false;
                if(bq != null) {
                    found = true;
                    action.accept(merger.apply(at, bq.poll()));
                    if(bq.isEmpty()) bMap.remove(ak);
                } else {
                    aMap.computeIfAbsent(ak, k -> new ArrayDeque<>()).add(at);
                }
                Queue<T> aq = aMap.get(bk);
                if(aq != null) {
                    if(found) {
                        hasBuffered = true;
                        buf = merger.apply(aq.poll(), bt);
                    } else {
                        found = true;
                        action.accept(merger.apply(aq.poll(), bt));
                    }
                    if(aq.isEmpty()) aMap.remove(bk);
                } else {
                    bMap.computeIfAbsent(bk, k -> new ArrayDeque<>()).add(bt);
                }
                if(found)
                    return true;
            }
        }
    };
    return StreamSupport.stream(res, a.isParallel() || b.isParallel())
              .onClose(a::close).onClose(b::close);
}

此方法接受两个流(可能是无限的)、键提取函数(在您的情况下需要提取第一个字符)和合并函数(如何将两个元素组合在一起;在您的情况下使用 "," 连接).这是用法示例:

List<String> list1 = Arrays.asList("a1_5", "c1_91", "b1_43", "b1_76", "a1_68"); 
List<String> list2 = Arrays.asList("c2_3", "b2_19", "c2_29", "a2_45", "b2_53"); 
pairs(list1.stream(), list2.stream(), s -> s.charAt(0), (a, b) -> a+","+b)
    .forEach(System.out::println);

输出:

c1_91,c2_3
b1_43,b2_19
a1_5,a2_45
b1_76,b2_53

具有实际无限流的替代示例:组合来自两个流的随机数对,它们仅在最后一位不同:

pairs(new Random().ints(0, 1000).boxed(), new Random().ints(0, 1000).boxed(),
        i -> i/10, (a, b) -> a+","+b)
    .limit(100)
    .forEach(System.out::println);

请注意,对于无限流,如果流中有许多未配对的元素,则可能 OutOfMemoryError

假设你谈论 Java 8 个流,列表具有相同的长度,每个元素都可以按照你描述的方式配对,你不介意使用额外的库 Java俚语,可以这样做(尽管适用于不同大小的列表):

// functional way
static List<String> pairingFun(List<String> list1, List<String> list2,
                               BiPredicate<String, String> isPair) {
    return pairingFun(list1.size(), Stream.empty(), Stream.ofAll(list1), Stream.ofAll(list2).cycle(), isPair)
            .toJavaList();
}

// recursive helper function
static Stream<String> pairingFun(int size, Stream<String> acc, Stream<String> stream1, Stream<String> stream2,
                                 BiPredicate<String, String> isPair) {
    if (stream1.isEmpty()) {
        return acc;
    } else {
        String elem1 = stream1.head();
        Option<String> elem2 = stream2.take(size).find(that -> isPair.test(elem1, that));
        return pairingFun(size,
                elem2.map(elem -> acc.append(elem1 + "," + elem)).getOrElse(acc),
                stream1.tail(),
                elem2.isDefined() ? stream2.dropUntil(that -> isPair.test(elem1, that)).tail() : stream2,
                isPair);
    }
}

在理想情况下,您不会在 Java 集合和 Java 俚语集合之间来回转换,而只使用 Java 俚语集合。这将进一步减少样板文件。但是,我怀疑您很可能绑定到其他第 3 方库的 API。

但请注意,如果 list1 包含太多元素,我们使用上面的递归函数可能会产生堆栈溢出。因此,我建议使用古老的命令式方式:

// imperative way
static List<String> pairingImp(List<String> list1, List<String> list2,
                               BiPredicate<String, String> isPair) {
    int size = list1.size();
    List<String> result = new ArrayList<>(size);
    Stream<String> stream = Stream.ofAll(list2).cycle();
    for (String elem1 : list1) {
        Option<String> elem2 = stream.take(size).find(that -> isPair.test(elem1, that));
        if (elem2.isDefined()) {
            result.add(elem1 + "," + elem2.get());
            stream = stream.dropUntil(that -> isPair.test(elem1, that)).tail();
        }
    }
    return result;
}

这是一个测试:

import javaslang.collection.Stream;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiPredicate;

// ...

public static void main(String[] args) {

    List<String> list1 = Arrays.asList("a1_5", "c1_91", "b1_43", "b1_76", "a1_68");
    List<String> list2 = Arrays.asList("c2_3", "b2_19", "c2_29", "a2_45", "b2_53");

    BiPredicate<String, String> isPair = (s1, s2) -> s1.charAt(0) == s2.charAt(0);

    // [a1_5,a2_45, c1_91,c2_3, b1_43,b2_19, b1_76,b2_53, a1_68,a2_45]
    System.out.println(pairingFun(list1, list2, isPair));

    // [a1_5,a2_45, c1_91,c2_3, b1_43,b2_19, b1_76,b2_53, a1_68,a2_45]
    System.out.println(pairingImp(list1, list2, isPair));
}

因为我们为 list1 的每个元素迭代 list2,所以我们具有二次运行时性能,即 O(n^2)。这可以通过使用映射来查找配对候选来进一步改进。我认为最快的解决方案将在 O(n * log n) 中执行。

免责声明:我是 Java俚语的创造者。

这个频道class是我解决问题的幼稚方法。按照@Daniel Dietrich 和@Tagir Valeev 的建议,我将使用 java 8 个流来更清晰地完成它。

真正的工作是匹配函数。

    class Channel{

private StringBuilder output_string_builder = null;

public static void main(String[] args){
           Channel ch1 = new Channel(list_channel1);
           Channel ch2 = new Channel(list_channel2);
           Integer count1 = new Integer(ch1.getQ().size());
           Integer count2 = new Integer(ch2.getQ().size());
           while((ch1.getQ().size()>0 && count1>0) 
                 && (ch2.getQ().size()>0 && count2>0))
           {
                 ch1.match(ch2, output_string_builder);
                 count1--;
                 count2--;
           }
     System.out.println(output_string_builder.toString());

}


private List<String> Rs;
private List<String> Gs;
private List<String> Bs;
private List<String> my_list;

public Channel(List<String> channel_list){
    Rs = new ArrayList<String>();
    Gs = new ArrayList<String>();
    Bs = new ArrayList<String>();
    my_list = channel_list;
    for(String str: channel_list){
        if(str.charAt(0) == 'R'){
            insertR(str);
        }
        if(str.charAt(0) == 'G'){
            insertG(str);
        }
        if(str.charAt(0) == 'B'){
            insertB(str);
        }
    }
}

public List<String> getQ(){
    return my_list;
}

public void match(Channel ch, StringBuilder output){

    if(getQ().size() < 1 || ch.getQ().size() < 1){
        return;
    }

        String str = ch.getQ().get(0);
        if(str.charAt(0) == 'R' && hasR()){
            //self updated
            String my_val = this.getR();
            getQ().remove(my_val);

            //remote channel's data updated
            ch.getQ().remove(str);
            ch.getR();

            //need to do placing of string 1 before 2
            if(str.charAt(1) == '1'){
                output.append(str + "," + my_val + " ");
            }
            else{
                output.append(my_val + "," + str + " ");
            }



        }
        if(str.charAt(0) == 'G' && hasG()){
            //self updated
            String my_val = this.getG();
            getQ().remove(my_val);

            //remote channel's data updated
            ch.getQ().remove(str);
            ch.getG();

            //need to do placing of string 1 before 2
            if(str.charAt(1) == '1'){
                output.append(str + "," + my_val + " ");
            }
            else{
                output.append(my_val + "," + str + " ");
            }



        }
        if(str.charAt(0) == 'B' && hasB()){
            //self updated
            String my_val = this.getB();
            getQ().remove(my_val);

            //remote channel's data updated
            ch.getQ().remove(str);
            ch.getB();

            //need to do placing of string 1 before 2
            if(str.charAt(1) == '1'){
                output.append(str + "," + my_val + " ");
            }
            else{
                output.append(my_val + "," + str + " ");
            }



        }

}

private void insertR(String _string){
    Rs.add(_string);
}
private void insertG(String _string){
    Gs.add(_string);
}
private void insertB(String _string){
    Bs.add(_string);
}

public boolean hasR(){
    if(Rs.size() > 0){
        return true;
    }
    return false;
}
public boolean hasG(){
    if(Gs.size() > 0){
        return true;
    }
    return false;
}
public boolean hasB(){
    if(Bs.size() > 0){
        return true;
    }
    return false;
}

public String getR(){
    if(hasR()){
        return Rs.remove(0);
    }
    return null;

}
public String getG(){
    if(hasG()){
    return Gs.remove(0);
    }
    return null;
}

public String getB(){
    if(hasB()){
    return Bs.remove(0);
    }
    return null;
}


}

使用通过 SocketChannel 读入两个列表的无限数据流测试输出。