KSQL:我可以在 KSQL UDF 函数中使用线程来加速这个过程吗?
KSQL: Could I use threads in KSQL UDF functions to speed up the process?
我在 3 节点中 运行 独立 ksql-server
与 Kafka
3 节点集群通信。从具有 15 个分区的 Topic
创建了一个 Stream
,并且数据在 Stream 中以进行一些扩充。得到一段代码 UDF
来查找 IP2Location.bin 文件,UDF
class 看起来像:
import java.io.IOException;
import java.util.Map;
import com.google.gson.Gson;
import io.confluent.common.Configurable;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
@UdfDescription(name = "Ip2Lookup", description = "Lookup class for IP2Location database.")
public class Ip2Lookup implements Configurable {
private IP2Location loc = null;
private Gson gson = null;
@Udf(description = "fetches the geoloc of the ipaddress.")
public synchronized String ip2lookup(String ip) {
String json = null;
if (loc != null) {
IP2LocationResult result = null;
try {
result = loc.query(ip);
System.out.println(result);
json = gson.toJson(result);
} catch (IOException e) {
e.printStackTrace();
}
return json;
}
return ip;
}
@Override
public void configure(Map<String, ?> arg0) {
try {
String db_path = null;
String os = System.getProperty("os.name").toLowerCase();
db_path = "/data/md0/ip2loc/ipv4-bin/IP-COUNTRY-REGION-CITY-LATITUDE-LONGITUDE-ZIPCODE-TIMEZONE-ISP-DOMAIN-NETSPEED-AREACODE-WEATHER-MOBILE-ELEVATION-USAGETYPE.BIN";
loc = new IP2Location(db_path);
gson = new Gson();
} catch (IOException e) {
e.printStackTrace();
}
}
}
数据进入 Topic
和进入 Stream
的速度非常快(每秒可能有一百万条记录)。在方法上使用 synchronized
后,每个 ksql-server
节点的速度为每秒 3000 records/messages。以这种速度,你知道,要赶上这个速度需要时间。如果没有 synchronized
方法,我会看到损坏的数据,因为单个 object/method 被多个线程左右使用。
问题 1:KSQL 的 udf
调用 called/invoked 究竟如何?
问题 2:我可以使用线程处理 udf
中的请求吗?
问题 3:Topic/Stream 有 15 个分区,我是否应该启动 ksql-servers
的 15 个节点?
谢谢。
Question1: How exactly the udf call would be called/invoked by KSQL?
不确定你的意思。一旦您的 UDF 可用于 KSQL(参见 https://docs.confluent.io/current/ksql/docs/developer-guide/udf.html#deploying),您可以在 KSQL 语句中调用 UDF 作为 IP2LOOKUP
。你也可以在KSQL中运行SHOW FUNCTIONS
确认你的UDF可以使用
也许你问的是下一个问题? KSQL 将一次调用您的 UDF。
Question2: Could I use threads handling the requests in udf?
你为什么要这样做?您是否担心使用您当前的 UDF 代码的 KSQL 无法处理传入的数据量?说到这里,是您尝试处理的预期数据量,因为您可能正在尝试进行过早优化?
另外,在不了解更多细节的情况下,我不认为你的 UDF 的多线程设置会产生任何优势,因为 UDF 在被调用时仍然一次只能处理一条消息(每个 KSQL 服务器或者,更准确地说,每个流任务,每个 KSQL 服务器可以有很多任务;我提到这个是为了清楚地表明 KSQL 中的 UDF 不会 通过只处理一个跨所有服务器的单个消息;处理当然是分布式的并且是并行发生的)。
Question3: Being the Topic/Stream is of 15 partitions, should I spin-up 15 nodes of ksql-servers?
这取决于您的数据量。您可以根据需要旋转任意数量的 KSQL 服务器。如果数据量不大,单台 KSQL 服务器可能就足够了。如果数据量更高,您可以开始启动额外的 KSQL 服务器,最多 15 个服务器(因为输入主题有 15 个分区)。任何额外的 KSQL 服务器都将处于空闲状态。
在15台KSQL服务器不够用的情况下,您应该将输入主题的分区数从15增加到更高的数量,然后您也可以启动更多的KSQL服务器(从而增加您设置的计算能力)。
我在 3 节点中 运行 独立 ksql-server
与 Kafka
3 节点集群通信。从具有 15 个分区的 Topic
创建了一个 Stream
,并且数据在 Stream 中以进行一些扩充。得到一段代码 UDF
来查找 IP2Location.bin 文件,UDF
class 看起来像:
import java.io.IOException;
import java.util.Map;
import com.google.gson.Gson;
import io.confluent.common.Configurable;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
@UdfDescription(name = "Ip2Lookup", description = "Lookup class for IP2Location database.")
public class Ip2Lookup implements Configurable {
private IP2Location loc = null;
private Gson gson = null;
@Udf(description = "fetches the geoloc of the ipaddress.")
public synchronized String ip2lookup(String ip) {
String json = null;
if (loc != null) {
IP2LocationResult result = null;
try {
result = loc.query(ip);
System.out.println(result);
json = gson.toJson(result);
} catch (IOException e) {
e.printStackTrace();
}
return json;
}
return ip;
}
@Override
public void configure(Map<String, ?> arg0) {
try {
String db_path = null;
String os = System.getProperty("os.name").toLowerCase();
db_path = "/data/md0/ip2loc/ipv4-bin/IP-COUNTRY-REGION-CITY-LATITUDE-LONGITUDE-ZIPCODE-TIMEZONE-ISP-DOMAIN-NETSPEED-AREACODE-WEATHER-MOBILE-ELEVATION-USAGETYPE.BIN";
loc = new IP2Location(db_path);
gson = new Gson();
} catch (IOException e) {
e.printStackTrace();
}
}
}
数据进入 Topic
和进入 Stream
的速度非常快(每秒可能有一百万条记录)。在方法上使用 synchronized
后,每个 ksql-server
节点的速度为每秒 3000 records/messages。以这种速度,你知道,要赶上这个速度需要时间。如果没有 synchronized
方法,我会看到损坏的数据,因为单个 object/method 被多个线程左右使用。
问题 1:KSQL 的 udf
调用 called/invoked 究竟如何?
问题 2:我可以使用线程处理 udf
中的请求吗?
问题 3:Topic/Stream 有 15 个分区,我是否应该启动 ksql-servers
的 15 个节点?
谢谢。
Question1: How exactly the udf call would be called/invoked by KSQL?
不确定你的意思。一旦您的 UDF 可用于 KSQL(参见 https://docs.confluent.io/current/ksql/docs/developer-guide/udf.html#deploying),您可以在 KSQL 语句中调用 UDF 作为 IP2LOOKUP
。你也可以在KSQL中运行SHOW FUNCTIONS
确认你的UDF可以使用
也许你问的是下一个问题? KSQL 将一次调用您的 UDF。
Question2: Could I use threads handling the requests in udf?
你为什么要这样做?您是否担心使用您当前的 UDF 代码的 KSQL 无法处理传入的数据量?说到这里,是您尝试处理的预期数据量,因为您可能正在尝试进行过早优化?
另外,在不了解更多细节的情况下,我不认为你的 UDF 的多线程设置会产生任何优势,因为 UDF 在被调用时仍然一次只能处理一条消息(每个 KSQL 服务器或者,更准确地说,每个流任务,每个 KSQL 服务器可以有很多任务;我提到这个是为了清楚地表明 KSQL 中的 UDF 不会 通过只处理一个跨所有服务器的单个消息;处理当然是分布式的并且是并行发生的)。
Question3: Being the Topic/Stream is of 15 partitions, should I spin-up 15 nodes of ksql-servers?
这取决于您的数据量。您可以根据需要旋转任意数量的 KSQL 服务器。如果数据量不大,单台 KSQL 服务器可能就足够了。如果数据量更高,您可以开始启动额外的 KSQL 服务器,最多 15 个服务器(因为输入主题有 15 个分区)。任何额外的 KSQL 服务器都将处于空闲状态。
在15台KSQL服务器不够用的情况下,您应该将输入主题的分区数从15增加到更高的数量,然后您也可以启动更多的KSQL服务器(从而增加您设置的计算能力)。