如何为 BucketingSink 函数 Flink 设置动态基路径?
How to set dynamic base path for BucketingSink function Flink?
我正在从文件中获取一些 json 记录。我想解析 json,然后基于 json 中的字段,更新存储函数基本路径。
例如:Json 记录中有一个字段名称 'user-id',基于此我想将我的基本路径更新为 BucketingSink("/data/app/users/"+user-id-field-value+"/")
我该怎么做?
代码:
数据流输入 = env.readTextFile("/home/user/Desktop/jsonFile");
DataStream<String> parsedJson = input.map((inputMsg)->{
String json="";
try{
json=jsonParser.parse(inputMsg).getAsString();
}catch (Exception e){
e.printStackTrace();
}
return json;
});
parsedJson.addSink(new BucketingSink<>(""));
}
使用BucketingSink.setBucketer() method to set a class you create which implements the Bucketer接口,使用user-id
字段值作为子桶路径
我正在从文件中获取一些 json 记录。我想解析 json,然后基于 json 中的字段,更新存储函数基本路径。
例如:Json 记录中有一个字段名称 'user-id',基于此我想将我的基本路径更新为 BucketingSink("/data/app/users/"+user-id-field-value+"/")
我该怎么做?
代码: 数据流输入 = env.readTextFile("/home/user/Desktop/jsonFile");
DataStream<String> parsedJson = input.map((inputMsg)->{
String json="";
try{
json=jsonParser.parse(inputMsg).getAsString();
}catch (Exception e){
e.printStackTrace();
}
return json;
});
parsedJson.addSink(new BucketingSink<>(""));
}
使用BucketingSink.setBucketer() method to set a class you create which implements the Bucketer接口,使用user-id
字段值作为子桶路径