使用akka流下载指定文件?

Download specified file with akka streams?

对于下面的代码,我需要客户端将文件名发送给服务器,然后服务器才能回复客户端指定文件的内容。如果我不指定文件名,它可以工作,只是在服务器部分硬编码,但客户端如何告诉服务器它需要指定的文件?

Server.scala

package com.tst

import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Tcp.{IncomingConnection, ServerBinding}
import akka.stream.scaladsl._
import akka.util.ByteString
import scala.concurrent.Future

object Server extends App {
  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val connections: Source[IncomingConnection, Future[ServerBinding]] =
    Tcp().bind("127.0.0.1", 9989)
  connections runForeach { connection =>
    println(s"New connection from: ${connection.remoteAddress}")
    var fileName = ""

    // NOTE: here, the fileName = item can not affect the latter runWith,
    // I want to find solution
    val go = Flow[ByteString].via(connection.flow).map(_.utf8String).map {
      item => println(item); fileName = item
    }.runWith(FileIO.fromPath(Paths.get(fileName)), Sink.ignore)
  }
}

Client.scala

package com.tst

import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString

object Client extends App {
  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val connection = Tcp().outgoingConnection("127.0.0.1", 9989)
  val f = Source(List("D:/testinput.txt")).map(ByteString(_)).via(connection).
    runWith(FileIO.toPath(Paths.get("D:/testoutput.txt")))
  f.foreach {
    _ =>
      println("done")
      system.terminate()
  }
}

build.sbt

name := "streamtest"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream" % "2.4.20"
)

在服务器端,您可以将 handleWith 方法与 Flow 一起用于 Connection,后者接收文件名并生成文件行:

import akka.stream.scaladsl.FileIO
import java.nio.file.Paths

val fileNameToContentsFlow : Flow[ByteString, ByteString, _] = 
  Flow[ByteString]
    .map(_.utf8String)
    .take(1L)
    .map(fileName => Paths.get(fileName))
    .flatMapConcat( filePath => FileIO.fromPath(filePath) )

注意:我添加了 .take(1L) 以便客户端每个连接只能访问 1 个文件。这可以修改为每个连接处理多个文件,但随后需要在每个单独文件的内容之间插入适当的分隔符。

您的服务器代码将稍作修改

connections runForeach { connection =>

  connection.handleWith(fileNameToContentsFlow)
}