实时文章采集( 实时数仓:日志数据采集1.的使用流程和使用方法)

优采云 发布时间: 2022-04-11 10:23

  实时文章采集(

实时数仓:日志数据采集1.的使用流程和使用方法)

  Flink 实时数仓项目——日志数据采集

  前言

  实时数仓项目的数据源有两种,一种是用户行为日志数据,一种是业务数据库中的业务数据。我们需要对这两部分数据执行采集,首先执行用户行为日志数据采集。

  一、日志数据采集 1.模拟日志*敏*感*词*的使用

  由于日志数据是用户通过点击等操作生成的,因此无法获取真实数据。这里,通过模拟生成日志数据的方法来生成日志数据。

  主要流程:运行生成日志数据的springboot程序,然后通过接口接受数据,将数据放到磁盘上,形成日志数据。

  2.使用流

  1)将日志数据生成对应的程序文件上传到/opt/module/rt_applog目录:

  [atguigu@hadoop102 ~]$ cd /opt/module/gmall-flink/rt_applog/

[atguigu@hadoop102 rt_applog]$ ll

总用量 45652

-rw-rw-r-- 1 atguigu atguigu 952 3 月 6 14:54 application.yml

-rw-rw-r-- 1 atguigu atguigu 15642393 12 月 29 14:54 gmall2020-mock-log-2020-12-18.jar

-rw-rw-r-- 1 atguigu atguigu 31094068 2 月 5 15:29 gmall-logger-0.0.1-SNAPSHOT.jar

  2) 根据实际需要修改application.yml文件:

  配置说明:

  mock.date:生成数据的日期

  mock.url:生成的数据发送到的地址

  

  3)运行jar包:java -jar gmall2020-mock-log-2020-12-18.jar

  注意:zookeeper从3.5开始,AdminServer的端口改成8080,什么意思,你懂的!

  3.创建日志采集springboot程序

  1)在IDEA中创建一个日志采集模块,依赖选择如下:

  

  创建最终结果如下:

  

  2)Kafka 配置

  因为要发送日志数据到Kafka,所以需要在application.propeties中对Kafka进行如下配置:

  # 应用名称

spring.application.name=gmall2022-logger

# 应用服务 WEB 访问端口

server.port=8081

#============== kafka ===================

# 指定 kafka 代理地址,可以多个

spring.kafka.bootstrap-servers=hadoop102:9092

# 指定消息 key 和消息体的编解码方式

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

  注意:要修改web应用的访问端口,否则会和zookeeper的端口冲突

  3)日志丢弃配置

  因为日志数据还需要放在磁盘上,所以需要配置磁盘放置,在Resources中添加logback.xml文件。文件内容如下:

  

%msg%n

${LOG_HOME}/app.log

${LOG_HOME}/app.%d{yyyy-MM-dd}.log

%msg%n

  上面指定的位置是服务器的目录/opt/module/gmall-flink/rt_applog/logs,同时在控制台打印。

  4)创建一个controller包,写一个LoggerController类来处理日志数据,把数据放到磁盘上的同时发送给Kafka

  @RestController //相当于@Controller+@ResponseBody

@Slf4j

public class LoggerController {

@Autowired

private KafkaTemplate kafkaTemplate;

@RequestMapping("applog")

public String getLogger(@RequestParam("param")String jsonStr){

//落盘

log.info(jsonStr);

//写入Kafka的ods_base_log主题

kafkaTemplate.send("ods_base_log",jsonStr);

return "success";

}

}

  4.Nginx 配置

  因为有多个服务器,为了提高性能,可以配置 Nginx 进行负载均衡。

  流程:模拟生成数据的脚本会将数据发送到nginx中指定的地址,然后nginx会进行负载均衡,将数据转发到配置的多台服务器上。

  在nginx的conf目录下,修改nginx.conf如下:

  http {

# 启动省略

upstream logcluster{

server hadoop102:8081 weight=1;

server hadoop103:8081 weight=1;

server hadoop104:8081 weight=1;

}

server {

listen 80;

server_name localhost;

#charset koi8-r;

#access_log logs/host.access.log main;

location / {

#root html;

#index index.html index.htm;

# 代理的服务器集群 命名随意, 但是不能出现下划线

proxy_pass http://logcluster;

proxy_connect_timeout 10;

}

# 其他省略

}

  创建了一个名为 logcluster 的代理。当nginx在localhost的80端口*敏*感*词*请求时,nginx会依次将请求发送到hadoop102:8081、hadoop103:8081、hadoop104:8081

  5.日志采集过程测试

  1)先修改模拟日志生成的配置,配置向hadoop102发送请求:

  # 外部配置打开

#logging.config=./logback.xml

#业务日期

mock.date=2020-07-13

#模拟数据发送模式mock.type=http

#http 模式下,发送的地址

mock.url=http://hadoop102/applog

=

  2)在hadoop102上启动配置好的Nginx:/opt/module/nginx/sbin/nginx

  3)在hadoop102上运行kafka consumer,观察是否可以消费topic ods_base_log的数据

  4)分别分发springboot程序处理日志采集的jar包,在三台服务器上运行:java -jar gmall2022-logger-0.0.1 -SNAPSHOT.jar

  5)在hadoop102上运行生成日志数据的jar包:java -jar gmall2020-mock-log-2020-12-18.jar

  最后,可以观察到Kafka可以消费数据。

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线