Spark to Hbase Table 没有显示完整的数据记录

Spark to Hbase Table is not showing complete data records

我有一个包含 30k 条记录的日志文件,我从 Kafka 发布它并通过 spark 将其保存到 HBase 中。在 30K 条记录中,我在 HBase 中只能看到 4K 条记录table。

  1. 我已尝试将流保存在 MySQL 中,它正在正确保存 MySql 中的所有记录。
  2. 但是在 HBase 中,如果我在 Kafka 主题中发布一个包含 100 条记录的文件,它会在 HBase 中保存 36 条记录table,而如果我发布 30K 条记录,Hbase 只会显示 4k 条记录。
  3. 此外,HBase 中的记录(行)不是像 1..3..10..17 这样的顺序。 final Job newAPIJobConfiguration1 = Job.getInstance(config); newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "logs"); newAPIJobConfiguration1.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class); HTable hTable = new HTable(config, "country"); lines.foreachRDD((rdd,time)-> { // Get the singleton instance of SparkSession SparkSession spark = SparkSession.builder().config(rdd.context().getConf()).getOrCreate(); // Convert RDD[String] to RDD[case class] to DataFrame JavaRDD rowRDD = rdd.map(line -> { String[] logLine = line.split(" +"); Log record = new Log(); record.setTime((logLine[0])); record.setTime_taken((logLine[1])); record.setIp(logLine[2]); return record; }); saveToHBase(rowRDD, newAPIJobConfiguration1.getConfiguration()); }); ssc.start(); ssc.awaitTermination(); } //6. saveToHBase method - insert data into HBase public static void saveToHBase(JavaRDD rowRDD, Configuration conf) throws IOException { // create Key, Value pair to store in HBase JavaPairRDD hbasePuts = rowRDD.mapToPair( new PairFunction() { private static final long serialVersionUID = 1L; @Override public Tuple2 call(Log row) throws Exception { Put put = new Put(Bytes.toBytes(System.currentTimeMillis())); //put.addColumn(Bytes.toBytes("sparkaf"), Bytes.toBytes("message"), Bytes.toBytes(row.getMessage())); put.addImmutable(Bytes.toBytes("time"), Bytes.toBytes("col1"), Bytes.toBytes(row.getTime())); put.addImmutable(Bytes.toBytes("time_taken"), Bytes.toBytes("col2"), Bytes.toBytes(row.getTime_taken())); put.addImmutable(Bytes.toBytes("ip"), Bytes.toBytes("col3"), Bytes.toBytes(row.getIp())); return new Tuple2(new ImmutableBytesWritable(), put); } }); // save to HBase- Spark built-in API method //hbasePuts.saveAsNewAPIHadoopDataset(conf); hbasePuts.saveAsNewAPIHadoopDataset(conf);

由于HBase通过rowkey唯一存储记录,所以很可能你正在覆盖记录。

您正在使用当前时间(以毫秒为单位)作为行键,使用相同行键创建的任何记录都将覆盖旧记录。

Put put = new Put(Bytes.toBytes(System.currentTimeMillis()));

因此,如果在 1 毫秒内创建了 100 个 Put,那么 HBase 中只会显示 100 个,因为同一行被覆盖了 99 次。

HBase 中的 4k 行键很可能是加载数据所花费的 4k 独特毫秒(4 秒)。

我建议使用不同的行键设计。另外,作为旁注,在 HBase 中使用单调递增的行键通常不是一个好主意: Further Information