Spark 只写入一个 hbase 区域服务器
Spark write only to one hbase region server
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.PairRDDFunctions
def bulkWriteToHBase(sparkSession: SparkSession, sparkContext: SparkContext, jobContext: Map[String, String], sinkTableName: String, outRDD: RDD[(ImmutableBytesWritable, Put)]): Unit = {
val hConf = HBaseConfiguration.create()
hConf.set("hbase.zookeeper.quorum", jobContext("hbase.zookeeper.quorum"))
hConf.set("zookeeper.znode.parent", jobContext("zookeeper.znode.parent"))
hConf.set(TableInputFormat.INPUT_TABLE, sinkTableName)
val hJob = Job.getInstance(hConf)
hJob.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, sinkTableName)
hJob.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
outRDD.saveAsNewAPIHadoopDataset(hJob.getConfiguration())
}
我使用这个hbase批量插入发现,每次spark只会从hbase写入一个region server,这成为了瓶颈。
然而,当我使用几乎相同的方法但从 hbase 读取时,它使用多个执行程序进行并行读取。
def bulkReadFromHBase(sparkSession: SparkSession, sparkContext: SparkContext, jobContext: Map[String, String], sourceTableName: String) = {
val hConf = HBaseConfiguration.create()
hConf.set("hbase.zookeeper.quorum", jobContext("hbase.zookeeper.quorum"))
hConf.set("zookeeper.znode.parent", jobContext("zookeeper.znode.parent"))
hConf.set(TableInputFormat.INPUT_TABLE, sourceTableName)
val inputRDD = sparkContext.newAPIHadoopRDD(hConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
inputRDD
}
can anyone please explain why this could happen? or maybe I have
used the wrong way for spark-hbase bulk I/O ?
虽然您没有提供示例数据或足够的解释,但这主要不是您的代码或配置造成的。
由于 non-optimal rowkey 设计,它正在发生。
您正在编写的数据的键(hbase rowkey)结构不正确(可能单调递增或其他)。因此,写入其中一个区域是 happening.You 可以通过各种方式防止这种情况(rowkey 的各种推荐做法加盐、反相和其他技术等设计)。
作为参考,您可以查看 http://hbase.apache.org/book.html#rowkey.design
以防万一,如果您想知道写入是对所有区域并行完成还是一个一个地完成(问题不清楚),请查看以下内容:
http://hbase.apache.org/book.html#_bulk_load.
Question : I have used the wrong way for spark-hbase bulk I/O ?
不,你的方法不对,不过,你需要先pre-split regions & create table with presplit regions.
例如create 'test_table', 'f1', SPLITS=> ['1', '2', '3', '4', '5', '6', '7', '8', '9']
以上table占据9个区域..
设计好的 rowkey 将以 1-9 开头
你可以像下面这样使用 guava murmur hash。
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
/**
* getMurmurHash.
*
* @param content
* @return HashCode
*/
public static HashCode getMurmurHash(String content) {
final HashFunction hf = Hashing.murmur3_128();
final HashCode hc = hf.newHasher().putString(content, Charsets.UTF_8).hash();
return hc;
}
final long hash = getMurmur128Hash(Bytes.toString(yourrowkey as string)).asLong();
final int prefix = Math.abs((int) hash % 9);
现在将此前缀附加到您的行键
For example
1rowkey1 // will go in to first region
2rowkey2 // will go in to
second region
3rowkey3 // will go in to third region
...
9rowkey9 //
will go in to ninth region
如果您正在执行 pre-split 操作,并且想要手动管理区域拆分,您还可以通过将 hbase.hregion.max.filesize 设置为较高的数字并将拆分策略设置为 ConstantSizeRegionSplitPolicy 来禁用区域拆分。但是,您应该使用 100GB 之类的保护值,这样区域的增长就不会超出区域服务器的能力。您可以考虑禁用自动拆分并依赖 pre-splitting 中的初始区域集,例如,如果您对键前缀使用统一哈希,并且 您可以确保read/write 每个区域的负载及其大小在 table
中的各个区域是均匀的
1) 请确保在将数据加载到 hbase 之前可以预拆分 table table 2) 使用 murmurhash 或其他一些哈希技术设计好的 rowkey,如下所述。确保跨区域统一分配。
另请参阅 http://hortonworks.com/blog/apache-hbase-region-splitting-and-merging/
Question : can anyone please explain why this could happen?
原因很明显也很简单由于行键不佳table[=75=,将数据热点定位到一个特定原因]...
考虑 java 中的哈希映射,其中包含哈希码为 1234 的元素。那么它会填充一个桶中的所有元素不是吗?如果 hashmap 元素分布在不同的 good hashcode
中,那么它将把元素放在不同的桶中。 hbase也是如此。这里你的哈希码就像你的行键...
此外,
What happens if I already have a table and I want to split the regions
across...
RegionSplitter
class 提供了几个实用程序来帮助选择手动拆分区域而不是让 HBase 自动处理的开发人员管理生命周期。
最有用的实用程序是:
- 创建具有指定数量 pre-split 个区域的 table
- 在现有 table
上执行所有区域的滚动拆分
示例:
$ hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f f1
其中-c 10,指定请求的region个数为10,-f指定你想要的列族在table中,以“:”分隔。该工具将创建一个名为“test_table”的 table,包含 10 个区域:
13/01/18 18:49:32 DEBUG hbase.HRegionInfo: Current INFO from scan results = {NAME => 'test_table,,1358563771069.acc1ad1b7962564fc3a43e5907e8db33.', STARTKEY => '', ENDKEY => '19999999', ENCODED => acc1ad1b7962564fc3a43e5907e8db33,}
13/01/18 18:49:32 DEBUG hbase.HRegionInfo: Current INFO from scan results = {NAME => 'test_table,19999999,1358563771096.37ec12df6bd0078f5573565af415c91b.', STARTKEY => '19999999', ENDKEY => '33333332', ENCODED => 37ec12df6bd0078f5573565af415c91b,}
...
as discussed in comment, you found that my final RDD right before writing into hbase only has 1 partition! which indicates that there
was only one executor holding the entire data... I am still trying to
find out why.
另外,检查
spark.default.parallelism
defaults to the number of all cores on all
machines. The parallelize api has no parent RDD to determine the
number of partitions, so it uses the spark.default.parallelism
.
所以你可以通过重新分区来增加分区
注意:我观察到,在 Mapreduce 中,regions/input 拆分的分区数 = 启动的映射器数。同样,在您的情况下,数据加载到一个特定区域的情况可能相同这就是为什么一位执行者发射的原因。请也验证一下
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.PairRDDFunctions
def bulkWriteToHBase(sparkSession: SparkSession, sparkContext: SparkContext, jobContext: Map[String, String], sinkTableName: String, outRDD: RDD[(ImmutableBytesWritable, Put)]): Unit = {
val hConf = HBaseConfiguration.create()
hConf.set("hbase.zookeeper.quorum", jobContext("hbase.zookeeper.quorum"))
hConf.set("zookeeper.znode.parent", jobContext("zookeeper.znode.parent"))
hConf.set(TableInputFormat.INPUT_TABLE, sinkTableName)
val hJob = Job.getInstance(hConf)
hJob.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, sinkTableName)
hJob.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
outRDD.saveAsNewAPIHadoopDataset(hJob.getConfiguration())
}
我使用这个hbase批量插入发现,每次spark只会从hbase写入一个region server,这成为了瓶颈。
然而,当我使用几乎相同的方法但从 hbase 读取时,它使用多个执行程序进行并行读取。
def bulkReadFromHBase(sparkSession: SparkSession, sparkContext: SparkContext, jobContext: Map[String, String], sourceTableName: String) = {
val hConf = HBaseConfiguration.create()
hConf.set("hbase.zookeeper.quorum", jobContext("hbase.zookeeper.quorum"))
hConf.set("zookeeper.znode.parent", jobContext("zookeeper.znode.parent"))
hConf.set(TableInputFormat.INPUT_TABLE, sourceTableName)
val inputRDD = sparkContext.newAPIHadoopRDD(hConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
inputRDD
}
can anyone please explain why this could happen? or maybe I have used the wrong way for spark-hbase bulk I/O ?
虽然您没有提供示例数据或足够的解释,但这主要不是您的代码或配置造成的。 由于 non-optimal rowkey 设计,它正在发生。 您正在编写的数据的键(hbase rowkey)结构不正确(可能单调递增或其他)。因此,写入其中一个区域是 happening.You 可以通过各种方式防止这种情况(rowkey 的各种推荐做法加盐、反相和其他技术等设计)。 作为参考,您可以查看 http://hbase.apache.org/book.html#rowkey.design
以防万一,如果您想知道写入是对所有区域并行完成还是一个一个地完成(问题不清楚),请查看以下内容: http://hbase.apache.org/book.html#_bulk_load.
Question : I have used the wrong way for spark-hbase bulk I/O ?
不,你的方法不对,不过,你需要先pre-split regions & create table with presplit regions.
例如create 'test_table', 'f1', SPLITS=> ['1', '2', '3', '4', '5', '6', '7', '8', '9']
以上table占据9个区域..
设计好的 rowkey 将以 1-9 开头
你可以像下面这样使用 guava murmur hash。
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
/**
* getMurmurHash.
*
* @param content
* @return HashCode
*/
public static HashCode getMurmurHash(String content) {
final HashFunction hf = Hashing.murmur3_128();
final HashCode hc = hf.newHasher().putString(content, Charsets.UTF_8).hash();
return hc;
}
final long hash = getMurmur128Hash(Bytes.toString(yourrowkey as string)).asLong();
final int prefix = Math.abs((int) hash % 9);
现在将此前缀附加到您的行键
For example
1rowkey1 // will go in to first region
2rowkey2 // will go in to second region
3rowkey3 // will go in to third region ... 9rowkey9 // will go in to ninth region
如果您正在执行 pre-split 操作,并且想要手动管理区域拆分,您还可以通过将 hbase.hregion.max.filesize 设置为较高的数字并将拆分策略设置为 ConstantSizeRegionSplitPolicy 来禁用区域拆分。但是,您应该使用 100GB 之类的保护值,这样区域的增长就不会超出区域服务器的能力。您可以考虑禁用自动拆分并依赖 pre-splitting 中的初始区域集,例如,如果您对键前缀使用统一哈希,并且 您可以确保read/write 每个区域的负载及其大小在 table
中的各个区域是均匀的1) 请确保在将数据加载到 hbase 之前可以预拆分 table table 2) 使用 murmurhash 或其他一些哈希技术设计好的 rowkey,如下所述。确保跨区域统一分配。
另请参阅 http://hortonworks.com/blog/apache-hbase-region-splitting-and-merging/
Question : can anyone please explain why this could happen?
原因很明显也很简单由于行键不佳table[=75=,将数据热点定位到一个特定原因]...
考虑 java 中的哈希映射,其中包含哈希码为 1234 的元素。那么它会填充一个桶中的所有元素不是吗?如果 hashmap 元素分布在不同的 good hashcode
中,那么它将把元素放在不同的桶中。 hbase也是如此。这里你的哈希码就像你的行键...
此外,
What happens if I already have a table and I want to split the regions across...
RegionSplitter
class 提供了几个实用程序来帮助选择手动拆分区域而不是让 HBase 自动处理的开发人员管理生命周期。
最有用的实用程序是:
- 创建具有指定数量 pre-split 个区域的 table
- 在现有 table 上执行所有区域的滚动拆分
示例:
$ hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f f1
其中-c 10,指定请求的region个数为10,-f指定你想要的列族在table中,以“:”分隔。该工具将创建一个名为“test_table”的 table,包含 10 个区域:
13/01/18 18:49:32 DEBUG hbase.HRegionInfo: Current INFO from scan results = {NAME => 'test_table,,1358563771069.acc1ad1b7962564fc3a43e5907e8db33.', STARTKEY => '', ENDKEY => '19999999', ENCODED => acc1ad1b7962564fc3a43e5907e8db33,}
13/01/18 18:49:32 DEBUG hbase.HRegionInfo: Current INFO from scan results = {NAME => 'test_table,19999999,1358563771096.37ec12df6bd0078f5573565af415c91b.', STARTKEY => '19999999', ENDKEY => '33333332', ENCODED => 37ec12df6bd0078f5573565af415c91b,}
...
as discussed in comment, you found that my final RDD right before writing into hbase only has 1 partition! which indicates that there was only one executor holding the entire data... I am still trying to find out why.
另外,检查
spark.default.parallelism
defaults to the number of all cores on all machines. The parallelize api has no parent RDD to determine the number of partitions, so it uses thespark.default.parallelism
.
所以你可以通过重新分区来增加分区