如何将 CSV 文件加载到 Apache Arrow vectors 并将箭头文件保存到磁盘
How to load a CSV file into Apache Arrow vectors and save an arrow file to disk
我目前正在使用 Apache Arrow 的 java API(尽管我将它从 Scala 用于代码示例)以熟悉此工具。
作为练习,我选择将 CSV 文件加载到箭头向量中,然后将它们保存到箭头文件中。第一部分看起来很简单,我试了一下:
val csvLines: Stream[Array[String]] = <open stream from CSV parser>
// There are other types of allocator, but things work with this one...
val allocator = new RootAllocator(Int.MaxValue)
// Initialize the vectors
val vectors = initVectors(csvLines.head, allocator)
// Put their mutators into an array for easy access
val mutators = vectors.map(_.getMutator)
// Work on the data, zipping it with its index
Stream.from(0)
.zip(csvLines.tail) // Work on the tail (head contains the headers)
.foreach(rowTup => // rowTup = (index, csvRow as an Array[String])
Range(0, rowTup._2.size) // Iterate on each column...
.foreach(columnNumber =>
writeToMutator(
mutators(columnNumber), // get that column's mutator
idx=rowTup._1, // pass the current row number
data=rowTup._2(columnNumber) // pass the entry of the curernt column
)
)
)
initVectors()
和 writeToMutator()
定义为:
def initVectors(
columns: Array[String],
alloc: RootAllocator): Array[NullableVarCharVector] = {
// Initialize a vector for each column
val vectors = columns.map(colName =>
new NullableVarCharVector(colName, alloc))
// 4096 size, for 1024 values initially. This is arbitrary
vectors.foreach(_.allocateNew(2^12,1024))
vectors
}
def writeToMutator(
mutator: NullableVarCharVector#Mutator,
idx: Int,
data: String): Unit = {
// The CSV may contain null values
if (data != null) {
val bytes = data.getBytes()
mutator.setSafe(idx, bytes, 0, bytes.length)
}
mutator.setNull(idx)
}
(我目前不关心使用正确的类型,并将所有内容存储为字符串,或者 VarChar
箭头的形式)
所以在这一点上我有一个 NullableVarCharVector
的集合并且可以读写 from/to 它们。在这一点上一切都很棒。不过,现在,对于下一步,我想知道如何将它们实际包装在一起并将它们序列化为箭头文件。我偶然发现了一个 AbstractFieldWriter
抽象 class,但不清楚如何使用这些实现。
所以,问题主要是:
- 将一堆矢量保存到箭头文件的(最好的?- 似乎有多种)方法是什么。
- 还有其他方法可以将 CSV 列加载到箭头向量吗?
编辑添加: metadata description page 提供了关于该主题的良好总体概述。
api 的测试 classes 似乎包含一些可能有用的东西,我会 post 回复一个样本已经试过了。
查看 TestArrowFile.java and BaseFileTest.java 我发现:
- 如何将单个箭头文件写入磁盘
- 另一种填充矢量的方法,因为我的第一次尝试阻止了我组装单个箭头文件(或者至少以直接的方式这样做)。
所以,填充向量现在看起来像:
// Open stream of rows
val csvLines: Stream[Array[String]] = <open stream from CSV parser>
// Define a parent to hold the vectors
val parent = MapVector.empty("parent", allocator)
// Create a new writer. VarCharWriterImpl would probably do as well?
val writer = new ComplexWriterImpl("root", parent)
// Initialise a writer for each column, using the header as the name
val rootWriter = writer.rootAsMap()
val writers = csvLines.head.map(colName =>
rootWriter.varChar(colName))
Stream.from(0)
.zip(csvLines.tail) // Zip the rows with their index
.foreach( rowTup => { // Iterate on each (index, row) tuple
val (idx, row) = rowTup
Range(0, row.size) // Iterate on each field of the row
.foreach(column =>
Option(row(column)) // row(column) may be null,
.foreach(str => // use the option as a null check
write(writers(column), idx, allocator, str)
)
)
}
)
toFile(parent.getChild("root"), "csv.arrow") // Save everything to a file
write
定义为:
def write(writer: VarCharWriter, idx: Int,
allocator: BufferAllocator, data: String): Unit = {
// Set the position to the correct index
writer.setPosition(idx)
val bytes = data.getBytes()
// Apparently the allocator is required again to build a new buffer
val varchar = allocator.buffer(bytes.length)
varchar.setBytes(0, data.getBytes())
writer.writeVarChar(0, bytes.length, varchar)
}
def toFile(parent: FieldVector, fName: String): Unit = {
// Extract a schema from the parent: that's the part I struggled with in the original question
val rootSchema = new VectorSchemaRoot(parent)
val stream = new FileOutputStream(fName)
val fileWriter = new ArrowFileWriter(
rootSchema,
null, // We don't use dictionary encoding.
stream.getChannel)
// Write everything to file...
fileWriter.start()
fileWriter.writeBatch()
fileWriter.end()
stream.close()
}
通过以上,我可以将 CSV 文件保存到文件中。我通过读取它并再次将其转换为 CSV 来检查一切是否正常,并且内容没有改变。
请注意,ComplexWriterImpl
允许写入不同类型的列,这将派上用场,以避免将数字列存储为字符串。
(我现在正在玩阅读方面的东西,这些东西可能值得他们自己提出问题。)
我目前正在使用 Apache Arrow 的 java API(尽管我将它从 Scala 用于代码示例)以熟悉此工具。
作为练习,我选择将 CSV 文件加载到箭头向量中,然后将它们保存到箭头文件中。第一部分看起来很简单,我试了一下:
val csvLines: Stream[Array[String]] = <open stream from CSV parser>
// There are other types of allocator, but things work with this one...
val allocator = new RootAllocator(Int.MaxValue)
// Initialize the vectors
val vectors = initVectors(csvLines.head, allocator)
// Put their mutators into an array for easy access
val mutators = vectors.map(_.getMutator)
// Work on the data, zipping it with its index
Stream.from(0)
.zip(csvLines.tail) // Work on the tail (head contains the headers)
.foreach(rowTup => // rowTup = (index, csvRow as an Array[String])
Range(0, rowTup._2.size) // Iterate on each column...
.foreach(columnNumber =>
writeToMutator(
mutators(columnNumber), // get that column's mutator
idx=rowTup._1, // pass the current row number
data=rowTup._2(columnNumber) // pass the entry of the curernt column
)
)
)
initVectors()
和 writeToMutator()
定义为:
def initVectors(
columns: Array[String],
alloc: RootAllocator): Array[NullableVarCharVector] = {
// Initialize a vector for each column
val vectors = columns.map(colName =>
new NullableVarCharVector(colName, alloc))
// 4096 size, for 1024 values initially. This is arbitrary
vectors.foreach(_.allocateNew(2^12,1024))
vectors
}
def writeToMutator(
mutator: NullableVarCharVector#Mutator,
idx: Int,
data: String): Unit = {
// The CSV may contain null values
if (data != null) {
val bytes = data.getBytes()
mutator.setSafe(idx, bytes, 0, bytes.length)
}
mutator.setNull(idx)
}
(我目前不关心使用正确的类型,并将所有内容存储为字符串,或者 VarChar
箭头的形式)
所以在这一点上我有一个 NullableVarCharVector
的集合并且可以读写 from/to 它们。在这一点上一切都很棒。不过,现在,对于下一步,我想知道如何将它们实际包装在一起并将它们序列化为箭头文件。我偶然发现了一个 AbstractFieldWriter
抽象 class,但不清楚如何使用这些实现。
所以,问题主要是:
- 将一堆矢量保存到箭头文件的(最好的?- 似乎有多种)方法是什么。
- 还有其他方法可以将 CSV 列加载到箭头向量吗?
编辑添加: metadata description page 提供了关于该主题的良好总体概述。
api 的测试 classes 似乎包含一些可能有用的东西,我会 post 回复一个样本已经试过了。
查看 TestArrowFile.java and BaseFileTest.java 我发现:
- 如何将单个箭头文件写入磁盘
- 另一种填充矢量的方法,因为我的第一次尝试阻止了我组装单个箭头文件(或者至少以直接的方式这样做)。
所以,填充向量现在看起来像:
// Open stream of rows
val csvLines: Stream[Array[String]] = <open stream from CSV parser>
// Define a parent to hold the vectors
val parent = MapVector.empty("parent", allocator)
// Create a new writer. VarCharWriterImpl would probably do as well?
val writer = new ComplexWriterImpl("root", parent)
// Initialise a writer for each column, using the header as the name
val rootWriter = writer.rootAsMap()
val writers = csvLines.head.map(colName =>
rootWriter.varChar(colName))
Stream.from(0)
.zip(csvLines.tail) // Zip the rows with their index
.foreach( rowTup => { // Iterate on each (index, row) tuple
val (idx, row) = rowTup
Range(0, row.size) // Iterate on each field of the row
.foreach(column =>
Option(row(column)) // row(column) may be null,
.foreach(str => // use the option as a null check
write(writers(column), idx, allocator, str)
)
)
}
)
toFile(parent.getChild("root"), "csv.arrow") // Save everything to a file
write
定义为:
def write(writer: VarCharWriter, idx: Int,
allocator: BufferAllocator, data: String): Unit = {
// Set the position to the correct index
writer.setPosition(idx)
val bytes = data.getBytes()
// Apparently the allocator is required again to build a new buffer
val varchar = allocator.buffer(bytes.length)
varchar.setBytes(0, data.getBytes())
writer.writeVarChar(0, bytes.length, varchar)
}
def toFile(parent: FieldVector, fName: String): Unit = {
// Extract a schema from the parent: that's the part I struggled with in the original question
val rootSchema = new VectorSchemaRoot(parent)
val stream = new FileOutputStream(fName)
val fileWriter = new ArrowFileWriter(
rootSchema,
null, // We don't use dictionary encoding.
stream.getChannel)
// Write everything to file...
fileWriter.start()
fileWriter.writeBatch()
fileWriter.end()
stream.close()
}
通过以上,我可以将 CSV 文件保存到文件中。我通过读取它并再次将其转换为 CSV 来检查一切是否正常,并且内容没有改变。
请注意,ComplexWriterImpl
允许写入不同类型的列,这将派上用场,以避免将数字列存储为字符串。
(我现在正在玩阅读方面的东西,这些东西可能值得他们自己提出问题。)