通用解决方案:maxwell+kafka+Spark Streaming构建MySQL Bin

优采云 发布时间: 2022-11-03 03:01

  通用解决方案:maxwell+kafka+Spark Streaming构建MySQL Bin

  需求分析说明

  根据业务场景,需要对日志进行实时处理,进行实时图表展示(Highchart等)。如果频繁提取数据库,会对数据库服务器造成很大的压力,相应的web服务也会受到很大的影响。因此,提取数据库日志不仅可以大大减轻数据库服务的压力,还可以解决实时处理和实时显示图表的需求。本篇博客 MySQL Binlog log 采集 提供解决方案为例

  1.部署安装maxwell采集器

  1)首先检查mysql是否开启了binlog

  2) 下载麦克斯韦

  组件下载地址:

  解压 tar -zxvf maxwell-1.17.1.tar.gz

  3)授权mysql(只对maxwell库操作)

  其中 user01 是数据库用户名 666666 是数据库密码

  GRANT ALL on maxwell.* to 'user01'@'%' 由 '666666' 标识;

  将 *.* 上的 SELECT、REPLICATION CLIENT、REPLICATION SLAVE 授予 'user01'@'%';

  4)执行maxwell命令行(注:maxwell默认将监控的mysql binlog日志发送到名为maxwell topic的kafka topic)

  具体demo如下:

  bin/maxwell --user='user01'--password='666666'--host='127.0.0.1'--include_dbs=db1 --include_tables=table1,table2--producer=kafka--kafka.bootstrap.servers =d1:9092,d2:9092,d3:9092 --kafka_topic 测试

  注意:--user为数据库用户名--password数据库密码--host表示安装mysql的服务器地址(可以与安装maxwell的服务器不同)--include_dbs表示过滤特定数据库--include_tables意思是过滤特定库 下面的具体表格--kafka.bootstrap.servers代表kafka的IP地址和端口号--kafka_topic kafka代表kafka对应的topic

  2、kafka的相关配置(注:d1、d2、d3为各个服务器的主机名,kafka中配置文件的端口号要与命令行中给出的端口号一致)

  1)启动kafka命令行(这里作为后台进程运行)

  nohup bin/kafka-server-start.sh 配置/server.properties &

  2)创建kafka主题作为测试主题

  bin/kafka-topics.sh --zookeeper d1:2181,d2:2181,d3:2181 --create --topic test --partitions 20 --replication-factor 1

  3)启动消费者窗口

  bin/kafka-console-consumer.sh --bootstrap-server d1:9092,d2:9092,d3:9092 --topic 测试

  

  三、Spark Streaming结合kafka

  注意:这个demo的spark版本是2.2.1,kafka版本是0.10.0。请注意spark版本对应kafka版本。详细请参考spark官方说明网站

  package com.baison.realTimeCalculation

import java.lang

import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.SparkConf

import org.apache.spark.streaming.dstream.DStream

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

import org.apache.spark.streaming.kafka010.KafkaUtils

import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.spark.streaming.{Durations, StreamingContext}

import scala.util.Try

object IposRealTime {

def main(args: Array[String]): Unit = {

val conf=new SparkConf().setAppName("IposRealTime")

.set("spark.streaming.blockInterval", "50")//生成block的间隔

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//用kryo序列化

.set("spark.streaming.backpressure.enabled","true") //数据的反压机制

.set("spark.task.maxFailures","10")//task最大失败次数

.set("spark.streaming.kafka.maxRetries","5") //kafka的最大重试次数

.set("spark.streaming.stopGracefullyOnShutdown","true")//程序优雅关闭

.set("spark.io.compression.codec","snappy") //压缩模式

<p>

.set("spark.rdd.compress","true") //压缩RDD的分区

.registerKryoClasses(Array(classOf[EveryWeekForm],classOf[HotGoodsForm],classOf[MemberFlowForm],

classOf[TodayYeJiForm]))

val ssc=new StreamingContext(conf,Durations.seconds(2))

//kafka的配置

val kafkaParam=Map[String,Object](

Constants.KAFKA_METADATA_BROKER_LIST->ConfigurationManager.getProperty(Constants.KAFKA_METADATA_BROKER_LIST),

"key.deserializer"->classOf[StringDeserializer],

"value.deserializer"->classOf[StringDeserializer],

Constants.KAFKA_GROUP_ID->ConfigurationManager.getProperty(Constants.KAFKA_GROUP_ID),

Constants.KAFKA_AUTO_OFFSET_RESET->ConfigurationManager.getProperty(Constants.KAFKA_AUTO_OFFSET_RESET),//从该topic最新位置开始读取数据

"enable.auto.commit"->(false:lang.Boolean),

Constants.SESSION_TIMEOUT_MS->ConfigurationManager.getProperty(Constants.SESSION_TIMEOUT_MS) //最大程度的确保Spark集群和kafka连接的稳定性

)

val topics=List(ConfigurationManager.getProperty(Constants.KAFKA_TOPICS)).toSet

val inputDStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam)).repartition(50)

ssc.checkpoint(Constants.SPARK_CHECKPOINT_DATA)

//此处进行处理数据操作

ssc.start()

ssc.awaitTermination()

}

</p>

  如有错误请指正,不胜感激。

  最佳实践:ELK(elasticsearch+logstash+kibana)日志采集系统

  文章目录

  一、安装环境

  系统版本:分 6.5

  JDK:1.8.0_181

  弹性搜索-6.4.2

  日志-6.4.2

  木花-6.4.2

  其次,安装 JDK 2.1 并下载 JDK:

  此环境下载 64 位 tar .gz 包,并将安装包复制到安装服务器/home/ 目录

  [root@localhost ~]# 光盘 /首页/

  [root@localhost local]# tar -xzvf JDK-8u181-linux-x64.tar.gz

  2.2. 配置环境变量

  [root@localhost本地]# vim /etc/profile

  将以下内容添加到文件末尾

  JAVA_HOME=/home/jdk1.8.0_181

JRE_HOME=/home/jdk1.8.0_181/jre

CLASSPATH=.:$JAVA_HOME/lib:/dt.jar:$JAVA_HOME/lib/tools.jar

PATH=$PATH:$JAVA_HOME/bin

export JAVA_HOME

export JRE_HOME

ulimit -u 4096

  [root@localhost本地]# 源 /etc/profile

  2.3. 配置限制相关参数

  修改限制.conf

  vi /etc/security/limits.conf

  添加以下内容

  * soft nproc 65536

* hard nproc 65536

* soft nofile 65536

* hard nofile 65536

  修改 90-nproc.conf 配置文件。

  vi /etc/security/limits.d/90-nproc.conf

  #修改如下内容:

  软

  NPRC 1024 修改为 Soft NPRC 4096

  修改配置 sysctl.conf

  vi /etc/sysctl.conf

  #添加下面配置:

  

  vm.max_map_count=655360

  #并执行命令:

  系统CTL -p

  2.4. 创建一个运行 ELK 的用户

  [root@localhost本地]# 组添加麋鹿

  [root@localhost local]# useradd -g elk elk

  [root@localhost本地]# passwd elk – 更改 elk 用户密码

  创建 ELK 运行目录

  [root@localhost本地]# MKDIR /home/elk

  [root@localhost local]# chown -R elk:elk /home/elk

  以上所有操作均由根用户完成

  第三,安装 Elasticsearch以下由麋鹿用户

  操作,麋鹿用户以麋鹿用户身份登录服务器

  下载 ELK 安装包,上传到服务器并解压。

  解压缩命令:tar -xzvf 软件包名称

  配置弹性搜索

  vi conf/elasticsearch.yml

  修改如下:

  cluster.name: mycluster

node.name: node-1

node.master: true #指定了该节点可能成为 master 节点,还可以是数据节点

node.data: true

network.host: 192.168.31.86

http.port: 9200

transport.tcp.port: 9300

discovery.zen.ping.unicast.hosts: ["172.18.96.32", "172.18.96.33","172.18.96.35","172.18.96.36"]

#修改bootstrap.system_call_filter为false,注意要在Memory下面:

bootstrap.memory_lock: false

bootstrap.system_call_filter: false

  修改 jvm.options 文件中的以下内容以设置使用的最大和最小内存量

  -Xms1g

-Xmx1g

  向防火墙配置添加了端口

  苏根

  vi /etc/sysconfig/iptables

  -A INPUT -m state --state NEW -m tcp -p tcp --dport 9200 -j ACCEPT

<p>

-A INPUT -m state --state NEW -m tcp -p tcp --dport 9300 -j ACCEPT

</p>

  服务IP表重新启动

  保存以退出

  启动弹性搜索

  ./elasticsearch -d & --后台启动

  检查启动是否成功

  使用浏览器访问::9200

  安装了 Elasticsearch。

  四、安装日志库

  logstash 是负责采集和过滤日志的 ELK

  按如下方式编写配置文件:

  解释:

  logstash 配置文件必须收录三件事:

  input{}:该模块负责采集日志,可以由生成日志的业务系统从文件中读取、从 Redis 读取或开放端口直接写入 logstash

  filter{}:该模块负责过滤采集到的日志,并根据过滤定义日志的显示字段。

  output{}:该模块负责将过滤后的日志输出到 ElasticSearch 或文件、redis 等。

  该环境从文件中读取日志,业务系统生成的日志格式如下:

  [2016-11-05 00:00:03,731 INFO] [http-nio-8094-exec-10] [filter.LogRequestFilter] - /merchant/get-supply-detail.shtml, IP: 121.35.185.117, [device-dpi = 414*736, version = 3.6, device-os = iOS8.4.1, timestamp = 1478275204, bundle = APYQ9WATKK98V2EC, device-network = WiFi, token = 393E38694471483CB3686EC77BABB496, device-model = iPhone, device-cpu = , sequence = 1478275204980, device-uuid = C52FF568-A447-4AFE-8AE8-4C9A54CED10C, sign = 0966a15c090fa6725d8e3a14e9ef98dc, request = {

"supply-id" : 192

}]

[2016-11-05 00:00:03,731 DEBUG] [http-nio-8094-exec-10] [filter.ValidateRequestFilter] - Unsigned: bundle=APYQ9WATKK98V2EC&device-cpu=&device-dpi=414*736&device-model=iPhone&device-network=WiFi&device-os=iOS8.4.1&device-uuid=C52FF568-A447-4AFE-8AE8-4C9A54CED10C&request={

"supply-id" : 192

  输出

  直接输出到 Elasticsearch

  此环境需要处理来自两组业务系统的日志

  type:代表类型,其实这个类型被推送到 Elasticsearch,方便后续的 kibana 分类搜索,一般直接命名业务系统的项目名称路径

  :读取文件的路径

  这意味着,当日志中报告错误时,错误的换行符归因于上一条消息的内容

  start_position => “开始”是指从文件头部读取

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线