如何创建一个新的 DistributedFunction
How to create a new DistributedFunction
这种新的编程范式对我来说很陌生。我想用给定 class 中定义的 DistributedFunction
替换 .map()
中的匿名函数。但是我不确定如何创建新函数。
我有以下管道:
p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
.map(e -> {
Gson gson = new Gson();
KafkaMessage kafkaMessage = gson.fromJson(e.getValue().toString(),
KafkaMessage.class);
byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());
try {
kafkaMessage.setData(new String(encodedData, "utf-8"));
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
return kafkaMessage;
})
.map(m -> m.getData())
.drainTo(Sinks.logger());
根据一些 Jet 示例,我得出以下结论:
p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
.map(KafkaHelper::decodeKafkaMessage)
.map(m -> m.getData())
.drainTo(Sinks.logger());
KafkaHelper class:
public final class KafkaHelper implements Serializable {
private static final long serialVersionUID = -3556269069192202060L;
public static KafkaMessage decodeKafkaMessage(Map.Entry<Object,Object> entry) {
Gson gson = new Gson();
KafkaMessage kafkaMessage = gson.fromJson(entry.getValue().toString(), KafkaMessage.class);
byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());
try {
kafkaMessage.setData(new String(encodedData, "utf-8"));
} catch (UnsupportedEncodingException e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
return kafkaMessage;
}
}
这种方法是否遵循 specification/requirement 将 DistributedFunction
传递给 .map()
?如果是,为什么?如果不是,我应该对其进行哪些更改?
是的,在您的两个示例中,您都在创建 DistributedFunction
的实例并将其传递给 map()
。 Java 8 有一个规则,第一个示例中的 lambda 函数和第二个示例中的方法引用用于创建 DistributedFunction
的合成子类型,该子类型实现其单一抽象方法("SAM") 使用您提供的代码。
您的 KafkaHelper
不必是 Serializable
因为您从不实例化它。您还可以将静态方法 decodeKafkaMessage
放在任何其他 class 中,因为它不依赖于 class 实例。
这种新的编程范式对我来说很陌生。我想用给定 class 中定义的 DistributedFunction
替换 .map()
中的匿名函数。但是我不确定如何创建新函数。
我有以下管道:
p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
.map(e -> {
Gson gson = new Gson();
KafkaMessage kafkaMessage = gson.fromJson(e.getValue().toString(),
KafkaMessage.class);
byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());
try {
kafkaMessage.setData(new String(encodedData, "utf-8"));
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
return kafkaMessage;
})
.map(m -> m.getData())
.drainTo(Sinks.logger());
根据一些 Jet 示例,我得出以下结论:
p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
.map(KafkaHelper::decodeKafkaMessage)
.map(m -> m.getData())
.drainTo(Sinks.logger());
KafkaHelper class:
public final class KafkaHelper implements Serializable {
private static final long serialVersionUID = -3556269069192202060L;
public static KafkaMessage decodeKafkaMessage(Map.Entry<Object,Object> entry) {
Gson gson = new Gson();
KafkaMessage kafkaMessage = gson.fromJson(entry.getValue().toString(), KafkaMessage.class);
byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());
try {
kafkaMessage.setData(new String(encodedData, "utf-8"));
} catch (UnsupportedEncodingException e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
return kafkaMessage;
}
}
这种方法是否遵循 specification/requirement 将 DistributedFunction
传递给 .map()
?如果是,为什么?如果不是,我应该对其进行哪些更改?
是的,在您的两个示例中,您都在创建 DistributedFunction
的实例并将其传递给 map()
。 Java 8 有一个规则,第一个示例中的 lambda 函数和第二个示例中的方法引用用于创建 DistributedFunction
的合成子类型,该子类型实现其单一抽象方法("SAM") 使用您提供的代码。
您的 KafkaHelper
不必是 Serializable
因为您从不实例化它。您还可以将静态方法 decodeKafkaMessage
放在任何其他 class 中,因为它不依赖于 class 实例。