如何为 BucketingSink 函数 Flink 设置动态基路径?

How to set dynamic base path for BucketingSink function Flink?

我正在从文件中获取一些 json 记录。我想解析 json,然后基于 json 中的字段,更新存储函数基本路径。

例如:Json 记录中有一个字段名称 'user-id',基于此我想将我的基本路径更新为 BucketingSink("/data/app/users/"+user-id-field-value+"/")

我该怎么做?

代码: 数据流输入 = env.readTextFile("/home/user/Desktop/jsonFile");

    DataStream<String> parsedJson = input.map((inputMsg)->{

        String json="";
        try{

            json=jsonParser.parse(inputMsg).getAsString();

        }catch (Exception e){
            e.printStackTrace();
        }
        return json;

    });

   parsedJson.addSink(new BucketingSink<>(""));

}

使用BucketingSink.setBucketer() method to set a class you create which implements the Bucketer接口,使用user-id字段值作为子桶路径