使用向后兼容的编码器更改编码器以进行转换
Changing Coder for a transform with a backwards compatible coder
我想弄清楚如何将 Coder(StringUtf8Coder) 换成自定义实现。
我已经实现了一个编码器,它增加了处理快速压缩字符串的能力:
import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.DelegateCoder;
import com.google.common.base.Charsets;
import org.xerial.snappy.Snappy;
import java.io.IOException;
public class CompressedByteArrayCoder extends DelegateCoder<String, byte[]> {
private static String decompressSnappy(byte[] input) throws IOException {
if (input == null) {
throw new CoderException("null input is not accepted");
}
if (Snappy.isValidCompressedBuffer(input)) {
return Snappy.uncompressString(input);
}
return new String(input, Charsets.UTF_8);
}
private static byte[] compressSnappy(String input) throws IOException {
return Snappy.compress(input);
}
public static CompressedByteArrayCoder of() {
return new CompressedByteArrayCoder(ByteArrayCoder.of(), CompressedByteArrayCoder::compressSnappy, CompressedByteArrayCoder::decompressSnappy);
}
private CompressedByteArrayCoder(Coder<byte[]> coder, CodingFunction<String, byte[]> toFn, CodingFunction<byte[], String> fromFn) {
super(coder, toFn, fromFn);
}
}
我想找出一种方法来换出 StringUtf8Coder(PubSubIO.Read 的默认值),不会导致数据流管道更新失败。
我想弄清楚如何告诉数据流服务运行器这两个编码器是 "compatible"。
遗憾的是,目前在 Google Cloud Dataflow 服务上更新 运行 管道时无法更改 PCollection 的编码器。在这种情况下,您必须将管道作为新的 Dataflow 作业提交。
有关详细信息,请参阅 Updating an Existing Pipeline,特别是有关兼容性检查的部分。
不过,这是我们将来可能会解决的问题。请查看我们的文档以获取任何更新。
我想弄清楚如何将 Coder(StringUtf8Coder) 换成自定义实现。
我已经实现了一个编码器,它增加了处理快速压缩字符串的能力:
import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.DelegateCoder;
import com.google.common.base.Charsets;
import org.xerial.snappy.Snappy;
import java.io.IOException;
public class CompressedByteArrayCoder extends DelegateCoder<String, byte[]> {
private static String decompressSnappy(byte[] input) throws IOException {
if (input == null) {
throw new CoderException("null input is not accepted");
}
if (Snappy.isValidCompressedBuffer(input)) {
return Snappy.uncompressString(input);
}
return new String(input, Charsets.UTF_8);
}
private static byte[] compressSnappy(String input) throws IOException {
return Snappy.compress(input);
}
public static CompressedByteArrayCoder of() {
return new CompressedByteArrayCoder(ByteArrayCoder.of(), CompressedByteArrayCoder::compressSnappy, CompressedByteArrayCoder::decompressSnappy);
}
private CompressedByteArrayCoder(Coder<byte[]> coder, CodingFunction<String, byte[]> toFn, CodingFunction<byte[], String> fromFn) {
super(coder, toFn, fromFn);
}
}
我想找出一种方法来换出 StringUtf8Coder(PubSubIO.Read 的默认值),不会导致数据流管道更新失败。
我想弄清楚如何告诉数据流服务运行器这两个编码器是 "compatible"。
遗憾的是,目前在 Google Cloud Dataflow 服务上更新 运行 管道时无法更改 PCollection 的编码器。在这种情况下,您必须将管道作为新的 Dataflow 作业提交。
有关详细信息,请参阅 Updating an Existing Pipeline,特别是有关兼容性检查的部分。
不过,这是我们将来可能会解决的问题。请查看我们的文档以获取任何更新。