通用解决方案:maxwell+kafka+Spark Streaming构建MySQL Bin
优采云 发布时间: 2022-11-03 03:01通用解决方案:maxwell+kafka+Spark Streaming构建MySQL Bin
需求分析说明
根据业务场景,需要对日志进行实时处理,进行实时图表展示(Highchart等)。如果频繁提取数据库,会对数据库服务器造成很大的压力,相应的web服务也会受到很大的影响。因此,提取数据库日志不仅可以大大减轻数据库服务的压力,还可以解决实时处理和实时显示图表的需求。本篇博客 MySQL Binlog log 采集 提供解决方案为例
1.部署安装maxwell采集器
1)首先检查mysql是否开启了binlog
2) 下载麦克斯韦
组件下载地址:
解压 tar -zxvf maxwell-1.17.1.tar.gz
3)授权mysql(只对maxwell库操作)
其中 user01 是数据库用户名 666666 是数据库密码
GRANT ALL on maxwell.* to 'user01'@'%' 由 '666666' 标识;
将 *.* 上的 SELECT、REPLICATION CLIENT、REPLICATION SLAVE 授予 'user01'@'%';
4)执行maxwell命令行(注:maxwell默认将监控的mysql binlog日志发送到名为maxwell topic的kafka topic)
具体demo如下:
bin/maxwell --user='user01'--password='666666'--host='127.0.0.1'--include_dbs=db1 --include_tables=table1,table2--producer=kafka--kafka.bootstrap.servers =d1:9092,d2:9092,d3:9092 --kafka_topic 测试
注意:--user为数据库用户名--password数据库密码--host表示安装mysql的服务器地址(可以与安装maxwell的服务器不同)--include_dbs表示过滤特定数据库--include_tables意思是过滤特定库 下面的具体表格--kafka.bootstrap.servers代表kafka的IP地址和端口号--kafka_topic kafka代表kafka对应的topic
2、kafka的相关配置(注:d1、d2、d3为各个服务器的主机名,kafka中配置文件的端口号要与命令行中给出的端口号一致)
1)启动kafka命令行(这里作为后台进程运行)
nohup bin/kafka-server-start.sh 配置/server.properties &
2)创建kafka主题作为测试主题
bin/kafka-topics.sh --zookeeper d1:2181,d2:2181,d3:2181 --create --topic test --partitions 20 --replication-factor 1
3)启动消费者窗口
bin/kafka-console-consumer.sh --bootstrap-server d1:9092,d2:9092,d3:9092 --topic 测试
三、Spark Streaming结合kafka
注意:这个demo的spark版本是2.2.1,kafka版本是0.10.0。请注意spark版本对应kafka版本。详细请参考spark官方说明网站
package com.baison.realTimeCalculation
import java.lang
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Durations, StreamingContext}
import scala.util.Try
object IposRealTime {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("IposRealTime")
.set("spark.streaming.blockInterval", "50")//生成block的间隔
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//用kryo序列化
.set("spark.streaming.backpressure.enabled","true") //数据的反压机制
.set("spark.task.maxFailures","10")//task最大失败次数
.set("spark.streaming.kafka.maxRetries","5") //kafka的最大重试次数
.set("spark.streaming.stopGracefullyOnShutdown","true")//程序优雅关闭
.set("spark.io.compression.codec","snappy") //压缩模式
<p>
.set("spark.rdd.compress","true") //压缩RDD的分区
.registerKryoClasses(Array(classOf[EveryWeekForm],classOf[HotGoodsForm],classOf[MemberFlowForm],
classOf[TodayYeJiForm]))
val ssc=new StreamingContext(conf,Durations.seconds(2))
//kafka的配置
val kafkaParam=Map[String,Object](
Constants.KAFKA_METADATA_BROKER_LIST->ConfigurationManager.getProperty(Constants.KAFKA_METADATA_BROKER_LIST),
"key.deserializer"->classOf[StringDeserializer],
"value.deserializer"->classOf[StringDeserializer],
Constants.KAFKA_GROUP_ID->ConfigurationManager.getProperty(Constants.KAFKA_GROUP_ID),
Constants.KAFKA_AUTO_OFFSET_RESET->ConfigurationManager.getProperty(Constants.KAFKA_AUTO_OFFSET_RESET),//从该topic最新位置开始读取数据
"enable.auto.commit"->(false:lang.Boolean),
Constants.SESSION_TIMEOUT_MS->ConfigurationManager.getProperty(Constants.SESSION_TIMEOUT_MS) //最大程度的确保Spark集群和kafka连接的稳定性
)
val topics=List(ConfigurationManager.getProperty(Constants.KAFKA_TOPICS)).toSet
val inputDStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam)).repartition(50)
ssc.checkpoint(Constants.SPARK_CHECKPOINT_DATA)
//此处进行处理数据操作
ssc.start()
ssc.awaitTermination()
}
</p>
如有错误请指正,不胜感激。
最佳实践:ELK(elasticsearch+logstash+kibana)日志采集系统
文章目录
一、安装环境
系统版本:分 6.5
JDK:1.8.0_181
弹性搜索-6.4.2
日志-6.4.2
木花-6.4.2
其次,安装 JDK 2.1 并下载 JDK:
此环境下载 64 位 tar .gz 包,并将安装包复制到安装服务器/home/ 目录
[root@localhost ~]# 光盘 /首页/
[root@localhost local]# tar -xzvf JDK-8u181-linux-x64.tar.gz
2.2. 配置环境变量
[root@localhost本地]# vim /etc/profile
将以下内容添加到文件末尾
JAVA_HOME=/home/jdk1.8.0_181
JRE_HOME=/home/jdk1.8.0_181/jre
CLASSPATH=.:$JAVA_HOME/lib:/dt.jar:$JAVA_HOME/lib/tools.jar
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME
export JRE_HOME
ulimit -u 4096
[root@localhost本地]# 源 /etc/profile
2.3. 配置限制相关参数
修改限制.conf
vi /etc/security/limits.conf
添加以下内容
* soft nproc 65536
* hard nproc 65536
* soft nofile 65536
* hard nofile 65536
修改 90-nproc.conf 配置文件。
vi /etc/security/limits.d/90-nproc.conf
#修改如下内容:
软
NPRC 1024 修改为 Soft NPRC 4096
修改配置 sysctl.conf
vi /etc/sysctl.conf
#添加下面配置:
vm.max_map_count=655360
#并执行命令:
系统CTL -p
2.4. 创建一个运行 ELK 的用户
[root@localhost本地]# 组添加麋鹿
[root@localhost local]# useradd -g elk elk
[root@localhost本地]# passwd elk – 更改 elk 用户密码
创建 ELK 运行目录
[root@localhost本地]# MKDIR /home/elk
[root@localhost local]# chown -R elk:elk /home/elk
以上所有操作均由根用户完成
第三,安装 Elasticsearch以下由麋鹿用户
操作,麋鹿用户以麋鹿用户身份登录服务器
下载 ELK 安装包,上传到服务器并解压。
解压缩命令:tar -xzvf 软件包名称
配置弹性搜索
vi conf/elasticsearch.yml
修改如下:
cluster.name: mycluster
node.name: node-1
node.master: true #指定了该节点可能成为 master 节点,还可以是数据节点
node.data: true
network.host: 192.168.31.86
http.port: 9200
transport.tcp.port: 9300
discovery.zen.ping.unicast.hosts: ["172.18.96.32", "172.18.96.33","172.18.96.35","172.18.96.36"]
#修改bootstrap.system_call_filter为false,注意要在Memory下面:
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
修改 jvm.options 文件中的以下内容以设置使用的最大和最小内存量
-Xms1g
-Xmx1g
向防火墙配置添加了端口
苏根
vi /etc/sysconfig/iptables
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9200 -j ACCEPT
<p>
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9300 -j ACCEPT
</p>
服务IP表重新启动
保存以退出
启动弹性搜索
./elasticsearch -d & --后台启动
检查启动是否成功
使用浏览器访问::9200
安装了 Elasticsearch。
四、安装日志库
logstash 是负责采集和过滤日志的 ELK
按如下方式编写配置文件:
解释:
logstash 配置文件必须收录三件事:
input{}:该模块负责采集日志,可以由生成日志的业务系统从文件中读取、从 Redis 读取或开放端口直接写入 logstash
filter{}:该模块负责过滤采集到的日志,并根据过滤定义日志的显示字段。
output{}:该模块负责将过滤后的日志输出到 ElasticSearch 或文件、redis 等。
该环境从文件中读取日志,业务系统生成的日志格式如下:
[2016-11-05 00:00:03,731 INFO] [http-nio-8094-exec-10] [filter.LogRequestFilter] - /merchant/get-supply-detail.shtml, IP: 121.35.185.117, [device-dpi = 414*736, version = 3.6, device-os = iOS8.4.1, timestamp = 1478275204, bundle = APYQ9WATKK98V2EC, device-network = WiFi, token = 393E38694471483CB3686EC77BABB496, device-model = iPhone, device-cpu = , sequence = 1478275204980, device-uuid = C52FF568-A447-4AFE-8AE8-4C9A54CED10C, sign = 0966a15c090fa6725d8e3a14e9ef98dc, request = {
"supply-id" : 192
}]
[2016-11-05 00:00:03,731 DEBUG] [http-nio-8094-exec-10] [filter.ValidateRequestFilter] - Unsigned: bundle=APYQ9WATKK98V2EC&device-cpu=&device-dpi=414*736&device-model=iPhone&device-network=WiFi&device-os=iOS8.4.1&device-uuid=C52FF568-A447-4AFE-8AE8-4C9A54CED10C&request={
"supply-id" : 192
输出
直接输出到 Elasticsearch
此环境需要处理来自两组业务系统的日志
type:代表类型,其实这个类型被推送到 Elasticsearch,方便后续的 kibana 分类搜索,一般直接命名业务系统的项目名称路径
:读取文件的路径
这意味着,当日志中报告错误时,错误的换行符归因于上一条消息的内容
start_position => “开始”是指从文件头部读取