groovy 脚本在 nifi executescript 处理器中不工作
groovy script not working in nifi executescript processor
我正在尝试通过 executescript 处理器执行某些操作;里面有一个groovy代码。在代码中,我试图创建一个 scala 脚本,该脚本将在另一个处理器中的 spark 上执行。
// Get flow file
def flowFile = session.get()
if (!flowFile) return
// Create output directory
def userInputDir = flowFile.getAttribute("user.input.path")
def finalFolder = new File(userInputDir + "/" + innerDir)
try
{
if (!finalFolder.exists()) finalFolder.mkdirs()
// Write script
file = "spark.sqlContext.setConf(\"hive.exec.dynamic.partition\", \"true\")\n"
file = file + "spark.sqlContext.setConf(\"hive.exec.dynamic.partition.mode\", \"nonstrict\")\n"
file = file + "import org.apache.spark.sql._"
file = file + "\n"
file = file + "import java.io._"
file = file + "\n"
}
.
.
.
其余的其他步骤是向 script
变量添加一些其他特定于 spark 的命令。脚本很大,所以跳过完整的代码粘贴。
最后,以一个陷阱结束
// Output file path
flowFile = session.putAttribute(flowFile, "generatedScript", scalaFile.getCanonicalPath())
session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e)
{
log.info("File: {}\n", finalFolder.file)
session.transfer(flowFile, REL_FAILURE)
}
处理器甚至没有开始执行 groovy 脚本,它失败并显示错误:
groovy.lang.MissingPropertyException: No such property: script for calss: javal.io.File
语句'not even beginning to start'表示前面的队列不为空,处理器抛出错误。我猜这是一个语法问题,但我没有发现脚本中有任何相关的语法问题。我还尝试了 运行 本地机器 groovy shell 中的脚本,那里也有同样的错误,但没有语法问题。
谷歌搜索错误让我得到了在脚本中包含导入的建议,但即使在包含相关导入之后错误还是一样。
有什么线索吗?
您指的是未在任何地方定义的变量 "innerDir"。您是否希望用户向名为 innerDir 的 ExecuteScript 添加用户定义的 属性?如果是这样,脚本中的 innerDir 变量是一个 PropertyValue 对象,因此您需要对其调用 getValue() 以获取 属性:
的实际值
innerDir.value
你也提到了 scalaFile.getCanonicalPath() 但上面没有定义 scalaFile,getCanonicalPath() 不会给你脚本的内容,是这个意思吗?
我修改了上面的部分脚本,假设 innerDir 是一个用户定义的 属性 并且你将文件变量的内容写入 scalaFile 指向的文件;我还通过使用 heredoc 而不是附加到文件变量使其变得更多 Groovy:
// Get flow file
def flowFile = session.get()
if (!flowFile) return
// Create output directory
def userInputDir = flowFile.getAttribute("user.input.path")
def finalFolder = new File(userInputDir + "/" + innerDir?.value ?: '')
try
{
if (!finalFolder.exists()) finalFolder.mkdirs()
// Write script
file =
"""
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
import org.apache.spark.sql._
import java.io._
"""
scalaFile = new File(finalFolder, 'script.scala')
scalaFile.withWriter {w -> w.write(file)}
// Output file path
flowFile = session.putAttribute(flowFile, "generatedScript", scalaFile.canonicalPath)
session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
log.info("File: {}\n", finalFolder.file)
session.transfer(flowFile, REL_FAILURE)
}
我正在尝试通过 executescript 处理器执行某些操作;里面有一个groovy代码。在代码中,我试图创建一个 scala 脚本,该脚本将在另一个处理器中的 spark 上执行。
// Get flow file
def flowFile = session.get()
if (!flowFile) return
// Create output directory
def userInputDir = flowFile.getAttribute("user.input.path")
def finalFolder = new File(userInputDir + "/" + innerDir)
try
{
if (!finalFolder.exists()) finalFolder.mkdirs()
// Write script
file = "spark.sqlContext.setConf(\"hive.exec.dynamic.partition\", \"true\")\n"
file = file + "spark.sqlContext.setConf(\"hive.exec.dynamic.partition.mode\", \"nonstrict\")\n"
file = file + "import org.apache.spark.sql._"
file = file + "\n"
file = file + "import java.io._"
file = file + "\n"
}
.
.
.
其余的其他步骤是向 script
变量添加一些其他特定于 spark 的命令。脚本很大,所以跳过完整的代码粘贴。
最后,以一个陷阱结束
// Output file path
flowFile = session.putAttribute(flowFile, "generatedScript", scalaFile.getCanonicalPath())
session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e)
{
log.info("File: {}\n", finalFolder.file)
session.transfer(flowFile, REL_FAILURE)
}
处理器甚至没有开始执行 groovy 脚本,它失败并显示错误:
groovy.lang.MissingPropertyException: No such property: script for calss: javal.io.File
语句'not even beginning to start'表示前面的队列不为空,处理器抛出错误。我猜这是一个语法问题,但我没有发现脚本中有任何相关的语法问题。我还尝试了 运行 本地机器 groovy shell 中的脚本,那里也有同样的错误,但没有语法问题。
谷歌搜索错误让我得到了在脚本中包含导入的建议,但即使在包含相关导入之后错误还是一样。
有什么线索吗?
您指的是未在任何地方定义的变量 "innerDir"。您是否希望用户向名为 innerDir 的 ExecuteScript 添加用户定义的 属性?如果是这样,脚本中的 innerDir 变量是一个 PropertyValue 对象,因此您需要对其调用 getValue() 以获取 属性:
的实际值innerDir.value
你也提到了 scalaFile.getCanonicalPath() 但上面没有定义 scalaFile,getCanonicalPath() 不会给你脚本的内容,是这个意思吗?
我修改了上面的部分脚本,假设 innerDir 是一个用户定义的 属性 并且你将文件变量的内容写入 scalaFile 指向的文件;我还通过使用 heredoc 而不是附加到文件变量使其变得更多 Groovy:
// Get flow file
def flowFile = session.get()
if (!flowFile) return
// Create output directory
def userInputDir = flowFile.getAttribute("user.input.path")
def finalFolder = new File(userInputDir + "/" + innerDir?.value ?: '')
try
{
if (!finalFolder.exists()) finalFolder.mkdirs()
// Write script
file =
"""
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
import org.apache.spark.sql._
import java.io._
"""
scalaFile = new File(finalFolder, 'script.scala')
scalaFile.withWriter {w -> w.write(file)}
// Output file path
flowFile = session.putAttribute(flowFile, "generatedScript", scalaFile.canonicalPath)
session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
log.info("File: {}\n", finalFolder.file)
session.transfer(flowFile, REL_FAILURE)
}