Flink sink 从不执行
Flink sink never executes
我有一个程序可以将加密货币价格流式传输到 flink 管道中,并打印一段时间的最高出价 window。
Main.java
public class Main {
private final static Logger log = LoggerFactory.getLogger(Main.class);
private final static DateFormat dateFormat = new SimpleDateFormat("y-M-d H:m:s");
private final static NumberFormat numberFormat = new DecimalFormat("#0.00");
public static void main(String[] args) throws Exception {
MultipleParameterTool multipleParameterTool = MultipleParameterTool.fromArgs(args);
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.getConfig().setGlobalJobParameters(multipleParameterTool);
streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
streamExecutionEnvironment.addSource(new GdaxSourceFunction())
.name("Gdax Exchange Price Source")
.assignTimestampsAndWatermarks(new WatermarkStrategy<TickerPrice>() {
@Override
public WatermarkGenerator<TickerPrice> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new BoundedOutOfOrdernessGenerator();
}
})
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100)))
.trigger(EventTimeTrigger.create())
.reduce((ReduceFunction<TickerPrice>) (value1, value2) ->
value1.getHighestBid() > value2.getHighestBid() ? value1 : value2)
.addSink(new SinkFunction<TickerPrice>() {
@Override
public void invoke(TickerPrice value, Context context) throws Exception {
String dateString = dateFormat.format(context.timestamp());
String valueString = "$" + numberFormat.format(value.getHighestBid());
log.info(dateString + " : " + valueString);
}
}).name("Highest Bid Logger");
streamExecutionEnvironment.execute("Gdax Highest bid window calculator");
}
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
public static class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<TickerPrice> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public void onEvent(TickerPrice event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// emit the watermark as current highest timestamp minus the out-of-orderness bound
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
}
GdaxSourceFunction.java
public class GdaxSourceFunction extends WebSocketClient implements SourceFunction<TickerPrice> {
private static String URL = "wss://ws-feed.gdax.com";
private static Logger log = LoggerFactory.getLogger(GdaxSourceFunction.class);
private static String subscribeMsg = "{\n" +
" \"type\": \"subscribe\",\n" +
" \"product_ids\": [<productIds>],\n" +
" \"channels\": [\n" +
//TODO: uncomment to re-enable order book tracking
//" \"level2\",\n" +
" {\n" +
" \"name\": \"ticker\",\n" +
" \"product_ids\": [<productIds>]\n" +
" }\n"+
" ]\n" +
"}";
SourceContext<TickerPrice> ctx;
@Override
public void run(SourceContext<TickerPrice> ctx) throws Exception {
this.ctx = ctx;
openConnection().get();
while(isOpen()) {
Thread.sleep(10000);
}
}
@Override
public void cancel() {
}
@Override
public void onMessage(String message) {
try {
ObjectNode objectNode = objectMapper.readValue(message, ObjectNode.class);
String type = objectNode.get("type").asText();
if("ticker".equals(type)) {
TickerPrice tickerPrice = new TickerPrice();
String productId = objectNode.get("product_id").asText();
String[] currencies = productId.split("-");
tickerPrice.setFromCurrency(currencies[1]);
tickerPrice.setToCurrency(currencies[0]);
tickerPrice.setHighestBid(objectNode.get("best_bid").asDouble());
tickerPrice.setLowestOffer(objectNode.get("best_ask").asDouble());
tickerPrice.setExchange("gdax");
String time = objectNode.get("time").asText();
Instant instant = Instant.parse(time);
ctx.collectWithTimestamp(tickerPrice, instant.getEpochSecond());
}
//log.info(objectNode.toString());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
@Override
public void onOpen(Session session) {
super.onOpen(session);
//Authenticate and ensure we can properly connect to Gdax Websocket
//construct auth message with list of product ids
StringBuilder productIds = new StringBuilder("");
productIds.append("" +
"\"ETH-USD\",\n" +
"\"ETH-USD\",\n" +
"\"BTC-USD\"");
String subMsg = subscribeMsg.replace("<productIds>", productIds.toString());
try {
userSession.getAsyncRemote().sendText(subMsg).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
public String getUrl() {
return URL;
}
}
但从未调用接收器函数。我已验证减速器正在执行(非常快,每 100 毫秒一次)。如果我删除 windowing 部分并只打印每条进入的记录的出价,程序就可以运行。但是我已经学习了 windowing 上的所有教程,并且我发现我在这里所做的与教程中显示的没有区别。我不知道为什么 flink 接收器不会在 windowed 模式下执行。
我直接从this tutorial复制了BoundedOutOfOrdernessGenerator
class。它应该适用于我的用例。在 3600 毫秒内,我应该在日志中看到我的第一条记录,但我没有。我调试了程序,但 sink 函数从未执行过。如果我删除这些行:
.assignTimestampsAndWatermarks(new WatermarkStrategy<TickerPrice>() {
@Override
public WatermarkGenerator<TickerPrice> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new BoundedOutOfOrdernessGenerator();
}
})
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100)))
.trigger(EventTimeTrigger.create())
.reduce((ReduceFunction<TickerPrice>) (value1, value2) ->
value1.getHighestBid() > value2.getHighestBid() ? value1 : value2)
因此流创建代码如下所示:
streamExecutionEnvironment.addSource(new GdaxSourceFunction())
.name("Gdax Exchange Price Source")
.addSink(new SinkFunction<TickerPrice>() {
@Override
public void invoke(TickerPrice value, Context context) throws Exception {
String dateString = dateFormat.format(context.timestamp());
String valueString = "$" + numberFormat.format(value.getHighestBid());
log.info(dateString + " : " + valueString);
}
}).name("Highest Bid Logger");
接收器会执行,但当然不会 window 编辑结果,因此它们对于我的用例来说是不正确的。但这表明我的 windowing 逻辑有问题,但我不知道是什么。
版本:
JDK 1.8
弗林克 1.11.2
我认为造成此问题的原因是您的自定义源生成的时间戳以秒为单位,而 window 持续时间始终以毫秒为单位。尝试改变
ctx.collectWithTimestamp(tickerPrice, instant.getEpochSecond());
到
ctx.collectWithTimestamp(tickerPrice, instant.getEpochMilli());
我还会建议一些其他(基本上不相关的)更改。
streamExecutionEnvironment.addSource(new GdaxSourceFunction())
.name("Gdax Exchange Price Source")
.uid("source")
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<TickerPrice>forBoundedOutOfOrderness(Duration.ofMillis(3500))
)
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100)))
.reduce((ReduceFunction<TickerPrice>) (value1, value2) ->
value1.getHighestBid() > value2.getHighestBid() ? value1 : value2)
.uid("window")
.addSink(new SinkFunction<TickerPrice>() { ... }
.uid("sink")
注意以下建议:
- 删除
BoundedOutOfOrdernessGenerator
。无需重新实现内置的有界无序水印生成器。
- 删除 window 触发器。貌似不需要重写默认的触发器,弄错了会出问题。
- 为每个有状态运算符添加 UID。如果您想在更改作业拓扑后对应用程序进行状态升级,则需要这些。 (您当前的接收器不是有状态的,但向其添加 UID 不会有什么坏处。)
我有一个程序可以将加密货币价格流式传输到 flink 管道中,并打印一段时间的最高出价 window。
Main.java
public class Main {
private final static Logger log = LoggerFactory.getLogger(Main.class);
private final static DateFormat dateFormat = new SimpleDateFormat("y-M-d H:m:s");
private final static NumberFormat numberFormat = new DecimalFormat("#0.00");
public static void main(String[] args) throws Exception {
MultipleParameterTool multipleParameterTool = MultipleParameterTool.fromArgs(args);
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.getConfig().setGlobalJobParameters(multipleParameterTool);
streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
streamExecutionEnvironment.addSource(new GdaxSourceFunction())
.name("Gdax Exchange Price Source")
.assignTimestampsAndWatermarks(new WatermarkStrategy<TickerPrice>() {
@Override
public WatermarkGenerator<TickerPrice> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new BoundedOutOfOrdernessGenerator();
}
})
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100)))
.trigger(EventTimeTrigger.create())
.reduce((ReduceFunction<TickerPrice>) (value1, value2) ->
value1.getHighestBid() > value2.getHighestBid() ? value1 : value2)
.addSink(new SinkFunction<TickerPrice>() {
@Override
public void invoke(TickerPrice value, Context context) throws Exception {
String dateString = dateFormat.format(context.timestamp());
String valueString = "$" + numberFormat.format(value.getHighestBid());
log.info(dateString + " : " + valueString);
}
}).name("Highest Bid Logger");
streamExecutionEnvironment.execute("Gdax Highest bid window calculator");
}
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
public static class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<TickerPrice> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public void onEvent(TickerPrice event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// emit the watermark as current highest timestamp minus the out-of-orderness bound
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
}
GdaxSourceFunction.java
public class GdaxSourceFunction extends WebSocketClient implements SourceFunction<TickerPrice> {
private static String URL = "wss://ws-feed.gdax.com";
private static Logger log = LoggerFactory.getLogger(GdaxSourceFunction.class);
private static String subscribeMsg = "{\n" +
" \"type\": \"subscribe\",\n" +
" \"product_ids\": [<productIds>],\n" +
" \"channels\": [\n" +
//TODO: uncomment to re-enable order book tracking
//" \"level2\",\n" +
" {\n" +
" \"name\": \"ticker\",\n" +
" \"product_ids\": [<productIds>]\n" +
" }\n"+
" ]\n" +
"}";
SourceContext<TickerPrice> ctx;
@Override
public void run(SourceContext<TickerPrice> ctx) throws Exception {
this.ctx = ctx;
openConnection().get();
while(isOpen()) {
Thread.sleep(10000);
}
}
@Override
public void cancel() {
}
@Override
public void onMessage(String message) {
try {
ObjectNode objectNode = objectMapper.readValue(message, ObjectNode.class);
String type = objectNode.get("type").asText();
if("ticker".equals(type)) {
TickerPrice tickerPrice = new TickerPrice();
String productId = objectNode.get("product_id").asText();
String[] currencies = productId.split("-");
tickerPrice.setFromCurrency(currencies[1]);
tickerPrice.setToCurrency(currencies[0]);
tickerPrice.setHighestBid(objectNode.get("best_bid").asDouble());
tickerPrice.setLowestOffer(objectNode.get("best_ask").asDouble());
tickerPrice.setExchange("gdax");
String time = objectNode.get("time").asText();
Instant instant = Instant.parse(time);
ctx.collectWithTimestamp(tickerPrice, instant.getEpochSecond());
}
//log.info(objectNode.toString());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
@Override
public void onOpen(Session session) {
super.onOpen(session);
//Authenticate and ensure we can properly connect to Gdax Websocket
//construct auth message with list of product ids
StringBuilder productIds = new StringBuilder("");
productIds.append("" +
"\"ETH-USD\",\n" +
"\"ETH-USD\",\n" +
"\"BTC-USD\"");
String subMsg = subscribeMsg.replace("<productIds>", productIds.toString());
try {
userSession.getAsyncRemote().sendText(subMsg).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
public String getUrl() {
return URL;
}
}
但从未调用接收器函数。我已验证减速器正在执行(非常快,每 100 毫秒一次)。如果我删除 windowing 部分并只打印每条进入的记录的出价,程序就可以运行。但是我已经学习了 windowing 上的所有教程,并且我发现我在这里所做的与教程中显示的没有区别。我不知道为什么 flink 接收器不会在 windowed 模式下执行。
我直接从this tutorial复制了BoundedOutOfOrdernessGenerator
class。它应该适用于我的用例。在 3600 毫秒内,我应该在日志中看到我的第一条记录,但我没有。我调试了程序,但 sink 函数从未执行过。如果我删除这些行:
.assignTimestampsAndWatermarks(new WatermarkStrategy<TickerPrice>() {
@Override
public WatermarkGenerator<TickerPrice> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new BoundedOutOfOrdernessGenerator();
}
})
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100)))
.trigger(EventTimeTrigger.create())
.reduce((ReduceFunction<TickerPrice>) (value1, value2) ->
value1.getHighestBid() > value2.getHighestBid() ? value1 : value2)
因此流创建代码如下所示:
streamExecutionEnvironment.addSource(new GdaxSourceFunction())
.name("Gdax Exchange Price Source")
.addSink(new SinkFunction<TickerPrice>() {
@Override
public void invoke(TickerPrice value, Context context) throws Exception {
String dateString = dateFormat.format(context.timestamp());
String valueString = "$" + numberFormat.format(value.getHighestBid());
log.info(dateString + " : " + valueString);
}
}).name("Highest Bid Logger");
接收器会执行,但当然不会 window 编辑结果,因此它们对于我的用例来说是不正确的。但这表明我的 windowing 逻辑有问题,但我不知道是什么。
版本:
JDK 1.8 弗林克 1.11.2
我认为造成此问题的原因是您的自定义源生成的时间戳以秒为单位,而 window 持续时间始终以毫秒为单位。尝试改变
ctx.collectWithTimestamp(tickerPrice, instant.getEpochSecond());
到
ctx.collectWithTimestamp(tickerPrice, instant.getEpochMilli());
我还会建议一些其他(基本上不相关的)更改。
streamExecutionEnvironment.addSource(new GdaxSourceFunction())
.name("Gdax Exchange Price Source")
.uid("source")
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<TickerPrice>forBoundedOutOfOrderness(Duration.ofMillis(3500))
)
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100)))
.reduce((ReduceFunction<TickerPrice>) (value1, value2) ->
value1.getHighestBid() > value2.getHighestBid() ? value1 : value2)
.uid("window")
.addSink(new SinkFunction<TickerPrice>() { ... }
.uid("sink")
注意以下建议:
- 删除
BoundedOutOfOrdernessGenerator
。无需重新实现内置的有界无序水印生成器。 - 删除 window 触发器。貌似不需要重写默认的触发器,弄错了会出问题。
- 为每个有状态运算符添加 UID。如果您想在更改作业拓扑后对应用程序进行状态升级,则需要这些。 (您当前的接收器不是有状态的,但向其添加 UID 不会有什么坏处。)