如何创建一个新的 DistributedFunction

How to create a new DistributedFunction

这种新的编程范式对我来说很陌生。我想用给定 class 中定义的 DistributedFunction 替换 .map() 中的匿名函数。但是我不确定如何创建新函数。

我有以下管道:

p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
 .map(e -> {
    Gson gson = new Gson();

    KafkaMessage kafkaMessage = gson.fromJson(e.getValue().toString(), 
    KafkaMessage.class);

    byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());

    try {
       kafkaMessage.setData(new String(encodedData, "utf-8"));
    } catch (Exception e1) {
       // TODO Auto-generated catch block
       e1.printStackTrace();
    }

    return kafkaMessage;             
  })
 .map(m -> m.getData())
 .drainTo(Sinks.logger());

根据一些 Jet 示例,我得出以下结论:

p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
 .map(KafkaHelper::decodeKafkaMessage)
 .map(m -> m.getData())
 .drainTo(Sinks.logger());

KafkaHelper class:

public final class KafkaHelper implements Serializable {

    private static final long serialVersionUID = -3556269069192202060L;

    public static KafkaMessage decodeKafkaMessage(Map.Entry<Object,Object> entry) {

        Gson gson = new Gson();

        KafkaMessage kafkaMessage = gson.fromJson(entry.getValue().toString(), KafkaMessage.class);

        byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());

        try {
            kafkaMessage.setData(new String(encodedData, "utf-8"));
        } catch (UnsupportedEncodingException e) {
            System.out.println(e.getMessage());
            e.printStackTrace();
        }

         return kafkaMessage;            
    }   

}

这种方法是否遵循 specification/requirement 将 DistributedFunction 传递给 .map()?如果是,为什么?如果不是,我应该对其进行哪些更改?

是的,在您的两个示例中,您都在创建 DistributedFunction 的实例并将其传递给 map()。 Java 8 有一个规则,第一个示例中的 lambda 函数和第二个示例中的方法引用用于创建 DistributedFunction 的合成子类型,该子类型实现其单一抽象方法("SAM") 使用您提供的代码。

您的 KafkaHelper 不必是 Serializable 因为您从不实例化它。您还可以将静态方法 decodeKafkaMessage 放在任何其他 class 中,因为它不依赖于 class 实例。