优化的解决方案:Kafka+flume实时采集数据
优采云 发布时间: 2022-11-09 08:27优化的解决方案:Kafka+flume实时采集数据
一、模拟日志的生成在
IDEA 的资源文件夹下创建一个新的 log4j.properties 来定义日志格式,其中可以查看 flume 和 log4j 的集成配置
#设置日志格式
log4j.rootCategory=ERROR,console,flume
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
#flume和log4j整合
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.116.10
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
使用 Java 的 log4j 模拟生成日志
import org.apache.log4j.Logger;
public class LoggerGenerator {
private static Logger logger=Logger.getLogger(LoggerGenerator.class.getName());
public static void main(String[] args)throws Exception{
int i=0;
while (1==1){
Thread.sleep(1000);
logger.info("now is "+i);
i++;
}
}
}
具体的依赖和配置过程可以查看Flume远程实时采集windows生成的log4j生成的数据
二、配置水槽
1. 配置
1.1 配置弗鲁梅卡夫卡
我在这里使用Flume版本为1.6,所以写作与官方网站不同。如果启动 flume 时出现错误 .producer.requiredAcks=1, channel=c2, ic=streaming, type=org.apache.flume.sink.kafka.KafkaSink} }
2018-12-29 06:17:26,698 (conf-file-poller-0) [错误 - org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:427)] 由于配置过程中出现错误,*敏*感*词* k2 已被删除
org.apache.flume.conf.ConfigurationException: brokerList 必须至少收录一个 Kafka 代理
它应该是由水槽版本的不同配置引起的。
#命名代理
b1.sources = r2
b1.sinks = k2
b1.channels = c2
# 配置监控文件
b1.sources.r2.type =avro
b1.sources.r2.bind=0.0.0.0
b1.sources.r2.port = 41414
#b1.sources.r2.interceptors = i1
#b1.sources.r2.interceptors.i1.type = timestamp
# 配置sink
b1.sinks.k2.type =org.apache.flume.sink.kafka.KafkaSink
b1.sinks.k2.topic = streaming
b1.sinks.k2.brokerList= 192.168.116.10:9092
#b1.sinks.k2.kafka.bootstrap.servers = 192.168.116.10:9092
b1.sinks.k2.producer.requiredAcks = 1
b1.sinks.k2.batchSize = 20
# 配置channel
b1.channels.c2.type = memory
# 将三者串联
b1.sources.r2.channels = c2
b1.sinks.k2.channel = c2
这里的本地配置有点复杂,具体配置可以在官网看到
1.3 启动水槽
你可以在命令行中输入 flume-ng 查看 help 命令,我的启动脚本是
水槽-卡夫卡:flume-ng agent -n b1 -c /usr/local/src/apache-flume-1.6.0-bin/conf -
f /usr/local/src/apache-flume-1.6.0-bin/conf/flumekafka.conf -Dflume.root.logger=INFO,console
flume-ng agent -n a1-c $FLUME_HOME/conf -f $FLUME_HOME/conf/streaming.conf-Dflume.root.logger=INFO,console >> /usr/tmp/flume/1.log & (background startup).
配置 Kafka
1. 启动动物园管理员
在每台机器的 zookeeper 的 bin 目录中输入 ./zkServer.sh start
2. 以后台形式启动卡夫卡
在每台机器上 kafka 安装目录的 bin 目录中,键入:kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties &
确保Zookeeper和Kafka都已经启动
3. 创建新主题3.1 创建新主题: kafka-topics.sh --创建 --动物园管理员主:2181,从1:2181
,从2:2181 --复制因子 1 --分区 1 --主题流
如果出现错误
被报告为领导者报告错误:NOT_LEADER_FOR_PARTITION您可以转到 Kafka 安装目录中 logs 文件夹下的服务器.log查看详细错误。
3.2 查看新创建的主题 kafka-topics.sh --描述 --动物园管理员主:2181,从1:2181,从2:2181 --主题流
您还可以按 kafka-topics.sh 查看所有主题 --list --zookeeper master:2181,slave1:2181,slave2:2181
如果要删除主题,则需要将其输入到动物园管理员的bin文件夹中
./zkCli.sh 启动动物园管理员客户端
ls /brokers/topics 查看有多少个主题
rmr /brokers/topics/test 删除测试主题
3.3 打开一个新窗口来启动消费者
kafka-console-consumer.sh --动物园管理员主:2181,从1:2181,从2:2181 --主题流
此时,先启动水槽,然后启动IEDA的日志模拟器
等待生成 20(在水槽中设置 batch20)以查看日志是否已消耗。
查看博客水槽的实际应用
解决方案:IP地址定位技术之基础数据采集
IP地理定位定位技术包括四大关键技术:基础数据采集、硬件系统建设、应用场景划分和定位系统研发。
基础数据采集为IP地理位置定位技术的研究提供基础数据支撑,是IP地址定位的基础工作和关键技术。首先,根据不同的数据采集规律,针对不同数据源的不同数据格式,研究并实现一套自动化智能数据采集技术;其次,对采集数据进行筛选、清洗和挖掘,形成基础数据库,为系统提供基础数据支撑。
基础数据采集的研究内容包括确定数据来源(如Whois开放数据等)、分析数据采集的方法(如
网络爬虫、数据交换、地面采集等)、各种数据采集方法的可行性分析和实现,确定采集数据的属性值(如地理位置、经纬度、载体等)、数据清洗方法、数据正确性验证步骤、底层数据的迭代更新过程等。
为了保证数据质量和数据丰富性,系统针对不同的数据源,通过三种方式获取基础数据,即数据挖掘、数据获取和地面采集。数据挖掘是指通过网络爬虫从APNIC网站、BGP网站、地图网站等特定网页获取IP和地理位置信息。数据采购是指从能够提供基础数据的公司获取数据,如本地服务网站、在线*敏*感*词*网站等;地面采集是指利用自主研发的数据采集软件进行人工现场数据采集。
数据采集技术已经存在于许多开源第三方框架中,例如Scrapy,Nutch,Crawler4j,WebMagic等。数据挖掘算法,如支持向量机SVM、K-Means等,得到了广泛的应用。