使用 Log4j 在日志中输出 Spark 应用程序 ID
Output Spark application id in the logs with Log4j
我有一个用于 Spark 应用程序的自定义 Log4j 文件。我想输出 Spark 应用程序 ID 以及消息和日期等其他属性,因此 JSON 字符串结构如下所示:
{"name":,"time":,"date":,"level":,"thread":,"message":,"app_id":}
现在,这个结构看起来像这样:
{"name":,"time":,"date":,"level":,"thread":,"message":}
如何为 Spark 驱动程序日志定义这样的布局?
我的 log4j 文件如下所示:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j='http://jakarta.apache.org/log4j/'>
<appender name="Json" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.hadoop.log.Log4Json">
<param name="ConversionLayout" value=""/>
</layout>
</appender>
<root>
<level value="INFO"/>
<appender-ref ref="Json"/>
</root>
</log4j:configuration>
我怀疑是否可以为此调整 org.apache.hadoop.log.Log4Json
。根据其 javadoc 和源代码,它可能相当麻烦。
虽然看起来您使用的是 Log4j 1x,但它的 API 非常灵活,我们可以通过扩展 org.apache.log4j.Layout
.
轻松定义我们自己的布局
我们需要一个案例class,根据目标结构将其转化为JSON:
case class LoggedMessage(name: String,
appId: String,
thread: String,
time: Long,
level: String,
message: String)
和Layout
可能会扩展如下。要访问 "app_id" 的值,我们将使用 Log4j 的映射诊断上下文
import org.apache.log4j.Layout
import org.apache.log4j.spi.LoggingEvent
import org.json4s.DefaultFormats
import org.json4s.native.Serialization.write
class JsonLoggingLayout extends Layout {
// required by the API
override def ignoresThrowable(): Boolean = false
// required by the API
override def activateOptions(): Unit = { /* nothing */ }
override def format(event: LoggingEvent): String = {
// we are using json4s for JSON serialization
implicit val formats = DefaultFormats
// retrieve app_id from Mapped Diagnostic Context
val appId = event.getMDC("app_id") match {
case null => "[no_app]" // logged messages outside our app
case defined: AnyRef => defined.toString
}
val message = LoggedMessage("TODO",
appId,
Thread.currentThread().getName,
event.getTimeStamp,
event.getLevel.toString,
event.getMessage.toString)
write(message) + "\n"
}
}
最后,当创建Spark会话时,我们将app_id值放入MDC:
import org.apache.log4j.{Logger, MDC}
// create Spark session
MDC.put("app_id", session.sparkContext.applicationId)
logger.info("-------- this is info --------")
logger.warn("-------- THIS IS A WARNING --------")
logger.error("-------- !!! ERROR !!! --------")
这会产生以下日志:
{"name":"TODO","appId":"local-1550247707920","thread":"main","time":1550247708149,"level":"INFO","message":"-------- this is info --------"}
{"name":"TODO","appId":"local-1550247707920","thread":"main","time":1550247708150,"level":"WARN","message":"-------- THIS IS A WARNING --------"}
{"name":"TODO","appId":"local-1550247707920","thread":"main","time":1550247708150,"level":"ERROR","message":"-------- !!! ERROR !!! --------"}
当然,不要忘记参考 log4j 配置中的实现 xml:
<appender name="Json" class="org.apache.log4j.ConsoleAppender">
<layout class="Whosebug.q54706582.JsonLoggingLayout" />
</appender>
我有一个用于 Spark 应用程序的自定义 Log4j 文件。我想输出 Spark 应用程序 ID 以及消息和日期等其他属性,因此 JSON 字符串结构如下所示:
{"name":,"time":,"date":,"level":,"thread":,"message":,"app_id":}
现在,这个结构看起来像这样:
{"name":,"time":,"date":,"level":,"thread":,"message":}
如何为 Spark 驱动程序日志定义这样的布局?
我的 log4j 文件如下所示:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j='http://jakarta.apache.org/log4j/'>
<appender name="Json" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.hadoop.log.Log4Json">
<param name="ConversionLayout" value=""/>
</layout>
</appender>
<root>
<level value="INFO"/>
<appender-ref ref="Json"/>
</root>
</log4j:configuration>
我怀疑是否可以为此调整 org.apache.hadoop.log.Log4Json
。根据其 javadoc 和源代码,它可能相当麻烦。
虽然看起来您使用的是 Log4j 1x,但它的 API 非常灵活,我们可以通过扩展 org.apache.log4j.Layout
.
我们需要一个案例class,根据目标结构将其转化为JSON:
case class LoggedMessage(name: String,
appId: String,
thread: String,
time: Long,
level: String,
message: String)
和Layout
可能会扩展如下。要访问 "app_id" 的值,我们将使用 Log4j 的映射诊断上下文
import org.apache.log4j.Layout
import org.apache.log4j.spi.LoggingEvent
import org.json4s.DefaultFormats
import org.json4s.native.Serialization.write
class JsonLoggingLayout extends Layout {
// required by the API
override def ignoresThrowable(): Boolean = false
// required by the API
override def activateOptions(): Unit = { /* nothing */ }
override def format(event: LoggingEvent): String = {
// we are using json4s for JSON serialization
implicit val formats = DefaultFormats
// retrieve app_id from Mapped Diagnostic Context
val appId = event.getMDC("app_id") match {
case null => "[no_app]" // logged messages outside our app
case defined: AnyRef => defined.toString
}
val message = LoggedMessage("TODO",
appId,
Thread.currentThread().getName,
event.getTimeStamp,
event.getLevel.toString,
event.getMessage.toString)
write(message) + "\n"
}
}
最后,当创建Spark会话时,我们将app_id值放入MDC:
import org.apache.log4j.{Logger, MDC}
// create Spark session
MDC.put("app_id", session.sparkContext.applicationId)
logger.info("-------- this is info --------")
logger.warn("-------- THIS IS A WARNING --------")
logger.error("-------- !!! ERROR !!! --------")
这会产生以下日志:
{"name":"TODO","appId":"local-1550247707920","thread":"main","time":1550247708149,"level":"INFO","message":"-------- this is info --------"}
{"name":"TODO","appId":"local-1550247707920","thread":"main","time":1550247708150,"level":"WARN","message":"-------- THIS IS A WARNING --------"}
{"name":"TODO","appId":"local-1550247707920","thread":"main","time":1550247708150,"level":"ERROR","message":"-------- !!! ERROR !!! --------"}
当然,不要忘记参考 log4j 配置中的实现 xml:
<appender name="Json" class="org.apache.log4j.ConsoleAppender">
<layout class="Whosebug.q54706582.JsonLoggingLayout" />
</appender>