在 VB 或 C# 中异步读取文件并将内容插入数据库

Read Files And Insert Contents Into Database Asynchronously In VB Or C#

我需要阅读大约 10 个文件,每个文件的大小约为 150 MB。这些制表符分隔的文本文件的内容需要格式化并写入 DataTable ,然后将其插入数据库。我的代码 必须 完成执行,然后才能将控制权交还给实例化我的 class.

的代码

为了加快这个过程,我想利用异步编程(在我的 class 中)。我找到了一种可行的方法,但我的极简主义者更愿意用 lambda 表达式或类似的东西替换单行 Async 函数。我将在 VB.NET 中编写代码,但我不介意以 C# 的形式提供帮助。

Imports System.IO
Imports System.Text
Imports System.Collections.Generic
Imports System.Threading.Tasks
Imports System.Linq

///<summary>
///This BO reads all the text files from the appropriate directory and
///inserts the rows into a table. Once a file is read, it is deleted from the directory.
///</summary>

Public NotInheritable Class FileProcessing

    Public Sub Execute()

        Dim dctColumnMappings As Dictionary(Of String, String) = DictionaryMappings()
        Dim strDirectory As String = "C:\Documents\Files"
        TaskWaiterAsync(strDirectory, dctColumnMappings).Wait()
    End Sub

    ///<summary>
    ///Function that is solely used to prevent control from being passed to the code that
    ///instantiates FileProcessing
    ///</summary>

    Private Async Function TaskWaiterAsync(ByVal pstrDirectory As String, 
                                           ByVal pdctColumnMappings As Dictionary(Of 
                                           String, String)) As Task

        Await InsertAsync(pstrDirectory, pdctColumnMappings)
    End Function

    ///<summary>
    ///Function that asynchronously inserts a DataTable that contains the contents of a file.
    ///The DataTables are inserted in the order they are returned based on the 
    ///asynchronous function CreateDataTableAsync.
    ///</summary>

    Private Async Function InsertAsync(ByVal pstrDirectory As String, 
                                       ByVal pdctColumnMappings As Dictionary(Of 
                                       String, String)) As Task

        Dim lstFiles As List(Of String) = Directory.GetFiles(pstrDirectory).ToList()
        Dim vntTasksQuery As IEnumerable(Of Task(Of DataTable)) = 
            From strFilePath In lstFiles Select CreateDataTableAsync(strFilePath, 
                                             pdctColumnMappings)
        Dim lstTasks As List(Of Task(Of DataTable)) = vntTasksQuery.ToList()

        While lstTasks.Count > 0
            Dim vntFinishedTask As Task(Of DataTable) = Await Task.WhenAny(lstTasks)
            lstTasks.Remove(vntFinishedTask)
            Dim DataFile As DataTable = Await vntFinishedTask
            ///User-created class that allows database interaction.
            Dim vntSql As New SqlCon("FileDB")
            vntSql.DataTableInsert("tblFiles", DataFile)
        End While
    End Function

    ///<summary>
    ///Function that returns a dictionary mapping the appropriate subset of names of the
    ///columns from the file to the names of the columns in tblFiles.
    ///</summary>

    Private Function DictionaryMappings() As Dictionary(Of String, String)

        Dim dctColumnMappings As Dictionary(Of String, String) = New Dictionary(Of String, 
            String)(68, StringComparer.CurrentCultureIgnoreCase)
        ///Code that populates the dictionary.
        Return dctColumnMappings
    End Function

    ///<summary>
    ///Asynchronous function that reads from a file and returns a task containing a
    ///DataTable with the necessary values to be inserted into tblFiles.
    ///</summary>

    Private Async Function CreateDataTableAsync(ByVal pstrFilePath As String, 
                                                ByVal pdctColumnMappings As Dictionary(Of 
                                                String, String)) As Task(Of DataTable)

        Dim vntTask As New Task(Of DataTable)(Function()
                                                  Dim DataFile As New DataTable()
                                                  ///Code that populates
                                                  ///DataFile with the
                                                  ///necessary columns.

                                                  Using SR As New StreamReader(
                                                      New FileStream(pstrFilePath,
                                                         FileMode.Open,
                                                         FileAccess.Read,
                                                         FileShare.None,
                                                         8192,
                                                         FileOptions.SequentialScan Or
                                                         FileOptions.DeleteOnClose),
                                                      Encoding.UTF8, True, 8192, False)
                                                           ///Code that reads the file
                                                           ///and writes the necessary
                                                           ///formatted values into
                                                           ///DataFile.       
                                                  End Using
                                                  Return DataFile
                                              End Function)
        vntTask.Start()
        Dim FinishedDataFile As DataTable = Await vntTask
        Return FinishedDataFile
    End Function
End Class

我想用 lambda 表达式替换的方法是 TaskWaiterAsync。我试过了:

Dim vntTask As Task = New Task(Async Sub()
                                   Await InsertAsync(strDirectory, dctColumnMappings)
                               End Sub)
vntTask.Start()
vntTask.Wait()

上面的 returns 控制实例化 FileProcessing 的代码。任何帮助,将不胜感激。谢谢。

代码合理;虽然有一个硬编码的字符串!

Execute 方法中而不是调用 TaskWaiterAsync(strDirectory, dctColumnMappings).Wait() 不能调用 InsertAsync 方法。

Public Sub Execute()

    Dim dctColumnMappings As Dictionary(Of String, String) = DictionaryMappings()
    Dim strDirectory As String = "C:\Documents\Files"
    InsertAsync(strDirectory, dctColumnMappings).Wait()
End Sub 

只删除 TaskWaiterAsync 方法?

已将代码加载到 Visual Studio。这有帮助吗? 再次直接调用InsertAsync

Public Sub Execute()

    Dim dctColumnMappings As Dictionary(Of String, String) = DictionaryMappings()
    Dim strDirectory As String = "C:\Documents\Files"

    InsertAsync(strDirectory, dctColumnMappings)
End Sub

然后等待所有文件被异步读入DataTables,然后再对数据库进行同步插入。

Private Sub InsertAsync(ByVal pstrDirectory As String, ByVal pdctColumnMappings As Dictionary(Of String, String)) 

    Dim lstFiles As List(Of String) = Directory.GetFiles(pstrDirectory).ToList()
    'TODO: if no files then return, nothing to do


    Dim lstTasks As IEnumerable(Of Task(Of DataTable)) =
            From strFilePath In lstFiles
            Select CreateDataTableAsync(strFilePath, pdctColumnMappings)

    ' create a single task that represents all tasks and wait for them all to complete
    Dim overall As Task(Of DataTable()) = Task.WhenAll(lstTasks)
    Task.WaitAll(overall)

    For Each lstTask As Task(Of DataTable) In lstTasks
        Dim DataFile As DataTable = lstTask.Result

        Dim vntSql As New SqlCon("FileDB")
        vntSql.DataTableInsert("tblFiles", DataFile)
    Next

End Sub

我没有使用异步编程,而是选择了并行处理。这个进程运行在一个有很多核心和线程的服务器上,它不需要在作业运行时响应用户。我更改了 InsertAsync——在下面命名为 InsertFile——以便它在写入后将 DataTable 插入到 table 中。

Dim vntFiles As IEnumerable(Of String) = Directory.GetFiles(strDirectory)
Parallel.ForEach(vntFiles, Sub(strFilePath) InsertFile(strFilePath, dctColumnMappings))