实时文章采集( 实时数仓:日志数据采集1.的使用流程和使用方法)
优采云 发布时间: 2022-04-11 10:23实时文章采集(
实时数仓:日志数据采集1.的使用流程和使用方法)
Flink 实时数仓项目——日志数据采集
前言
实时数仓项目的数据源有两种,一种是用户行为日志数据,一种是业务数据库中的业务数据。我们需要对这两部分数据执行采集,首先执行用户行为日志数据采集。
一、日志数据采集 1.模拟日志*敏*感*词*的使用
由于日志数据是用户通过点击等操作生成的,因此无法获取真实数据。这里,通过模拟生成日志数据的方法来生成日志数据。
主要流程:运行生成日志数据的springboot程序,然后通过接口接受数据,将数据放到磁盘上,形成日志数据。
2.使用流
1)将日志数据生成对应的程序文件上传到/opt/module/rt_applog目录:
[atguigu@hadoop102 ~]$ cd /opt/module/gmall-flink/rt_applog/
[atguigu@hadoop102 rt_applog]$ ll
总用量 45652
-rw-rw-r-- 1 atguigu atguigu 952 3 月 6 14:54 application.yml
-rw-rw-r-- 1 atguigu atguigu 15642393 12 月 29 14:54 gmall2020-mock-log-2020-12-18.jar
-rw-rw-r-- 1 atguigu atguigu 31094068 2 月 5 15:29 gmall-logger-0.0.1-SNAPSHOT.jar
2) 根据实际需要修改application.yml文件:
配置说明:
mock.date:生成数据的日期
mock.url:生成的数据发送到的地址
3)运行jar包:java -jar gmall2020-mock-log-2020-12-18.jar
注意:zookeeper从3.5开始,AdminServer的端口改成8080,什么意思,你懂的!
3.创建日志采集springboot程序
1)在IDEA中创建一个日志采集模块,依赖选择如下:
创建最终结果如下:
2)Kafka 配置
因为要发送日志数据到Kafka,所以需要在application.propeties中对Kafka进行如下配置:
# 应用名称
spring.application.name=gmall2022-logger
# 应用服务 WEB 访问端口
server.port=8081
#============== kafka ===================
# 指定 kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=hadoop102:9092
# 指定消息 key 和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
注意:要修改web应用的访问端口,否则会和zookeeper的端口冲突
3)日志丢弃配置
因为日志数据还需要放在磁盘上,所以需要配置磁盘放置,在Resources中添加logback.xml文件。文件内容如下:
%msg%n
${LOG_HOME}/app.log
${LOG_HOME}/app.%d{yyyy-MM-dd}.log
%msg%n
上面指定的位置是服务器的目录/opt/module/gmall-flink/rt_applog/logs,同时在控制台打印。
4)创建一个controller包,写一个LoggerController类来处理日志数据,把数据放到磁盘上的同时发送给Kafka
@RestController //相当于@Controller+@ResponseBody
@Slf4j
public class LoggerController {
@Autowired
private KafkaTemplate kafkaTemplate;
@RequestMapping("applog")
public String getLogger(@RequestParam("param")String jsonStr){
//落盘
log.info(jsonStr);
//写入Kafka的ods_base_log主题
kafkaTemplate.send("ods_base_log",jsonStr);
return "success";
}
}
4.Nginx 配置
因为有多个服务器,为了提高性能,可以配置 Nginx 进行负载均衡。
流程:模拟生成数据的脚本会将数据发送到nginx中指定的地址,然后nginx会进行负载均衡,将数据转发到配置的多台服务器上。
在nginx的conf目录下,修改nginx.conf如下:
http {
# 启动省略
upstream logcluster{
server hadoop102:8081 weight=1;
server hadoop103:8081 weight=1;
server hadoop104:8081 weight=1;
}
server {
listen 80;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
#root html;
#index index.html index.htm;
# 代理的服务器集群 命名随意, 但是不能出现下划线
proxy_pass http://logcluster;
proxy_connect_timeout 10;
}
# 其他省略
}
创建了一个名为 logcluster 的代理。当nginx在localhost的80端口*敏*感*词*请求时,nginx会依次将请求发送到hadoop102:8081、hadoop103:8081、hadoop104:8081
5.日志采集过程测试
1)先修改模拟日志生成的配置,配置向hadoop102发送请求:
# 外部配置打开
#logging.config=./logback.xml
#业务日期
mock.date=2020-07-13
#模拟数据发送模式mock.type=http
#http 模式下,发送的地址
mock.url=http://hadoop102/applog
=
2)在hadoop102上启动配置好的Nginx:/opt/module/nginx/sbin/nginx
3)在hadoop102上运行kafka consumer,观察是否可以消费topic ods_base_log的数据
4)分别分发springboot程序处理日志采集的jar包,在三台服务器上运行:java -jar gmall2022-logger-0.0.1 -SNAPSHOT.jar
5)在hadoop102上运行生成日志数据的jar包:java -jar gmall2020-mock-log-2020-12-18.jar
最后,可以观察到Kafka可以消费数据。