解决方案:将爬虫采集的数据存放到数据库

优采云 发布时间: 2022-10-27 03:31

  解决方案:将爬虫采集的数据存放到数据库

  首先,我们要有一个本地数据库,我用的是mysql数据库;

  爬虫数据源豆瓣电影排行榜

  从抓包工具可以看出,返回的响应数据是json格式,所以,废话不多说,直接上代码

  import requests

url = 'https://movie.douban.com/j/chart/top_list?'

headers = {

'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.81 Safari/537.36 Edg/104.0.1293.47'

}

param = {

'type': '24',

'interval_id': '100:90',

'action': '',

'start':1,

'limit': '20'

}

# 获取json格式响应数据

response = requests.get(url=url, params=param, headers=headers)

list_data = response.json()

  由于获取到的数据是json格式,所以我们要处理一下:

   # 遍历所需键值

for item in list_data:

Id = item['id']

name = item['title']

temp = item['regions']

where = temp[0]

year = item['release_date']

  这里我只提取了一些元素,由于上面的代码只能提取单个电影数据,我们可以改进一下

  import requests

url = 'https://movie.douban.com/j/chart/top_list?'

headers = {

'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.81 Safari/537.36 Edg/104.0.1293.47'

}

# 遍历不同电影

for index in range(10):

param = {

'type': '24',

'interval_id': '100:90',

'action': '',

'start':index,

'limit': '20'

<p>

}

# 获取json格式响应数据

response = requests.get(url=url, params=param, headers=headers)

list_data = response.json()

# 遍历所需键值

for item in list_data:

Id = item[&#39;id&#39;]

name = item[&#39;title&#39;]

temp = item[&#39;regions&#39;]

where = temp[0]

year = item[&#39;release_date&#39;]</p>

  至此,爬虫部分就完成了,一个很简单的爬虫。下定决心;

  接下来我们连接本地数据库,记得导入第三方库pymysql:

   con = pymysql.connect(

host="127.0.0.1",

port=3306,

user=&#39;root&#39;,

password=&#39;mysql123&#39;,

database="person",

charset=&#39;utf8&#39;

)

  然后是熟悉的增删改查操作(注:创建表的过程就不一一列举了,如有需要请按效果图创建)

   cur = con.cursor()

try:

# 增加

add_sql = "insert into movies values(&#39;{}&#39;,&#39;{}&#39;,&#39;{}&#39;,&#39;{}&#39;)".format(Id, name, where, year)

cur.execute(add_sql)

con.commit()

  爬虫数据添加到数据表的方式不唯一,我这里使用的是format()函数

  完整代码如下:

  import pymysql

import requests

url = &#39;https://movie.douban.com/j/chart/top_list?&#39;

headers = {

&#39;user-agent&#39;: &#39;Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.81 Safari/537.36 Edg/104.0.1293.47&#39;

}

# 遍历不同电影

for index in range(10):

param = {

&#39;type&#39;: &#39;24&#39;,

&#39;interval_id&#39;: &#39;100:90&#39;,

&#39;action&#39;: &#39;&#39;,

&#39;start&#39;:index,

<p>

&#39;limit&#39;: &#39;20&#39;

}

# 获取json格式响应数据

response = requests.get(url=url, params=param, headers=headers)

list_data = response.json()

# 遍历所需键值

for item in list_data:

Id = item[&#39;id&#39;]

name = item[&#39;title&#39;]

temp = item[&#39;regions&#39;]

where = temp[0]

year = item[&#39;release_date&#39;]

# 使用数据库存储

con = pymysql.connect(

host="127.0.0.1",

port=3306,

user=&#39;root&#39;,

password=&#39;mysql123&#39;,

database="person",

charset=&#39;utf8&#39;

)

# 创建游标对象

cur = con.cursor()

try:

# 增加

add_sql = "insert into movies values(&#39;{}&#39;,&#39;{}&#39;,&#39;{}&#39;,&#39;{}&#39;)".format(Id, name, where, year)

cur.execute(add_sql)

con.commit()

except Exception as e:

print(e)

con.rollback()

finally:

cur.close() # 关闭游标对象

con.close() # 关闭连接对象

print(&#39;程序结束&#39;)</p>

  效果图如下:

  总结:

  1. 获取数据

  2.处理数据

  3.连接数据库

  4.添加到数据库

  总结:openGauss数据库源码解析系列文章——AI技术(四):指标采集、预测与异常

  高斯松鼠*敏*感*词*

  学习探索和分享前沿数据库知识和技术,构建数据库技术交流圈

  在最后的图文中,我们分享了相关的精彩内容。本文将详细介绍AI技术——指标采集、预测和异常检测相关内容。8.5 指标采集,Prediction and Anomaly Detection 数据库指标监控和异常检测技术,通过监控数据库指标,基于时序预测和异常检测等算法,发现异常信息,然后提醒用户采取措施避免异常情况造成的严重后果。8.5.1 使用场景 用户操作数据库的某些行为或某些正在运行的服务的变化可能会导致数据库异常。如果不及时发现和处理这些异常,可能会产生严重的后果。通常,数据库监控指标(指标,如 CPU 使用率、QPS 等)可以反映数据库系统的健康状况。通过监控数据库指标,分析指标数据特征或变化趋势,及时发现数据库异常情况,及时向运维管理人员推送告警信息,避免损失。8.5.2 实现原理

  图 1 Anomaly-Detection 框架

  指标采集,预测和异常检测由同一个系统实现,在openGauss项目中命名为Anomaly-Detection,其结构如图1所示。该工具可分为Agent和Detector两部分. Agent是一个数据库代理模块,负责采集数据库指标数据并将数据推送到Detector;Detector是一个数据库异常检测分析模块,主要有3个功能。(1)在Agent端采集并转储采集的数据。(2) 对采集到的数据进行特征分析和异常检测。(3)将检测到的异常信息推送给运维管理人员。1. Agent模块的组成 Agent模块负责采集和指标数据的发送。该模块由三个子模块组成:DBSource、MemoryChannel 和 HttpSink。(1) DBSource作为数据源,负责定期采集数据库指标数据,并将数据发送到数据通道MemoryChannel。(2) MemoryChannel是内存数据通道,本质上是一个FIFO队列,用于数据缓存。HttpSink 组件使用 MemoryChannel 中的数据。为了防止MemoryChannel中数据过多导致OOM(out of Memory,内存溢出),设置了容量的上限。当超过容量上限时,将禁止将过多的元素放入队列。(3) HttpSink 是数据汇聚点。该模块定期从 MemoryChannel 获取数据,并以 Http(s) 的形式转发数据。读取数据后,它从 MemoryChannel 中清除。2.Detector模块由Detector模块组成,负责数据检测。该模块由服务器和监控两个子模块组成。(1)Server是Agent采集的web服务,接收到的数据提供接收接口,将数据存储在本地数据库中。为了防止数据库因为数据的增加而占用过多的资源,我们对数据库中每张表的行数设置了一个上限。(2) Monitor模块包括时间序列预测和异常检测等算法。该模块定期从本地数据库中获取数据库指标数据,并根据现有算法对数据进行预测和分析。如果算法检测到数据库指标在历史或未来某个时间段或时间出现异常,会及时将信息推送给用户。8.5.3 关键源码分析 1. 整体流程分析智能索引推荐工具的路径为openGauss-server/src/gausskernel/dbmind/tools/anomaly_detection。下面的代码详细展示了程序的入口。

  def forecast(args): … # 如果没有指定预测方式,则默认使用’auto_arima’算法 if not args.forecast_method: forecast_alg = get_instance('auto_arima') else: forecast_alg = get_instance(args.forecast_method) # 指标预测功能函数 def forecast_metric(name, train_ts, save_path=None): … forecast_alg.fit(timeseries=train_ts) dates, values = forecast_alg.forecast( period=TimeString(args.forecast_periods).standard) date_range = "{start_date}~{end_date}".format(start_date=dates[0], end_date=dates[-1]) display_table.add_row( [name, date_range, min(values), max(values), sum(values) / len(values)] )# 校验存储路径 if save_path: if not os.path.exists(os.path.dirname(save_path)): os.makedirs(os.path.dirname(save_path)) with open(save_path, mode='w') as f: for date, value in zip(dates, values): f.write(date + ',' + str(value) + '\n') # 从本地sqlite中抽取需要的数据 with sqlite_storage.SQLiteStorage(database_path) as db: if args.metric_name: timeseries = db.get_timeseries(table=args.metric_name, period=max_rows) forecast_metric(args.metric_name, timeseries, args.save_path) else:# 获取sqlite中所有的表名 tables = db.get_all_tables() # 从每个表中抽取训练数据进行预测for table in tables: timeseries = db.get_timeseries(table=table, period=max_rows) forecast_metric(table, timeseries)# 输出结果 print(display_table.get_string()) # 代码远程部署def deploy(args): print('Please input the password of {user}@{host}: '.format(user=args.user, host=args.host))# 格式化代码远程部署指令 command = 'sh start.sh --deploy {host} {user} {project_path}' \ .format(user=args.user, host=args.host, project_path=args.project_path) # 判断指令执行情况if subprocess.call(shlex.split(command), cwd=SBIN_PATH) == 0: print("\nExecute successfully.") else: print("\nExecute unsuccessfully.")… # 展示当前监控的参数def show_metrics():… # 项目总入口def main(): …

  2、关键代码段分析(1)后台线程的实现。如前所述,该功能可分为三个角色:Agent、Monitor 和 Detector。这三个不同的角色是驻留在后台并执行不同任务的进程。Daemon 类是负责运行不同业务流程的容器类。下面描述这个类的实现。

  class Daemon: """ This class implements the function of running a process in the background.""" def __init__(self): …def daemon_process(self): # 注册退出函数 atexit.register(lambda: os.remove(self.pid_file)) signal.signal(signal.SIGTERM, handle_sigterm)# 启动进程 @staticmethod def start(self): try: self.daemon_process() except RuntimeError as msg: abnormal_exit(msg) self.function(*self.args, **self.kwargs) # 停止进程 def stop(self): if not os.path.exists(self.pid_file): abnormal_exit("Process not running.") read_pid = read_pid_file(self.pid_file) if read_pid > 0: os.kill(read_pid, signal.SIGTERM) if read_pid_file(self.pid_file) < 0: os.remove(self.pid_file)

  (2) 数据库相关指标采集流程。数据库的索引采集架构参考了Apache Flume的设计。将一个完整的信息采集流程分为三个部分,即Source、Channel和Sink。以上三部分被抽象为三个不同的基类,从中可以派生出不同的采集数据源、缓存管道和数据*敏*感*词*。上面提到的DBSource是从Source派生的,MemoryChannel是从Channel派生的,HttpSink是从Sink派生的。以下代码来自 metric_agent.py,负责 采集 指标,上面的模块是连接在一起的。

  def agent_main():… # 初始化通道管理器cm = ChannelManager()# 初始化数据源 source = DBSource() http_sink = HttpSink(interval=params['sink_timer_interval'], url=url, context=context) source.channel_manager = cm http_sink.channel_manager = cm # 获取参数文件里面的功能函数 for task_name, task_func in get_funcs(metric_task): source.add_task(name=task_name, interval=params['source_timer_interval'], task=task_func, maxsize=params['channel_capacity']) source.start() http_sink.start()

  (3)数据存储和监控部分的实现。Agent将采集收到的指标数据发送给Detector服务器,Detector服务器负责存储。Monitor 不断检查存储的数据,以便提前发现异常情况。这里实现了一种通过SQLite本地化存储的方法。代码位于 sqlite_storage.py 文件中,实现类为 SQLiteStorage。该类实现的主要方法如下:

  # 通过时间戳获取最近一段时间的数据def select_timeseries_by_timestamp(self, table, period):…# 通过编号获取最近一段时间的数据def select_timeseries_by_number(self, table, number): …

  其中,由于不同指标的数据存储在不同的表中,所以上述参数表也代表了不同指标的名称。异常检测目前主要支持基于时间序列预测的方法,包括Prophet算法(Facebook开源的工业级时间序列预测算法工具)和ARIMA算法,封装成类供Forecaster调用。上述时序检测的算法类都继承了AlgModel类,该类的结构如下:

  class AlgModel(object): """ This is the base class for forecasting algorithms. If we want to use our own forecast algorithm, we should follow some rules. """ def __init__(self): pass @abstractmethod def fit(self, timeseries): pass @abstractmethod def forecast(self, period): pass def save(self, model_path): pass def load(self, model_path): pass

  在 Forecast 类中,通过调用 fit() 方法,可以根据历史时间序列数据进行训练,通过 forecast() 方法预测未来趋势。获取未来趋势后如何判断是否异常?有很多方法。最简单最基本的方法是通过阈值来判断。在我们的程序中,这个方法也默认用于判断。8.5.4 使用示例 Anomaly-Detection 工具有五种操作模式:启动、停止、预测、show_metrics 和部署。每种模式的说明如表1所示。 表1 Anomaly-Detection使用模式及说明

  

  模式名称

  阐明

  开始

  启动本地或远程服务

  停止

  停止本地或远程服务

  预报

  未来变化的预测器

  显示指标

  输出当前监控的参数

  部署

  远程部署代码

  Anomaly-Detection 工具的操作模式示例如下所示。① 使用启动方式启动本地采集器服务,代码如下:

  python main.py start –role collector

  ② 使用停止方式停止本地采集器服务,代码如下:

  python main.py stop –role collector

  ③ 使用启动方式启动远程采集器服务,代码如下:

  

  python main.py start --user xxx --host xxx.xxx.xxx.xxx –project-path xxx –role collector

  ④ 使用停止方式停止远程采集器服务,代码如下:

  python main.py stop --user xxx --host xxx.xxx.xxx.xxx –project-path xxx –role collector

  ⑤ 显示当前所有监控参数,代码如下:

  python main.py show_metrics

  ⑥ 预测接下来60秒io_read的最大值、最小值和平均值,代码如下:

  python main.py forecast –metric-name io_read –forecast-periods 60S –save-path predict_result

  ⑦ 将代码部署到远程服务器,代码如下:

  python main.py deploy –user xxx –host xxx.xxx.xxx.xxx –project-path xxx

  8.5.5 演进路线

  Anomaly-Detection作为数据库指标监控和异常检测工具,目前具备数据采集、数据存储、异常检测、消息推送等基本功能。但是,存在以下问题。(1) Agent模块采集的数据过于简单。目前Agent只能采集数据库的资源指标数据,包括IO、磁盘、内存、CPU等,未来需要增强采集的索引丰富度。(2) Monitor 内置算法的覆盖范围不够。Monitor目前只支持两种时序预测算法,对于异常检测,只支持简单的基于阈值的情况,使用场景有限。(3) Server 只支持单个 Agent 传输数据。目前Server采用的方案只支持从一个Agent接收数据,不支持多个Agent同时传输。这对于只有一个master节点的openGauss数据库来说暂时够用了,但是对于分布式部署显然不友好。因此,针对以上三个问题,将首先丰富Agent,方便数据的采集,主要包括安全指标、数据库日志等信息。其次,在算法层面,编写了鲁棒性(即算法的鲁棒性和稳定性)更强的异常检测算法,并增加了异常监控场景。同时,Server 需要改进以支持多 Agent 模式。最后,

  以上内容是对AI技术中的指标采集、预测和异常检测的详细介绍。下一篇将分享“AI查询时间预测”的相关内容,敬请期待!

  - 结尾 -

  高斯松鼠*敏*感*词*

  汇聚数据库从业者和爱好者,互相帮助解决问题,构建数据库技术交流圈

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线