Spark Streaming:使用PairRDD.saveAsNewHadoopDataset函数将数据保存到HBase
Spark Streaming: Using PairRDD.saveAsNewHadoopDataset function to save data to HBase
我想在 HBase 数据库中保存 Twitter 流。我现在拥有的是用于接收和转换数据的 Saprk 应用程序。但是我不知道如何将我的 TwitterStream 保存到 HBase 中?
我发现唯一有用的是 PairRDD.saveAsNewAPIHadoopDataset(conf)
方法。但是我应该如何使用它,我必须进行哪些配置才能将 RDD 数据保存到我的 HBase table?
我唯一找到的是 HBase 客户端库,它可以通过 Put 对象将数据插入 table。但这不是 Spark 程序内部的解决方案,是吗(有必要遍历 RDD 内的所有项目!!)?
有人可以在 JAVA 中举个例子吗?我的主要问题似乎是 org.apache.hadoop.conf.Configuration
实例的设置,我必须在 saveAsNewAPIHadoopDataset
...
中提交
这里是一段代码:
JavaReceiverInputDStream<Status> statusDStream = TwitterUtils.createStream(streamingCtx);
JavaPairDStream<Long, String> statusPairDStream = statusDStream.mapToPair(new PairFunction<Status, Long, String>() {
public Tuple2<Long, String> call(Status status) throws Exception {
return new Tuple2<Long, String> (status.getId(), status.getText());
}
});
statusPairDStream.foreachRDD(new Function<JavaPairRDD<Long,String>, Void>() {
public Void call(JavaPairRDD<Long, String> status) throws Exception {
org.apache.hadoop.conf.Configuration conf = new Configuration();
status.saveAsNewAPIHadoopDataset(conf);
// HBase PUT here can't be correct!?
return null;
}
});
First thing is functions are discouraged, if you are using java 8. Pls. use lambda.
下面的代码片段可以解决您的所有问题。
示例片段:
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
....
public static void processYourMessages(final JavaRDD<YourMessage> rdd, final HiveContext sqlContext,
, MyMessageUtil messageutil) throws Exception {
final JavaRDD<Row> yourrdd = rdd.filter(msg -> messageutil.filterType(.....) // create a java rdd
final JavaPairRDD<ImmutableBytesWritable, Put> yourrddPuts = yourrdd.mapToPair(row -> messageutil.getPuts(row));
yourrddPuts.saveAsNewAPIHadoopDataset(conf);
}
conf 如下所示
private Configuration conf = HBaseConfiguration.create();
conf.set(ZOOKEEPER_QUORUM, "comma seperated list of zookeeper quorum");
conf.set("hbase.mapred.outputtable", "your table name");
conf.set("mapreduce.outputformat.class", "org.apache.hadoop.hbase.mapreduce.TableOutputFormat");
MyMessageUtil
有 getPuts
方法,如下所示
public Tuple2<ImmutableBytesWritable, Put> getPuts(Row row) throws Exception {
Put put = ..// prepare your put with all the columns you have.
return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
}
希望对您有所帮助!
我想在 HBase 数据库中保存 Twitter 流。我现在拥有的是用于接收和转换数据的 Saprk 应用程序。但是我不知道如何将我的 TwitterStream 保存到 HBase 中?
我发现唯一有用的是 PairRDD.saveAsNewAPIHadoopDataset(conf)
方法。但是我应该如何使用它,我必须进行哪些配置才能将 RDD 数据保存到我的 HBase table?
我唯一找到的是 HBase 客户端库,它可以通过 Put 对象将数据插入 table。但这不是 Spark 程序内部的解决方案,是吗(有必要遍历 RDD 内的所有项目!!)?
有人可以在 JAVA 中举个例子吗?我的主要问题似乎是 org.apache.hadoop.conf.Configuration
实例的设置,我必须在 saveAsNewAPIHadoopDataset
...
这里是一段代码:
JavaReceiverInputDStream<Status> statusDStream = TwitterUtils.createStream(streamingCtx);
JavaPairDStream<Long, String> statusPairDStream = statusDStream.mapToPair(new PairFunction<Status, Long, String>() {
public Tuple2<Long, String> call(Status status) throws Exception {
return new Tuple2<Long, String> (status.getId(), status.getText());
}
});
statusPairDStream.foreachRDD(new Function<JavaPairRDD<Long,String>, Void>() {
public Void call(JavaPairRDD<Long, String> status) throws Exception {
org.apache.hadoop.conf.Configuration conf = new Configuration();
status.saveAsNewAPIHadoopDataset(conf);
// HBase PUT here can't be correct!?
return null;
}
});
First thing is functions are discouraged, if you are using java 8. Pls. use lambda.
下面的代码片段可以解决您的所有问题。
示例片段:
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
....
public static void processYourMessages(final JavaRDD<YourMessage> rdd, final HiveContext sqlContext,
, MyMessageUtil messageutil) throws Exception {
final JavaRDD<Row> yourrdd = rdd.filter(msg -> messageutil.filterType(.....) // create a java rdd
final JavaPairRDD<ImmutableBytesWritable, Put> yourrddPuts = yourrdd.mapToPair(row -> messageutil.getPuts(row));
yourrddPuts.saveAsNewAPIHadoopDataset(conf);
}
conf 如下所示
private Configuration conf = HBaseConfiguration.create();
conf.set(ZOOKEEPER_QUORUM, "comma seperated list of zookeeper quorum");
conf.set("hbase.mapred.outputtable", "your table name");
conf.set("mapreduce.outputformat.class", "org.apache.hadoop.hbase.mapreduce.TableOutputFormat");
MyMessageUtil
有 getPuts
方法,如下所示
public Tuple2<ImmutableBytesWritable, Put> getPuts(Row row) throws Exception {
Put put = ..// prepare your put with all the columns you have.
return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
}
希望对您有所帮助!