Scala 的 Kafka 消费者中弃用的 poll() 方法的替代方法是什么?
What is the alternative to deprecated poll() method in a Kafka Consumer in Scala?
我正在尝试创建一个 Scala 消费者,如下所示:
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import java.util.ArrayList
import scala.concurrent.duration._
object ScalaConsumer {
def subscribePartitions() = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringDeSerializer")
val consumer = new KafkaConsumer[String, String](props)
val partitionList:ArrayList[TopicPartition] = new ArrayList[TopicPartition]()
val topicPartition1 = new TopicPartition("topicr1p3", 0)
val topicPartition2 = new TopicPartition("othertopicr1p3", 2)
partitionList.add(topicPartition1)
partitionList.add(topicPartition2)
consumer.assign(partitionList)
try {
val records:ConsumerRecords[String, String] = consumer.poll(10)
} catch {
case e:Exception => e.printStackTrace()
}
}
def main(args: Array[String]): Unit = {
}
}
以上示例不完整,因为
中使用的方法 poll()
val records:ConsumerRecords[String, String] = consumer.poll(10)
在 Scala 中已弃用。
它说消息:Symbol poll is deprecated.
所以我使用下图中标记的轮询替代:
在代码中为:
val records:ConsumerRecords[String, String] = consumer.poll(Duration(1000, "millis"))
但是这次,错误信息是:Cannot resolve overloaded method 'poll'
build.sbt内容:
name := "KafkaScalaImplementation"
version := "0.1"
scalaVersion := "2.12.10"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.4.0"
我现在对我应该在这里做什么来使用 poll() 有点困惑。有没有使用轮询方法的新方法?
谁能告诉我如何以正确的方式修复错误?
以下对我有用:
import java.time.Duration
val records:ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(1000))
根据 Duration class 的 Java 文档,您有以下选择:
static Duration of(long amount, TemporalUnit unit)
// Obtains a Duration representing an amount in the specified unit.
static Duration ofDays(long days)
// Obtains a Duration representing a number of standard 24 hour days.
static Duration ofHours(long hours)
// Obtains a Duration representing a number of standard hours.
static Duration ofMillis(long millis)
// Obtains a Duration representing a number of milliseconds.
static Duration ofMinutes(long minutes)
// Obtains a Duration representing a number of standard minutes.
static Duration ofNanos(long nanos)
// Obtains a Duration representing a number of nanoseconds.
static Duration ofSeconds(long seconds)
我正在尝试创建一个 Scala 消费者,如下所示:
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import java.util.ArrayList
import scala.concurrent.duration._
object ScalaConsumer {
def subscribePartitions() = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringDeSerializer")
val consumer = new KafkaConsumer[String, String](props)
val partitionList:ArrayList[TopicPartition] = new ArrayList[TopicPartition]()
val topicPartition1 = new TopicPartition("topicr1p3", 0)
val topicPartition2 = new TopicPartition("othertopicr1p3", 2)
partitionList.add(topicPartition1)
partitionList.add(topicPartition2)
consumer.assign(partitionList)
try {
val records:ConsumerRecords[String, String] = consumer.poll(10)
} catch {
case e:Exception => e.printStackTrace()
}
}
def main(args: Array[String]): Unit = {
}
}
以上示例不完整,因为
中使用的方法 poll() val records:ConsumerRecords[String, String] = consumer.poll(10)
在 Scala 中已弃用。
它说消息:Symbol poll is deprecated.
所以我使用下图中标记的轮询替代:
在代码中为:
val records:ConsumerRecords[String, String] = consumer.poll(Duration(1000, "millis"))
但是这次,错误信息是:Cannot resolve overloaded method 'poll'
build.sbt内容:
name := "KafkaScalaImplementation"
version := "0.1"
scalaVersion := "2.12.10"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.4.0"
我现在对我应该在这里做什么来使用 poll() 有点困惑。有没有使用轮询方法的新方法? 谁能告诉我如何以正确的方式修复错误?
以下对我有用:
import java.time.Duration
val records:ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(1000))
根据 Duration class 的 Java 文档,您有以下选择:
static Duration of(long amount, TemporalUnit unit)
// Obtains a Duration representing an amount in the specified unit.
static Duration ofDays(long days)
// Obtains a Duration representing a number of standard 24 hour days.
static Duration ofHours(long hours)
// Obtains a Duration representing a number of standard hours.
static Duration ofMillis(long millis)
// Obtains a Duration representing a number of milliseconds.
static Duration ofMinutes(long minutes)
// Obtains a Duration representing a number of standard minutes.
static Duration ofNanos(long nanos)
// Obtains a Duration representing a number of nanoseconds.
static Duration ofSeconds(long seconds)