资讯内容采集系统 dataCollection
优采云 发布时间: 2022-11-18 07:38资讯内容采集系统 dataCollection
项目概况
舆情系统中的数据采集是关键部分。虽然这部分的核心技术是由爬虫技术的框架构建的,但是靠一两个爬虫来抓取互联网海量数据是肯定不行的,尤其是抓取大量网站的情况下,每天大量的网站状态和样式变化后,爬虫程序可以快速响应和维护。
一旦分布式爬虫规模变大,就会出现很多问题,都是技术上的挑战,也是很多门槛,比如:
1、检测到你是爬虫并封掉你的IP
2 如何识别返回给你的脏数据?
3 对手被你爬死了,调度规则怎么设计?
4 要求你每天爬取10000w的数据,你的机器带宽有限。如何以分布式方式提高效率?
5 数据爬回来的时候,要清理吗?对方的脏数据会不会把原来的数据弄脏?
6对方部分数据未更新。是否必须重新下载未更新的数据?如何鉴别?如何优化你的规则?
7数据太多,一个库放不下,要分库吗?
8 对方的数据是用JavaScript渲染的,那怎么抓?你想使用 PhantomJS 吗?
9对方返回的数据是加密的,如何解密?
10 对方有验证码,怎么破解?
11 对方有一个APP,如何获取他们的数据接口?
12 爬回来的数据怎么显示?如何形象化?如何使用?如何发挥价值?
13 等等...
在*敏*感*词*的互联网数据采集中,需要构建一个完整的数据采集体系。否则,你的项目开发效率和数据采集效率都会很低。同时,也会出现很多意想不到的问题。
开源技术栈整体架构
(这是最早的系统架构图)
数据处理流程
(这是最早的系统设计图)
资源管理
来源,信息来源的简称。
我们需要管理采集类型、内容、平台、地域等各种属性,我们开发了三代信息源管理平台。
生成产品形态
二代产品形态
三代产品形态
网站画像
采用模拟浏览器请求技术实现深度和广度爬虫算法,一般分为3个环节,整个站点1)全站扫描,2)数据存储,3)特征分析。
数据抓取
数据存储
低代码开发
分布式 采集
爬虫管理
采集分类
反爬策略
采集日志数据解析
数据存储异步调用
数据通过kafka中间件以消息的形式发送到存储子系统。
更多内容
【数据处理】技术架构文档
汇总:Elasticsearch及ELK使用(四):从数据库采集及写入数据库
ELK 通常从文本文件中采集数据,然后将其写入 Elasticsearch。此外,还可以与数据库进行交互,包括两种方案:1
从数据库 1.1 采集环境
已经有一个MySQL数据库(版本MySQL 8.0.18)。
MySQL上有schema=dbtest,还有一个收录人员数据的表,如下所示:
1.2 配置 logstash 首先确保 filebeat(数据库采集不需要 filebeat)和 logstash 已停止。首先将 Mysql 客户端驱动程序文件 mysql-connector-java-8.0.18.jar 上传到 logsash 机器上的 /tmp 目录。在配置目录中,复制 logstash-sample.conf 示例文件以获取文件 logstash_mysql.conf。编辑 config/logstash_mysql.conf 如下
input {
jdbc {
jdbc_driver_library => "/tmp/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.43.201/dbtest?serverTimezone=UTC"
jdbc_user => "root"
jdbc_password => "Pwd_1234"
schedule => "* * * * *" #每分钟执行
clean_run => true
jdbc_default_timezone => "Asia/Shanghai"
statement => "select * from person"
}
}
output {
elasticsearch {
hosts => ["http://192.168.43.201:9200"]
index => "person-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}
附表*
5 *
1-3 * 从一月到三月,每分钟一分钟一分钟一分钟。0 * *
* *
它在一天中每个小时的第 0 分钟执行。
0 6 * * * 美国/芝加哥
每天早上 6 点(UTC/GMT -5)执行。
1.3 启动 开始日志。
./logstash -f ../config/logstash_mysql.conf
开始需要更长的时间。
1.4 检查结果
当FileBeat,Logstash,Elasticsearch,Kibana都启动时。日志会自动采集并存储在 Elasticsearch 中。您可以通过 Kibana 查看结果:
注意:此索引的运行状况为*敏*感*词*,因为 logstash 在为 Elasticsearch 创建索引时指定 number_of_replicas=1,除了主数据之外,这至少需要一个副本。
ElasticSearch 要求副本分布在不同的节点节点上,以安全地避免由于单节点故障而导致数据丢失。因为这个环境 elasticsearch 只有一个节点,所以所有健康都是*敏*感*词*的。如果在“设置”中设置索引设置,请将“number_of_replicas”更改为“0”。然后健康变成绿色。
4.4 搜索结果(文档)。
在 kibana 的“开发工具”中通过 RESTfull API 进行模糊搜索
GET /person-2020.06.25/_search
{
"sort": [
{
"@timestamp": {
"order": "asc"
}
}
]
}
搜索结果如下图所示,表中的数据内容在消息集中。
注意:由于 logsash 配置为每分钟从数据库中采集一次,因此此处存在大量重复的搜索结果。
正确的做法是:
2 采集后写入数据库
这里以生产环境为例,文本采集通过 filebeat 实现,采集数据库通过 logstash 写入数据库。
2.1 采集源文件节拍配置
以采集文本数据为例,filebeat配置如下:
#=========================== Filebeat prospectors =============================
filebeat.prospectors:
# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.
- type: log
enabled: true
fields:
servicename: termfrontbpu
logtype: MONITOR_STAT
paths:
- /app/appgess1/bin/log/*bpu.log
include_lines: [".*MARKER=MONITOR_STAT.*"]
encoding: gbk
(省略其他内容....)
output.logstash:
<p>
# The Logstash hosts
hosts: ["20.200.54.51:56565"]
</p>
2.2 日志存储安装插件
既然有
官方logstac插件中没有output-JDBC插件,即写入数据库的插件,需要先安装第三方插件。2.2.1 在线安装 logstash-output-jdbc
插件,以确保您有互联网连接。 cd logstash-6.8.3 (logstash-6.8.3 解压缩安装目录)./bin/logstash-plugin install logstash-output-jdbc 将自动从互联网下载并安装。验证结果:./bin/logstash-plugin list 你可以看到列表中有更多的logstash-output-jdbc 插件 2.2.2 在线安装 logstash-output-jdbc 插件 通过复制一个完成的插件作为基础进行在线安装。CD logstash-6.8.3 (logstash-6.8.3 解压安装目录) vim gemfile 文件。在末尾添加一行。
gem "logstash-output-jdbc"
复制文件 logstash-output-jdbc-5.4.0.gemspec 中的 vendor/bundle/jruby/2.5.0/
规范目录复制 bundle/jruby/2.5.0/gems 目录中的整个目录 logstash-output-jdbc-5.4.0 以验证结果:./bin/ logstash-plugin list 你可以看到列表添加了 logstash-output-jdbc 插件 2.3 logstash 配置
数据库中的表定义字段 elapsedmisec 的类型为 int,所有其他字段的类型为 varchar。
logstash.conf 配置文件的全文如下:
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 56565
}
}
filter {
dissect {
mapping => { "message" => "%{nothings}MONITOR_STAT, RecvReqDateTime=%{recvreqdatetime}, UpstreamSysCode=%{upstreamsyscode}, ExchCode=%{exchcode}, ExchCodeDesc=%{exchcodedesc}, SysTraceNo=%{systraceno}, SeqNo=%{seqno}, ElapsedMiSec=%{elapsedmisec}, RspCode=%{rspcode}, RspCodeDesc=%{rspcodedesc}, OtherInfo=%{otherinfo}" }
}
dissect {
convert_datatype => {"elapsedmisec" => "int"}
}
}
output {
if [fields][logtype] == "MONITOR_STAT" {
jdbc {
driver_jar_path => "/app/appgess/elk_logcollect/logstash-6.8.3_monitor/lib/postgresql-42.2.5.jar"
driver_class => "org.postgresql.Driver"
connection_string => "jdbc:postgresql://20.200.35.17:5432/common"
username => "goldcomm"
password => "goldcomm"
statement => ["insert into traffic_statistics_details(hostname, servicename, recvreqdatetime, upstreamsyscode, exchcode, exchcodedesc, systraceno, seqno, elapsedmisec, rspcode, rspcodedesc, otherinfo) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", "[beat][hostname]", "[fields][servicename]", "recvreqdatetime", "upstreamsyscode", "exchcode", "exchcodedesc", "systraceno", "seqno", "elapsedmisec", "rspcode", "rspcodedesc", "otherinfo" ]
}
}
}