如何逐步加载协处理器
How to Load Coprocessor Step by Step
任何人都可以解释如何通过shell.i加载区域协处理器无法提前获得有关加载和部署Coprocessor.Thanks的正确信息
请按照以下步骤操作:
第 1 步: 创建接口并扩展 org.apache.hadoop.hbase.ipc.CoprocessorProtocol
第 2 步:在调用协处理器后在接口中定义要执行的方法
步骤 3: 创建 HTable
的实例
第 4 步:使用所有必需的参数调用 HTable.coprocessorExec()
方法
请看下面的例子:
在示例中,我们试图获取注册号在我们感兴趣的某个范围内的学生列表。
正在创建接口协议:
public interface CoprocessorTestProtocol extends org.apache.hadoop.hbase.ipc.CoprocessorProtocol{
List<Student> getStudentList(byte[] startRegistrationNumber, byte[] endRegistrationNumber) throws IOException;
}
样本学生 Class:
public class Student implements Serializable{
byte[] registrationNumber;
String name;
public void setRegistrationNumber(byte[] registrationNumber){
this.registrationNumber = registrationNumber;
}
public byte[] getRegistrationNumber(){
return this.registrationNumber;
}
public void setName(String name){
this.name = name;
}
public int getName(){
return this.name;
}
public String toString(){
return "Student[ registration number = " + Bytes.toInt(this.getRegistrationNumber()) + " name = " + this.getName() + " ]"
}
}
模型Class:【从HBase获取数据的业务逻辑写在哪里】
public class MyModel extends org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor implements CoprocessorTestProtocol{
@Override
List<Student> getStudentList(byte[] startRegistrationNumber, byte[] endRegistrationNumber){
Scan scan = new Scan();
scan.setStartRow(startRegistrationNumber);
scan.setStopRow(endRegistrationNumber);
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion().getScanner(scan);
List<KeyValue> currentTempObj = new ArrayList<KeyValue>();
List<Student> studentList = new ArrayList<Student>();
try{
Boolean hasNext = false;
Student student;
do{
currentTempObj.clear();
hasNext = scanner.next(currentTempObj);
if(!currentTempObj.isEmpty()){
student = new Student();
for(KeyValue keyValue: currentTempObj){
bytes[] qualifier = keyValue.getQualifier();
if(Arrays.equals(qualifier, Bytes.toBytes("registrationNumber")))
student.setRegistrationNumber(keyValue.getValue());
else if(Arrays.equals(qualifier, Bytes.toBytes("name")))
student.setName(Bytes.toString(keyValue.getValue()));
}
StudentList.add(student);
}
}while(hasNext);
}catch (Exception e){
// catch the exception the way you want
}
finally{
scanner.close();
}
}
}
Client class: [调用协处理器的地方]
public class MyClient{
if (args.length < 2) {
System.out.println("Usage : startRegistrationNumber endRegistrationNumber");
return;
}
public List<Student> displayStudentInfo(int startRegistrationNumber, int endRegistrationNumber){
final byte[] startKey=Bytes.toBytes(startRegistrationNumber);
final byte[] endKey=Bytes.toBytes(endRegistrationNumber);
String zkPeers = SystemInfo.getHBaseZkConnectString();
Configuration configuration=HBaseConfiguration.create();
configuration.set(HConstants.ZOOKEEPER_QUORUM, zkPeers);
HTableInterface table = new HTable(configuration, TABLE_NAME);
Map<byte[],List<Student>> allRegionOutput;
allRegionOutput = table.coprocessorExec(CoprocessorTestProtocol.class, startKey,endKey,
new Batch.Call<CoprocessorTestProtocol, List<Student>>() {
public List<Student> call(CoprocessorTestProtocol instance)throws IOException{
return instance.getStudentList(startKey, endKey);
}
});
table.close();
List<Student> anotherList = new ArrayList<Student>();
for (List<Student> studentData: allRegionOutput.values()){
anotherList.addAll(studentData);
}
return anotherList;
}
public static void main(String args){
if (args.length < 2) {
System.out.println("Usage : startRegistrationNumber endRegistrationNumber");
return;
}
int startRegistrationNumber = args[0];
int endRegistrationNumber = args[1];
for (Student student : displayStudentInfo(startRegistrationNumber, endRegistrationNumber)){
System.out.println(student);
}
}
}
请注意:请仔细查看示例中的Scanner.next(Object)
方法。此 returns 布尔值并将当前对象存储在 Object
参数
中
任何人都可以解释如何通过shell.i加载区域协处理器无法提前获得有关加载和部署Coprocessor.Thanks的正确信息
请按照以下步骤操作:
第 1 步: 创建接口并扩展 org.apache.hadoop.hbase.ipc.CoprocessorProtocol
第 2 步:在调用协处理器后在接口中定义要执行的方法
步骤 3: 创建 HTable
第 4 步:使用所有必需的参数调用 HTable.coprocessorExec()
方法
请看下面的例子:
在示例中,我们试图获取注册号在我们感兴趣的某个范围内的学生列表。
正在创建接口协议:
public interface CoprocessorTestProtocol extends org.apache.hadoop.hbase.ipc.CoprocessorProtocol{
List<Student> getStudentList(byte[] startRegistrationNumber, byte[] endRegistrationNumber) throws IOException;
}
样本学生 Class:
public class Student implements Serializable{
byte[] registrationNumber;
String name;
public void setRegistrationNumber(byte[] registrationNumber){
this.registrationNumber = registrationNumber;
}
public byte[] getRegistrationNumber(){
return this.registrationNumber;
}
public void setName(String name){
this.name = name;
}
public int getName(){
return this.name;
}
public String toString(){
return "Student[ registration number = " + Bytes.toInt(this.getRegistrationNumber()) + " name = " + this.getName() + " ]"
}
}
模型Class:【从HBase获取数据的业务逻辑写在哪里】
public class MyModel extends org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor implements CoprocessorTestProtocol{
@Override
List<Student> getStudentList(byte[] startRegistrationNumber, byte[] endRegistrationNumber){
Scan scan = new Scan();
scan.setStartRow(startRegistrationNumber);
scan.setStopRow(endRegistrationNumber);
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion().getScanner(scan);
List<KeyValue> currentTempObj = new ArrayList<KeyValue>();
List<Student> studentList = new ArrayList<Student>();
try{
Boolean hasNext = false;
Student student;
do{
currentTempObj.clear();
hasNext = scanner.next(currentTempObj);
if(!currentTempObj.isEmpty()){
student = new Student();
for(KeyValue keyValue: currentTempObj){
bytes[] qualifier = keyValue.getQualifier();
if(Arrays.equals(qualifier, Bytes.toBytes("registrationNumber")))
student.setRegistrationNumber(keyValue.getValue());
else if(Arrays.equals(qualifier, Bytes.toBytes("name")))
student.setName(Bytes.toString(keyValue.getValue()));
}
StudentList.add(student);
}
}while(hasNext);
}catch (Exception e){
// catch the exception the way you want
}
finally{
scanner.close();
}
}
}
Client class: [调用协处理器的地方]
public class MyClient{
if (args.length < 2) {
System.out.println("Usage : startRegistrationNumber endRegistrationNumber");
return;
}
public List<Student> displayStudentInfo(int startRegistrationNumber, int endRegistrationNumber){
final byte[] startKey=Bytes.toBytes(startRegistrationNumber);
final byte[] endKey=Bytes.toBytes(endRegistrationNumber);
String zkPeers = SystemInfo.getHBaseZkConnectString();
Configuration configuration=HBaseConfiguration.create();
configuration.set(HConstants.ZOOKEEPER_QUORUM, zkPeers);
HTableInterface table = new HTable(configuration, TABLE_NAME);
Map<byte[],List<Student>> allRegionOutput;
allRegionOutput = table.coprocessorExec(CoprocessorTestProtocol.class, startKey,endKey,
new Batch.Call<CoprocessorTestProtocol, List<Student>>() {
public List<Student> call(CoprocessorTestProtocol instance)throws IOException{
return instance.getStudentList(startKey, endKey);
}
});
table.close();
List<Student> anotherList = new ArrayList<Student>();
for (List<Student> studentData: allRegionOutput.values()){
anotherList.addAll(studentData);
}
return anotherList;
}
public static void main(String args){
if (args.length < 2) {
System.out.println("Usage : startRegistrationNumber endRegistrationNumber");
return;
}
int startRegistrationNumber = args[0];
int endRegistrationNumber = args[1];
for (Student student : displayStudentInfo(startRegistrationNumber, endRegistrationNumber)){
System.out.println(student);
}
}
}
请注意:请仔细查看示例中的Scanner.next(Object)
方法。此 returns 布尔值并将当前对象存储在 Object
参数