KSQL:我可以在 KSQL UDF 函数中使用线程来加速这个过程吗?

KSQL: Could I use threads in KSQL UDF functions to speed up the process?

我在 3 节点中 运行 独立 ksql-serverKafka 3 节点集群通信。从具有 15 个分区的 Topic 创建了一个 Stream,并且数据在 Stream 中以进行一些扩充。得到一段代码 UDF 来查找 IP2Location.bin 文件,UDF class 看起来像:

import java.io.IOException;
import java.util.Map;

import com.google.gson.Gson;

import io.confluent.common.Configurable;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;

@UdfDescription(name = "Ip2Lookup", description = "Lookup class for IP2Location database.")
public class Ip2Lookup implements Configurable {

    private IP2Location loc = null;
    private Gson gson = null;

    @Udf(description = "fetches the geoloc of the ipaddress.")
    public synchronized String ip2lookup(String ip) {

        String json = null;
        if (loc != null) {
            IP2LocationResult result = null;
            try {
                result = loc.query(ip);
                System.out.println(result);
                json = gson.toJson(result);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return json;
        }
        return ip;
    }

    @Override
    public void configure(Map<String, ?> arg0) {

        try {
            String db_path = null;
            String os = System.getProperty("os.name").toLowerCase();

            db_path = "/data/md0/ip2loc/ipv4-bin/IP-COUNTRY-REGION-CITY-LATITUDE-LONGITUDE-ZIPCODE-TIMEZONE-ISP-DOMAIN-NETSPEED-AREACODE-WEATHER-MOBILE-ELEVATION-USAGETYPE.BIN";

            loc = new IP2Location(db_path);
            gson = new Gson();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

数据进入 Topic 和进入 Stream 的速度非常快(每秒可能有一百万条记录)。在方法上使用 synchronized 后,每个 ksql-server 节点的速度为每秒 3000 records/messages。以这种速度,你知道,要赶上这个速度需要时间。如果没有 synchronized 方法,我会看到损坏的数据,因为单个 object/method 被多个线程左右使用。

问题 1:KSQL 的 udf 调用 called/invoked 究竟如何?

问题 2:我可以使用线程处理 udf 中的请求吗?

问题 3:Topic/Stream 有 15 个分区,我是否应该启动 ksql-servers 的 15 个节点?

谢谢。

Question1: How exactly the udf call would be called/invoked by KSQL?

不确定你的意思。一旦您的 UDF 可用于 KSQL(参见 https://docs.confluent.io/current/ksql/docs/developer-guide/udf.html#deploying),您可以在 KSQL 语句中调用 UDF 作为 IP2LOOKUP。你也可以在KSQL中运行SHOW FUNCTIONS确认你的UDF可以使用

也许你问的是下一个问题? KSQL 将一次调用您的 UDF。

Question2: Could I use threads handling the requests in udf?

你为什么要这样做?您是否担心使用您当前的 UDF 代码的 KSQL 无法处理传入的数据量?说到这里,您尝试处理的预期数据量,因为您可能正在尝试进行过早优化?

另外,在不了解更多细节的情况下,我不认为你的 UDF 的多线程设置会产生任何优势,因为 UDF 在被调用时仍然一次只能处理一条消息(每个 KSQL 服务器或者,更准确地说,每个流任务,每个 KSQL 服务器可以有很多任务;我提到这个是为了清楚地表明 KSQL 中的 UDF 不会 通过只处理一个跨所有服务器的单个消息;处理当然是分布式的并且是并行发生的)。

Question3: Being the Topic/Stream is of 15 partitions, should I spin-up 15 nodes of ksql-servers?

这取决于您的数据量。您可以根据需要旋转任意数量的 KSQL 服务器。如果数据量不大,单台 KSQL 服务器可能就足够了。如果数据量更高,您可以开始启动额外的 KSQL 服务器,最多 15 个服务器(因为输入主题有 15 个分区)。任何额外的 KSQL 服务器都将处于空闲状态。

在15台KSQL服务器不够用的情况下,您应该将输入主题的分区数从15增加到更高的数量,然后您也可以启动更多的KSQL服务器(从而增加您设置的计算能力)。