自己生成数据的 Flink Streaming 例子
Flink streaming example that generates its own data
早些时候我问了一个简单的 for Flink。这给了我一些很好的例子!
但是我想问一个更“流”的例子,我们每秒生成一个输入值。理想情况下,这将是随机的,但即使每次都是相同的值也可以。
objective 是为了获得一个通过 no/minimal 外部触摸“移动”的流。
因此我的问题是:
如何在没有外部依赖的情况下展示Flink实际流数据?
我发现了如何通过在外部生成数据并写入 Kafka 或收听 public 源来展示这一点,但是我试图以最小的依赖性来解决它(比如从 Nifi 中的 GenerateFlowFile 开始)。
举个例子。这是作为如何使源和接收器可插入的示例构建的。这个想法是,在开发中,您可以使用随机源并打印结果,对于测试,您可以使用输入事件的硬连线列表并将结果收集在列表中,而在生产中,您将使用真实的源和接收器。
这是工作:
/*
* Example showing how to make sources and sinks pluggable in your application code so
* you can inject special test sources and test sinks in your tests.
*/
public class TestableStreamingJob {
private SourceFunction<Long> source;
private SinkFunction<Long> sink;
public TestableStreamingJob(SourceFunction<Long> source, SinkFunction<Long> sink) {
this.source = source;
this.sink = sink;
}
public void execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> LongStream =
env.addSource(source)
.returns(TypeInformation.of(Long.class));
LongStream
.map(new IncrementMapFunction())
.addSink(sink);
env.execute();
}
public static void main(String[] args) throws Exception {
TestableStreamingJob job = new TestableStreamingJob(new RandomLongSource(), new PrintSinkFunction<>());
job.execute();
}
// While it's tempting for something this simple, avoid using anonymous classes or lambdas
// for any business logic you might want to unit test.
public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
public Long map(Long record) throws Exception {
return record + 1 ;
}
}
}
这是RandomLongSource
:
public class RandomLongSource extends RichParallelSourceFunction<Long> {
private volatile boolean cancelled = false;
private Random random;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
random = new Random();
}
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while (!cancelled) {
Long nextLong = random.nextLong();
synchronized (ctx.getCheckpointLock()) {
ctx.collect(nextLong);
}
}
}
@Override
public void cancel() {
cancelled = true;
}
}
早些时候我问了一个简单的
但是我想问一个更“流”的例子,我们每秒生成一个输入值。理想情况下,这将是随机的,但即使每次都是相同的值也可以。
objective 是为了获得一个通过 no/minimal 外部触摸“移动”的流。
因此我的问题是:
如何在没有外部依赖的情况下展示Flink实际流数据?
我发现了如何通过在外部生成数据并写入 Kafka 或收听 public 源来展示这一点,但是我试图以最小的依赖性来解决它(比如从 Nifi 中的 GenerateFlowFile 开始)。
举个例子。这是作为如何使源和接收器可插入的示例构建的。这个想法是,在开发中,您可以使用随机源并打印结果,对于测试,您可以使用输入事件的硬连线列表并将结果收集在列表中,而在生产中,您将使用真实的源和接收器。
这是工作:
/*
* Example showing how to make sources and sinks pluggable in your application code so
* you can inject special test sources and test sinks in your tests.
*/
public class TestableStreamingJob {
private SourceFunction<Long> source;
private SinkFunction<Long> sink;
public TestableStreamingJob(SourceFunction<Long> source, SinkFunction<Long> sink) {
this.source = source;
this.sink = sink;
}
public void execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> LongStream =
env.addSource(source)
.returns(TypeInformation.of(Long.class));
LongStream
.map(new IncrementMapFunction())
.addSink(sink);
env.execute();
}
public static void main(String[] args) throws Exception {
TestableStreamingJob job = new TestableStreamingJob(new RandomLongSource(), new PrintSinkFunction<>());
job.execute();
}
// While it's tempting for something this simple, avoid using anonymous classes or lambdas
// for any business logic you might want to unit test.
public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
public Long map(Long record) throws Exception {
return record + 1 ;
}
}
}
这是RandomLongSource
:
public class RandomLongSource extends RichParallelSourceFunction<Long> {
private volatile boolean cancelled = false;
private Random random;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
random = new Random();
}
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while (!cancelled) {
Long nextLong = random.nextLong();
synchronized (ctx.getCheckpointLock()) {
ctx.collect(nextLong);
}
}
}
@Override
public void cancel() {
cancelled = true;
}
}