Databricks to_avro 只有在没有指定事件名称和命名空间的情况下注册架构时才有效

Databricks to_avro works only if schema is registered without specified event name and namespace

我将 databricks runtime 10.0 与 spark 3.2.0 和 scala 2.12 一起使用。我还依赖于 io.confluent:kafka-schema-registry-client:6.2.0,从中我使用 CachedSchemaRegistryClient 在模式注册表中注册模式,如下所示:


import org.apache.avro.Schema
import io.confluent.kafka.schemaregistry.avro.AvroSchema

@transient lazy val reg = new CachedSchemaRegistryClient(schemaRegistryUrl, schemaRegistryCacheCapacity, null)

  def registerSchema(name: String, schema: Schema): Int = {
    reg.register(name, new AvroSchema(schema))
  }

现在,这在 Spark 中按预期工作:

val dataframe = ...
val schema = toAvroType(dataframe.schema)
schemaRegistry.registerSchema("some_name", schema)

          

display(dataframe
       .select(struct(/*some fields*/).alias("body"))
       .select(to_avro('body, lit("some_name"), schemaRegistryUrl).as("body")))

而且我还可以反序列化。现在,只要我进行以下更改以指定正确的架构名称和命名空间:

val schema = toAvroType(dataframe.schema, true, "some_name", "com.some.namespace")

Spark 失败

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 853.0 failed 4 times, most recent failure: Lost task 2.3 in stage 853.0 (TID 21433) (10.206.5.9 executor driver): org.spark_project.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
    at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
    at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
    at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:284)
    at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:272)

尽管在这两种情况下,我都能看到在模式注册表中正确注册的模式。知道为什么第二种情况不起作用吗?

这些是抛出异常的代码行 (186-192)

String requestUrl = buildRequestUrl(baseUrl, path);
  try {
    return sendHttpRequest(requestUrl, method, requestBodyData, requestProperties, responseFormat);
  } catch (IOException e) {
    baseUrls.fail(baseUrl);
    if (i == n-1) throw e; // Raise the exception since we have no more urls to try
  }

第 191 行是注释行

if (i == n-1) throw e;// Raise the exception since we have no more urls to try

因此,Worker/Driver 个节点根本无法连接到架构注册表客户端。

参考 Spark 3.2.0 代码 here

来自https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html

"如果to_avro的默认输出模式匹配目标主题的模式...否则,您必须在to_avro函数中提供目标主题的模式:"

 to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))

所以 to_avro 3 个参数是有效的,因为转换后的输出 Avro 模式是记录类型,记录名称是 topLevelRecord 并且默认没有命名空间。

对于指定的名称和命名空间,它不再匹配,因此还需要指定架构本身。