博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flume实时采集日志到Kafka(极简版)
阅读量:2225 次
发布时间:2019-05-09

本文共 2867 字,大约阅读时间需要 9 分钟。

背景:

由于项目采用微服务架构,业务日志文件数量较多,我做了个简单的日志监控服务,先在此分享下日志采集的简单步骤,没有任何花里胡哨 ~(ps:一切数据到了 kafka就非常好解决了!)


一、Flume安装

Flume的安装使用可以说非常简单,直接进官网:

在这里插入图片描述

最新是1.9.0版本,我们选择1.8.0版本下载。
在这里插入图片描述
在这里插入图片描述
然后在Linux下解压:
在这里插入图片描述
在这里插入图片描述
配置用户环境变量:(如果有root权限可以配置在/etc/profile

vim ~/.bashrc

在末尾添加路径:

FLUME_HOME=/home/dev/flume/flume-1.8.0PATH=$PATH:$FLUME_HOME/bin

source使其生效:

source ~/.bashrc

二、确定日志采集策略

需要根据我们的日志类型来确定最佳策略。

我们项目的业务日志是通过Logback生成,日志形式如下所示:

在这里插入图片描述
根据日期规则进行滚动:
当天的日志文件名为stdout.log,过了凌晨12点即把stdout.log改名为stdout.log.昨天日期.log,然后日志重新输出在新的stdout.log。所以我们只需要采集名为stdout.log的文件即可。

一般Flume采集日志source有两种方式:

1.Exec类型的Source

可以将命令产生的输出作为源,如:

a1.sources.r1.type = execa1.sources.r1.command = ping 10.3.1.227 //此处输入命令

2.Spooling Directory类型的 Source

将指定的文件加入到“自动搜集 ”目录中。flume会持续监听这个目录,把文件当做source来处理。注意:一旦文件被放到“自动收集”目录中后,便不能修改,如果修改,flume会报错。此外,也不能有重名的文件,如果有,flume也会报错。

a1.sources.r1.type = spooldira1.sources.r1.spoolDir = /home/work/data

向指定的文件目录下传送一个日志文件,发现flume的控制台打印相关的信息;此外,会发现被处理的文件,会追加一个后缀:completed,表示已处理完。

确定采集姿势:

如果采用spooldir的方式来监控log文件夹,flume会采集log数据,但是滚动生成stdout.log.昨天日期.log,flume就会把stdout.log.昨天日期.log文件当作新文件,又重新读取一遍,导致重复

所以使用exec命令行的方式,通过tail -F *.log命令比较好!

注意: -F根据文件名进行追踪,并保持重试,即该文件被删除或改名后,如果再次创建相同的文件名,会继续追踪。 而-f根据文件的nodeid即文件描述符进行追踪,当文件改名或被删除,追踪停止 。


三、添加Flume-Sink:Kafka的配置

网上关于这种配置文件五花八门,都是对应Flume不同版本的产物,无法判断对错,所以我建议大家直接去官网查看对应版本Flume的配置写法!

在这里插入图片描述

查看文档根据自己的版本来!此处我的是1.8.0!
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
附上我的配置(在flume的conf目录下添加kafka-producer.conf):
在这里插入图片描述
文字版如下:

pro.sources = s1pro.channels = c1pro.sinks = k1pro.sources.s1.type = execpro.sources.s1.command = tail -F /home/dev/log/moercredit/stdout.logpro.channels.c1.type = memorypro.channels.c1.capacity = 1000pro.channels.c1.transactionCapacity = 100pro.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinkpro.sinks.k1.kafka.topic = moercredit_log_testpro.sinks.k1.kafka.bootstrap.servers = cdh1:9092,cdh2:9092,cdh3:9092pro.sinks.k1.kafka.flumeBatchSize = 20pro.sinks.k1.kafka.producer.acks = 1pro.sinks.k1.kafka.producer.linger.ms = 1pro.sinks.k1.kafka.producer.compression.type = snappypro.sources.s1.channels = c1pro.sinks.k1.channel = c1

注:上面的 cdh1、2、3为主机名,此处配置 IP:端口也行


四、启动并测试

在conf目录下执行命令:

nohup flume-ng agent -n pro -c ./ -f kafka-producer.conf &

注:

-n : agent的name
-c:conf目录路径
-f:配置文件路径

上面命令会将日志打印在目录下的nohup.out文件里,如果不需要,可以执行如下命令:

nohup flume-ng agent -n pro -c ./ -f kafka-producer.conf >/dev/null 2>&1 &

扩展:

Linux下还有一个特殊的文件/dev/null,或称空设备,是一个特殊的设备文件;它就像一个无底洞,所有重定向到它的信息都会消失得无影无踪。这一点非常有用,当我们不需要回显程序的所有信息时,就可以将输出重定向到/dev/null。
如果想要正常输出和错误信息都不显示,则要把标准输出和标准错误都重定向到/dev/null, 例如: # ls 1>/dev/null
2>/dev/null 还有一种做法是将错误重定向到标准输出,然后再重定向到 /dev/null,例如: # ls/dev/null 2>&1
注意:此处的顺序不能更改,否则达不到想要的效果,此时先将标准输出重定向到
/dev/null,然后将标准错误重定向到标准输出,由于标准输出已经重定向到了/dev/null,因此标准错误也会重定向到/dev/null,于是一切静悄悄:-)
由于使用nohup时,会自动将输出写入nohup.out文件中,如果文件很大的话,nohup.out就会不停的增大,这是我们不希望看到的,因此,可以利用/dev/null来解决这个问题。
nohup ./program >/dev/null 2>log &
如果错误信息也不想要的话: nohup ./program /dev/null 2>&1 &

启动后我们通过Kafka Tool工具查看日志topic里面有没有数据:

在这里插入图片描述
一切 OK!


转载地址:http://hrlfb.baihongyu.com/

你可能感兴趣的文章
Oracle知识点连载(二)
查看>>
Oracle知识点连载(三)
查看>>
Oracle知识点连载(五)
查看>>
关于三元运算符的类型转换问题
查看>>
笔记本怎么设置WIfi热点
查看>>
如何实现字符串的反转及替换?
查看>>
Java面试题全集(上)
查看>>
Java面试题全集(中)
查看>>
值传递和引用传递
查看>>
什么情况下用+运算符进行字符串连接比调用StringBuilder对象的append方法连接字符串性能更好?
查看>>
怎么根据Comparable方法中的compareTo方法的返回值的正负 判断升序 还是 降序?
查看>>
理解事务的4种隔离级别
查看>>
常用正则匹配符号
查看>>
建议42: 让工具类不可实例化
查看>>
Java 异步机制与同步机制的区别
查看>>
hibernate的对象三种状态说明
查看>>
什么是N+1查询?
查看>>
Spring 接管 Hibernate 配置 延迟加载
查看>>
找出不在预定数组中的自然数
查看>>
String常见面试题
查看>>