BY ZZB
如今分布式系统已经成为开发web的标配,且当前大数据系统通常都在集群中运行,如此多的系统集成在一起,日志管理变得非常繁杂艰难。因此一套同一的日志管理 方案迫在眉睫。网上有很多日志统一方案,大部分是通过filebeat或者logstash采集本地日志,再输出到ES中,通过索引的方式获取不同系统,不同级别的日志。 这种方案的确可靠有效,但是多了一个采集层的工作,而如果需要采集的日志给其他监控系统消费时则需要配多个采集器,并且在yarn中无法定位日志在哪台机器的 什么位置的情况下,这种方案也显得比较无力。
本项目是通过扩展slf4j这个java世界通用的日志框架,将日志按照一定的格式直接输出到kafka,然后不同的系统可以定义不同的消费者从kafka中获取日志, 做具体业务。
本项目通过扩展Slf4j的Appender,将日志像输出到文件或者控制台一样简单的配置,就可以输出到kafka中。 项目提供了KafkaAppender和KafkaConcurrentAppender两种方案,均是异步输出到kafka中。
appender | 插入顺序 | 性能消耗 | 插入速度 |
---|---|---|---|
KafkaAppender | 保证顺序 | 较大 | 较慢 |
KafkaConcurrentAppender | 不保证顺序 | 小 | 快 |
当前已支持log4j1.2.X和logback
在maven中定义log4j依赖
<dependency>
<artifactId>slf4j-kafka-over-log4j12</artifactId>
<groupId>com.tuandai.log</groupId>
<version>1.7.5</version>
</dependency>
在需要用到日志的地方定义Slf4jLogger
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static Logger logger = LoggerFactory.getLogger(Main.class);
定义log4j.properties
#控制台输出Appender
log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender
#增强日志格式转换器
log4j.appender.myConsoleAppender.layout=com.tuandai.log.log4j12.layout.ExtendedEnhancedPatternLayout
#控制台输出日志格式
log4j.appender.myConsoleAppender.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p %P --- [%15.15t] %-40.40logger{39} : %m%n
#kafka输出Appender
log4j.appender.kafkaAppender=com.tuandai.log.log4j12.appender.KafkaAppender
#增强日志格式转换器
log4j.appender.kafkaAppender.layout=com.tuandai.log.log4j12.layout.ExtendedEnhancedPatternLayout
#kafka输出日志格式
log4j.appender.kafkaAppender.layout.ConversionPattern={"@timestamp": "%d{yyyy-MM-dd HH:mm:ss.SSS}","beat": { "hostname": "%host_name","name": "%sys_user", "version": "5.2.2" }, "fields": { "app_id": "%app_id" }, "input_type": "log", "message": "%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p %P --- [%15.15t] %-40.40logger{39} : %encode_message%n", "offset": 0, "source": "", "type": "log" }
#输出日志级别, 应用 控制台输出Appender 和 kafka输出Appender
log4j.rootLogger=DEBUG, myConsoleAppender, kafkaAppender
#关闭kafka输出日志[重要!因为kafka提供的javaapi使用了log4j做日志,如果不屏蔽掉会出现循环依赖,造成栈溢出]
log4j.logger.org.apache.kafka=OFF
定义sl4j-kafka.properties
#kafka-support
kafka.bootstrapServers=10.100.11.39:9092,10.100.11.40:9092
kafka.topic=log-log4j-test
kafka.appId=app
在maven中定义log4j依赖
<dependency>
<artifactId>slf4j-kafka-over-logback</artifactId>
<groupId>com.tuandai.log</groupId>
<version>1.7.5</version>
</dependency>
在需要用到日志的地方定义Slf4jLogger
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static Logger logger = LoggerFactory.getLogger(Main.class);
定义logback.xml
<?xml version="1.0" encoding="UTF-8" ?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p %pid --- [%15.15t] %-40.40logger{39} : %m%n" />
<!--控制台打印-->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="com.tuandai.log.logback.layout.ExtendedPatternLayoutEncoder">
<pattern>${LOG_PATTERN}</pattern>
<immediateFlush>true</immediateFlush>
</encoder>
</appender>
<property name="KAFKA_PATTERN" value='{"@timestamp": "%d{yyyy-MM-dd"T"HH:mm:ss.SSS"Z"}","beat": { "hostname": "${HOSTNAME}","name": "%sys_user", "version": "5.2.2" }, "fields": { "app_id": "%app_id" }, "input_type": "log", "message": "%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p %pid --- [%15.15t] %-40.40logger{39} : %encode_message%n", "offset": 0, "source": "", "type": "log" }' />
<!--输出到kafka-->
<appender name="KAFKA" class="com.tuandai.log.logback.appender.KafkaConcurrentAppender">
<encoder class="com.tuandai.log.logback.layout.ExtendedPatternLayoutEncoder">
<pattern>${KAFKA_PATTERN}</pattern>
</encoder>
</appender>
<!--日志级别-->
<root level="DEBUG">
<appender-ref ref="STDOUT" />
<appender-ref ref="KAFKA" />
</root>
</configuration>
定义sl4j-kafka.properties
#kafka-support
kafka.bootstrapServers=10.100.11.39:9092,10.100.11.40:9092
kafka.topic=log-log4j-test
kafka.appId=app
当spark运行的时候,会有driver和executor两种角色,其中driver负责提交spark任务和收集executor执行结果,而executor是由yarn分配到集群中的某些机器来运行被driver提交的任务。 在我的理解看来,driver做了驱动和对结果集的聚合,executor就负责执行具体任务产出结果集。 通过我们运行spark-submit的后日志可以发现,提交程序后不会有程序的日志输出,具体的日志在各个节点中的yarn工作目录下。如/yarns/nm/userXX/。 节点任务的日志中包含了spark节点的启动日志和spark具体业务日志。 具体的我自己没有透彻前就不误人子弟了,说到这里就够了,有兴趣可以详细研究下spark-yarn工作方式。 以上说的这些只是想表述一个问题,spark-yarn下的日志的配置不是在运行起任务的时候指定的,而是在提交任务是就必须指定了。 所以在spark-yarn使用和普通的业务有所不同。
进入正题
将log4j.properties 和 slf4j-kafka.properties 拷贝到spark提交目录下,内容参照上 如
[cdh]$ pwd
/home/datapipe/spark-jobs/spark-job
[cdh]$ ls
spark-job.jar log4j.properties slf4j-kafka.properties
将编译好的slf4j-kafka-over-log4j12和slf4j-kafka-producer的jar包放到该目录下
[cdh]$ ls
spark-job.jar log4j.properties slf4j-kafka.properties
slf4j-kafka-over-log4j12-1.7.5.jar slf4j-kafka-producer-1.7.5.jar
编写启动脚本
[cdh]$ touch startup.sh
[cdh]$ chmod +x startup.sh
脚本内容
1 nohup spark-submit --class com.tuandai.XXXMainFunctionClass \
2 --master yarn --deploy-mode cluster --executor-memory 512m --driver-memory 512m --num-executors 1 --name XXX \
3 --jars slf4j-kafka-producer-1.7.5.jar,slf4j-kafka-over-log4j12-1.7.5.jar \
4 --files "log4j-kafka.properties,slf4j-kafka.properties" \
5 --conf "spark.driver.extraClassPath=slf4j-kafka-producer-1.7.5.jar:slf4j-kafka-over-log4j12-1.7.5.jar" \
6 --conf "spark.executor.extraClassPath=slf4j-kafka-producer-1.7.5.jar:slf4j-kafka-over-log4j12-1.7.5.jar" \
7 --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-kafka.properties" \
8 --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-kafka.properties" \
9 spark-job.jar &
10 echo "spark job started"
逐行讲解下
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。
1. 开源生态
2. 协作、人、软件
3. 评估模型