Elixir:我可以使用 Stream.resource 来逐步读取大数据文件吗?
Elixir: can I use Stream.resource to progressively read a large data file?
我知道如何使用 Stream.resource() 从
一个文件并将它们放在一个列表中。
str = Stream.resource(fn -> File.open!("./data/fidap011.mtx") end,
fn file ->
case IO.read(file, :line) do
data when is_binary(data) -> {[data], file}
_ -> {:halt, file}
end
end,
fn file -> File.close(file) end)
str |> Enum.take(5)
但是我如何从同一个流中获取接下来的 5 行?
如果我再次输入:
str |> Enum.take(5)
我只得到相同的前 5 行。
我是不是漏掉了什么明显的东西?
最终,我希望从我的流中读取足够的数据来生成一些进程
那个处理那个数据。当其中一些过程完成时,我希望
从同一个流中读取更多信息,从而处理下一组数据,等等。
Stream.chunk() 应该在这里发挥作用吗?
但是没有一个例子,我似乎无法凭直觉理解。
编辑 - 几个设计迭代之后!
出于我的目的,不使用 Stream 更容易。
相反,我简单地使用
创建了一个文件 pointer/process
{:ok, fp} = File.open( "data/fidap011.mtx" )
然后我实际上将该 fp 传递给 30000 个不同的派生进程
只要他们喜欢,他们就可以毫不费力地阅读它。
这些进程中的每一个都通过读取其新状态来更改其状态
文件中的变量。在下面的模块中 oR
和 vR
是两个
"router" 接收消息的进程 - 代码是稀疏的一部分
矩阵/向量乘数。
defmodule M_Cells do
@moduledoc """
Provides matrix related code
Each cell process serves for that row & col
"""
defp get_next_state( fp ) do
case IO.read( fp, :line ) do
data when is_binary(data) ->
[rs,cs,vs] = String.split( data )
r = String.to_integer(rs)
c = String.to_integer(cs)
v = String.to_float(vs)
{r,c,v}
_ ->
File.close( fp )
:fail
end
end
defp loop(fp, r,c,v, oR,vR) do
# Maintains state of Matrix Cell, row, col, value
# receives msgs and responds
receive do
:start ->
send vR, { :multiply, c, self() } # get values for operands via router vR
loop(fp, r,c,v, oR,vR)
{ :multiply, w } -> # handle request to multiply by w and relay to router oR
send oR, { :sum, r, v*w }
case get_next_state( fp ) do # read line from file and fill in rcv
{r1,c1,v1} ->
send vR, { :multiply, c1, self() }
loop(fp, r1,c1,v1, oR,vR)
_ -> ## error or end of file etc
##IO.puts(":kill rcv: #{r},#{c},#{v}")
Process.exit( self(), :kill )
end
end
end
# Launch each matrix cell using iteration by tail recursion
def launch(_fp, _oR,_vR, result, 0) do
result |> Enum.reverse # reverse is cosmetic, not substantive
end
def launch(fp, oR,vR, result, count) do
#IO.inspect count
case get_next_state( fp ) do
{r,c,v} ->
pid = spawn fn -> loop( fp, r,c,v, oR,vR) end
launch( fp, oR,vR, [pid|result], count-1 )
_ -> ## error or end of file etc, skip to count 0
launch( fp, oR,vR, result, 0 )
end
end
end
尽情享受吧!
附带说明一下,从文件创建流是一项常见任务。这已经被解决了,所以你可以直接使用File.stream!/3
to create the stream, no need to use Stream.resource/3
。
关于您最初的问题:是的,您是对的,Stream.chunk_every/2
是前往此处的方式。它将懒惰地将流分成提供大小的块:
File.stream!("./data/fidap011.mtx") |> Stream.chunk_every(5) |> Enum.each(fn chunk ->
# do something with chunk
end)
我最近遇到了和作者一样的需求。我想创建一个自定义的 GenStage 制作人,百老汇可以将其用作自定义制作人。我使用了以下解决方案。
{:ok, fp} = File.open("enormousFile.csv")
fstream = IO.stream(fp, :line)
fstream
|> Enum.take(10)
我最初尝试的另一个变体是:
File.stream!(opts.filename)
|> Stream.drop(offset_from_start)
|> Enum.take(10)
它们都按预期工作。我没有对它们进行基准测试,但 IO.stream 更适合我的用例,因为我不必在每次百老汇消费者需要数据时都坚持并增加偏移量。
我知道这是一个老问题,但在这里添加这个答案以防其他人遇到这个问题。
我知道如何使用 Stream.resource() 从 一个文件并将它们放在一个列表中。
str = Stream.resource(fn -> File.open!("./data/fidap011.mtx") end,
fn file ->
case IO.read(file, :line) do
data when is_binary(data) -> {[data], file}
_ -> {:halt, file}
end
end,
fn file -> File.close(file) end)
str |> Enum.take(5)
但是我如何从同一个流中获取接下来的 5 行? 如果我再次输入:
str |> Enum.take(5)
我只得到相同的前 5 行。
我是不是漏掉了什么明显的东西?
最终,我希望从我的流中读取足够的数据来生成一些进程 那个处理那个数据。当其中一些过程完成时,我希望 从同一个流中读取更多信息,从而处理下一组数据,等等。 Stream.chunk() 应该在这里发挥作用吗? 但是没有一个例子,我似乎无法凭直觉理解。
编辑 - 几个设计迭代之后!
出于我的目的,不使用 Stream 更容易。 相反,我简单地使用
创建了一个文件 pointer/process{:ok, fp} = File.open( "data/fidap011.mtx" )
然后我实际上将该 fp 传递给 30000 个不同的派生进程
只要他们喜欢,他们就可以毫不费力地阅读它。
这些进程中的每一个都通过读取其新状态来更改其状态
文件中的变量。在下面的模块中 oR
和 vR
是两个
"router" 接收消息的进程 - 代码是稀疏的一部分
矩阵/向量乘数。
defmodule M_Cells do
@moduledoc """
Provides matrix related code
Each cell process serves for that row & col
"""
defp get_next_state( fp ) do
case IO.read( fp, :line ) do
data when is_binary(data) ->
[rs,cs,vs] = String.split( data )
r = String.to_integer(rs)
c = String.to_integer(cs)
v = String.to_float(vs)
{r,c,v}
_ ->
File.close( fp )
:fail
end
end
defp loop(fp, r,c,v, oR,vR) do
# Maintains state of Matrix Cell, row, col, value
# receives msgs and responds
receive do
:start ->
send vR, { :multiply, c, self() } # get values for operands via router vR
loop(fp, r,c,v, oR,vR)
{ :multiply, w } -> # handle request to multiply by w and relay to router oR
send oR, { :sum, r, v*w }
case get_next_state( fp ) do # read line from file and fill in rcv
{r1,c1,v1} ->
send vR, { :multiply, c1, self() }
loop(fp, r1,c1,v1, oR,vR)
_ -> ## error or end of file etc
##IO.puts(":kill rcv: #{r},#{c},#{v}")
Process.exit( self(), :kill )
end
end
end
# Launch each matrix cell using iteration by tail recursion
def launch(_fp, _oR,_vR, result, 0) do
result |> Enum.reverse # reverse is cosmetic, not substantive
end
def launch(fp, oR,vR, result, count) do
#IO.inspect count
case get_next_state( fp ) do
{r,c,v} ->
pid = spawn fn -> loop( fp, r,c,v, oR,vR) end
launch( fp, oR,vR, [pid|result], count-1 )
_ -> ## error or end of file etc, skip to count 0
launch( fp, oR,vR, result, 0 )
end
end
end
尽情享受吧!
附带说明一下,从文件创建流是一项常见任务。这已经被解决了,所以你可以直接使用File.stream!/3
to create the stream, no need to use Stream.resource/3
。
关于您最初的问题:是的,您是对的,Stream.chunk_every/2
是前往此处的方式。它将懒惰地将流分成提供大小的块:
File.stream!("./data/fidap011.mtx") |> Stream.chunk_every(5) |> Enum.each(fn chunk ->
# do something with chunk
end)
我最近遇到了和作者一样的需求。我想创建一个自定义的 GenStage 制作人,百老汇可以将其用作自定义制作人。我使用了以下解决方案。
{:ok, fp} = File.open("enormousFile.csv")
fstream = IO.stream(fp, :line)
fstream
|> Enum.take(10)
我最初尝试的另一个变体是:
File.stream!(opts.filename)
|> Stream.drop(offset_from_start)
|> Enum.take(10)
它们都按预期工作。我没有对它们进行基准测试,但 IO.stream 更适合我的用例,因为我不必在每次百老汇消费者需要数据时都坚持并增加偏移量。
我知道这是一个老问题,但在这里添加这个答案以防其他人遇到这个问题。