数据流:无法初始化 class org.xerial.snappy.Snappy

Dataflow: Could not initialize class org.xerial.snappy.Snappy

我的管道通过Pub\Sub方式从GCS读取数据,然后将数据接收到redis。开始时似乎在 Dataflow 中运行良好。但是,运行 两天后我的管道中发现了以下异常。


java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy
        org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:97)
        org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:89)
        org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79)
        org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
        org.apache.beam.runners.core.construction.WindowingStrategyTranslation.toProto(WindowingStrategyTranslation.java:216)
        org.apache.beam.runners.core.construction.WindowingStrategyTranslation.toProto(WindowingStrategyTranslation.java:294)
        org.apache.beam.runners.core.construction.WindowingStrategyTranslation.toMessageProto(WindowingStrategyTranslation.java:280)
        org.apache.beam.runners.dataflow.worker.graph.RegisterNodeFunction.apply(RegisterNodeFunction.java:205)
        org.apache.beam.runners.dataflow.worker.graph.RegisterNodeFunction.apply(RegisterNodeFunction.java:97)
        java.util.function.Function.lambda$andThen(Function.java:88)
        org.apache.beam.runners.dataflow.worker.graph.CreateRegisterFnOperationFunction.apply(CreateRegisterFnOperationFunction.java:207)
        org.apache.beam.runners.dataflow.worker.graph.CreateRegisterFnOperationFunction.apply(CreateRegisterFnOperationFunction.java:74)
        java.util.function.Function.lambda$andThen(Function.java:88)
        java.util.function.Function.lambda$andThen(Function.java:88)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1172)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access00(StreamingDataflowWorker.java:149)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.run(StreamingDataflowWorker.java:1028)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

这是 Dataflow 的一期还是我的管道?

根据 google 支持,此问题是由内存不足引起的。可以通过以下几种解决方案

  • To reduce your memory requirements in your pipeline.
  • To use a VM with a higher memory allocation.
  • To use Streaming Autoscaling (Not supported in the Apache Beam SDK for Python.) [3]
  • To use Streaming Engine [4]. This allows to move the pipeline execution out of the worker VMs and into the Cloud Dataflow service backend, reducing the consumption of CPU, memory and Persistent Disk storage resources on the worker VMs.

所以我添加 --maxNumWorkers=15 --autoscalingAlgorithm=THROUGHPUT_BASED 来启动数据流作业。现在效果很好。