哪个 Java 文件将 Hadoop HDFS 文件拆分为块
Which Java file does Hadoop HDFS file splitting into Blocks
众所周知,当一个文本文件从本地复制到HDFS时,文件被分割成固定大小的128MB。例如,当我将一个 256 MB 的文本文件复制到 HDFS 时,将有 2 个块 (256/128) 包含 "splitted" 文件。
有人能告诉我 Hadoop 2.7.1 源代码中哪个 java/jar 文件具有将文件拆分为块的功能,哪个 java/jar 文件将块写入数据节点的目录。
帮我追踪这段代码。
我只找到了他们对 FileInputFormat.java 中的块进行逻辑输入拆分的那个,这不是我需要的。我需要要拆分的物理文件的 java 文件。
它不是一个jar/java文件,它具有分割文件的功能。执行此任务的是客户端守护程序。当你从本地加载一个文件时,客户端首先只读取 128MB,它通过询问 namenode 找到一个存储它的地方,它还确保文件被正确复制和复制。此阶段的客户端不知道文件的实际大小,除非它以相同的方式读取所有块。
你说的FileInputFormat.java是hdfs存储文件时没有用到的。当您想 运行 该文件上的任何 mapreduce 任务时使用它。与文件存储无关
将数据写入 DataNode 的代码存在于 2 个文件中:
DFSOutputStream.java
(包:org.apache.hadoop.hdfs
)
客户端写入的数据被分成数据包(通常为 64k 大小)。当一个数据包准备就绪时,数据被排入数据队列,由 DataStreamer
拾取。
DataStreamer
(包:org.apache.hadoop.hdfs
)
它拾取数据队列中的数据包并将它们发送到管道中的数据节点(通常数据管道中有 3 个数据节点,因为复制因子为 3)。
它检索一个新的块 ID 并开始将数据流式传输到数据节点。当写入一个数据块时,它会关闭当前块并获取一个新块来写入下一组数据包。
获取新区块的代码如下:
// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if(LOG.isDebugEnabled()) {
LOG.debug("Allocating new block");
}
setPipeline(nextBlockOutputStream());
initDataStreaming();
}
关闭当前块的代码如下:
// Is this block full?
if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!shouldStop() && ackQueue.size() != 0) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
if (shouldStop()) {
continue;
}
endBlock();
}
在endBlock()
方法中,舞台再次设置为:
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
这意味着,一个新的管道被创建用于将下一组数据包写入一个新的块。
编辑:如何检测块结束?
随着DataStreamer
不断将数据附加到块中,它会更新写入的字节数。
/**
* increase bytes of current block by len.
*
* @param len how many bytes to increase to current block
*/
void incBytesCurBlock(long len) {
this.bytesCurBlock += len;
}
它还会不断检查写入的字节数是否等于块大小:
// If packet is full, enqueue it for transmission
//
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) {
enqueueCurrentPacketFull();
}
在上面的语句中,以下条件检查是否达到块大小:
getStreamer().getBytesCurBlock() == blockSize)
如果遇到块边界,则调用 endBlock()
方法:
/**
* if encountering a block boundary, send an empty packet to
* indicate the end of block and reset bytesCurBlock.
*
* @throws IOException
*/
protected void endBlock() throws IOException {
if (getStreamer().getBytesCurBlock() == blockSize) {
setCurrentPacketToEmpty();
enqueueCurrentPacket();
getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0;
}
}
这将确保当前块被关闭并从Name Node
获得一个新块用于写入数据。
块大小由 hdfs-site.xml
文件中的 dfs.blocksize
参数决定(在我的集群中设置为 128 MB = 134217728):
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
<description>The default block size for new files, in bytes.
You can use the following suffix (case insensitive): k(kilo),
m(mega), g(giga), t(tera), p(peta), e(exa) to specify the
size (such as 128k, 512m, 1g, etc.), Or provide complete size
in bytes (such as 134217728 for 128 MB).
</description>
</property>
众所周知,当一个文本文件从本地复制到HDFS时,文件被分割成固定大小的128MB。例如,当我将一个 256 MB 的文本文件复制到 HDFS 时,将有 2 个块 (256/128) 包含 "splitted" 文件。
有人能告诉我 Hadoop 2.7.1 源代码中哪个 java/jar 文件具有将文件拆分为块的功能,哪个 java/jar 文件将块写入数据节点的目录。
帮我追踪这段代码。
我只找到了他们对 FileInputFormat.java 中的块进行逻辑输入拆分的那个,这不是我需要的。我需要要拆分的物理文件的 java 文件。
它不是一个jar/java文件,它具有分割文件的功能。执行此任务的是客户端守护程序。当你从本地加载一个文件时,客户端首先只读取 128MB,它通过询问 namenode 找到一个存储它的地方,它还确保文件被正确复制和复制。此阶段的客户端不知道文件的实际大小,除非它以相同的方式读取所有块。
你说的FileInputFormat.java是hdfs存储文件时没有用到的。当您想 运行 该文件上的任何 mapreduce 任务时使用它。与文件存储无关
将数据写入 DataNode 的代码存在于 2 个文件中:
DFSOutputStream.java
(包:org.apache.hadoop.hdfs
)客户端写入的数据被分成数据包(通常为 64k 大小)。当一个数据包准备就绪时,数据被排入数据队列,由
DataStreamer
拾取。DataStreamer
(包:org.apache.hadoop.hdfs
)它拾取数据队列中的数据包并将它们发送到管道中的数据节点(通常数据管道中有 3 个数据节点,因为复制因子为 3)。
它检索一个新的块 ID 并开始将数据流式传输到数据节点。当写入一个数据块时,它会关闭当前块并获取一个新块来写入下一组数据包。
获取新区块的代码如下:
// get new block from namenode. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { if(LOG.isDebugEnabled()) { LOG.debug("Allocating new block"); } setPipeline(nextBlockOutputStream()); initDataStreaming(); }
关闭当前块的代码如下:
// Is this block full? if (one.isLastPacketInBlock()) { // wait for the close packet has been acked synchronized (dataQueue) { while (!shouldStop() && ackQueue.size() != 0) { dataQueue.wait(1000);// wait for acks to arrive from datanodes } } if (shouldStop()) { continue; } endBlock(); }
在
endBlock()
方法中,舞台再次设置为:stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
这意味着,一个新的管道被创建用于将下一组数据包写入一个新的块。
编辑:如何检测块结束?
随着DataStreamer
不断将数据附加到块中,它会更新写入的字节数。
/**
* increase bytes of current block by len.
*
* @param len how many bytes to increase to current block
*/
void incBytesCurBlock(long len) {
this.bytesCurBlock += len;
}
它还会不断检查写入的字节数是否等于块大小:
// If packet is full, enqueue it for transmission
//
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) {
enqueueCurrentPacketFull();
}
在上面的语句中,以下条件检查是否达到块大小:
getStreamer().getBytesCurBlock() == blockSize)
如果遇到块边界,则调用 endBlock()
方法:
/**
* if encountering a block boundary, send an empty packet to
* indicate the end of block and reset bytesCurBlock.
*
* @throws IOException
*/
protected void endBlock() throws IOException {
if (getStreamer().getBytesCurBlock() == blockSize) {
setCurrentPacketToEmpty();
enqueueCurrentPacket();
getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0;
}
}
这将确保当前块被关闭并从Name Node
获得一个新块用于写入数据。
块大小由 hdfs-site.xml
文件中的 dfs.blocksize
参数决定(在我的集群中设置为 128 MB = 134217728):
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
<description>The default block size for new files, in bytes.
You can use the following suffix (case insensitive): k(kilo),
m(mega), g(giga), t(tera), p(peta), e(exa) to specify the
size (such as 128k, 512m, 1g, etc.), Or provide complete size
in bytes (such as 134217728 for 128 MB).
</description>
</property>