如何逐步加载协处理器

How to Load Coprocessor Step by Step

任何人都可以解释如何通过shell.i加载区域协处理器无法提前获得有关加载和部署Coprocessor.Thanks的正确信息

请按照以下步骤操作:

第 1 步: 创建接口并扩展 org.apache.hadoop.hbase.ipc.CoprocessorProtocol

第 2 步:在调用协处理器后在接口中定义要执行的方法

步骤 3: 创建 HTable

的实例

第 4 步:使用所有必需的参数调用 HTable.coprocessorExec() 方法

请看下面的例子:

在示例中,我们试图获取注册号在我们感兴趣的某个范围内的学生列表。

正在创建接口协议:

public interface CoprocessorTestProtocol extends org.apache.hadoop.hbase.ipc.CoprocessorProtocol{
    List<Student> getStudentList(byte[] startRegistrationNumber, byte[] endRegistrationNumber) throws IOException;
}

样本学生 Class:

public class Student implements Serializable{
    byte[] registrationNumber;
    String name;

    public void setRegistrationNumber(byte[] registrationNumber){
        this.registrationNumber = registrationNumber;
    }

    public byte[] getRegistrationNumber(){
        return this.registrationNumber;
    }

    public void setName(String name){
        this.name = name;
    }

    public int getName(){
        return this.name;
    }

    public String toString(){
        return "Student[ registration number = " + Bytes.toInt(this.getRegistrationNumber()) + " name = " + this.getName() + " ]"
    }
}

模型Class:【从HBase获取数据的业务逻辑写在哪里】

public class MyModel extends org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor implements CoprocessorTestProtocol{

    @Override
    List<Student> getStudentList(byte[] startRegistrationNumber, byte[] endRegistrationNumber){
        Scan scan = new Scan();
        scan.setStartRow(startRegistrationNumber);
        scan.setStopRow(endRegistrationNumber);

        InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion().getScanner(scan);

        List<KeyValue> currentTempObj = new ArrayList<KeyValue>();
        List<Student> studentList = new ArrayList<Student>();

        try{
            Boolean hasNext = false;
            Student student;

            do{
                currentTempObj.clear();
                hasNext = scanner.next(currentTempObj);

                if(!currentTempObj.isEmpty()){
                    student = new Student();
                    for(KeyValue keyValue: currentTempObj){
                        bytes[] qualifier = keyValue.getQualifier();
                        if(Arrays.equals(qualifier, Bytes.toBytes("registrationNumber")))
                            student.setRegistrationNumber(keyValue.getValue());
                        else if(Arrays.equals(qualifier, Bytes.toBytes("name")))
                            student.setName(Bytes.toString(keyValue.getValue()));
                    }
                    StudentList.add(student);
                }
            }while(hasNext);

        }catch (Exception e){
            // catch the exception the way you want
        }
        finally{
            scanner.close();
        }
    }
}

Client class: [调用协处理器的地方]

public class MyClient{

    if (args.length < 2) {
        System.out.println("Usage : startRegistrationNumber endRegistrationNumber");
        return;
    }

    public List<Student> displayStudentInfo(int startRegistrationNumber, int endRegistrationNumber){
        final byte[] startKey=Bytes.toBytes(startRegistrationNumber);
        final byte[] endKey=Bytes.toBytes(endRegistrationNumber);


    String zkPeers = SystemInfo.getHBaseZkConnectString();
    Configuration configuration=HBaseConfiguration.create();
    configuration.set(HConstants.ZOOKEEPER_QUORUM, zkPeers);

    HTableInterface table = new HTable(configuration, TABLE_NAME);

        Map<byte[],List<Student>> allRegionOutput;

        allRegionOutput = table.coprocessorExec(CoprocessorTestProtocol.class, startKey,endKey,
                new Batch.Call<CoprocessorTestProtocol, List<Student>>() {
            public List<Student> call(CoprocessorTestProtocol instance)throws IOException{
                return instance.getStudentList(startKey, endKey);
            }
        });

        table.close();

        List<Student> anotherList = new ArrayList<Student>();

        for (List<Student> studentData: allRegionOutput.values()){
            anotherList.addAll(studentData);
        }

        return anotherList;
    }

    public static void main(String args){

        if (args.length < 2) {
            System.out.println("Usage : startRegistrationNumber endRegistrationNumber");
            return;
        }

        int startRegistrationNumber = args[0];
        int endRegistrationNumber = args[1];

        for (Student student : displayStudentInfo(startRegistrationNumber, endRegistrationNumber)){
            System.out.println(student);
        }
    }
}

请注意:请仔细查看示例中的Scanner.next(Object)方法。此 returns 布尔值并将当前对象存储在 Object 参数