组合 KV 对的 PCollection 时,如何访问 CombinerFn 子类中的密钥?
How Can I access the key in subclass of CombinerFn when combining a PCollection of KV pairs?
我正在使用 CombineFn
的子类来实现 CombinePerKeyExample,而不是使用 SerializableFunction
的实现
package me.examples;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import java.util.HashSet;
import java.util.Set;
public class ConcatWordsCombineFn extends CombineFn<String, ConcatWordsCombineFn.Accumulator, String> {
@DefaultCoder(AvroCoder.class)
public static class Accumulator{
HashSet<String> plays;
}
@Override
public Accumulator createAccumulator(){
Accumulator accumulator = new Accumulator();
accumulator.plays = new HashSet<>();
return accumulator;
}
@Override
public Accumulator addInput(Accumulator accumulator, String input){
accumulator.plays.add(input);
return accumulator;
}
@Override
public Accumulator mergeAccumulators(Iterable<Accumulator> accumulators){
Accumulator mergeAccumulator = new Accumulator();
mergeAccumulator.plays = new HashSet<>();
for(Accumulator accumulator: accumulators){
mergeAccumulator.plays.addAll(accumulator.plays);
}
return mergeAccumulator;
}
@Override
public String extractOutput(Accumulator accumulator){
//how to access the key here ?
return String.join(",", accumulator.plays);
}
}
管道由ReadFromBigQuery
、ExtractAllPlaysOfWords
(代码如下)和WriteToBigQuery
组成
package me.examples;
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
public class PlaysForWord extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
@Override
public PCollection<TableRow> expand(PCollection<TableRow> input) {
PCollection<KV<String, String>> largeWords = input.apply("ExtractLargeWords", ParDo.of(new ExtractLargeWordsFn()));
PCollection<KV<String, String>> wordNPlays = largeWords.apply("CombinePlays",Combine.perKey(new ConcatWordsCombineFn()));
wordNPlays.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
PCollection<TableRow> rows = wordNPlays.apply("FormatToRow", ParDo.of(new FormatShakespeareOutputFn()));
return rows;
}
}
我想访问 ConcatWordsCombineFn
中的密钥,以便在此基础上进行最终累加。例如,如果键以 a
开头,则可以使用 ,
连接单词,否则使用 ;
。
看编程指南时
If you need the combining strategy to change based on the key (for example, MIN for some users and MAX for other users), you can define a KeyedCombineFn to access the key within the combining strategy.
我在 org.apache.beam.sdk.transforms.Combine
中找不到 KeyedCombineFn
我正在使用 Apache Beam 2.12.0 和 Google Dataflow 作为跑步者。
我认为 built-in 没有办法解决这个问题。直接的解决方法(我知道并不完美)是将您的字符串包装到另一个 KV 中:KV<String, KV<String, String>>
,其中两个键相同。
我正在使用 CombineFn
的子类来实现 CombinePerKeyExample,而不是使用 SerializableFunction
package me.examples;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import java.util.HashSet;
import java.util.Set;
public class ConcatWordsCombineFn extends CombineFn<String, ConcatWordsCombineFn.Accumulator, String> {
@DefaultCoder(AvroCoder.class)
public static class Accumulator{
HashSet<String> plays;
}
@Override
public Accumulator createAccumulator(){
Accumulator accumulator = new Accumulator();
accumulator.plays = new HashSet<>();
return accumulator;
}
@Override
public Accumulator addInput(Accumulator accumulator, String input){
accumulator.plays.add(input);
return accumulator;
}
@Override
public Accumulator mergeAccumulators(Iterable<Accumulator> accumulators){
Accumulator mergeAccumulator = new Accumulator();
mergeAccumulator.plays = new HashSet<>();
for(Accumulator accumulator: accumulators){
mergeAccumulator.plays.addAll(accumulator.plays);
}
return mergeAccumulator;
}
@Override
public String extractOutput(Accumulator accumulator){
//how to access the key here ?
return String.join(",", accumulator.plays);
}
}
管道由ReadFromBigQuery
、ExtractAllPlaysOfWords
(代码如下)和WriteToBigQuery
package me.examples;
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
public class PlaysForWord extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
@Override
public PCollection<TableRow> expand(PCollection<TableRow> input) {
PCollection<KV<String, String>> largeWords = input.apply("ExtractLargeWords", ParDo.of(new ExtractLargeWordsFn()));
PCollection<KV<String, String>> wordNPlays = largeWords.apply("CombinePlays",Combine.perKey(new ConcatWordsCombineFn()));
wordNPlays.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
PCollection<TableRow> rows = wordNPlays.apply("FormatToRow", ParDo.of(new FormatShakespeareOutputFn()));
return rows;
}
}
我想访问 ConcatWordsCombineFn
中的密钥,以便在此基础上进行最终累加。例如,如果键以 a
开头,则可以使用 ,
连接单词,否则使用 ;
。
看编程指南时
If you need the combining strategy to change based on the key (for example, MIN for some users and MAX for other users), you can define a KeyedCombineFn to access the key within the combining strategy.
我在 org.apache.beam.sdk.transforms.Combine
中找不到 KeyedCombineFn
我正在使用 Apache Beam 2.12.0 和 Google Dataflow 作为跑步者。
我认为 built-in 没有办法解决这个问题。直接的解决方法(我知道并不完美)是将您的字符串包装到另一个 KV 中:KV<String, KV<String, String>>
,其中两个键相同。