值已在 kafka 流的范围内定义
value is already defined in scope in kafka streams
我正在尝试使用 kafka 流中的编码部分,如下所示
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines = builder.stream("iostatin2");
KStream<String, String> mstream = textlines
.mapValues(value -> value.replace("[","" ) )
.mapValues(value -> value.replace("]","" ) )
.mapValues(value -> value.replaceAll("\":\"", "\":"))
.mapValues(value -> value.replaceAll("\":", "\":\""))
.mapValues(value -> value.replaceAll("\",\"", ",\""))
.mapValues(value -> value.replaceAll(",\"", "\",\""))
.mapValues(value -> value.replaceAll(":\"\{", ":\{"))
.mapValues(value -> value.replaceAll("\}\",", "\},"))
.mapValues(value -> value.replaceAll("\},\{" ,"\}\},\{\{"));
textlines.foreach(new ForeachAction<String, String>() {
@Override
public void apply(String key, String value) {
try {
textlines.flatMapValues(value -> Arrays.asList(value.split("\},\{")));
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
所以在 foreachaction() 函数中
textlines.flatMapValues(value -> Arrays.asList(value.split("\},\{")));
此行值导致错误,例如变量 'value' 已在范围中定义。那么我应该用什么替换那行...plzz 帮帮我..
value 已经在上面的 apply() 方法中用作参数,
所以改变行中的值
textlines.flatMapValues(value -> Arrays.asList(value.split("\},\{")));
到任何其他名称,例如 v
textlines.flatMapValues(v -> Arrays.asList(v.split("\},\{")));
我正在尝试使用 kafka 流中的编码部分,如下所示
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines = builder.stream("iostatin2");
KStream<String, String> mstream = textlines
.mapValues(value -> value.replace("[","" ) )
.mapValues(value -> value.replace("]","" ) )
.mapValues(value -> value.replaceAll("\":\"", "\":"))
.mapValues(value -> value.replaceAll("\":", "\":\""))
.mapValues(value -> value.replaceAll("\",\"", ",\""))
.mapValues(value -> value.replaceAll(",\"", "\",\""))
.mapValues(value -> value.replaceAll(":\"\{", ":\{"))
.mapValues(value -> value.replaceAll("\}\",", "\},"))
.mapValues(value -> value.replaceAll("\},\{" ,"\}\},\{\{"));
textlines.foreach(new ForeachAction<String, String>() {
@Override
public void apply(String key, String value) {
try {
textlines.flatMapValues(value -> Arrays.asList(value.split("\},\{")));
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
所以在 foreachaction() 函数中
textlines.flatMapValues(value -> Arrays.asList(value.split("\},\{")));
此行值导致错误,例如变量 'value' 已在范围中定义。那么我应该用什么替换那行...plzz 帮帮我..
value 已经在上面的 apply() 方法中用作参数,
所以改变行中的值
textlines.flatMapValues(value -> Arrays.asList(value.split("\},\{")));
到任何其他名称,例如 v
textlines.flatMapValues(v -> Arrays.asList(v.split("\},\{")));