Flink `textInputFormat` 不处理来自 aws S3 `Source` 文件系统的 GZ 压缩文件
Flink `textInputFormat` does not process GZ compressed files from aws S3 `Source` file system
我跟随 () 并编写了以下代码片段来处理目录中的 .gz
日志文件,使用简单的 TextInputFormat
。它适用于我的本地测试目录,扫描并自动打开 .gz
文件内容。但是,当我 运行 使用 s3 存储桶源时,它不处理 .gz
压缩文件。不过,此 Flink 作业仍会在 s3 存储桶上打开 .log
个文件。似乎它只是不解压缩 .gz
文件。我怎样才能在 s3 文件系统上解决这个问题?
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final String sourceLogDirPath = params.get("source_log_dir_path", "s3://my-test-bucket-logs/"); // "/Users/my.user/logtest/logs"
final Long checkpointInterval = Long.parseLong(params.get("checkpoint_interval", "60000"));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getConfig().setGlobalJobParameters(params);
TextInputFormat textInputFormat = new TextInputFormat(new Path(sourceLogDirPath));
textInputFormat.setNestedFileEnumeration(true);
DataStream<String> stream = env.readFile(
textInputFormat, sourceLogDirPath,
FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
stream.print();
env.execute();
}
这是我的类路径 jar flink 库:
/opt/flink/lib/flink-csv-1.13.2.jar:/opt/flink/lib/flink-json-1.13.2.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.13.2.jar:/opt/flink/lib/flink-table_2.12-1.13.2.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar :/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/sentry_log4j2_deploy.jar :/opt/flink/lib/flink-dist_2.12-1.13.2.jar:::
P.S。我也试过 s3a://<bucket>/
但没有成功。
可能你可以将log改成debug模式,观察拆分文件时是否过滤掉文件
默认情况下,将过滤掉以“.”或“_”开头的文件
我跟随 (.gz
日志文件,使用简单的 TextInputFormat
。它适用于我的本地测试目录,扫描并自动打开 .gz
文件内容。但是,当我 运行 使用 s3 存储桶源时,它不处理 .gz
压缩文件。不过,此 Flink 作业仍会在 s3 存储桶上打开 .log
个文件。似乎它只是不解压缩 .gz
文件。我怎样才能在 s3 文件系统上解决这个问题?
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final String sourceLogDirPath = params.get("source_log_dir_path", "s3://my-test-bucket-logs/"); // "/Users/my.user/logtest/logs"
final Long checkpointInterval = Long.parseLong(params.get("checkpoint_interval", "60000"));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getConfig().setGlobalJobParameters(params);
TextInputFormat textInputFormat = new TextInputFormat(new Path(sourceLogDirPath));
textInputFormat.setNestedFileEnumeration(true);
DataStream<String> stream = env.readFile(
textInputFormat, sourceLogDirPath,
FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
stream.print();
env.execute();
}
这是我的类路径 jar flink 库:
/opt/flink/lib/flink-csv-1.13.2.jar:/opt/flink/lib/flink-json-1.13.2.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.13.2.jar:/opt/flink/lib/flink-table_2.12-1.13.2.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar :/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/sentry_log4j2_deploy.jar :/opt/flink/lib/flink-dist_2.12-1.13.2.jar:::
P.S。我也试过 s3a://<bucket>/
但没有成功。
可能你可以将log改成debug模式,观察拆分文件时是否过滤掉文件
默认情况下,将过滤掉以“.”或“_”开头的文件