使用 MockedStreams 的单元测试 Kafka Streams 失败,简单的 mapValues 转换为大写

Unit test Kafka Streams with MockedStreams fails with simple mapValues to convert in upper case

我正在尝试使用 MockedStreams 库对 Kafka Stream 进行单元测试。

为了测试一个简单的例子,我只想将字符串转换为大写

我试试下面的代码:

import com.madewithtea.mockedstreams.MockedStreams
import org.apache.kafka.common.serialization.Serdes
import org.scalatest.{Matchers, WordSpec}

class mockedStreamsSpec extends WordSpec with Matchers {

  val input = Seq(("x", "foo"), ("y", "bar"))
  val exp = Seq(("x", "FOO"), ("y", "BAR"))

  "Put in upper case " should {
    "always return value in upper case" in {
      MockedStreams()
        .topology {
          builder =>
            builder.stream[String, String]("topic-in")
              .mapValues[String](_.toUpperCase())
              .to("topic-out")
        }
        .input("topic-in", Serdes.String(), Serdes.String(), input)
        .output("topic-out", Serdes.String(), Serdes.String(), exp.size) shouldEqual exp
    }
  }
}

我收到一个错误 java.lang.String :

[info] mockedStreamsSpec:
[info] Put in upper case
[info] - should always return value in upper case *** FAILED ***
  [info]   java.lang.ClassCastException: [B cannot be cast to java.lang.String
  [info]   at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
[info]   at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
[info]   at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
[info]   at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
[info]   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
[info]   at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
[info]   at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
[info]   at org.apache.kafka.test.ProcessorTopologyTestDriver.process(ProcessorTopologyTestDriver.java:276)
[info]   at org.apache.kafka.test.ProcessorTopologyTestDriver.process(ProcessorTopologyTestDriver.java:315)
[info]   at com.madewithtea.mockedstreams.MockedStreams$Builder.$anonfun$produce(MockedStreams.scala:110)

这看起来很奇怪,因为 Scala String 和 Java String 应该是相同的。

我从未使用过这个 MockedStreams 库,但是异常“[B cannot be cast to java.lang.String”意味着一个字节数组正在被转换为一个字符串。我建议您尝试使用 builder.stream() 方法的变体(在拓扑内部),它允许您明确指定要使用的 Serdes,因为它接缝了提供给 input() 方法的那些没有被使用。我猜这些只是用于将测试数据序列化到流上。