Spark Streaming - HBase 批量加载
Spark Streaming - HBase Bulk Load
我目前正在使用 Python 将 CSV 数据批量加载到 HBase table,并且我目前在使用 saveAsNewAPIHadoopFile
[=15 编写适当的 HFile 时遇到了问题=]
我的代码目前如下所示:
def csv_to_key_value(row):
cols = row.split(",")
result = ((cols[0], [cols[0], "f1", "c1", cols[1]]),
(cols[0], [cols[0], "f2", "c2", cols[2]]),
(cols[0], [cols[0], "f3", "c3", cols[3]]))
return result
def bulk_load(rdd):
conf = {#Ommitted to simplify}
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
.flatMap(csv_to_key_value)
if not load_rdd.isEmpty():
load_rdd.saveAsNewAPIHadoopFile("file:///tmp/hfiles" + startTime,
"org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",
conf=conf,
keyConverter=keyConv,
valueConverter=valueConv)
else:
print("Nothing to process")
当我运行这段代码时,我得到以下错误:
java.io.IOException: Added a key not lexically larger than previous. Current cell = 10/f1:c1/1453891407213/Minimum/vlen=1/seqid=0, lastCell = /f1:c1/1453891407212/Minimum/vlen=1/seqid=0 at org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:204)
由于报错表明是key的问题,我从我的RDD中抓取了元素,它们如下(为了便于阅读而格式化)
[(u'1', [u'1', 'f1', 'c1', u'A']),
(u'1', [u'1', 'f2', 'c2', u'1A']),
(u'1', [u'1', 'f3', 'c3', u'10']),
(u'2', [u'2', 'f1', 'c1', u'B']),
(u'2', [u'2', 'f2', 'c2', u'2B']),
(u'2', [u'2', 'f3', 'c3', u'9']),
。 . .
(u'9', [u'9', 'f1', 'c1', u'I']),
(u'9', [u'9', 'f2', 'c2', u'3C']),
(u'9', [u'9', 'f3', 'c3', u'2']),
(u'10', [u'10', 'f1', 'c1', u'J']),
(u'10', [u'10', 'f2', 'c2', u'1A']),
(u'10', [u'10', 'f3', 'c3', u'1'])]
这与我的 CSV 完全匹配,顺序正确。据我了解,在 HBase 中,一个键由 {row, family, timestamp} 定义。行和族的组合是唯一的,并且对于我数据中的所有条目都是单调递增的,而且我无法控制时间戳(这是我能想象的唯一问题)
谁能告诉我如何解决 avoid/prevent 此类问题?
好吧,这只是我的一个愚蠢错误,我觉得有点愚蠢。按字典顺序,顺序应为 1、10、2、3 ... 8、9。在加载前保证正确排序的最简单方法是:
rdd.sortByKey(true);
我希望至少可以让一个人摆脱我的头痛。
我目前正在使用 Python 将 CSV 数据批量加载到 HBase table,并且我目前在使用 saveAsNewAPIHadoopFile
[=15 编写适当的 HFile 时遇到了问题=]
我的代码目前如下所示:
def csv_to_key_value(row):
cols = row.split(",")
result = ((cols[0], [cols[0], "f1", "c1", cols[1]]),
(cols[0], [cols[0], "f2", "c2", cols[2]]),
(cols[0], [cols[0], "f3", "c3", cols[3]]))
return result
def bulk_load(rdd):
conf = {#Ommitted to simplify}
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
.flatMap(csv_to_key_value)
if not load_rdd.isEmpty():
load_rdd.saveAsNewAPIHadoopFile("file:///tmp/hfiles" + startTime,
"org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",
conf=conf,
keyConverter=keyConv,
valueConverter=valueConv)
else:
print("Nothing to process")
当我运行这段代码时,我得到以下错误:
java.io.IOException: Added a key not lexically larger than previous. Current cell = 10/f1:c1/1453891407213/Minimum/vlen=1/seqid=0, lastCell = /f1:c1/1453891407212/Minimum/vlen=1/seqid=0 at org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:204)
由于报错表明是key的问题,我从我的RDD中抓取了元素,它们如下(为了便于阅读而格式化)
[(u'1', [u'1', 'f1', 'c1', u'A']),
(u'1', [u'1', 'f2', 'c2', u'1A']),
(u'1', [u'1', 'f3', 'c3', u'10']),
(u'2', [u'2', 'f1', 'c1', u'B']),
(u'2', [u'2', 'f2', 'c2', u'2B']),
(u'2', [u'2', 'f3', 'c3', u'9']),
。 . .
(u'9', [u'9', 'f1', 'c1', u'I']),
(u'9', [u'9', 'f2', 'c2', u'3C']),
(u'9', [u'9', 'f3', 'c3', u'2']),
(u'10', [u'10', 'f1', 'c1', u'J']),
(u'10', [u'10', 'f2', 'c2', u'1A']),
(u'10', [u'10', 'f3', 'c3', u'1'])]
这与我的 CSV 完全匹配,顺序正确。据我了解,在 HBase 中,一个键由 {row, family, timestamp} 定义。行和族的组合是唯一的,并且对于我数据中的所有条目都是单调递增的,而且我无法控制时间戳(这是我能想象的唯一问题)
谁能告诉我如何解决 avoid/prevent 此类问题?
好吧,这只是我的一个愚蠢错误,我觉得有点愚蠢。按字典顺序,顺序应为 1、10、2、3 ... 8、9。在加载前保证正确排序的最简单方法是:
rdd.sortByKey(true);
我希望至少可以让一个人摆脱我的头痛。