根据 Flink 的模式使用 GCS 文件
Consume GCS files based on pattern from Flink
由于 Flink 支持 Hadoop 文件系统抽象,并且有一个 GCS connector - 在 Google 云存储之上实现它的库。
如何使用此 repo 中的代码创建 Flink 文件源?
要做到这一点,您需要:
- Install and configure Flink 集群上的 GCS 连接器。
- 将 Hadoop 和 Flink 依赖项(包括 HDFS connector)添加到您的项目中:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
Use it 使用 GCS 路径创建数据源:
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<LongWritable, Text>> input =
env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat(), LongWritable.class, Text.class, "gs://bucket/path/some*pattern/"));
由于 Flink 支持 Hadoop 文件系统抽象,并且有一个 GCS connector - 在 Google 云存储之上实现它的库。
如何使用此 repo 中的代码创建 Flink 文件源?
要做到这一点,您需要:
- Install and configure Flink 集群上的 GCS 连接器。
- 将 Hadoop 和 Flink 依赖项(包括 HDFS connector)添加到您的项目中:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency>
Use it 使用 GCS 路径创建数据源:
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.hadoopcompatibility.HadoopInputs; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.TextInputFormat; ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple2<LongWritable, Text>> input = env.createInput( HadoopInputs.readHadoopFile( new TextInputFormat(), LongWritable.class, Text.class, "gs://bucket/path/some*pattern/"));