使用 Apache Nifi 或 Python 脚本迭代 Json 数组
iterate Json Array with Apache Nifi or Python script
我有一个 Json 和一个数组字段,我想迭代数组并将其拆分为新的行或对象,用于数组中包含的每个属性。
我目前正在使用 Apache Nifi,但我也可以使用 python 脚本。
我的输入数据是:
{
"workorder_id" : "99999",
"properties" : [ {
"id" : "11",
"propertyType" : {
"id" : "55834595398",
"name" : "action"
},
"stringValue" : "string01",
"nodeValue" : null
}, {
"id" : "22",
"propertyType" : {
"id" : "55834595419",
"name" : "Tipo"
},
"stringValue" : "string02",
"nodeValue" : null
}, {
"id" : "33",
"propertyType" : {
"id" : "44",
"name" : "Action2"
},
"stringValue" : "string02",
"nodeValue" : null
}, {
"id" : "55",
"propertyType" : {
"id" : "55834595400",
"name" : "Action3"
}
]
}
输出可以是 Json 或 csv。例如在 csv 中:
使用相同的 workorder_id 作为键
workorder_id,id_properties,stringValue_properties
99999,11,string01
99999,22,string02
99999,33,string03
.
.
.
感谢您的帮助
# -*- coding: utf-8 -*-
data = {
"workorder_id" : "99999",
"properties" : [
{
"id" : "11",
"propertyType" : {
"id" : "55834595398",
"name" : "action"
},
"stringValue" : "string01",
"nodeValue" : float('nan')
},
{
"id" : "22",
"propertyType" : {
"id" : "55834595419",
"name" : "Tipo"
},
"stringValue" : "string02",
"nodeValue" : float('nan')
},
{
"id" : "33",
"propertyType" : {
"id" : "44",
"name" : "Action2"
},
"stringValue" : "string03",
"nodeValue" : float('nan')
},
{
"id" : "55",
"propertyType" : {
"id" : "55834595400",
"name" : "Action3"
},
"stringValue" : "string04",
"nodeValue" : float('nan')
}
]
}
for item in data['properties']:
print(data['workorder_id'], item['id'], item['propertyType']['id'], item['propertyType']['name'], item['stringValue'], item['nodeValue'])
#99999 11 55834595398 action string01 nan
#99999 22 55834595419 Tipo string02 nan
#99999 33 44 Action2 string03 nan
#99999 55 55834595400 Action3 string04 nan
您需要将这些空值转换为 None 或其他一些 Python 类型。
假设您将空值切换为 None...
data = {'workorder_id': '99999', 'properties': [{'id': '11', 'propertyType': {'id': '55834595398', 'name': 'action'}, 'stringValue': 'string01', 'nodeValue': None}, {'id': '22', 'propertyType': {'id': '55834595419', 'name': 'Tipo'}, 'stringValue': 'string02', 'nodeValue': None}]}
def _handler(prepend: str, record: dict):
""" relabels keys with a prepended string """
d = {}
for key in record.keys():
new_key = '%s_%s' % (prepend, key)
d.update({new_key: record[key]})
return d
def mapper(data: dict):
""" returns a list of dictionaries. """
records= []
properties = data['properties']
for prop in properties:
_record = {'workorder_id': data['workorder_id'], }
for prop_key in prop.keys():
prop_data = {}
value = prop[prop_key]
if isinstance(value, dict):
prop_data.update(_handler(prop_key, value))
else:
prop_data.update({prop_key: value})
_record.update(_handler('property', prop_data))
records.append(_record)
return records
输出
[{'workorder_id': '99999',
'property_id': '11',
'property_propertyType_id': '55834595398',
'property_propertyType_name': 'action',
'property_stringValue': 'string01',
'property_nodeValue': None},
{'workorder_id': '99999',
'property_id': '22',
'property_propertyType_id': '55834595419',
'property_propertyType_name': 'Tipo',
'property_stringValue': 'string02',
'property_nodeValue': None}]
使用 NiFi
按此顺序:
评估Json路径从 workorder_id ($.workorder_id
)
创建属性
输出将其发送到 $.properties.*
上的拆分 Json
splitjson 的输出将其发送到 evaluatejson,您将在其中提取数组。
id = $.id
propertyType_id = $.propertyType.id
propertyType_name = $.propertyType.name
现在您的每个流都将带有此属性:
workorder_id,id,propertyType_id,propertyType_name
使用此列表使用 AttributestoCSV
workorder_id,id,propertyType_id,propertyType_name
合并内容
putfile(保存你的 csv)
我有一个 Json 和一个数组字段,我想迭代数组并将其拆分为新的行或对象,用于数组中包含的每个属性。
我目前正在使用 Apache Nifi,但我也可以使用 python 脚本。
我的输入数据是:
{
"workorder_id" : "99999",
"properties" : [ {
"id" : "11",
"propertyType" : {
"id" : "55834595398",
"name" : "action"
},
"stringValue" : "string01",
"nodeValue" : null
}, {
"id" : "22",
"propertyType" : {
"id" : "55834595419",
"name" : "Tipo"
},
"stringValue" : "string02",
"nodeValue" : null
}, {
"id" : "33",
"propertyType" : {
"id" : "44",
"name" : "Action2"
},
"stringValue" : "string02",
"nodeValue" : null
}, {
"id" : "55",
"propertyType" : {
"id" : "55834595400",
"name" : "Action3"
}
]
}
输出可以是 Json 或 csv。例如在 csv 中: 使用相同的 workorder_id 作为键
workorder_id,id_properties,stringValue_properties
99999,11,string01
99999,22,string02
99999,33,string03
.
.
.
感谢您的帮助
# -*- coding: utf-8 -*-
data = {
"workorder_id" : "99999",
"properties" : [
{
"id" : "11",
"propertyType" : {
"id" : "55834595398",
"name" : "action"
},
"stringValue" : "string01",
"nodeValue" : float('nan')
},
{
"id" : "22",
"propertyType" : {
"id" : "55834595419",
"name" : "Tipo"
},
"stringValue" : "string02",
"nodeValue" : float('nan')
},
{
"id" : "33",
"propertyType" : {
"id" : "44",
"name" : "Action2"
},
"stringValue" : "string03",
"nodeValue" : float('nan')
},
{
"id" : "55",
"propertyType" : {
"id" : "55834595400",
"name" : "Action3"
},
"stringValue" : "string04",
"nodeValue" : float('nan')
}
]
}
for item in data['properties']:
print(data['workorder_id'], item['id'], item['propertyType']['id'], item['propertyType']['name'], item['stringValue'], item['nodeValue'])
#99999 11 55834595398 action string01 nan
#99999 22 55834595419 Tipo string02 nan
#99999 33 44 Action2 string03 nan
#99999 55 55834595400 Action3 string04 nan
您需要将这些空值转换为 None 或其他一些 Python 类型。
假设您将空值切换为 None...
data = {'workorder_id': '99999', 'properties': [{'id': '11', 'propertyType': {'id': '55834595398', 'name': 'action'}, 'stringValue': 'string01', 'nodeValue': None}, {'id': '22', 'propertyType': {'id': '55834595419', 'name': 'Tipo'}, 'stringValue': 'string02', 'nodeValue': None}]}
def _handler(prepend: str, record: dict):
""" relabels keys with a prepended string """
d = {}
for key in record.keys():
new_key = '%s_%s' % (prepend, key)
d.update({new_key: record[key]})
return d
def mapper(data: dict):
""" returns a list of dictionaries. """
records= []
properties = data['properties']
for prop in properties:
_record = {'workorder_id': data['workorder_id'], }
for prop_key in prop.keys():
prop_data = {}
value = prop[prop_key]
if isinstance(value, dict):
prop_data.update(_handler(prop_key, value))
else:
prop_data.update({prop_key: value})
_record.update(_handler('property', prop_data))
records.append(_record)
return records
输出
[{'workorder_id': '99999',
'property_id': '11',
'property_propertyType_id': '55834595398',
'property_propertyType_name': 'action',
'property_stringValue': 'string01',
'property_nodeValue': None},
{'workorder_id': '99999',
'property_id': '22',
'property_propertyType_id': '55834595419',
'property_propertyType_name': 'Tipo',
'property_stringValue': 'string02',
'property_nodeValue': None}]
使用 NiFi
按此顺序:
评估Json路径从 workorder_id (
创建属性$.workorder_id
)输出将其发送到
上的拆分 Json$.properties.*
splitjson 的输出将其发送到 evaluatejson,您将在其中提取数组。
id = $.id propertyType_id = $.propertyType.id propertyType_name = $.propertyType.name
现在您的每个流都将带有此属性:
workorder_id,id,propertyType_id,propertyType_name
使用此列表使用 AttributestoCSV
workorder_id,id,propertyType_id,propertyType_name
合并内容
putfile(保存你的 csv)