资讯内容采集系统 dataCollection

优采云 发布时间: 2022-11-18 07:38

  资讯内容采集系统 dataCollection

  项目概况

  舆情系统中的数据采集是关键部分。虽然这部分的核心技术是由爬虫技术的框架构建的,但是靠一两个爬虫来抓取互联网海量数据是肯定不行的,尤其是抓取大量网站的情况下,每天大量的网站状态和样式变化后,爬虫程序可以快速响应和维护。

  一旦分布式爬虫规模变大,就会出现很多问题,都是技术上的挑战,也是很多门槛,比如:

  1、检测到你是爬虫并封掉你的IP

  2 如何识别返回给你的脏数据?

  3 对手被你爬死了,调度规则怎么设计?

  4 要求你每天爬取10000w的数据,你的机器带宽有限。如何以分布式方式提高效率?

  5 数据爬回来的时候,要清理吗?对方的脏数据会不会把原来的数据弄脏?

  6对方部分数据未更新。是否必须重新下载未更新的数据?如何鉴别?如何优化你的规则?

  7数据太多,一个库放不下,要分库吗?

  8 对方的数据是用JavaScript渲染的,那怎么抓?你想使用 PhantomJS 吗?

  9对方返回的数据是加密的,如何解密?

  10 对方有验证码,怎么破解?

  11 对方有一个APP,如何获取他们的数据接口?

  12 爬回来的数据怎么显示?如何形象化?如何使用?如何发挥价值?

  

  13 等等...

  在*敏*感*词*的互联网数据采集中,需要构建一个完整的数据采集体系。否则,你的项目开发效率和数据采集效率都会很低。同时,也会出现很多意想不到的问题。

  开源技术栈整体架构

  (这是最早的系统架构图)

  数据处理流程

  (这是最早的系统设计图)

  资源管理

  来源,信息来源的简称。

  我们需要管理采集类型、内容、平台、地域等各种属性,我们开发了三代信息源管理平台。

  生成产品形态

  二代产品形态

  三代产品形态

  

  网站画像

  采用模拟浏览器请求技术实现深度和广度爬虫算法,一般分为3个环节,整个站点1)全站扫描,2)数据存储,3)特征分析。

  数据抓取

  数据存储

  低代码开发

  分布式 采集

  爬虫管理

  采集分类

  反爬策略

  采集日志数据解析

  数据存储异步调用

  数据通过kafka中间件以消息的形式发送到存储子系统。

  更多内容

  【数据处理】技术架构文档

  汇总:Elasticsearch及ELK使用(四):从数据库采集及写入数据库

  ELK 通常从文本文件中采集数据,然后将其写入 Elasticsearch。此外,还可以与数据库进行交互,包括两种方案:1

  从数据库 1.1 采集环境

  已经有一个MySQL数据库(版本MySQL 8.0.18)。

  MySQL上有schema=dbtest,还有一个收录人员数据的表,如下所示:

  1.2 配置 logstash 首先确保 filebeat(数据库采集不需要 filebeat)和 logstash 已停止。首先将 Mysql 客户端驱动程序文件 mysql-connector-java-8.0.18.jar 上传到 logsash 机器上的 /tmp 目录。在配置目录中,复制 logstash-sample.conf 示例文件以获取文件 logstash_mysql.conf。编辑 config/logstash_mysql.conf 如下

  input {

jdbc {

jdbc_driver_library => "/tmp/mysql-connector-java-8.0.18.jar"

jdbc_driver_class => "com.mysql.jdbc.Driver"

jdbc_connection_string => "jdbc:mysql://192.168.43.201/dbtest?serverTimezone=UTC"

jdbc_user => "root"

jdbc_password => "Pwd_1234"

schedule => "* * * * *" #每分钟执行

clean_run => true

jdbc_default_timezone => "Asia/Shanghai"

statement => "select * from person"

}

}

output {

elasticsearch {

hosts => ["http://192.168.43.201:9200"]

index => "person-%{+YYYY.MM.dd}"

#user => "elastic"

#password => "changeme"

}

}

  附表*

  5 *

  1-3 * 从一月到三月,每分钟一分钟一分钟一分钟。0 * *

  * *

  它在一天中每个小时的第 0 分钟执行。

  0 6 * * * 美国/芝加哥

  每天早上 6 点(UTC/GMT -5)执行。

  1.3 启动 开始日志。

  ./logstash -f ../config/logstash_mysql.conf

  开始需要更长的时间。

  1.4 检查结果

  当FileBeat,Logstash,Elasticsearch,Kibana都启动时。日志会自动采集并存储在 Elasticsearch 中。您可以通过 Kibana 查看结果:

  

  注意:此索引的运行状况为*敏*感*词*,因为 logstash 在为 Elasticsearch 创建索引时指定 number_of_replicas=1,除了主数据之外,这至少需要一个副本。

  ElasticSearch 要求副本分布在不同的节点节点上,以安全地避免由于单节点故障而导致数据丢失。因为这个环境 elasticsearch 只有一个节点,所以所有健康都是*敏*感*词*的。如果在“设置”中设置索引设置,请将“number_of_replicas”更改为“0”。然后健康变成绿色。

  4.4 搜索结果(文档)。

  在 kibana 的“开发工具”中通过 RESTfull API 进行模糊搜索

  GET /person-2020.06.25/_search

{

"sort": [

{

"@timestamp": {

"order": "asc"

}

}

]

}

  搜索结果如下图所示,表中的数据内容在消息集中。

  注意:由于 logsash 配置为每分钟从数据库中采集一次,因此此处存在大量重复的搜索结果。

  正确的做法是:

  2 采集后写入数据库

  这里以生产环境为例,文本采集通过 filebeat 实现,采集数据库通过 logstash 写入数据库。

  2.1 采集源文件节拍配置

  以采集文本数据为例,filebeat配置如下:

  #=========================== Filebeat prospectors =============================

filebeat.prospectors:

# Each - is a prospector. Most options can be set at the prospector level, so

# you can use different prospectors for various configurations.

# Below are the prospector specific configurations.

- type: log

enabled: true

fields:

servicename: termfrontbpu

logtype: MONITOR_STAT

paths:

- /app/appgess1/bin/log/*bpu.log

include_lines: [".*MARKER=MONITOR_STAT.*"]

encoding: gbk

(省略其他内容....)

output.logstash:

<p>

# The Logstash hosts

hosts: ["20.200.54.51:56565"]

</p>

  2.2 日志存储安装插件

  既然有

  官方logstac插件中没有output-JDBC插件,即写入数据库的插件,需要先安装第三方插件。2.2.1 在线安装 logstash-output-jdbc

  插件,以确保您有互联网连接。 cd logstash-6.8.3 (logstash-6.8.3 解压缩安装目录)./bin/logstash-plugin install logstash-output-jdbc 将自动从互联网下载并安装。验证结果:./bin/logstash-plugin list 你可以看到列表中有更多的logstash-output-jdbc 插件 2.2.2 在线安装 logstash-output-jdbc 插件 通过复制一个完成的插件作为基础进行在线安装。CD logstash-6.8.3 (logstash-6.8.3 解压安装目录) vim gemfile 文件。在末尾添加一行。

  gem "logstash-output-jdbc"

  复制文件 logstash-output-jdbc-5.4.0.gemspec 中的 vendor/bundle/jruby/2.5.0/

  规范目录复制 bundle/jruby/2.5.0/gems 目录中的整个目录 logstash-output-jdbc-5.4.0 以验证结果:./bin/ logstash-plugin list 你可以看到列表添加了 logstash-output-jdbc 插件 2.3 logstash 配置

  数据库中的表定义字段 elapsedmisec 的类型为 int,所有其他字段的类型为 varchar。

  logstash.conf 配置文件的全文如下:

  # Sample Logstash configuration for creating a simple

# Beats -> Logstash -> Elasticsearch pipeline.

input {

beats {

port => 56565

}

}

filter {

dissect {

mapping => { "message" => "%{nothings}MONITOR_STAT, RecvReqDateTime=%{recvreqdatetime}, UpstreamSysCode=%{upstreamsyscode}, ExchCode=%{exchcode}, ExchCodeDesc=%{exchcodedesc}, SysTraceNo=%{systraceno}, SeqNo=%{seqno}, ElapsedMiSec=%{elapsedmisec}, RspCode=%{rspcode}, RspCodeDesc=%{rspcodedesc}, OtherInfo=%{otherinfo}" }

}

dissect {

convert_datatype => {"elapsedmisec" => "int"}

}

}

output {

if [fields][logtype] == "MONITOR_STAT" {

jdbc {

driver_jar_path => "/app/appgess/elk_logcollect/logstash-6.8.3_monitor/lib/postgresql-42.2.5.jar"

driver_class => "org.postgresql.Driver"

connection_string => "jdbc:postgresql://20.200.35.17:5432/common"

username => "goldcomm"

password => "goldcomm"

statement => ["insert into traffic_statistics_details(hostname, servicename, recvreqdatetime, upstreamsyscode, exchcode, exchcodedesc, systraceno, seqno, elapsedmisec, rspcode, rspcodedesc, otherinfo) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", "[beat][hostname]", "[fields][servicename]", "recvreqdatetime", "upstreamsyscode", "exchcode", "exchcodedesc", "systraceno", "seqno", "elapsedmisec", "rspcode", "rspcodedesc", "otherinfo" ]

}

}

}

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线