拆分 avro 文件并上传到 REST

Split an avro file and upload to REST

我创建了一些 avro 文件。我可以使用以下命令将它们转换为json,只是为了检查文件是否正常

java -jar avro-tools-1.8.2.jar tojson FileName.avro>outputfilename.json

现在,我有一些大的 avro 文件和我正在尝试上传的 REST API,它有大小限制,因此我正在尝试使用流分块上传它。

下面的示例只是从原始文件中分块读取并复制到另一个 avro 文件,完美地创建了文件

using System;
using System.IO;

class Test
{

    public static void Main()
    {
        // Specify a file to read from and to create.
        string pathSource = @"D:\BDS\AVRO\filename.avro";
        string pathNew = @"D:\BDS\AVRO\test\filenamenew.avro";

        try
        {

            using (FileStream fsSource = new FileStream(pathSource,
                FileMode.Open, FileAccess.Read))
            {
                byte[] buffer = new byte[(20 * 1024 * 1024) + 100];
                long numBytesToRead = (int)fsSource.Length;
                int numBytesRead = 0;
                using (FileStream fsNew = new FileStream(pathNew,
                    FileMode.Append, FileAccess.Write))
                {

                    // Read the source file into a byte array.
                    //byte[] bytes = new byte[fsSource.Length];
                    //int numBytesToRead = (int)fsSource.Length;
                    //int numBytesRead = 0;
                    while (numBytesToRead > 0)
                    {

                        int bytesRead = fsSource.Read(buffer, 0, buffer.Length);
                        byte[] actualbytes = new byte[bytesRead];

                        Array.Copy(buffer, actualbytes, bytesRead);
                        // Read may return anything from 0 to numBytesToRead.


                        // Break when the end of the file is reached.
                        if (bytesRead == 0)
                            break;

                        numBytesRead += bytesRead;
                        numBytesToRead -= bytesRead;



                        fsNew.Write(actualbytes, 0, actualbytes.Length);
                    }

                }
            }

                // Write the byte array to the other FileStream.


        }
        catch (FileNotFoundException ioEx)
        {
            Console.WriteLine(ioEx.Message);
        }
    }
}

我怎么知道这会创建一个好的 avro。因为之前转换为 json 的命令再次起作用,即

java -jar avro-tools-1.8.2.jar tojson filenamenew.avro>outputfilename.json

然而,当我使用相同的代码,而不是复制到另一个文件,只是调用 rest api,文件被上传但是从服务器下载相同的文件并且 运行 上面转换为 json 的命令说 - "Not a Data file".

所以,很明显有些东西被破坏了,我正在努力弄清楚是什么。

这是片段

 string filenamefullyqualified = path + filename;
            Stream stream = System.IO.File.Open(filenamefullyqualified, FileMode.Open, FileAccess.Read, FileShare.None);


            long? position = 0;

            byte[] buffer = new byte[(20 * 1024 * 1024) + 100];
            long numBytesToRead = stream.Length;
            int numBytesRead = 0;



            do
            {

                var content = new MultipartFormDataContent();
                int bytesRead = stream.Read(buffer, 0, buffer.Length);
                byte[] actualbytes = new byte[bytesRead];

                Array.Copy(buffer, actualbytes, bytesRead);

                if (bytesRead == 0)
                    break;

                //Append Data
                url = String.Format("https://{0}.dfs.core.windows.net/raw/datawarehouse/{1}/{2}/{3}/{4}/{5}?action=append&position={6}", datalakeName, filename.Substring(0, filename.IndexOf("_")), year, month, day, filename, position.ToString());
                numBytesRead += bytesRead;
                numBytesToRead -= bytesRead;

                ByteArrayContent byteContent = new ByteArrayContent(actualbytes);
                content.Add(byteContent);


                method = new HttpMethod("PATCH");

                request = new HttpRequestMessage(method, url)
                {
                    Content = content
                };


                request.Headers.Add("Authorization", "Bearer " + accesstoken);



                var response = await client.SendAsync(request);
                response.EnsureSuccessStatusCode();

                position = position + request.Content.Headers.ContentLength;

                Array.Clear(buffer, 0, buffer.Length);




            } while (numBytesToRead > 0);
            stream.Close();

我浏览了论坛主题,但没有遇到任何涉及拆分 avro 文件的内容。

我有预感我的 "content" 的 http 请求不正确。我缺少什么?

如果您需要更多详细信息,我很乐意提供。

我现在找到问题了。问题是因为 MultipartFormDataContent。当上传一个 avro 文件时,它会添加额外的文本,如内容类型等,同时删除许多行(我不知道为什么)。

因此,解决方案是将内容作为 "ByteArrayContent" 本身上传,而不是像我之前那样将其添加到 MultipartFormDataContent。

这是片段,几乎与问题中的片段相似,除了我不再使用 MultipartFormDataContent

            string filenamefullyqualified = path + filename;
            Stream stream = System.IO.File.Open(filenamefullyqualified, FileMode.Open, FileAccess.Read, FileShare.None);
            //content.Add(CreateFileContent(fs, path, filename, "text/plain"));


            long? position = 0;

            byte[] buffer = new byte[(20 * 1024 * 1024) + 100];
            long numBytesToRead = stream.Length;
            int numBytesRead = 0;


            //while ((bytesRead = stream.Read(buffer, 0, buffer.Length)) > 0)
            //{
            do
            {

                //var content = new MultipartFormDataContent();

                int bytesRead = stream.Read(buffer, 0, buffer.Length);
                byte[] actualbytes = new byte[bytesRead];

                Array.Copy(buffer, actualbytes, bytesRead);

                if (bytesRead == 0)
                    break;

                //Append Data
                url = String.Format("https://{0}.dfs.core.windows.net/raw/datawarehouse/{1}/{2}/{3}/{4}/{5}?action=append&position={6}", datalakeName, filename.Substring(0, filename.IndexOf("_")), year, month, day, filename, position.ToString());
                numBytesRead += bytesRead;
                numBytesToRead -= bytesRead;

                ByteArrayContent byteContent = new ByteArrayContent(actualbytes);
                //byteContent.Headers.ContentType= new MediaTypeHeaderValue("text/plain");
                //content.Add(byteContent);


                method = new HttpMethod("PATCH");

                //request = new HttpRequestMessage(method, url)
                //{
                //    Content = content
                //};


                request = new HttpRequestMessage(method, url)
                {
                    Content = byteContent
                };

                request.Headers.Add("Authorization", "Bearer " + accesstoken);



                var response = await client.SendAsync(request);
                response.EnsureSuccessStatusCode();

                position = position + request.Content.Headers.ContentLength;

                Array.Clear(buffer, 0, buffer.Length);




            } while (numBytesToRead > 0);
            stream.Close();

但是按记录流将无法在事务中将AVRO 文件作为一个整体来处理。例如,如果某些记录失败,我们可能会部分成功。

如果我们有一个小工具可以根据记录的阈值数量拆分 AVRO 文件,那就太好了。

spark-based 按分区拆分技术允许将数据集拆分为 pre-defined 个文件;但是,它不允许根据记录数进行拆分。即,我不想要包含超过 500 条记录的 AVRO 文件。

因此我们必须根据应用程序可以处理的舒适堆大小以及 two-phase 提交来设计批处理逻辑,以处理事务