Spark to Hbase Table 没有显示完整的数据记录
Spark to Hbase Table is not showing complete data records
我有一个包含 30k 条记录的日志文件,我从 Kafka 发布它并通过 spark 将其保存到 HBase 中。在 30K 条记录中,我在 HBase 中只能看到 4K 条记录table。
- 我已尝试将流保存在 MySQL 中,它正在正确保存 MySql 中的所有记录。
- 但是在 HBase 中,如果我在 Kafka 主题中发布一个包含 100 条记录的文件,它会在 HBase 中保存 36 条记录table,而如果我发布 30K 条记录,Hbase 只会显示 4k 条记录。
- 此外,HBase 中的记录(行)不是像 1..3..10..17 这样的顺序。
final Job newAPIJobConfiguration1 = Job.getInstance(config); newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "logs"); newAPIJobConfiguration1.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class);
HTable hTable = new HTable(config, "country");
lines.foreachRDD((rdd,time)->
{
// Get the singleton instance of SparkSession
SparkSession spark = SparkSession.builder().config(rdd.context().getConf()).getOrCreate();
// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD rowRDD = rdd.map(line -> {
String[] logLine = line.split(" +");
Log record = new Log();
record.setTime((logLine[0]));
record.setTime_taken((logLine[1]));
record.setIp(logLine[2]);
return record;
});
saveToHBase(rowRDD, newAPIJobConfiguration1.getConfiguration());
});
ssc.start();
ssc.awaitTermination();
}
//6. saveToHBase method - insert data into HBase
public static void saveToHBase(JavaRDD rowRDD, Configuration conf) throws IOException {
// create Key, Value pair to store in HBase
JavaPairRDD hbasePuts = rowRDD.mapToPair(
new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Log row) throws Exception {
Put put = new Put(Bytes.toBytes(System.currentTimeMillis()));
//put.addColumn(Bytes.toBytes("sparkaf"), Bytes.toBytes("message"), Bytes.toBytes(row.getMessage()));
put.addImmutable(Bytes.toBytes("time"), Bytes.toBytes("col1"), Bytes.toBytes(row.getTime()));
put.addImmutable(Bytes.toBytes("time_taken"), Bytes.toBytes("col2"), Bytes.toBytes(row.getTime_taken()));
put.addImmutable(Bytes.toBytes("ip"), Bytes.toBytes("col3"), Bytes.toBytes(row.getIp()));
return new Tuple2(new ImmutableBytesWritable(), put);
}
});
// save to HBase- Spark built-in API method
//hbasePuts.saveAsNewAPIHadoopDataset(conf);
hbasePuts.saveAsNewAPIHadoopDataset(conf);
由于HBase通过rowkey唯一存储记录,所以很可能你正在覆盖记录。
您正在使用当前时间(以毫秒为单位)作为行键,使用相同行键创建的任何记录都将覆盖旧记录。
Put put = new Put(Bytes.toBytes(System.currentTimeMillis()));
因此,如果在 1 毫秒内创建了 100 个 Put,那么 HBase 中只会显示 100 个,因为同一行被覆盖了 99 次。
HBase 中的 4k 行键很可能是加载数据所花费的 4k 独特毫秒(4 秒)。
我建议使用不同的行键设计。另外,作为旁注,在 HBase 中使用单调递增的行键通常不是一个好主意:
Further Information
我有一个包含 30k 条记录的日志文件,我从 Kafka 发布它并通过 spark 将其保存到 HBase 中。在 30K 条记录中,我在 HBase 中只能看到 4K 条记录table。
- 我已尝试将流保存在 MySQL 中,它正在正确保存 MySql 中的所有记录。
- 但是在 HBase 中,如果我在 Kafka 主题中发布一个包含 100 条记录的文件,它会在 HBase 中保存 36 条记录table,而如果我发布 30K 条记录,Hbase 只会显示 4k 条记录。
- 此外,HBase 中的记录(行)不是像 1..3..10..17 这样的顺序。
final Job newAPIJobConfiguration1 = Job.getInstance(config); newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "logs"); newAPIJobConfiguration1.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class); HTable hTable = new HTable(config, "country"); lines.foreachRDD((rdd,time)-> { // Get the singleton instance of SparkSession SparkSession spark = SparkSession.builder().config(rdd.context().getConf()).getOrCreate(); // Convert RDD[String] to RDD[case class] to DataFrame JavaRDD rowRDD = rdd.map(line -> { String[] logLine = line.split(" +"); Log record = new Log(); record.setTime((logLine[0])); record.setTime_taken((logLine[1])); record.setIp(logLine[2]); return record; }); saveToHBase(rowRDD, newAPIJobConfiguration1.getConfiguration()); }); ssc.start(); ssc.awaitTermination(); } //6. saveToHBase method - insert data into HBase public static void saveToHBase(JavaRDD rowRDD, Configuration conf) throws IOException { // create Key, Value pair to store in HBase JavaPairRDD hbasePuts = rowRDD.mapToPair( new PairFunction() { private static final long serialVersionUID = 1L; @Override public Tuple2 call(Log row) throws Exception { Put put = new Put(Bytes.toBytes(System.currentTimeMillis())); //put.addColumn(Bytes.toBytes("sparkaf"), Bytes.toBytes("message"), Bytes.toBytes(row.getMessage())); put.addImmutable(Bytes.toBytes("time"), Bytes.toBytes("col1"), Bytes.toBytes(row.getTime())); put.addImmutable(Bytes.toBytes("time_taken"), Bytes.toBytes("col2"), Bytes.toBytes(row.getTime_taken())); put.addImmutable(Bytes.toBytes("ip"), Bytes.toBytes("col3"), Bytes.toBytes(row.getIp())); return new Tuple2(new ImmutableBytesWritable(), put); } }); // save to HBase- Spark built-in API method //hbasePuts.saveAsNewAPIHadoopDataset(conf); hbasePuts.saveAsNewAPIHadoopDataset(conf);
由于HBase通过rowkey唯一存储记录,所以很可能你正在覆盖记录。
您正在使用当前时间(以毫秒为单位)作为行键,使用相同行键创建的任何记录都将覆盖旧记录。
Put put = new Put(Bytes.toBytes(System.currentTimeMillis()));
因此,如果在 1 毫秒内创建了 100 个 Put,那么 HBase 中只会显示 100 个,因为同一行被覆盖了 99 次。
HBase 中的 4k 行键很可能是加载数据所花费的 4k 独特毫秒(4 秒)。
我建议使用不同的行键设计。另外,作为旁注,在 HBase 中使用单调递增的行键通常不是一个好主意: Further Information