使用 MockedStreams 的单元测试 Kafka Streams 失败,简单的 mapValues 转换为大写
Unit test Kafka Streams with MockedStreams fails with simple mapValues to convert in upper case
我正在尝试使用 MockedStreams 库对 Kafka Stream 进行单元测试。
为了测试一个简单的例子,我只想将字符串转换为大写。
我试试下面的代码:
import com.madewithtea.mockedstreams.MockedStreams
import org.apache.kafka.common.serialization.Serdes
import org.scalatest.{Matchers, WordSpec}
class mockedStreamsSpec extends WordSpec with Matchers {
val input = Seq(("x", "foo"), ("y", "bar"))
val exp = Seq(("x", "FOO"), ("y", "BAR"))
"Put in upper case " should {
"always return value in upper case" in {
MockedStreams()
.topology {
builder =>
builder.stream[String, String]("topic-in")
.mapValues[String](_.toUpperCase())
.to("topic-out")
}
.input("topic-in", Serdes.String(), Serdes.String(), input)
.output("topic-out", Serdes.String(), Serdes.String(), exp.size) shouldEqual exp
}
}
}
我收到一个错误 java.lang.String
:
[info] mockedStreamsSpec:
[info] Put in upper case
[info] - should always return value in upper case *** FAILED ***
[info] java.lang.ClassCastException: [B cannot be cast to java.lang.String
[info] at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
[info] at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
[info] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
[info] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
[info] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
[info] at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
[info] at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
[info] at org.apache.kafka.test.ProcessorTopologyTestDriver.process(ProcessorTopologyTestDriver.java:276)
[info] at org.apache.kafka.test.ProcessorTopologyTestDriver.process(ProcessorTopologyTestDriver.java:315)
[info] at com.madewithtea.mockedstreams.MockedStreams$Builder.$anonfun$produce(MockedStreams.scala:110)
这看起来很奇怪,因为 Scala String 和 Java String 应该是相同的。
我从未使用过这个 MockedStreams 库,但是异常“[B cannot be cast to java.lang.String”意味着一个字节数组正在被转换为一个字符串。我建议您尝试使用 builder.stream() 方法的变体(在拓扑内部),它允许您明确指定要使用的 Serdes,因为它接缝了提供给 input() 方法的那些没有被使用。我猜这些只是用于将测试数据序列化到流上。
我正在尝试使用 MockedStreams 库对 Kafka Stream 进行单元测试。
为了测试一个简单的例子,我只想将字符串转换为大写。
我试试下面的代码:
import com.madewithtea.mockedstreams.MockedStreams
import org.apache.kafka.common.serialization.Serdes
import org.scalatest.{Matchers, WordSpec}
class mockedStreamsSpec extends WordSpec with Matchers {
val input = Seq(("x", "foo"), ("y", "bar"))
val exp = Seq(("x", "FOO"), ("y", "BAR"))
"Put in upper case " should {
"always return value in upper case" in {
MockedStreams()
.topology {
builder =>
builder.stream[String, String]("topic-in")
.mapValues[String](_.toUpperCase())
.to("topic-out")
}
.input("topic-in", Serdes.String(), Serdes.String(), input)
.output("topic-out", Serdes.String(), Serdes.String(), exp.size) shouldEqual exp
}
}
}
我收到一个错误 java.lang.String
:
[info] mockedStreamsSpec:
[info] Put in upper case
[info] - should always return value in upper case *** FAILED ***
[info] java.lang.ClassCastException: [B cannot be cast to java.lang.String
[info] at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
[info] at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
[info] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
[info] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
[info] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
[info] at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
[info] at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
[info] at org.apache.kafka.test.ProcessorTopologyTestDriver.process(ProcessorTopologyTestDriver.java:276)
[info] at org.apache.kafka.test.ProcessorTopologyTestDriver.process(ProcessorTopologyTestDriver.java:315)
[info] at com.madewithtea.mockedstreams.MockedStreams$Builder.$anonfun$produce(MockedStreams.scala:110)
这看起来很奇怪,因为 Scala String 和 Java String 应该是相同的。
我从未使用过这个 MockedStreams 库,但是异常“[B cannot be cast to java.lang.String”意味着一个字节数组正在被转换为一个字符串。我建议您尝试使用 builder.stream() 方法的变体(在拓扑内部),它允许您明确指定要使用的 Serdes,因为它接缝了提供给 input() 方法的那些没有被使用。我猜这些只是用于将测试数据序列化到流上。