如何通过执行内部联接并将其带入配置单元来从 hbase table 检索数据
How do I retrieve the data from a hbase table by doing a inner join and bringing it into hive
我有两个 Hbase table'hbaseTable
'、'hbaseTable1
' 和 Hive table 'hiveTable
'
我的查询看起来像:
'insert overwrite hiveTable select col1, h2.col2, col3 from hbaseTable h1,hbaseTable2 h2 where h1.col=h2.col2';
我需要在 hbase 中进行内部连接并将数据带到配置单元中。我们使用带有 java 的配置单元,性能很差。
所以计划通过使用 spark 来改变方法。即,用 java 激发
如何使用 SPARK 从我的 JAVA 代码连接到 hbase。
现在我的 spark 代码应该在 hbase 中进行连接并通过上述查询将数据引入配置单元。
请提供示例代码。
如果您使用 spark 加载 hbase 数据,那么为什么要在 hive 中加载它?
您可以使用类似于 hive 的 spark sql,因此 sql。
您可以在根本不使用配置单元的情况下查询数据。
例如:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;
import java.util.Arrays;
public class SparkHbaseHive {
public static void main(String[] args) {
Configuration conf = HBaseConfiguration.create();
conf.set(TableInputFormat.INPUT_TABLE, "test");
JavaSparkContext jsc = new JavaSparkContext(new SparkConf().setAppName("Spark-Hbase").setMaster("local[3]"));
JavaPairRDD<ImmutableBytesWritable, Result> source = jsc
.newAPIHadoopRDD(conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
SQLContext sqlContext = new SQLContext(jsc);
JavaRDD<Table1Bean> rowJavaRDD =
source.map((Function<Tuple2<ImmutableBytesWritable, Result>, Table1Bean>) object -> {
Table1Bean table1Bean = new Table1Bean();
table1Bean.setRowKey(Bytes.toString(object._1().get()));
table1Bean.setColumn1(Bytes.toString(object._2().getValue(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"))));
return table1Bean;
});
DataFrame df = sqlContext.createDataFrame(rowJavaRDD, Table1Bean.class);
//similarly create df2
//use df.join() and then register as joinedtable or register two tables and join
//execute sql queries
//Example of sql query on df
df.registerTempTable("table1");
Arrays.stream(sqlContext.sql("select * from table1").collect()).forEach(row -> System.out.println(row.getString(0) + "," + row.getString(1)));
}
}
public class Table1Bean {
private String rowKey;
private String column1;
public String getRowKey() {
return rowKey;
}
public void setRowKey(String rowKey) {
this.rowKey = rowKey;
}
public String getColumn1() {
return column1;
}
public void setColumn1(String column1) {
this.column1 = column1;
}
}
如果您出于某些原因需要使用配置单元,请使用 HiveContext 从配置单元读取数据并使用 saveAsTable 保存数据。
如有疑问,请告诉我。
我有两个 Hbase table'hbaseTable
'、'hbaseTable1
' 和 Hive table 'hiveTable
'
我的查询看起来像:
'insert overwrite hiveTable select col1, h2.col2, col3 from hbaseTable h1,hbaseTable2 h2 where h1.col=h2.col2';
我需要在 hbase 中进行内部连接并将数据带到配置单元中。我们使用带有 java 的配置单元,性能很差。 所以计划通过使用 spark 来改变方法。即,用 java 激发 如何使用 SPARK 从我的 JAVA 代码连接到 hbase。
现在我的 spark 代码应该在 hbase 中进行连接并通过上述查询将数据引入配置单元。
请提供示例代码。
如果您使用 spark 加载 hbase 数据,那么为什么要在 hive 中加载它? 您可以使用类似于 hive 的 spark sql,因此 sql。 您可以在根本不使用配置单元的情况下查询数据。 例如:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;
import java.util.Arrays;
public class SparkHbaseHive {
public static void main(String[] args) {
Configuration conf = HBaseConfiguration.create();
conf.set(TableInputFormat.INPUT_TABLE, "test");
JavaSparkContext jsc = new JavaSparkContext(new SparkConf().setAppName("Spark-Hbase").setMaster("local[3]"));
JavaPairRDD<ImmutableBytesWritable, Result> source = jsc
.newAPIHadoopRDD(conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
SQLContext sqlContext = new SQLContext(jsc);
JavaRDD<Table1Bean> rowJavaRDD =
source.map((Function<Tuple2<ImmutableBytesWritable, Result>, Table1Bean>) object -> {
Table1Bean table1Bean = new Table1Bean();
table1Bean.setRowKey(Bytes.toString(object._1().get()));
table1Bean.setColumn1(Bytes.toString(object._2().getValue(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"))));
return table1Bean;
});
DataFrame df = sqlContext.createDataFrame(rowJavaRDD, Table1Bean.class);
//similarly create df2
//use df.join() and then register as joinedtable or register two tables and join
//execute sql queries
//Example of sql query on df
df.registerTempTable("table1");
Arrays.stream(sqlContext.sql("select * from table1").collect()).forEach(row -> System.out.println(row.getString(0) + "," + row.getString(1)));
}
}
public class Table1Bean {
private String rowKey;
private String column1;
public String getRowKey() {
return rowKey;
}
public void setRowKey(String rowKey) {
this.rowKey = rowKey;
}
public String getColumn1() {
return column1;
}
public void setColumn1(String column1) {
this.column1 = column1;
}
}
如果您出于某些原因需要使用配置单元,请使用 HiveContext 从配置单元读取数据并使用 saveAsTable 保存数据。 如有疑问,请告诉我。