拆分 avro 文件并上传到 REST
Split an avro file and upload to REST
我创建了一些 avro 文件。我可以使用以下命令将它们转换为json,只是为了检查文件是否正常
java -jar avro-tools-1.8.2.jar tojson FileName.avro>outputfilename.json
现在,我有一些大的 avro 文件和我正在尝试上传的 REST API,它有大小限制,因此我正在尝试使用流分块上传它。
下面的示例只是从原始文件中分块读取并复制到另一个 avro 文件,完美地创建了文件
using System;
using System.IO;
class Test
{
public static void Main()
{
// Specify a file to read from and to create.
string pathSource = @"D:\BDS\AVRO\filename.avro";
string pathNew = @"D:\BDS\AVRO\test\filenamenew.avro";
try
{
using (FileStream fsSource = new FileStream(pathSource,
FileMode.Open, FileAccess.Read))
{
byte[] buffer = new byte[(20 * 1024 * 1024) + 100];
long numBytesToRead = (int)fsSource.Length;
int numBytesRead = 0;
using (FileStream fsNew = new FileStream(pathNew,
FileMode.Append, FileAccess.Write))
{
// Read the source file into a byte array.
//byte[] bytes = new byte[fsSource.Length];
//int numBytesToRead = (int)fsSource.Length;
//int numBytesRead = 0;
while (numBytesToRead > 0)
{
int bytesRead = fsSource.Read(buffer, 0, buffer.Length);
byte[] actualbytes = new byte[bytesRead];
Array.Copy(buffer, actualbytes, bytesRead);
// Read may return anything from 0 to numBytesToRead.
// Break when the end of the file is reached.
if (bytesRead == 0)
break;
numBytesRead += bytesRead;
numBytesToRead -= bytesRead;
fsNew.Write(actualbytes, 0, actualbytes.Length);
}
}
}
// Write the byte array to the other FileStream.
}
catch (FileNotFoundException ioEx)
{
Console.WriteLine(ioEx.Message);
}
}
}
我怎么知道这会创建一个好的 avro。因为之前转换为 json 的命令再次起作用,即
java -jar avro-tools-1.8.2.jar tojson filenamenew.avro>outputfilename.json
然而,当我使用相同的代码,而不是复制到另一个文件,只是调用 rest api,文件被上传但是从服务器下载相同的文件并且 运行 上面转换为 json 的命令说 - "Not a Data file".
所以,很明显有些东西被破坏了,我正在努力弄清楚是什么。
这是片段
string filenamefullyqualified = path + filename;
Stream stream = System.IO.File.Open(filenamefullyqualified, FileMode.Open, FileAccess.Read, FileShare.None);
long? position = 0;
byte[] buffer = new byte[(20 * 1024 * 1024) + 100];
long numBytesToRead = stream.Length;
int numBytesRead = 0;
do
{
var content = new MultipartFormDataContent();
int bytesRead = stream.Read(buffer, 0, buffer.Length);
byte[] actualbytes = new byte[bytesRead];
Array.Copy(buffer, actualbytes, bytesRead);
if (bytesRead == 0)
break;
//Append Data
url = String.Format("https://{0}.dfs.core.windows.net/raw/datawarehouse/{1}/{2}/{3}/{4}/{5}?action=append&position={6}", datalakeName, filename.Substring(0, filename.IndexOf("_")), year, month, day, filename, position.ToString());
numBytesRead += bytesRead;
numBytesToRead -= bytesRead;
ByteArrayContent byteContent = new ByteArrayContent(actualbytes);
content.Add(byteContent);
method = new HttpMethod("PATCH");
request = new HttpRequestMessage(method, url)
{
Content = content
};
request.Headers.Add("Authorization", "Bearer " + accesstoken);
var response = await client.SendAsync(request);
response.EnsureSuccessStatusCode();
position = position + request.Content.Headers.ContentLength;
Array.Clear(buffer, 0, buffer.Length);
} while (numBytesToRead > 0);
stream.Close();
我浏览了论坛主题,但没有遇到任何涉及拆分 avro 文件的内容。
我有预感我的 "content" 的 http 请求不正确。我缺少什么?
如果您需要更多详细信息,我很乐意提供。
我现在找到问题了。问题是因为 MultipartFormDataContent。当上传一个 avro 文件时,它会添加额外的文本,如内容类型等,同时删除许多行(我不知道为什么)。
因此,解决方案是将内容作为 "ByteArrayContent" 本身上传,而不是像我之前那样将其添加到 MultipartFormDataContent。
这是片段,几乎与问题中的片段相似,除了我不再使用 MultipartFormDataContent
string filenamefullyqualified = path + filename;
Stream stream = System.IO.File.Open(filenamefullyqualified, FileMode.Open, FileAccess.Read, FileShare.None);
//content.Add(CreateFileContent(fs, path, filename, "text/plain"));
long? position = 0;
byte[] buffer = new byte[(20 * 1024 * 1024) + 100];
long numBytesToRead = stream.Length;
int numBytesRead = 0;
//while ((bytesRead = stream.Read(buffer, 0, buffer.Length)) > 0)
//{
do
{
//var content = new MultipartFormDataContent();
int bytesRead = stream.Read(buffer, 0, buffer.Length);
byte[] actualbytes = new byte[bytesRead];
Array.Copy(buffer, actualbytes, bytesRead);
if (bytesRead == 0)
break;
//Append Data
url = String.Format("https://{0}.dfs.core.windows.net/raw/datawarehouse/{1}/{2}/{3}/{4}/{5}?action=append&position={6}", datalakeName, filename.Substring(0, filename.IndexOf("_")), year, month, day, filename, position.ToString());
numBytesRead += bytesRead;
numBytesToRead -= bytesRead;
ByteArrayContent byteContent = new ByteArrayContent(actualbytes);
//byteContent.Headers.ContentType= new MediaTypeHeaderValue("text/plain");
//content.Add(byteContent);
method = new HttpMethod("PATCH");
//request = new HttpRequestMessage(method, url)
//{
// Content = content
//};
request = new HttpRequestMessage(method, url)
{
Content = byteContent
};
request.Headers.Add("Authorization", "Bearer " + accesstoken);
var response = await client.SendAsync(request);
response.EnsureSuccessStatusCode();
position = position + request.Content.Headers.ContentLength;
Array.Clear(buffer, 0, buffer.Length);
} while (numBytesToRead > 0);
stream.Close();
但是按记录流将无法在事务中将AVRO 文件作为一个整体来处理。例如,如果某些记录失败,我们可能会部分成功。
如果我们有一个小工具可以根据记录的阈值数量拆分 AVRO 文件,那就太好了。
spark-based 按分区拆分技术允许将数据集拆分为 pre-defined 个文件;但是,它不允许根据记录数进行拆分。即,我不想要包含超过 500 条记录的 AVRO 文件。
因此我们必须根据应用程序可以处理的舒适堆大小以及 two-phase 提交来设计批处理逻辑,以处理事务
我创建了一些 avro 文件。我可以使用以下命令将它们转换为json,只是为了检查文件是否正常
java -jar avro-tools-1.8.2.jar tojson FileName.avro>outputfilename.json
现在,我有一些大的 avro 文件和我正在尝试上传的 REST API,它有大小限制,因此我正在尝试使用流分块上传它。
下面的示例只是从原始文件中分块读取并复制到另一个 avro 文件,完美地创建了文件
using System;
using System.IO;
class Test
{
public static void Main()
{
// Specify a file to read from and to create.
string pathSource = @"D:\BDS\AVRO\filename.avro";
string pathNew = @"D:\BDS\AVRO\test\filenamenew.avro";
try
{
using (FileStream fsSource = new FileStream(pathSource,
FileMode.Open, FileAccess.Read))
{
byte[] buffer = new byte[(20 * 1024 * 1024) + 100];
long numBytesToRead = (int)fsSource.Length;
int numBytesRead = 0;
using (FileStream fsNew = new FileStream(pathNew,
FileMode.Append, FileAccess.Write))
{
// Read the source file into a byte array.
//byte[] bytes = new byte[fsSource.Length];
//int numBytesToRead = (int)fsSource.Length;
//int numBytesRead = 0;
while (numBytesToRead > 0)
{
int bytesRead = fsSource.Read(buffer, 0, buffer.Length);
byte[] actualbytes = new byte[bytesRead];
Array.Copy(buffer, actualbytes, bytesRead);
// Read may return anything from 0 to numBytesToRead.
// Break when the end of the file is reached.
if (bytesRead == 0)
break;
numBytesRead += bytesRead;
numBytesToRead -= bytesRead;
fsNew.Write(actualbytes, 0, actualbytes.Length);
}
}
}
// Write the byte array to the other FileStream.
}
catch (FileNotFoundException ioEx)
{
Console.WriteLine(ioEx.Message);
}
}
}
我怎么知道这会创建一个好的 avro。因为之前转换为 json 的命令再次起作用,即
java -jar avro-tools-1.8.2.jar tojson filenamenew.avro>outputfilename.json
然而,当我使用相同的代码,而不是复制到另一个文件,只是调用 rest api,文件被上传但是从服务器下载相同的文件并且 运行 上面转换为 json 的命令说 - "Not a Data file".
所以,很明显有些东西被破坏了,我正在努力弄清楚是什么。
这是片段
string filenamefullyqualified = path + filename;
Stream stream = System.IO.File.Open(filenamefullyqualified, FileMode.Open, FileAccess.Read, FileShare.None);
long? position = 0;
byte[] buffer = new byte[(20 * 1024 * 1024) + 100];
long numBytesToRead = stream.Length;
int numBytesRead = 0;
do
{
var content = new MultipartFormDataContent();
int bytesRead = stream.Read(buffer, 0, buffer.Length);
byte[] actualbytes = new byte[bytesRead];
Array.Copy(buffer, actualbytes, bytesRead);
if (bytesRead == 0)
break;
//Append Data
url = String.Format("https://{0}.dfs.core.windows.net/raw/datawarehouse/{1}/{2}/{3}/{4}/{5}?action=append&position={6}", datalakeName, filename.Substring(0, filename.IndexOf("_")), year, month, day, filename, position.ToString());
numBytesRead += bytesRead;
numBytesToRead -= bytesRead;
ByteArrayContent byteContent = new ByteArrayContent(actualbytes);
content.Add(byteContent);
method = new HttpMethod("PATCH");
request = new HttpRequestMessage(method, url)
{
Content = content
};
request.Headers.Add("Authorization", "Bearer " + accesstoken);
var response = await client.SendAsync(request);
response.EnsureSuccessStatusCode();
position = position + request.Content.Headers.ContentLength;
Array.Clear(buffer, 0, buffer.Length);
} while (numBytesToRead > 0);
stream.Close();
我浏览了论坛主题,但没有遇到任何涉及拆分 avro 文件的内容。
我有预感我的 "content" 的 http 请求不正确。我缺少什么?
如果您需要更多详细信息,我很乐意提供。
我现在找到问题了。问题是因为 MultipartFormDataContent。当上传一个 avro 文件时,它会添加额外的文本,如内容类型等,同时删除许多行(我不知道为什么)。
因此,解决方案是将内容作为 "ByteArrayContent" 本身上传,而不是像我之前那样将其添加到 MultipartFormDataContent。
这是片段,几乎与问题中的片段相似,除了我不再使用 MultipartFormDataContent
string filenamefullyqualified = path + filename;
Stream stream = System.IO.File.Open(filenamefullyqualified, FileMode.Open, FileAccess.Read, FileShare.None);
//content.Add(CreateFileContent(fs, path, filename, "text/plain"));
long? position = 0;
byte[] buffer = new byte[(20 * 1024 * 1024) + 100];
long numBytesToRead = stream.Length;
int numBytesRead = 0;
//while ((bytesRead = stream.Read(buffer, 0, buffer.Length)) > 0)
//{
do
{
//var content = new MultipartFormDataContent();
int bytesRead = stream.Read(buffer, 0, buffer.Length);
byte[] actualbytes = new byte[bytesRead];
Array.Copy(buffer, actualbytes, bytesRead);
if (bytesRead == 0)
break;
//Append Data
url = String.Format("https://{0}.dfs.core.windows.net/raw/datawarehouse/{1}/{2}/{3}/{4}/{5}?action=append&position={6}", datalakeName, filename.Substring(0, filename.IndexOf("_")), year, month, day, filename, position.ToString());
numBytesRead += bytesRead;
numBytesToRead -= bytesRead;
ByteArrayContent byteContent = new ByteArrayContent(actualbytes);
//byteContent.Headers.ContentType= new MediaTypeHeaderValue("text/plain");
//content.Add(byteContent);
method = new HttpMethod("PATCH");
//request = new HttpRequestMessage(method, url)
//{
// Content = content
//};
request = new HttpRequestMessage(method, url)
{
Content = byteContent
};
request.Headers.Add("Authorization", "Bearer " + accesstoken);
var response = await client.SendAsync(request);
response.EnsureSuccessStatusCode();
position = position + request.Content.Headers.ContentLength;
Array.Clear(buffer, 0, buffer.Length);
} while (numBytesToRead > 0);
stream.Close();
但是按记录流将无法在事务中将AVRO 文件作为一个整体来处理。例如,如果某些记录失败,我们可能会部分成功。
如果我们有一个小工具可以根据记录的阈值数量拆分 AVRO 文件,那就太好了。
spark-based 按分区拆分技术允许将数据集拆分为 pre-defined 个文件;但是,它不允许根据记录数进行拆分。即,我不想要包含超过 500 条记录的 AVRO 文件。
因此我们必须根据应用程序可以处理的舒适堆大小以及 two-phase 提交来设计批处理逻辑,以处理事务