5 Star 24 Fork 15

改着名儿玩 / slf4j-kafka

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

slf4j日志输出到Kafka扩展

BY ZZB

背景

如今分布式系统已经成为开发web的标配,且当前大数据系统通常都在集群中运行,如此多的系统集成在一起,日志管理变得非常繁杂艰难。因此一套同一的日志管理 方案迫在眉睫。网上有很多日志统一方案,大部分是通过filebeat或者logstash采集本地日志,再输出到ES中,通过索引的方式获取不同系统,不同级别的日志。 这种方案的确可靠有效,但是多了一个采集层的工作,而如果需要采集的日志给其他监控系统消费时则需要配多个采集器,并且在yarn中无法定位日志在哪台机器的 什么位置的情况下,这种方案也显得比较无力。

本项目是通过扩展slf4j这个java世界通用的日志框架,将日志按照一定的格式直接输出到kafka,然后不同的系统可以定义不同的消费者从kafka中获取日志, 做具体业务。

简介

本项目通过扩展Slf4j的Appender,将日志像输出到文件或者控制台一样简单的配置,就可以输出到kafka中。 项目提供了KafkaAppender和KafkaConcurrentAppender两种方案,均是异步输出到kafka中。

appender 插入顺序 性能消耗 插入速度
KafkaAppender 保证顺序 较大 较慢
KafkaConcurrentAppender 不保证顺序

当前已支持log4j1.2.X和logback

log4j

在maven中定义log4j依赖

<dependency>
    <artifactId>slf4j-kafka-over-log4j12</artifactId>
    <groupId>com.tuandai.log</groupId>
    <version>1.7.5</version>
</dependency>

在需要用到日志的地方定义Slf4jLogger

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static Logger logger = LoggerFactory.getLogger(Main.class);

定义log4j.properties

#控制台输出Appender
log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender 
#增强日志格式转换器
log4j.appender.myConsoleAppender.layout=com.tuandai.log.log4j12.layout.ExtendedEnhancedPatternLayout
#控制台输出日志格式
log4j.appender.myConsoleAppender.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p %P --- [%15.15t] %-40.40logger{39} : %m%n

#kafka输出Appender
log4j.appender.kafkaAppender=com.tuandai.log.log4j12.appender.KafkaAppender
#增强日志格式转换器
log4j.appender.kafkaAppender.layout=com.tuandai.log.log4j12.layout.ExtendedEnhancedPatternLayout
#kafka输出日志格式
log4j.appender.kafkaAppender.layout.ConversionPattern={"@timestamp": "%d{yyyy-MM-dd HH:mm:ss.SSS}","beat": { "hostname": "%host_name","name": "%sys_user", "version": "5.2.2" }, "fields": { "app_id": "%app_id" }, "input_type": "log", "message": "%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p %P --- [%15.15t] %-40.40logger{39} : %encode_message%n", "offset": 0, "source": "", "type": "log" }

#输出日志级别, 应用 控制台输出Appender 和 kafka输出Appender
log4j.rootLogger=DEBUG, myConsoleAppender, kafkaAppender

#关闭kafka输出日志[重要!因为kafka提供的javaapi使用了log4j做日志,如果不屏蔽掉会出现循环依赖,造成栈溢出]
log4j.logger.org.apache.kafka=OFF

定义sl4j-kafka.properties

#kafka-support
kafka.bootstrapServers=10.100.11.39:9092,10.100.11.40:9092
kafka.topic=log-log4j-test
kafka.appId=app

logback

在maven中定义log4j依赖

<dependency>
    <artifactId>slf4j-kafka-over-logback</artifactId>
    <groupId>com.tuandai.log</groupId>
    <version>1.7.5</version>
</dependency>

在需要用到日志的地方定义Slf4jLogger

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static Logger logger = LoggerFactory.getLogger(Main.class);

定义logback.xml

<?xml version="1.0" encoding="UTF-8" ?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
    <property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p %pid --- [%15.15t] %-40.40logger{39} : %m%n" />
    <!--控制台打印-->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="com.tuandai.log.logback.layout.ExtendedPatternLayoutEncoder">
            <pattern>${LOG_PATTERN}</pattern>
            <immediateFlush>true</immediateFlush>
        </encoder>
    </appender>

    <property name="KAFKA_PATTERN" value='{"@timestamp": "%d{yyyy-MM-dd"T"HH:mm:ss.SSS"Z"}","beat": { "hostname": "${HOSTNAME}","name": "%sys_user", "version": "5.2.2" }, "fields": { "app_id": "%app_id" }, "input_type": "log", "message": "%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p %pid --- [%15.15t] %-40.40logger{39} : %encode_message%n", "offset": 0, "source": "", "type": "log" }' />
    <!--输出到kafka-->
    <appender name="KAFKA" class="com.tuandai.log.logback.appender.KafkaConcurrentAppender">
        <encoder class="com.tuandai.log.logback.layout.ExtendedPatternLayoutEncoder">
            <pattern>${KAFKA_PATTERN}</pattern>
        </encoder>
    </appender>

    <!--日志级别-->
    <root level="DEBUG">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="KAFKA" />
    </root>
</configuration>

定义sl4j-kafka.properties

#kafka-support
kafka.bootstrapServers=10.100.11.39:9092,10.100.11.40:9092
kafka.topic=log-log4j-test
kafka.appId=app

附:在Spark-yarn 中使用

当spark运行的时候,会有driver和executor两种角色,其中driver负责提交spark任务和收集executor执行结果,而executor是由yarn分配到集群中的某些机器来运行被driver提交的任务。 在我的理解看来,driver做了驱动和对结果集的聚合,executor就负责执行具体任务产出结果集。 通过我们运行spark-submit的后日志可以发现,提交程序后不会有程序的日志输出,具体的日志在各个节点中的yarn工作目录下。如/yarns/nm/userXX/。 节点任务的日志中包含了spark节点的启动日志和spark具体业务日志。 具体的我自己没有透彻前就不误人子弟了,说到这里就够了,有兴趣可以详细研究下spark-yarn工作方式。 以上说的这些只是想表述一个问题,spark-yarn下的日志的配置不是在运行起任务的时候指定的,而是在提交任务是就必须指定了。 所以在spark-yarn使用和普通的业务有所不同。

进入正题

将log4j.properties 和 slf4j-kafka.properties 拷贝到spark提交目录下,内容参照上

[cdh]$ pwd
/home/datapipe/spark-jobs/spark-job
[cdh]$ ls
spark-job.jar log4j.properties slf4j-kafka.properties

将编译好的slf4j-kafka-over-log4j12和slf4j-kafka-producer的jar包放到该目录下

[cdh]$ ls
spark-job.jar log4j.properties slf4j-kafka.properties
slf4j-kafka-over-log4j12-1.7.5.jar slf4j-kafka-producer-1.7.5.jar

编写启动脚本

[cdh]$ touch startup.sh
[cdh]$ chmod +x startup.sh

脚本内容

1  nohup spark-submit --class com.tuandai.XXXMainFunctionClass \
2  --master yarn --deploy-mode cluster --executor-memory 512m --driver-memory 512m --num-executors 1 --name XXX \
3  --jars slf4j-kafka-producer-1.7.5.jar,slf4j-kafka-over-log4j12-1.7.5.jar \
4  --files "log4j-kafka.properties,slf4j-kafka.properties" \
5  --conf "spark.driver.extraClassPath=slf4j-kafka-producer-1.7.5.jar:slf4j-kafka-over-log4j12-1.7.5.jar" \
6  --conf "spark.executor.extraClassPath=slf4j-kafka-producer-1.7.5.jar:slf4j-kafka-over-log4j12-1.7.5.jar" \
7  --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-kafka.properties" \
8  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-kafka.properties" \
9  spark-job.jar &
10 echo "spark job started"

逐行讲解下

  1. 提交的任务需要运行的main方法
  2. 使用yarn 集群模式 executor节点内存512m driver节点内存分配512m executor数量1个 任务名
  3. 上传第三方jar包,这里就是slf4j-kafka的支持了
  4. 上传配置文件
  5. driver对第三方jar包生效
  6. executor对第三方jar包生效
  7. driver运行参数,这里是生效上传的配置文件
  8. executor运行参数,这里是生效上传的配置文件
  9. spark任务jar包

空文件

简介

把slf4j日志输出到kafka,目前支持logback,log4j 展开 收起
Java
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/123zhangzhenbin/slf4j-kafka.git
git@gitee.com:123zhangzhenbin/slf4j-kafka.git
123zhangzhenbin
slf4j-kafka
slf4j-kafka
master

搜索帮助