将 GeoLocation Twitter4J 写入 Postgres

Write GeoLocation Twitter4J to Postgres

我正在使用 Twitter4J 和 Akka Streams 提取推文。我选择了几个字段,如 userId、tweetId、推文文本等。此推文实体被写入数据库:

class Counter extends StatusAdapter with Databases{
  implicit val system = ActorSystem("TweetsExtractor")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher
  implicit val LoggingAdapter =
    Logging(system, classOf[Counter])

  val overflowStrategy = OverflowStrategy.backpressure
  val bufferSize = 1000
  val statusSource = Source.queue[Status](
    bufferSize,
    overflowStrategy
  )

  val insertFlow: Flow[Status, Tweet, NotUsed] =
    Flow[Status].map(status => Tweet(status.getId, status.getUser.getId, status.getText, status.getLang,
      status.getFavoriteCount, status.getRetweetCount))
  val insertSink: Sink[Tweet, Future[Done]] = Sink.foreach(tweetRepository.create)
  val insertGraph = statusSource via insertFlow to insertSink
  val queueInsert = insertGraph.run()

  override def onStatus(status: Status) = 
    Await.result(queueInsert.offer(status), Duration.Inf)
}

我的目的是添加位置字段。在 Twitter4J 中有一个特定的 GeoLocation 类型,它包含 double 类型的纬度和经度。但是,当我尝试直接通过流程提取纬度和经度时,没有任何内容写入数据库:

Flow[Status].map(status => Tweet(status.getId, status.getUser.getId, status.getText, status.getLang, status.getFavoriteCount, status.getRetweetCount, status.getGeoLocation.getLatitude, status.getGeoLocation.getLongitude))

出现这种行为的原因可能是什么?我该如何解决?

正如对问题的评论所证实的那样,这里发生的事情是大多数推文都没有附加地理位置数据,这使得这些字段为空并导致了不当行为。

几个简单的空值检查应该可以解决问题。