使用 Apache NiFi 进行复杂的转换和过滤
Complex transformations and filters with Apache NiFi
我有一个JSON数组:
[ {
"account_login" : "some_mail@gmail.com",
"view_id" : 11313231,
"join_id" : "utm_campaign=toyota&utm_content=multiformat_sites&utm_medium=cpc&utm_source=mytarget",
"start_date" : "2020-08-01",
"end_date" : "2020-08-31"
}, {
"account_login" : "another_mail@lab.net",
"view_id" : 19556319183,
"join_id" : "utm_campaign=mazda&utm_content=keywords_social-networks&utm_medium=cpc&utm_source=facebook",
"start_date" : "2020-12-22",
"end_date" : "2020-12-23"
}, {
...
} ]
对于每个 join_id
我接下来应该做的事情:
- 将字符串拆分为键值对:
utm_campaign, toyota; utm_content, multiformat_sites; etc
- 过滤它们(Java 下面的代码);
- 将密钥转换为另一种格式;使用数据库中的 table(下面的 Java 代码);
我的主要目标是重复此 Java 代码:
public class GaUtmFactoryService {
private static final String INVALID_MACRO_FOOTPRINTS = "^.*[{\[%]+.+[}\]%].*$";
public Map<String, String> extractUtmMarks(String utmMarks) {
if (utmMarks == null || utmMarks.isBlank()) {
return Collections.emptyMap();
}
return Arrays.stream(utmMarks.split("\s*&\s*"))
.map(s -> s.trim().split("\s*=\s*"))
.filter(this::isUtmMarksValid)
.collect(Collectors.toMap(
key -> convertCsUtmMarkToGa(key[0]),
value -> value[1],
(val1, val2) -> val2)
);
}
private boolean isUtmMarksValid(String[] utmMarks) {
return utmMarks.length == 2
&& !convertCsUtmMarkToGa(utmMarks[0]).isBlank()
&& !utmMarks[1].isBlank()
&& Arrays.stream(utmMarks).noneMatch(this::isUtmMarkContainsInvalidChars);
}
private boolean isUtmMarkContainsInvalidChars(String utmMark) {
return utmMark.matches(INVALID_MACRO_FOOTPRINTS)
|| !StandardCharsets.US_ASCII.newEncoder().canEncode(utmMark);
}
private String convertCsUtmMarkToGa(String utmMark) {
switch (utmMark) {
case "utm_medium":
return "ga:medium";
case "utm_campaign":
return "ga:campaign";
case "utm_source":
return "ga:source";
case "utm_content":
return "ga:adContent";
case "utm_term":
return "ga:keyword";
case "utm_target":
case "utm_a":
return "";
default:
return rowUtmMarks;
}
}
}
外部用法:
public Map<String, String> getConvertedMarks() {
GaUtmFactoryService gaUtmFactoryService = new GaUtmFactoryService();
String utmMarks = "utm_campaign=toyota&utm_content=multiformat_sites&utm_medium=cpc&utm_source=facebook";
Map<String, String> converted = gaUtmFactoryService.extractUtmMarks(utmMarks);
//should be:
////{ga:campaign=toyota, ga:adContent=multiformat_sites, ga:medium=cpc, ga:source=facebook}
return converted;
}
NiFi 甚至可以吗?或者,如果这很难,也许我应该为此任务创建带有一些端点的 REST 微服务?
更新
我做了 EvaluateJsonPath
和 SplitJson
。现在每个 json 文件都有一个属性:utm.marks = utm_campaign=toyota&utm_content=multiformat_sites&utm_medium=cpc&utm_source=mytarget
我需要拆分这些属性并像这样得到 smth:
campaign.key = ga:campaign
campaign.value = toyota
content.key = ga:content
content.value = multiformat_sites
等等
对于此转换,ExecuteGroovyScript 可能如下所示:
import groovy.json.*
//get file from session
def ff=session.get()
if(!ff)return
//read stream, convert to reader, parse to list/objects
def data=ff.read().withReader("UTF-8"){r-> new JsonSlurper().parse(r) }
//transform json
data.each{ i->
i.join_id = i.join_id
.split("\s*&\s*") //# to array
.collectEntries{
//# convert each item to map entry
String[] kv = it.split("\s*=\s*")
kv[0] = [
"utm_medium" : "ga:medium",
"utm_campaign" : "ga:campaign",
"utm_source" : "ga:source",
"utm_content" : "ga:adContent",
"utm_term" : "ga:keyword",
].get( kv[0] )
kv
}
.findAll{ k,v-> k } //# filter out empty/null keys
}
//write back to file
ff.write("UTF-8"){w-> new JsonBuilder(data).writeTo(w)}
//transfer to success
REL_SUCCESS<<ff
基于 一个 JSON 答案的解决方案(不是数组):
import groovy.json.*
//get file from session
def ff=session.get()
if(!ff)return
//read stream, convert to reader, parse to list/objects
def data=ff.read().withReader("UTF-8"){r-> new JsonSlurper().parse(r) }
def builder = new JsonBuilder(data)
builder.content.join_id = builder.content.join_id.split("\s*&\s*") //# to array
.collectEntries{
//# convert each item to map entry
String[] kv = it.split("\s*=\s*")
kv[0] = [
"utm_medium" : "ga:medium",
"utm_campaign" : "ga:campaign",
"utm_source" : "ga:source",
"utm_content" : "ga:adContent",
"utm_term" : "ga:keyword",
].get( kv[0] )
kv
}
.findAll{ k,v-> k } //# filter out empty/null keys
ff.write("UTF-8"){w-> builder.writeTo(w)}
//transfer to success
REL_SUCCESS<<ff
我有一个JSON数组:
[ {
"account_login" : "some_mail@gmail.com",
"view_id" : 11313231,
"join_id" : "utm_campaign=toyota&utm_content=multiformat_sites&utm_medium=cpc&utm_source=mytarget",
"start_date" : "2020-08-01",
"end_date" : "2020-08-31"
}, {
"account_login" : "another_mail@lab.net",
"view_id" : 19556319183,
"join_id" : "utm_campaign=mazda&utm_content=keywords_social-networks&utm_medium=cpc&utm_source=facebook",
"start_date" : "2020-12-22",
"end_date" : "2020-12-23"
}, {
...
} ]
对于每个 join_id
我接下来应该做的事情:
- 将字符串拆分为键值对:
utm_campaign, toyota; utm_content, multiformat_sites; etc
- 过滤它们(Java 下面的代码);
- 将密钥转换为另一种格式;使用数据库中的 table(下面的 Java 代码);
我的主要目标是重复此 Java 代码:
public class GaUtmFactoryService {
private static final String INVALID_MACRO_FOOTPRINTS = "^.*[{\[%]+.+[}\]%].*$";
public Map<String, String> extractUtmMarks(String utmMarks) {
if (utmMarks == null || utmMarks.isBlank()) {
return Collections.emptyMap();
}
return Arrays.stream(utmMarks.split("\s*&\s*"))
.map(s -> s.trim().split("\s*=\s*"))
.filter(this::isUtmMarksValid)
.collect(Collectors.toMap(
key -> convertCsUtmMarkToGa(key[0]),
value -> value[1],
(val1, val2) -> val2)
);
}
private boolean isUtmMarksValid(String[] utmMarks) {
return utmMarks.length == 2
&& !convertCsUtmMarkToGa(utmMarks[0]).isBlank()
&& !utmMarks[1].isBlank()
&& Arrays.stream(utmMarks).noneMatch(this::isUtmMarkContainsInvalidChars);
}
private boolean isUtmMarkContainsInvalidChars(String utmMark) {
return utmMark.matches(INVALID_MACRO_FOOTPRINTS)
|| !StandardCharsets.US_ASCII.newEncoder().canEncode(utmMark);
}
private String convertCsUtmMarkToGa(String utmMark) {
switch (utmMark) {
case "utm_medium":
return "ga:medium";
case "utm_campaign":
return "ga:campaign";
case "utm_source":
return "ga:source";
case "utm_content":
return "ga:adContent";
case "utm_term":
return "ga:keyword";
case "utm_target":
case "utm_a":
return "";
default:
return rowUtmMarks;
}
}
}
外部用法:
public Map<String, String> getConvertedMarks() {
GaUtmFactoryService gaUtmFactoryService = new GaUtmFactoryService();
String utmMarks = "utm_campaign=toyota&utm_content=multiformat_sites&utm_medium=cpc&utm_source=facebook";
Map<String, String> converted = gaUtmFactoryService.extractUtmMarks(utmMarks);
//should be:
////{ga:campaign=toyota, ga:adContent=multiformat_sites, ga:medium=cpc, ga:source=facebook}
return converted;
}
NiFi 甚至可以吗?或者,如果这很难,也许我应该为此任务创建带有一些端点的 REST 微服务?
更新
我做了 EvaluateJsonPath
和 SplitJson
。现在每个 json 文件都有一个属性:utm.marks = utm_campaign=toyota&utm_content=multiformat_sites&utm_medium=cpc&utm_source=mytarget
我需要拆分这些属性并像这样得到 smth:
campaign.key = ga:campaign
campaign.value = toyota
content.key = ga:content
content.value = multiformat_sites
等等
对于此转换,ExecuteGroovyScript 可能如下所示:
import groovy.json.*
//get file from session
def ff=session.get()
if(!ff)return
//read stream, convert to reader, parse to list/objects
def data=ff.read().withReader("UTF-8"){r-> new JsonSlurper().parse(r) }
//transform json
data.each{ i->
i.join_id = i.join_id
.split("\s*&\s*") //# to array
.collectEntries{
//# convert each item to map entry
String[] kv = it.split("\s*=\s*")
kv[0] = [
"utm_medium" : "ga:medium",
"utm_campaign" : "ga:campaign",
"utm_source" : "ga:source",
"utm_content" : "ga:adContent",
"utm_term" : "ga:keyword",
].get( kv[0] )
kv
}
.findAll{ k,v-> k } //# filter out empty/null keys
}
//write back to file
ff.write("UTF-8"){w-> new JsonBuilder(data).writeTo(w)}
//transfer to success
REL_SUCCESS<<ff
基于
import groovy.json.*
//get file from session
def ff=session.get()
if(!ff)return
//read stream, convert to reader, parse to list/objects
def data=ff.read().withReader("UTF-8"){r-> new JsonSlurper().parse(r) }
def builder = new JsonBuilder(data)
builder.content.join_id = builder.content.join_id.split("\s*&\s*") //# to array
.collectEntries{
//# convert each item to map entry
String[] kv = it.split("\s*=\s*")
kv[0] = [
"utm_medium" : "ga:medium",
"utm_campaign" : "ga:campaign",
"utm_source" : "ga:source",
"utm_content" : "ga:adContent",
"utm_term" : "ga:keyword",
].get( kv[0] )
kv
}
.findAll{ k,v-> k } //# filter out empty/null keys
ff.write("UTF-8"){w-> builder.writeTo(w)}
//transfer to success
REL_SUCCESS<<ff