使用 Lambda 进行 Pentaho ETL 转换
Pentaho ETL Transformation Using Lamda
是否可以 运行 Pentaho ETL Jobs/transformation 使用 AWS Lamda 函数?
我在 Windows 服务器上按计划 运行 安排了 Pentaho ETL 作业,我们正计划迁移到 AWS。我正在考虑 Lambda 函数。只是想了解是否可以使用 AWS Lamdba
安排 Pentaho ETL 作业
这是我在 AWS Lambda 函数中成功 运行 的代码片段。
从 AWS Lambda 函数调用 handleRequest 函数
public Integer handleRequest(String input, Context context) {
parseInput(input);
return executeKtr(transName);
}
parseInput:该函数用于解析出Lambda Function传递的字符串参数,提取出KTR名称及其参数值。输入格式为“ktrfilename param1=value1 param2=value2”
public static void parseInput(String input) {
String[] tokens = input.split(" ");
transName = tokens[0].replace(".ktr", "") + ".ktr";
for (int i=1; i<tokens.length; i++) {
params.add(tokens[i]);
}
}
正在执行 KTR:我正在使用 git 存储库来存储我所有的 KTR 文件,并根据作为参数传递的名称执行 KTR
public static Integer executeKtr(String ktrName) {
try {
System.out.println("Present Project Directory : " + System.getProperty("user.dir"));
String transName = ktrName.replace(".ktr", "") + ".ktr";
String gitURI = awsSSM.getParaValue("kattle-trans-git-url");
String repoLocalPath = clonePDIrepo.cloneRepo(gitURI);
String path = new File(repoLocalPath + "/" + transName).getAbsolutePath();
File ktrFile = new File(path);
System.out.println("KTR Path: " + path);
try {
/**
* IMPORTANT NOTE FOR LAMBDA FUNCTION MUST CREATE .KEETLE DIRECOTRY OTHERWISE
* CODE WILL FAIL IN LAMBDA FUNCTION WITH ERROR CANT CREATE
* .kettle/kettle.properties file.
*
* ALSO SET ENVIRNOMENT VARIABLE ON LAMBDA FUNCTION TO POINT
* KETTLE_HOME=/tmp/.kettle
*/
Files.createDirectories(Paths.get("/tmp/.kettle"));
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Error Creating /tmp/.kettle directory");
}
if (ktrFile.exists()) {
KettleEnvironment.init();
TransMeta metaData = new TransMeta(path);
Trans trans = new Trans(metaData);
// SETTING PARAMETERS
trans = parameterSetting(trans);
trans.execute( null );
trans.waitUntilFinished();
if (trans.getErrors() > 0) {
System.out.print("Error Executing transformation");
throw new RuntimeException("There are errors in running transformations");
} else {
System.out.print("Successfully Executed Transformation");
return 1;
}
} else {
System.out.print("KTR File:" + path + " not found in repo");
throw new RuntimeException("KTR File:" + path + " not found in repo");
}
} catch (KettleException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
parameterSetting:如果KTR正在接受参数并且在调用AWS Lambda函数时传递,则使用parameterSetting函数进行设置。
public static Trans parameterSetting(Trans trans) {
String[] transParams = trans.listParameters();
for (String param : transParams) {
for (String p: params) {
String name = p.split("=")[0];
String val = p.split("=")[1];
if (name.trim().equals(param.trim())) {
try {
System.out.println("Setting Parameter:"+ name + "=" + val);
trans.setParameterValue(name, val);
} catch (UnknownParamException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
trans.activateParameters();
return trans;
}
克隆Git回购:
public class clonePDIrepo {
/**
* Clones the given repo to local folder
*
* @param pathWithPwd Gir repo URL with access token included in the url. e.g.
* https://token_name:token_value@github.com/ktr-git-repo.git
* @return returns Local Repository String Path
*/
public static String cloneRepo(String pathWithPwd) {
try {
/**
* CREATING TEMP DIR TO AVOID FOLDER EXISTS ERROR, THIS TEMP DIRECTORY LATER CAN
* BE USED TO GET ABSOLETE PATH FOR FILES IN DIRECTORY
*/
File pdiLocalPath = Files.createTempDirectory("repodir").toFile();
Git git = Git.cloneRepository().setURI(pathWithPwd).setDirectory(pdiLocalPath).call();
System.out.println("Git repository cloned successfully");
System.out.println("Local Repository Path:" + pdiLocalPath.getAbsolutePath());
// }
return pdiLocalPath.getAbsolutePath();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
AWSSMgetParaValue:获取传递的参数的字符串值。
public static String getParaValue(String paraName) {
try {
Region region = Region.US_EAST_1;
SsmClient ssmClient = SsmClient.builder()
.region(region)
.build();
GetParameterRequest parameterRequest = GetParameterRequest.builder()
.name(paraName)
.withDecryption(true)
.build();
GetParameterResponse parameterResponse = ssmClient.getParameter(parameterRequest);
System.out.println(paraName+ " value retreived from AWS SSM");
ssmClient.close();
return parameterResponse.parameter().value();
} catch (SsmException e) {
System.err.println(e.getMessage());
return null;
}
}
假设:
- Git repo 是在 repo 的根目录中使用 KTR 文件创建的
- git 存储库 url 存在于 aws SSM 上,具有用于克隆存储库的有效令牌
- 输入字符串包含 KTR 文件的名称
- 在 Lambda 函数上为 KETTLE_HOME=/tmp/.kettle
配置了环境变量
- Lambda 函数具有 SSM 和 S3 VPC 网络的必要权限
- 已设置适当的安全组规则以允许对 KTR 文件进行必要的网络访问
我打算将完整的代码上传到 git。我将使用存储库的 URL 更新此 post。
是否可以 运行 Pentaho ETL Jobs/transformation 使用 AWS Lamda 函数?
我在 Windows 服务器上按计划 运行 安排了 Pentaho ETL 作业,我们正计划迁移到 AWS。我正在考虑 Lambda 函数。只是想了解是否可以使用 AWS Lamdba
安排 Pentaho ETL 作业这是我在 AWS Lambda 函数中成功 运行 的代码片段。
从 AWS Lambda 函数调用 handleRequest 函数
public Integer handleRequest(String input, Context context) {
parseInput(input);
return executeKtr(transName);
}
parseInput:该函数用于解析出Lambda Function传递的字符串参数,提取出KTR名称及其参数值。输入格式为“ktrfilename param1=value1 param2=value2”
public static void parseInput(String input) {
String[] tokens = input.split(" ");
transName = tokens[0].replace(".ktr", "") + ".ktr";
for (int i=1; i<tokens.length; i++) {
params.add(tokens[i]);
}
}
正在执行 KTR:我正在使用 git 存储库来存储我所有的 KTR 文件,并根据作为参数传递的名称执行 KTR
public static Integer executeKtr(String ktrName) {
try {
System.out.println("Present Project Directory : " + System.getProperty("user.dir"));
String transName = ktrName.replace(".ktr", "") + ".ktr";
String gitURI = awsSSM.getParaValue("kattle-trans-git-url");
String repoLocalPath = clonePDIrepo.cloneRepo(gitURI);
String path = new File(repoLocalPath + "/" + transName).getAbsolutePath();
File ktrFile = new File(path);
System.out.println("KTR Path: " + path);
try {
/**
* IMPORTANT NOTE FOR LAMBDA FUNCTION MUST CREATE .KEETLE DIRECOTRY OTHERWISE
* CODE WILL FAIL IN LAMBDA FUNCTION WITH ERROR CANT CREATE
* .kettle/kettle.properties file.
*
* ALSO SET ENVIRNOMENT VARIABLE ON LAMBDA FUNCTION TO POINT
* KETTLE_HOME=/tmp/.kettle
*/
Files.createDirectories(Paths.get("/tmp/.kettle"));
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Error Creating /tmp/.kettle directory");
}
if (ktrFile.exists()) {
KettleEnvironment.init();
TransMeta metaData = new TransMeta(path);
Trans trans = new Trans(metaData);
// SETTING PARAMETERS
trans = parameterSetting(trans);
trans.execute( null );
trans.waitUntilFinished();
if (trans.getErrors() > 0) {
System.out.print("Error Executing transformation");
throw new RuntimeException("There are errors in running transformations");
} else {
System.out.print("Successfully Executed Transformation");
return 1;
}
} else {
System.out.print("KTR File:" + path + " not found in repo");
throw new RuntimeException("KTR File:" + path + " not found in repo");
}
} catch (KettleException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
parameterSetting:如果KTR正在接受参数并且在调用AWS Lambda函数时传递,则使用parameterSetting函数进行设置。
public static Trans parameterSetting(Trans trans) {
String[] transParams = trans.listParameters();
for (String param : transParams) {
for (String p: params) {
String name = p.split("=")[0];
String val = p.split("=")[1];
if (name.trim().equals(param.trim())) {
try {
System.out.println("Setting Parameter:"+ name + "=" + val);
trans.setParameterValue(name, val);
} catch (UnknownParamException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
trans.activateParameters();
return trans;
}
克隆Git回购:
public class clonePDIrepo {
/**
* Clones the given repo to local folder
*
* @param pathWithPwd Gir repo URL with access token included in the url. e.g.
* https://token_name:token_value@github.com/ktr-git-repo.git
* @return returns Local Repository String Path
*/
public static String cloneRepo(String pathWithPwd) {
try {
/**
* CREATING TEMP DIR TO AVOID FOLDER EXISTS ERROR, THIS TEMP DIRECTORY LATER CAN
* BE USED TO GET ABSOLETE PATH FOR FILES IN DIRECTORY
*/
File pdiLocalPath = Files.createTempDirectory("repodir").toFile();
Git git = Git.cloneRepository().setURI(pathWithPwd).setDirectory(pdiLocalPath).call();
System.out.println("Git repository cloned successfully");
System.out.println("Local Repository Path:" + pdiLocalPath.getAbsolutePath());
// }
return pdiLocalPath.getAbsolutePath();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
AWSSMgetParaValue:获取传递的参数的字符串值。
public static String getParaValue(String paraName) {
try {
Region region = Region.US_EAST_1;
SsmClient ssmClient = SsmClient.builder()
.region(region)
.build();
GetParameterRequest parameterRequest = GetParameterRequest.builder()
.name(paraName)
.withDecryption(true)
.build();
GetParameterResponse parameterResponse = ssmClient.getParameter(parameterRequest);
System.out.println(paraName+ " value retreived from AWS SSM");
ssmClient.close();
return parameterResponse.parameter().value();
} catch (SsmException e) {
System.err.println(e.getMessage());
return null;
}
}
假设:
- Git repo 是在 repo 的根目录中使用 KTR 文件创建的
- git 存储库 url 存在于 aws SSM 上,具有用于克隆存储库的有效令牌
- 输入字符串包含 KTR 文件的名称
- 在 Lambda 函数上为 KETTLE_HOME=/tmp/.kettle 配置了环境变量
- Lambda 函数具有 SSM 和 S3 VPC 网络的必要权限
- 已设置适当的安全组规则以允许对 KTR 文件进行必要的网络访问
我打算将完整的代码上传到 git。我将使用存储库的 URL 更新此 post。