UDF 原因警告:CachedKafkaConsumer 不是 运行 in UninterruptibleThread (KAFKA-1894)
UDF cause warning: CachedKafkaConsumer is not running in UninterruptibleThread (KAFKA-1894)
在通常的 structured_kafka_wordcount.py 代码中,
当我像下面那样按 udf
将行拆分为单词时,
my_split = udf(lambda x: x.split(' '), ArrayType(StringType()))
words = lines.select(
explode(
my_split(lines.value)
)
)
警告会一直显示:
WARN CachedKafkaConsumer: CachedKafkaConsumer is not running in
UninterruptibleThread. It may hang when CachedKafkaConsumer's methods
are interrupted because of KAFKA-1894
另一方面,当我按 pyspark.sql.functions.split
将行拆分为单词时,一切正常。
words = lines.select(
explode(
split(lines.value, ' ')
)
)
为什么会发生这种情况以及如何修复警告?
这是我在实践中尝试执行的代码:
pattern = "(.+) message repeated (\d) times: \[ (.+)\]"
prog = re.compile(pattern)
def _unfold(x):
ret = []
result = prog.match(x)
if result:
log = " ".join((result.group(1), result.group(3)))
times = result.group(2)
for _ in range(int(times)):
ret.append(log)
else:
ret.append(x)
return ret
_udf = udf(lambda x: _unfold(x), ArrayType(StringType()))
lines = lines.withColumn('value', explode(_udf(lines['value'])))
除了拒绝 Python UDF * 之外,您无法在代码中解决此问题。正如您在异常消息中所读到的那样,UninterruptibleThread
是 Kafka 错误 (KAFKA-1894) 的解决方法,旨在防止中断 KafkaConsumer
.
时的无限循环
它不与 PythonUDFRunner
一起使用(在那里引入特殊情况可能没有意义)。
我个人不会担心,除非您遇到一些相关问题。您的 Python 代码永远不会直接与 KafkaConsumer
交互。如果您遇到任何问题,应该在上游修复 - 在这种情况下,我建议创建一个 JIRA ticket.
* 您的 unfold
函数可以用 SQL 函数重写,但这将是一个 hack。将消息计数添加为整数:
from pyspark.sql.functions import concat_ws, col, expr, coalesce, lit, regexp_extract, when
p = "(.+) message repeated (\d) times: \[ (.+)\]"
lines = spark.createDataFrame(
["asd message repeated 3 times: [ 12]", "some other message"], "string"
)
lines_with_count = lines.withColumn(
"message_count", coalesce(regexp_extract("value", p, 2).cast("int"), lit(1)))
用于explode
exploded = lines_with_count.withColumn(
"i",
expr("explode(split(repeat('1', message_count - 1),''))")
).drop("message_count", "i")
并提取:
exploded.withColumn(
"value",
when(
col("value").rlike(p),
concat_ws(" ", regexp_extract("value", p, 1), regexp_extract("value", p, 3))
).otherwise(col("value"))).show(4, False)
# +------------------+
# |value |
# +------------------+
# |asd 12 |
# |asd 12 |
# |asd 12 |
# |some other message|
# +------------------+
在通常的 structured_kafka_wordcount.py 代码中,
当我像下面那样按 udf
将行拆分为单词时,
my_split = udf(lambda x: x.split(' '), ArrayType(StringType()))
words = lines.select(
explode(
my_split(lines.value)
)
)
警告会一直显示:
WARN CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are interrupted because of KAFKA-1894
另一方面,当我按 pyspark.sql.functions.split
将行拆分为单词时,一切正常。
words = lines.select(
explode(
split(lines.value, ' ')
)
)
为什么会发生这种情况以及如何修复警告?
这是我在实践中尝试执行的代码:
pattern = "(.+) message repeated (\d) times: \[ (.+)\]"
prog = re.compile(pattern)
def _unfold(x):
ret = []
result = prog.match(x)
if result:
log = " ".join((result.group(1), result.group(3)))
times = result.group(2)
for _ in range(int(times)):
ret.append(log)
else:
ret.append(x)
return ret
_udf = udf(lambda x: _unfold(x), ArrayType(StringType()))
lines = lines.withColumn('value', explode(_udf(lines['value'])))
除了拒绝 Python UDF * 之外,您无法在代码中解决此问题。正如您在异常消息中所读到的那样,UninterruptibleThread
是 Kafka 错误 (KAFKA-1894) 的解决方法,旨在防止中断 KafkaConsumer
.
它不与 PythonUDFRunner
一起使用(在那里引入特殊情况可能没有意义)。
我个人不会担心,除非您遇到一些相关问题。您的 Python 代码永远不会直接与 KafkaConsumer
交互。如果您遇到任何问题,应该在上游修复 - 在这种情况下,我建议创建一个 JIRA ticket.
* 您的 unfold
函数可以用 SQL 函数重写,但这将是一个 hack。将消息计数添加为整数:
from pyspark.sql.functions import concat_ws, col, expr, coalesce, lit, regexp_extract, when
p = "(.+) message repeated (\d) times: \[ (.+)\]"
lines = spark.createDataFrame(
["asd message repeated 3 times: [ 12]", "some other message"], "string"
)
lines_with_count = lines.withColumn(
"message_count", coalesce(regexp_extract("value", p, 2).cast("int"), lit(1)))
用于explode
exploded = lines_with_count.withColumn(
"i",
expr("explode(split(repeat('1', message_count - 1),''))")
).drop("message_count", "i")
并提取:
exploded.withColumn(
"value",
when(
col("value").rlike(p),
concat_ws(" ", regexp_extract("value", p, 1), regexp_extract("value", p, 3))
).otherwise(col("value"))).show(4, False)
# +------------------+
# |value |
# +------------------+
# |asd 12 |
# |asd 12 |
# |asd 12 |
# |some other message|
# +------------------+