解决方案:使用Flume+Kafka+SparkStreaming进行实时日志分析

优采云 发布时间: 2022-11-17 09:44

  解决方案:使用Flume+Kafka+SparkStreaming进行实时日志分析

  每个公司都想进行数据分析或数据挖掘。采集日志和 ETL 是第一步。今天讲一下如何实时采集日志(准实时,每分钟分析一次),处理日志,存储处理后的记录。保存在Hive中,附上完整的实战代码

  一、总体结构

  想一想,正常情况下我们是如何采集和分析日志的呢?

  首先,业务日志会通过Nginx(或者其他方式,我们使用Nginx来写日志)每分钟写入一次磁盘。现在如果我们要使用Spark来分析日志,我们需要先将磁盘中的文件上传到HDFS。然后Spark对其进行处理,最后存储到Hive表中,如图:

  我们之前使用这种方法每天分析一次日志,但是这样有几个缺点:

  首先,我们的日志通过Nginx每分钟保存为一个文件,所以一天的文件很多,不利于后续的分析任务,所以我们要先合并一天所有的日志文件

  合并后需要将文件从磁盘传输到hdfs,但是我们的日志服务器不在hadoop集群中,所以没有办法直接传输到hdfs,需要先将文件从日志服务器传输到Hadoop集群所在服务器,然后再上传到Hdfs

  最后,也是最重要的一点,延迟一天的数据分析已经不能满足我们新的业务需求。滞后时间最好控制在一小时以内

  可以看出,我们之前采集分析日志的方法比较原创,比较耗时。大量时间浪费在网络传输上。如果日志量很大,有可能会丢失数据,所以我们在此基础上进行了改进。建筑学:

  整个过程就是Flume会实时监控写入日志的磁盘。只要有新的日志写入,Flume就会将日志以消息的形式传递给Kafka,然后Spark Streaming会实时消费消息传递给Hive。

  那么Flume是什么,它为什么能监控一个磁盘文件呢?总之,Flume是一个开源的采集、聚合、移动大量日志文件的框架,所以非常适合这种实时采集和投递日志的场景

  Kafka 是一个消息系统。Flume采集的日志可以移动到Kafka消息队列中,然后可以多地消费,保证不丢数据。

  通过这种架构,采集到的日志可以被Flume发现并及时发送给Kafka。通过Kafka,我们可以在各个地方使用日志。同样的日志可以存储在Hdfs中,离线分析,实时计算。安全性可以得到保证,实时性要求基本可以满足

  整个流程已经清楚了,下面有突破,我们开始实施整个系统

  2. 实战 2.1 安装Kafka

  下载并安装 Kafka 并在此处发送一些基本命令:Kafka Installation and Introduction

  安装后,创建一个名为 launcher_click 的新主题:

  bin/kafka-topics.sh --create --zookeeper hxf:2181,cfg:2181,jqs:2181,jxf:2181,sxtb:2181 --replication-factor 2 --partitions 2 --topic launcher_click

  看看这个话题:

  bin/kafka-topics.sh --describe --zookeeper hxf:2181,cfg:2181,jqs:2181,jxf:2181,sxtb:2181 --topic launcher_click

  2.2 安装水槽

  1.下载解压

  下载链接:

  注意进入下载地址页面,使用清华的地址,否则会很慢

  wget http://apache.fayea.com/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

tar -xvf apache-flume-1.7.0-bin.tar.gz

  2.修改配置文件

  进入flume目录,修改conf/flume-env.sh

  export JAVA_HOME=/data/install/jdk

export JAVA_OPTS="-Xms1000m -Xmx2000m -Dcom.sun.management.jmxremote"

  添加配置文件:conf/flume_launcherclick.conf

  # logser可以看做是flume服务的名称,每个flume都由sources、channels和sinks三部分组成

# sources可以看做是数据源头、channels是中间转存的渠道、sinks是数据后面的去向

logser.sources = src_launcherclick

logser.sinks = kfk_launcherclick

logser.channels = ch_launcherclick

<p>

# source

# 源头类型是TAILDIR,就可以实时监控以追加形式写入文件的日志

logser.sources.src_launcherclick.type = TAILDIR

# positionFile记录所有监控的文件信息

logser.sources.src_launcherclick.positionFile = /data/install/flume/position/launcherclick/taildir_position.json

# 监控的文件组

logser.sources.src_launcherclick.filegroups = f1

# 文件组包含的具体文件,也就是我们监控的文件

logser.sources.src_launcherclick.filegroups.f1 = /data/launcher/stat_app/.*

# interceptor

# 写kafka的topic即可

logser.sources.src_launcherclick.interceptors = i1 i2

logser.sources.src_launcherclick.interceptors.i1.type=static

logser.sources.src_launcherclick.interceptors.i1.key = type

logser.sources.src_launcherclick.interceptors.i1.value = launcher_click

logser.sources.src_launcherclick.interceptors.i2.type=static

logser.sources.src_launcherclick.interceptors.i2.key = topic

logser.sources.src_launcherclick.interceptors.i2.value = launcher_click

# channel

logser.channels.ch_launcherclick.type = memory

logser.channels.ch_launcherclick.capacity = 10000

logser.channels.ch_launcherclick.transactionCapacity = 1000

# kfk sink

# 指定sink类型是Kafka,说明日志最后要发送到Kafka

logser.sinks.kfk_launcherclick.type = org.apache.flume.sink.kafka.KafkaSink

# Kafka broker

logser.sinks.kfk_launcherclick.brokerList = 10.0.0.80:9092,10.0.0.140:9092

# Bind the source and sink to the channel

logser.sources.src_launcherclick.channels = ch_launcherclick

logser.sinks.kfk_launcherclick.channel = ch_launcherclick</p>

  3.开始

  nohup bin/flume-ng agent --conf conf/ --conf-file conf/flume_launcherclick.conf --name logser -Dflume.root.logger=INFO,console >> logs/flume_launcherclick.log &

  这个时候Kafka和Flume都已经启动了。从配置我们可以看出Flume的监控文件是/data/launcher/stat_app/.*,所以只要这个目录下的文件内容增加,就会发送给kafka。可以自己添加一些 把测试日志放到这个目录下的文件中,然后打开一个Kafka Consumer,看看Kafka有没有收到消息。这里我们将看到完成SparkStreaming后的测试结果

  2.3 Spark流式编程

  SparkStreaming是Spark用来处理实时流的,可以实时到秒级。我们这里不需要这样的实时。日志分析程序每分钟执行一次。主要代码如下:

   def main(args: Array[String]) {

<p>

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val sparkConf = new SparkConf().setAppName("LauncherStreaming")

//每60秒一个批次

val ssc = new StreamingContext(sparkConf, Seconds(60))

// 从Kafka中读取数据

val kafkaStream = KafkaUtils.createStream(

ssc,

"hxf:2181,cfg:2181,jqs:2181,jxf:2181,sxtb:2181", // Kafka集群使用的zookeeper

"launcher-streaming", // 该消费者使用的group.id

Map[String, Int]("launcher_click" -> 0, "launcher_click" -> 1), // 日志在Kafka中的topic及其分区

StorageLevel.MEMORY_AND_DISK_SER).map(_._2) // 获取日志内容

kafkaStream.foreachRDD((rdd: RDD[String], time: Time) => {

val result = rdd.map(log => parseLog(log)) // 分析处理原始日志

.filter(t => StringUtils.isNotBlank(t._1) && StringUtils.isNotBlank(t._2))

// 存入hdfs

result.saveAsHadoopFile(HDFS_DIR, classOf[String], classOf[String], classOf[LauncherMultipleTextOutputFormat[String, String]])

})

ssc.start()

// 等待实时流

ssc.awaitTermination()

}</p>

  内容有限,完整代码访问我的github:

  然后打包上传到master上运行:

  nohup /data/install/spark-2.0.0-bin-hadoop2.7/bin/spark-submit --master spark://hxf:7077 --executor-memory 1G --total-executor-cores 4 --class com.analysis.main.LauncherStreaming --jars /home/hadoop/jar/kafka-clients-0.10.0.0.jar,/home/hadoop/jar/metrics-core-2.2.0.jar,/home/hadoop/jar/zkclient-0.3.jar,/home/hadoop/jar/spark-streaming-kafka-0-8_2.11-2.0.0.jar,/home/hadoop/jar/kafka_2.11-0.8.2.1.jar /home/hadoop/jar/SparkLearning.jar >> /home/hadoop/logs/LauncherDM.log &

  然后开始测试,将日志写入Flume监控目录/data/launcher/stat_app/.*。原创日志内容类似如下:

  118.120.102.3|1495608541.238|UEsDBBQACAgIACB2uEoAAAAAAAAAAAAAAAABAAAAMGWUbW7bMAyGb6NfnUFRFEWhJ+gBdgBZVjpjjp04brMAO*yY2DKa9Y+B1+DnQ1LCztoITgK4wPGHfNUhmKGUPOn3DyP*zdOxSWM3T33XXMqy9OP7xXTZiTC1xlL0HgMEi+BfHoooBEGKr3fPpYy5jMse4Xzupus4TKkrs4kZOhI51CgWWKxsUQBRPMDr1*w5Hcuc0LiUEFBwdXQxAARXHb3+QXlOfzya0uZWOGwlEwBDwLD5oJBVFHsEEPF2U0EUToyr8k4tg9v8AkRrIcKmxGsU2eqQIM45dKuKFICo5oveEqOjh2JAIITImyIJqBk3JS4qh7Wby*TroxnL9ZKHXrsyWeBQoMXaEgXUKh6mOQ1l7NLc*Hwz8aDpAtndLFJEetkVc6S9V*bg+RFiKMvnTv6ahuGUTmWexqEfi3Elezx0botJrCCQn5jfCzWaqaUOqNpFYO23ckYl5GOlx4rLQuUllh27SsjZyLQTUn4K+3uVczlOi+7uuMzTYLoibeIspk71DtKuJC+7T5qXPg9lLddaZs6+Lolnj7ANW0dBGKOn72m3cbQJI2Kq4*C6Xhz9E5Pzeeg*i2l1IAJtpReILNq6DY4peFjHeO5vffPZd2UyejEJ28Puo0sI*2*5ojvhfNcquWomFMVp02Pz++M6Nach3e6XR5wOlrdSg4T7RkgtQAuC6HYl2sc62i6dUq*om+HWjvdHAPSk8hYkegHraxC8PwPons73XZeozDfXmaRzzzaD2XI4fX0QX*8BUEsHCKeft*敏*感*词*8AgAAmQQAAA==

  检查HDFS对应目录是否有内容:

  分析后存储在HDFS中的日志内容如下:

  99000945863664;864698037273329|119.176.140.248|1495594615129|2017-05-24 10:56:55|xiaomi|redmi4x|com.jingdong.app.mall&0ae359b6&1495534579412&1;com.autonavi.minimap&279f562f&1495534597934,1495534616627&2;com.android.contacts&91586932&1495538267103,1495540527138,1495576834653,1495583404117,1495591231535&5

  SparkStreaming任务状态如下:

  可以看出确实是每分钟执行一次

  参考

  %E9%9B%86%E7%BE%A4%E5%8F%8A%E9%A1%B9%E7%9B%AE%E5%AE%9E%E6%88%98/

  解决方案:优化手机端移动端百度网站seo排名好的方式是什么?

  企业快速获取百度移动端SEO优化排名主要有四种技巧,有需要的可以参考这些内容。

  1.百度移动端优化排名域名选择

  无论是优化移动端网站还是PC端,网站的域名都很重要,域名的选择要简洁明了。移动端网站优化选择较短的域名,这样会让人更容易记忆,输入简单,让人的思想更中心化,便于搜索引擎抓取。

  

  2.排行优化移动端与PC适配转换

  现在有很多seo移动端网站做了响应式,可以实现seo百度移动端和网站端的跳转,避免二次建站,很方便,但是对于这种网站还需要做相应的导航跳转链接,让用户在seo优化后的手机端和电脑端来回切换,保证安全正常运行。&gt; 也有点帮助。

  3. 网站移动优化页面简洁

  因为seo排名移动端的网站受限于屏幕大小,所以移动端优化的网站移动端页面一定要更加简洁,能够直接展示最重要的内容以吸引用户浏览。同时,百度移动端排名网站简洁明了的内容也可以提高移动端关键词优化网站的访问速度。

  

  4. 移动关键词 SEO优化网站结构

  对于seo mobile网站,网站必须设置成树状结构,树状结构一般分为三层:首页,频道,文章页面,理想网站 结构要扁平化,从首页到内容页的层级尽量减少,这样搜索引擎会比较容易处理。

  以上就是为大家总结的百度移动端优化排名技巧。以上移动端网站排名优化的内容相信大家都知道。优化百度移动端排名可以帮助企业获得更多流量。促进利益转化,提升网站品牌形象。

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线