使用 Nessos.Stream 减少获取查询输出并将其写入 .CSV 的时间
Using Nessos.Stream to reduce the time to get and write query output to a `.CSV`
过去,我编写的查询只获取整个查询,将结果存储在内存中,然后将整个序列提供给 .CSV
类型的提供程序。查询示例:
let results =
query {
for row in db.ThisRow do
select row
}
|> Seq.toList
据我了解,Seq.toList
部分将查询强制为 运行,同时保持查询本身的输出,因为 Seq
将是 lazy.
如果查询结果的数量很少,这没什么大不了的。但是,如果结果数量很大(例如,>100 万行),我最终会得到 System.OutOfMemoryException
。因此,一位朋友建议研究使用 Nessos.Stream
库。
我的目标是从查询中提取一行,对该行执行一些操作,然后将该行写入 .CSV
并针对每一行迭代执行此操作,所有行最终都在同一个 .CSV
文件.
所以,我试试
open Nessos.Stream
type StringInt = {
String: string option
Int: int
}
type StringIntCsvType = CsvProvider<Sample = "item_number, num",
Schema = "item_number (string option), num (int)",
HasHeaders = true>
let buildRowFromObject (obj: StringInt) = StringIntCsvType.Row(obj.String,
obj.Int)
let results =
query {
for row in db.ThisRow do
select row
}
|> Stream.ofSeq
|> Stream.groupBy (fun row -> row.ITEMNUMBER)
|> Stream.map (fun (itemString, seq) -> (itemString, (seq |> Seq.length)))
|> Stream.map (fun (str, num) -> {String = Some str;
Int = num})
|> Stream.map buildRowFromObject
|> Stream.toSeq
let ThisCsv= new StringIntCsvType(results)
let ThisCsvLoc = "pathToFileLocation"
let ThisCsv.Save(ThisCsvLoc)
虽然这是可行的,因为我正在获取我从数据库中查询的所有行,但这仍然没有按照我的意愿进行,我不确定如何实现。此外,对于更大的查询,我仍然以 System.OutOfMemoryException
结束。
我想我应该可以插入
|> Stream.map (fun x -> new StringIntCsvType(x))
在|> Stream.map buildRowFromObject
行下,但是x
被
错误了
Type Constraint Mismatch. The type
CsvProvider<...>.Row
is not compatible with type
Collections.Generic.IEnumerable<CsvProvider<...>.Row>
即使我没有收到此错误,我认为添加该行也会为每一行创建一个 new .CSV
,这绝对不是我想要的想要。
我如何编写查询,对查询的每个不同部分进行操作,并将查询的每一行写入同一个 .CSV
文件?我什至接近上面的代码了吗?
您不需要中间的 StringInt 记录类型。您已经从 buildRowFromObject 获得了 CSV 序列,您可以直接将其写入文件。虽然我也是 Nessos Streams 的忠实粉丝,但我不确定它在这里为您增加了多少价值(我可能是错的,也许您在管道中有一些更复杂的 map/reduce 操作)。在这种情况下,在探索其他解决方案之前,我会先选择简单的解决方案。
首先,确保您的 FSI 设置为 64 位并且您也在 exe 中编译为 64 位。其次,正如@krontogiannis 评论的那样,您的操作似乎是在对某些组中的项目进行计数。我认为您不想实际遍历每个组,所以为什么不在数据库端进行计数。你会得到一个元组序列(我希望),其中你有一些 ID 和计数。您可以将其直接提供给 CSVTypeprovider。所以:
type StringIntCsvType = CsvProvider<Sample = "item_number, num",
Schema = "item_number (string), num (int)",
HasHeaders = true>
let buildRowFromObject (row:string * int) = StringIntCsvType.Row(row)
let qry = query {
for row in tbl1 do
groupBy row.ItemNumber into g
select (g.Key,g.Count())
}
let csvout = qry |> Seq.map buildRowFromObject
(new StringIntCsvType(csvout)).Save(@"C:\tmp\test.csv")
这会在 9 秒内将大约 700 万行写入 ca。 100MB 的 csv 文件。
在更复杂的应用程序中,您可能会将写入文件分解为如下内容:
let writeFile csvout (path:string) =
use csvtype = new StringIntCsvType(csvout)
csvtype.Save(path)
writeFile csvout @"C:\tmp\test2.csv"
手册中还有一条关于处理大量行的评论,不确定它对你有多重要,但你可以在实例化 CsvProvider 时设置 CacheRows=false
。
编辑
一些如果这可能不相关但这里是:
#load @"..\..\FSLAB\packages\FsLab\FsLab.fsx"
#r "System.Data.dll"
#r "FSharp.Data.TypeProviders.dll"
#r "System.Data.Linq.dll"
#r @"..\packages\Streams.0.4.1\lib\net45\Streams.dll"
#r @"..\packages\FileHelpers.3.1.5\lib\net45\FileHelpers.dll"
open System
open System.Diagnostics
open System.IO
open System.Collections
open System.Collections.Generic
open System.IO.Compression
open System.Data
open System.Data.Linq
open System.Linq
open Microsoft.FSharp.Data.TypeProviders
open FSharp.Linq
open FSharp.Data
open Nessos.Streams
open FileHelpers
[<Literal>]
let connectionString2 = @"Data Source=(LocalDB)\MSSQLLocalDB;AttachDbFilename=C:\Users\...\Documents\test.mdf;Connection Timeout = 60"
type dbSchema = SqlDataConnection<connectionString2,StoredProcedures = true>
type StringIntCsvType = CsvProvider<Sample = "item_number, num",
Schema = "item_number (string), num (int)",
HasHeaders = true,CacheRows=false>
let getDbx() =
let dbx = dbSchema.GetDataContext()
dbx.DataContext.ObjectTrackingEnabled <- false // could impact memory consumption but so far has little effect on time
dbx.DataContext.CommandTimeout <- 90
dbx
let dbx = getDbx()
let tbl1 = dbx.MyTable
过去,我编写的查询只获取整个查询,将结果存储在内存中,然后将整个序列提供给 .CSV
类型的提供程序。查询示例:
let results =
query {
for row in db.ThisRow do
select row
}
|> Seq.toList
据我了解,Seq.toList
部分将查询强制为 运行,同时保持查询本身的输出,因为 Seq
将是 lazy.
如果查询结果的数量很少,这没什么大不了的。但是,如果结果数量很大(例如,>100 万行),我最终会得到 System.OutOfMemoryException
。因此,一位朋友建议研究使用 Nessos.Stream
库。
我的目标是从查询中提取一行,对该行执行一些操作,然后将该行写入 .CSV
并针对每一行迭代执行此操作,所有行最终都在同一个 .CSV
文件.
所以,我试试
open Nessos.Stream
type StringInt = {
String: string option
Int: int
}
type StringIntCsvType = CsvProvider<Sample = "item_number, num",
Schema = "item_number (string option), num (int)",
HasHeaders = true>
let buildRowFromObject (obj: StringInt) = StringIntCsvType.Row(obj.String,
obj.Int)
let results =
query {
for row in db.ThisRow do
select row
}
|> Stream.ofSeq
|> Stream.groupBy (fun row -> row.ITEMNUMBER)
|> Stream.map (fun (itemString, seq) -> (itemString, (seq |> Seq.length)))
|> Stream.map (fun (str, num) -> {String = Some str;
Int = num})
|> Stream.map buildRowFromObject
|> Stream.toSeq
let ThisCsv= new StringIntCsvType(results)
let ThisCsvLoc = "pathToFileLocation"
let ThisCsv.Save(ThisCsvLoc)
虽然这是可行的,因为我正在获取我从数据库中查询的所有行,但这仍然没有按照我的意愿进行,我不确定如何实现。此外,对于更大的查询,我仍然以 System.OutOfMemoryException
结束。
我想我应该可以插入
|> Stream.map (fun x -> new StringIntCsvType(x))
在|> Stream.map buildRowFromObject
行下,但是x
被
Type Constraint Mismatch. The type
CsvProvider<...>.Row
is not compatible with type
Collections.Generic.IEnumerable<CsvProvider<...>.Row>
即使我没有收到此错误,我认为添加该行也会为每一行创建一个 new .CSV
,这绝对不是我想要的想要。
我如何编写查询,对查询的每个不同部分进行操作,并将查询的每一行写入同一个 .CSV
文件?我什至接近上面的代码了吗?
您不需要中间的 StringInt 记录类型。您已经从 buildRowFromObject 获得了 CSV 序列,您可以直接将其写入文件。虽然我也是 Nessos Streams 的忠实粉丝,但我不确定它在这里为您增加了多少价值(我可能是错的,也许您在管道中有一些更复杂的 map/reduce 操作)。在这种情况下,在探索其他解决方案之前,我会先选择简单的解决方案。
首先,确保您的 FSI 设置为 64 位并且您也在 exe 中编译为 64 位。其次,正如@krontogiannis 评论的那样,您的操作似乎是在对某些组中的项目进行计数。我认为您不想实际遍历每个组,所以为什么不在数据库端进行计数。你会得到一个元组序列(我希望),其中你有一些 ID 和计数。您可以将其直接提供给 CSVTypeprovider。所以:
type StringIntCsvType = CsvProvider<Sample = "item_number, num",
Schema = "item_number (string), num (int)",
HasHeaders = true>
let buildRowFromObject (row:string * int) = StringIntCsvType.Row(row)
let qry = query {
for row in tbl1 do
groupBy row.ItemNumber into g
select (g.Key,g.Count())
}
let csvout = qry |> Seq.map buildRowFromObject
(new StringIntCsvType(csvout)).Save(@"C:\tmp\test.csv")
这会在 9 秒内将大约 700 万行写入 ca。 100MB 的 csv 文件。
在更复杂的应用程序中,您可能会将写入文件分解为如下内容:
let writeFile csvout (path:string) =
use csvtype = new StringIntCsvType(csvout)
csvtype.Save(path)
writeFile csvout @"C:\tmp\test2.csv"
手册中还有一条关于处理大量行的评论,不确定它对你有多重要,但你可以在实例化 CsvProvider 时设置 CacheRows=false
。
编辑
一些如果这可能不相关但这里是:
#load @"..\..\FSLAB\packages\FsLab\FsLab.fsx"
#r "System.Data.dll"
#r "FSharp.Data.TypeProviders.dll"
#r "System.Data.Linq.dll"
#r @"..\packages\Streams.0.4.1\lib\net45\Streams.dll"
#r @"..\packages\FileHelpers.3.1.5\lib\net45\FileHelpers.dll"
open System
open System.Diagnostics
open System.IO
open System.Collections
open System.Collections.Generic
open System.IO.Compression
open System.Data
open System.Data.Linq
open System.Linq
open Microsoft.FSharp.Data.TypeProviders
open FSharp.Linq
open FSharp.Data
open Nessos.Streams
open FileHelpers
[<Literal>]
let connectionString2 = @"Data Source=(LocalDB)\MSSQLLocalDB;AttachDbFilename=C:\Users\...\Documents\test.mdf;Connection Timeout = 60"
type dbSchema = SqlDataConnection<connectionString2,StoredProcedures = true>
type StringIntCsvType = CsvProvider<Sample = "item_number, num",
Schema = "item_number (string), num (int)",
HasHeaders = true,CacheRows=false>
let getDbx() =
let dbx = dbSchema.GetDataContext()
dbx.DataContext.ObjectTrackingEnabled <- false // could impact memory consumption but so far has little effect on time
dbx.DataContext.CommandTimeout <- 90
dbx
let dbx = getDbx()
let tbl1 = dbx.MyTable