使用akka流下载指定文件?
Download specified file with akka streams?
对于下面的代码,我需要客户端将文件名发送给服务器,然后服务器才能回复客户端指定文件的内容。如果我不指定文件名,它可以工作,只是在服务器部分硬编码,但客户端如何告诉服务器它需要指定的文件?
Server.scala
package com.tst
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Tcp.{IncomingConnection, ServerBinding}
import akka.stream.scaladsl._
import akka.util.ByteString
import scala.concurrent.Future
object Server extends App {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind("127.0.0.1", 9989)
connections runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
var fileName = ""
// NOTE: here, the fileName = item can not affect the latter runWith,
// I want to find solution
val go = Flow[ByteString].via(connection.flow).map(_.utf8String).map {
item => println(item); fileName = item
}.runWith(FileIO.fromPath(Paths.get(fileName)), Sink.ignore)
}
}
Client.scala
package com.tst
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString
object Client extends App {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val connection = Tcp().outgoingConnection("127.0.0.1", 9989)
val f = Source(List("D:/testinput.txt")).map(ByteString(_)).via(connection).
runWith(FileIO.toPath(Paths.get("D:/testoutput.txt")))
f.foreach {
_ =>
println("done")
system.terminate()
}
}
build.sbt
name := "streamtest"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % "2.4.20"
)
在服务器端,您可以将 handleWith
方法与 Flow
一起用于 Connection
,后者接收文件名并生成文件行:
import akka.stream.scaladsl.FileIO
import java.nio.file.Paths
val fileNameToContentsFlow : Flow[ByteString, ByteString, _] =
Flow[ByteString]
.map(_.utf8String)
.take(1L)
.map(fileName => Paths.get(fileName))
.flatMapConcat( filePath => FileIO.fromPath(filePath) )
注意:我添加了 .take(1L)
以便客户端每个连接只能访问 1 个文件。这可以修改为每个连接处理多个文件,但随后需要在每个单独文件的内容之间插入适当的分隔符。
您的服务器代码将稍作修改
connections runForeach { connection =>
connection.handleWith(fileNameToContentsFlow)
}
对于下面的代码,我需要客户端将文件名发送给服务器,然后服务器才能回复客户端指定文件的内容。如果我不指定文件名,它可以工作,只是在服务器部分硬编码,但客户端如何告诉服务器它需要指定的文件?
Server.scala
package com.tst
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Tcp.{IncomingConnection, ServerBinding}
import akka.stream.scaladsl._
import akka.util.ByteString
import scala.concurrent.Future
object Server extends App {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind("127.0.0.1", 9989)
connections runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
var fileName = ""
// NOTE: here, the fileName = item can not affect the latter runWith,
// I want to find solution
val go = Flow[ByteString].via(connection.flow).map(_.utf8String).map {
item => println(item); fileName = item
}.runWith(FileIO.fromPath(Paths.get(fileName)), Sink.ignore)
}
}
Client.scala
package com.tst
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString
object Client extends App {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val connection = Tcp().outgoingConnection("127.0.0.1", 9989)
val f = Source(List("D:/testinput.txt")).map(ByteString(_)).via(connection).
runWith(FileIO.toPath(Paths.get("D:/testoutput.txt")))
f.foreach {
_ =>
println("done")
system.terminate()
}
}
build.sbt
name := "streamtest"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % "2.4.20"
)
在服务器端,您可以将 handleWith
方法与 Flow
一起用于 Connection
,后者接收文件名并生成文件行:
import akka.stream.scaladsl.FileIO
import java.nio.file.Paths
val fileNameToContentsFlow : Flow[ByteString, ByteString, _] =
Flow[ByteString]
.map(_.utf8String)
.take(1L)
.map(fileName => Paths.get(fileName))
.flatMapConcat( filePath => FileIO.fromPath(filePath) )
注意:我添加了 .take(1L)
以便客户端每个连接只能访问 1 个文件。这可以修改为每个连接处理多个文件,但随后需要在每个单独文件的内容之间插入适当的分隔符。
您的服务器代码将稍作修改
connections runForeach { connection =>
connection.handleWith(fileNameToContentsFlow)
}