使用 Nessos.Stream 减少获取查询输出并将其写入 .CSV 的时间

Using Nessos.Stream to reduce the time to get and write query output to a `.CSV`

过去,我编写的查询只获取整个查询,将结果存储在内存中,然后将整个序列提供给 .CSV 类型的提供程序。查询示例:

let results = 
    query {
        for row in db.ThisRow do 
        select row
        }
        |> Seq.toList

据我了解,Seq.toList 部分将查询强制为 运行,同时保持查询本身的输出,因为 Seq 将是 lazy. 如果查询结果的数量很少,这没什么大不了的。但是,如果结果数量很大(例如,>100 万行),我最终会得到 System.OutOfMemoryException。因此,一位朋友建议研究使用 Nessos.Stream 库。

我的目标是从查询中提取一行,对该行执行一些操作,然后将该行写入 .CSV 并针对每一行迭代执行此操作,所有行最终都在同一个 .CSV 文件.

所以,我试试

open Nessos.Stream

type StringInt = {
    String: string option
    Int: int
    }

type StringIntCsvType = CsvProvider<Sample = "item_number, num", 
                                    Schema = "item_number (string option), num (int)", 
                                    HasHeaders = true>
let buildRowFromObject (obj: StringInt) = StringIntCsvType.Row(obj.String, 
                                                               obj.Int)

let results = 
    query {
        for row in db.ThisRow do 
        select row
        }
        |> Stream.ofSeq
        |> Stream.groupBy (fun row -> row.ITEMNUMBER)
        |> Stream.map (fun (itemString, seq) -> (itemString, (seq |> Seq.length)))
        |> Stream.map (fun (str, num) -> {String = Some str; 
                                          Int = num})
        |> Stream.map buildRowFromObject
        |> Stream.toSeq

let ThisCsv= new StringIntCsvType(results)
let ThisCsvLoc = "pathToFileLocation"
let ThisCsv.Save(ThisCsvLoc)

虽然这是可行的,因为我正在获取我从数据库中查询的所有行,但这仍然没有按照我的意愿进行,我不确定如何实现。此外,对于更大的查询,我仍然以 System.OutOfMemoryException 结束。

我想我应该可以插入

|> Stream.map (fun x -> new StringIntCsvType(x))

|> Stream.map buildRowFromObject行下,但是x

错误了
Type Constraint Mismatch.  The type
    CsvProvider<...>.Row
is not compatible with type
    Collections.Generic.IEnumerable<CsvProvider<...>.Row>

即使我没有收到此错误,我认为添加该行也会为每一行创建一个 new .CSV,这绝对不是我想要的想要。

我如何编写查询,对查询的每个不同部分进行操作,并将查询的每一行写入同一个 .CSV 文件?我什至接近上面的代码了吗?

您不需要中间的 StringInt 记录类型。您已经从 buildRowFromObject 获得了 CSV 序列,您可以直接将其写入文件。虽然我也是 Nessos Streams 的忠实粉丝,但我不确定它在这里为您增加了多少价值(我可能是错的,也许您在管道中有一些更复杂的 map/reduce 操作)。在这种情况下,在探索其他解决方案之前,我会先选择简单的解决方案。

首先,确保您的 FSI 设置为 64 位并且您也在 exe 中编译为 64 位。其次,正如@krontogiannis 评论的那样,您的操作似乎是在对某些组中的项目进行计数。我认为您不想实际遍历每个组,所以为什么不在数据库端进行计数。你会得到一个元组序列(我希望),其中你有一些 ID 和计数。您可以将其直接提供给 CSVTypeprovider。所以:

type StringIntCsvType = CsvProvider<Sample = "item_number, num", 
                                    Schema = "item_number (string), num (int)", 
                                    HasHeaders = true>

let buildRowFromObject (row:string * int) = StringIntCsvType.Row(row)

let qry = query {
            for row in tbl1 do
            groupBy row.ItemNumber into g
            select (g.Key,g.Count())
            }

let csvout = qry |> Seq.map  buildRowFromObject
(new StringIntCsvType(csvout)).Save(@"C:\tmp\test.csv")

这会在 9 秒内将大约 700 万行写入 ca。 100MB 的 csv 文件。

在更复杂的应用程序中,您可能会将写入文件分解为如下内容:

let writeFile csvout (path:string) =
    use csvtype = new  StringIntCsvType(csvout)
    csvtype.Save(path)
writeFile csvout @"C:\tmp\test2.csv"

手册中还有一条关于处理大量行的评论,不确定它对你有多重要,但你可以在实例化 CsvProvider 时设置 CacheRows=false

编辑

一些如果这可能不相关但这里是:

#load @"..\..\FSLAB\packages\FsLab\FsLab.fsx"

#r "System.Data.dll"
#r "FSharp.Data.TypeProviders.dll"
#r "System.Data.Linq.dll"
#r @"..\packages\Streams.0.4.1\lib\net45\Streams.dll"
#r @"..\packages\FileHelpers.3.1.5\lib\net45\FileHelpers.dll"

open System
open System.Diagnostics
open System.IO
open System.Collections
open System.Collections.Generic
open System.IO.Compression

open System.Data
open System.Data.Linq
open System.Linq
open Microsoft.FSharp.Data.TypeProviders
open FSharp.Linq
open FSharp.Data
open Nessos.Streams
open FileHelpers

[<Literal>]
let connectionString2 = @"Data Source=(LocalDB)\MSSQLLocalDB;AttachDbFilename=C:\Users\...\Documents\test.mdf;Connection Timeout = 60"

type dbSchema = SqlDataConnection<connectionString2,StoredProcedures = true>


type StringIntCsvType = CsvProvider<Sample = "item_number, num", 
                                    Schema = "item_number (string), num (int)", 
                                    HasHeaders = true,CacheRows=false>



let getDbx() = 
    let dbx = dbSchema.GetDataContext()
    dbx.DataContext.ObjectTrackingEnabled <- false  // could impact memory consumption but so far has little effect on time
    dbx.DataContext.CommandTimeout <- 90
    dbx

let dbx = getDbx()

let tbl1 = dbx.MyTable