ZIO 监视文件系统事件
ZIO watch file system events
帮助我如何在 ZIO 上组织目录扫描。这是我的版本,但它不跟踪所有文件创建事件(遗漏一些事件)。
object Main extends App {
val program = for {
stream <- ZIO.succeed(waitEvents)
_ <- stream.run(ZSink.foreach(k => putStrLn(k.map(e => (e.kind(), e.context())).mkString("\n"))))
} yield ()
val managedWatchService = ZManaged.make {
for {
watchService <- FileSystem.default.newWatchService
path = Path("c:/temp")
_ <- path.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE
)
} yield watchService
}(_.close.orDie)
val lookKey = ZManaged.make {
managedWatchService.use(watchService => watchService.take)
}(_.reset)
val waitEvents = ZStream.fromEffect {
lookKey.use(key => key.pollEvents)
}.repeat(Schedule.forever)
override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, ExitCode] =
program
.provideLayer(Console.live ++ Blocking.live ++ Clock.live)
.exitCode
}
感谢您的建议。
每次轮询事件时,您都在强制 WatchService
关闭并重新创建。由于这可能涉及一些系统句柄,因此它可能相当慢,因此您可能会丢失其间发生的文件事件。您更有可能希望生成 WatchService
一次,然后重复轮询。我建议改为这样:
object Main extends App {
val managedWatchService = ZManaged.make {
for {
watchService <- FileSystem.default.newWatchService
path = Path("c:/temp")
_ <- path.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE
)
} yield watchService
}(_.close.orDie)
// Convert ZManaged[R, E, ZStream[R, E, A]] into ZStream[R, E, A]
val waitEvents = ZStream.unwrapManaged(
managedWatchService.mapM(_.take).map { key =>
// Use simple effect composition instead of a managed for readability.
ZStream.repeatEffect(key.pollEvents <* key.reset)
// Optional: Flatten the `List` of values that is returned
.flattenIterables
}
)
val program = waitEvents
.map(e => (e.kind(), e.context()).toString)
.foreach(putStrLn).unit
override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, ExitCode] =
program
.provideLayer(Console.live ++ Blocking.live ++ Clock.live)
.exitCode
}
另请注意,在使用 ZManaged
时,您可能不想这样做
ZManaged.make(otherManaged.use(doSomething))(tearDown)
因为你会导致终结器乱序执行。 ZManaged
已经可以通过正常的 flatMap
组合来处理拆解的顺序。
otherManaged.flatMap { other => ZManaged.make(doSomething(other))(tearDown) }
帮助我如何在 ZIO 上组织目录扫描。这是我的版本,但它不跟踪所有文件创建事件(遗漏一些事件)。
object Main extends App {
val program = for {
stream <- ZIO.succeed(waitEvents)
_ <- stream.run(ZSink.foreach(k => putStrLn(k.map(e => (e.kind(), e.context())).mkString("\n"))))
} yield ()
val managedWatchService = ZManaged.make {
for {
watchService <- FileSystem.default.newWatchService
path = Path("c:/temp")
_ <- path.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE
)
} yield watchService
}(_.close.orDie)
val lookKey = ZManaged.make {
managedWatchService.use(watchService => watchService.take)
}(_.reset)
val waitEvents = ZStream.fromEffect {
lookKey.use(key => key.pollEvents)
}.repeat(Schedule.forever)
override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, ExitCode] =
program
.provideLayer(Console.live ++ Blocking.live ++ Clock.live)
.exitCode
}
感谢您的建议。
每次轮询事件时,您都在强制 WatchService
关闭并重新创建。由于这可能涉及一些系统句柄,因此它可能相当慢,因此您可能会丢失其间发生的文件事件。您更有可能希望生成 WatchService
一次,然后重复轮询。我建议改为这样:
object Main extends App {
val managedWatchService = ZManaged.make {
for {
watchService <- FileSystem.default.newWatchService
path = Path("c:/temp")
_ <- path.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE
)
} yield watchService
}(_.close.orDie)
// Convert ZManaged[R, E, ZStream[R, E, A]] into ZStream[R, E, A]
val waitEvents = ZStream.unwrapManaged(
managedWatchService.mapM(_.take).map { key =>
// Use simple effect composition instead of a managed for readability.
ZStream.repeatEffect(key.pollEvents <* key.reset)
// Optional: Flatten the `List` of values that is returned
.flattenIterables
}
)
val program = waitEvents
.map(e => (e.kind(), e.context()).toString)
.foreach(putStrLn).unit
override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, ExitCode] =
program
.provideLayer(Console.live ++ Blocking.live ++ Clock.live)
.exitCode
}
另请注意,在使用 ZManaged
时,您可能不想这样做
ZManaged.make(otherManaged.use(doSomething))(tearDown)
因为你会导致终结器乱序执行。 ZManaged
已经可以通过正常的 flatMap
组合来处理拆解的顺序。
otherManaged.flatMap { other => ZManaged.make(doSomething(other))(tearDown) }