文章采集调用( AI技术——指标采集、预测与异常检测的相关内容)

优采云 发布时间: 2022-04-07 20:23

  文章采集调用(

AI技术——指标采集、预测与异常检测的相关内容)

  

  高斯松鼠*敏*感*词*

  学习探索和分享前沿数据库知识和技术,构建数据库技术交流圈

  关注之前的图文,我们分享了AI技术的相关内容——智能索引推荐,本文将详细介绍AI技术的相关内容——索引采集,预测和异常检测。8.5指标采集,Prediction and Anomaly Detection 数据库指标监控和异常检测技术,通过监控数据库指标,并基于时序预测和异常检测等算法,发现异常信息,然后进行提醒用户采取措施避免异常情况产生严重后果。8.5.1 使用场景 用户操作数据库的某些行为或某些正在运行的业务的变化可能会导致数据库异常。如果这些异常没有及时发现和处理,导致严重后果。通常,数据库监控指标(指标,如 CPU 使用率、QPS 等)可以反映数据库系统的健康状况。通过监控数据库指标,分析指标数据特征或变化趋势,及时发现数据库异常情况,及时向运维管理人员推送告警信息,避免损失。8.5.2 实现原理 并及时将告警信息推送给运维管理人员,避免损失。8.5.2 实现原理 并及时将告警信息推送给运维管理人员,避免损失。8.5.2 实现原理

  

  图 1 Anomaly-Detection 框架

  指标采集,预测和异常检测由同一个系统实现,在openGauss项目中命名为Anomaly-Detection,其结构如图1所示。该工具可分为Agent和Detector两部分. Agent是一个数据库代理模块,负责采集数据库指标数据并将数据推送到Detector;Detector是一个数据库异常检测分析模块,主要有3个功能。(1) 采集并转储 Agent 采集 的数据。(2) 对采集到的数据进行特征分析和异常检测。(3) 将检测到的异常信息推送到1. Agent模块的组成 Agent模块负责采集和指标数据的发送。该模块由三个子模块组成:DBSource、MemoryChannel 和 HttpSink。(1) DBSource作为数据源,负责定时采集数据库指标数据,并将数据发送到数据通道MemoryChannel。(2) MemoryChannel是内存数据通道,本质上是数据的FIFO队列缓存,HttpSink组件消费MemoryChannel Data中的数据,为了防止MemoryChannel中数据过多导致OOM(out of Memory,内存溢出),设置了容量上限,当容量上限为超过,过多的元素将被禁止入队列。(3)HttpSink为数据采集点,该模块周期性的从MemoryChannel获取数据,并以Http(s)的形式转发数据。之后读取数据,它从 MemoryChannel 中清除。2. Detector模块由Detector模块组成,负责数据检测,该模块由Server和Monitor两个子模块组成。(1)Server是一个Web服务,就是Agent采集接收到的数据提供了一个接收接口,将数据存储在本地数据库中。为了防止数据库由于随着数据的增加,我们对数据库中每张表的行数设置了上限。(2) Monitor模块包括时序预测和异常检测等算法。该模块定期从本地数据库,并基于现有算法对数据进行预测和分析。

  def forecast(args):

# 如果没有指定预测方式,则默认使用’auto_arima’算法

if not args.forecast_method:

forecast_alg = get_instance('auto_arima')

else:

forecast_alg = get_instance(args.forecast_method)

# 指标预测功能函数

def forecast_metric(name, train_ts, save_path=None):

forecast_alg.fit(timeseries=train_ts)

dates, values = forecast_alg.forecast(

period=TimeString(args.forecast_periods).standard)

date_range = "{start_date}~{end_date}".format(start_date=dates[0],

end_date=dates[-1])

display_table.add_row(

[name, date_range, min(values), max(values), sum(values) / len(values)]

)

# 校验存储路径

if save_path:

if not os.path.exists(os.path.dirname(save_path)):

os.makedirs(os.path.dirname(save_path))

with open(save_path, mode='w') as f:

for date, value in zip(dates, values):

f.write(date + ',' + str(value) + '\n')

# 从本地sqlite中抽取需要的数据

with sqlite_storage.SQLiteStorage(database_path) as db:

if args.metric_name:

timeseries = db.get_timeseries(table=args.metric_name, period=max_rows)

forecast_metric(args.metric_name, timeseries, args.save_path)

else:

# 获取sqlite中所有的表名

tables = db.get_all_tables()

# 从每个表中抽取训练数据进行预测

for table in tables:

timeseries = db.get_timeseries(table=table, period=max_rows)

forecast_metric(table, timeseries)

# 输出结果

print(display_table.get_string())

# 代码远程部署

def deploy(args):

print('Please input the password of {user}@{host}: '.format(user=args.user, host=args.host))

# 格式化代码远程部署指令

command = 'sh start.sh --deploy {host} {user} {project_path}' \

.format(user=args.user,

host=args.host,

project_path=args.project_path)

# 判断指令执行情况

if subprocess.call(shlex.split(command), cwd=SBIN_PATH) == 0:

print("\nExecute successfully.")

else:

print("\nExecute unsuccessfully.")

# 展示当前监控的参数

def show_metrics():

# 项目总入口

def main():

  2. 关键代码段分析(1) 后台线程的实现。前面说过,这个功能可以分为三个角色:Agent、Monitor 和 Detector,这三个不同的角色都是常见的进程驻留在后台执行不同的任务,Daemon类是负责运行不同业务流程的容器类,下面介绍这个类的实现。

  class Daemon:

"""

This class implements the function of running a process in the background."""

def __init__(self):

def daemon_process(self):

# 注册退出函数

atexit.register(lambda: os.remove(self.pid_file))

signal.signal(signal.SIGTERM, handle_sigterm)

# 启动进程

@staticmethod

def start(self):

try:

self.daemon_process()

except RuntimeError as msg:

abnormal_exit(msg)

self.function(*self.args, **self.kwargs)

# 停止进程

def stop(self):

if not os.path.exists(self.pid_file):

abnormal_exit("Process not running.")

read_pid = read_pid_file(self.pid_file)

if read_pid > 0:

os.kill(read_pid, signal.SIGTERM)

if read_pid_file(self.pid_file) < 0:

os.remove(self.pid_file)

  (2)数据库相关指标采集流程。数据库指标采集架构参考了Apache Flume的设计。一个完整的信息采集流程分为三个部分,分别是Source , Channel 和 Sink. 以上三部分被抽象成三个不同的基类, 从中可以派生出不同的采集 数据源, 缓存管道和数据接收者. 前面说过DBSource派生自Source, MemoryChannel派生来源于Channel,HttpSink来源于Sink,下面代码来源于metric_agent.py,负责采集指标,上面的模块是串联的。

  def agent_main():

# 初始化通道管理器

cm = ChannelManager()

# 初始化数据源

source = DBSource()

http_sink = HttpSink(interval=params['sink_timer_interval'], url=url, context=context)

source.channel_manager = cm

http_sink.channel_manager = cm

# 获取参数文件里面的功能函数

for task_name, task_func in get_funcs(metric_task):

source.add_task(name=task_name,

interval=params['source_timer_interval'],

task=task_func,

maxsize=params['channel_capacity'])

source.start()

http_sink.start()

  (3)实现数据存储和监控。Agent将采集收到的指标数据发送到Detector服务器,Detector服务器负责存储。Monitor不断检查存储的数据,以便提前发现异常,这里实现了一个通过SQLite进行本地化存储的方法,代码位于sqlite_storage.py文件中,实现的类为SQLiteStorage,该类实现的主要方法如下:

  # 通过时间戳获取最近一段时间的数据

def select_timeseries_by_timestamp(self, table, period):

# 通过编号获取最近一段时间的数据

def select_timeseries_by_number(self, table, number):

  其中,由于不同指标的数据存储在不同的表中,所以上述参数表也代表了不同指标的名称。异常检测目前主要支持基于时间序列预测的方法,包括Prophet算法(Facebook开源的工业级时间序列预测算法工具)和ARIMA算法,封装成类供Forecaster调用。上述时序检测的算法类都继承了AlgModel类,该类的结构如下:

  class AlgModel(object):

"""

This is the base class for forecasting algorithms.

If we want to use our own forecast algorithm, we should follow some rules.

"""

def __init__(self):

pass

@abstractmethod

def fit(self, timeseries):

pass

@abstractmethod

def forecast(self, period):

pass

def save(self, model_path):

pass

def load(self, model_path):

pass

  在 Forecast 类中,通过调用 fit() 方法,可以根据历史时间序列数据进行训练,通过 forecast() 方法预测未来趋势。获取未来趋势后如何判断是否异常?有很多方法。最简单最基本的方法是通过阈值来判断。在我们的程序中,这个方法也默认用于判断。8.5.4 Anomaly-Detection 工具有五种操作模式:启动、停止、预测、show_metrics 和部署。每种模式的说明如表1所示。 表1 Anomaly-Detection使用模式及说明

  模式名称

  阐明

  开始

  启动本地或远程服务

  停止

  停止本地或远程服务

  预报

  未来变化的预测器

  显示指标

  输出当前监控的参数

  部署

  远程部署代码

  Anomaly-Detection 工具的操作模式示例如下所示。① 使用启动方式启动本地采集器服务,代码如下:

  python main.py start –role collector

  ② 使用停止方式停止本地采集器服务,代码如下:

  python main.py stop –role collector

  ③ 使用启动方式启动远程采集器服务,代码如下:

  python main.py start --user xxx --host xxx.xxx.xxx.xxx –project-path xxx –role collector

  ④ 使用停止方式停止远程采集器服务,代码如下:

  python main.py stop --user xxx --host xxx.xxx.xxx.xxx –project-path xxx –role collector

  ⑤ 显示当前所有监控参数,代码如下:

  python main.py show_metrics

  ⑥ 预测接下来60秒io_read的最大值、最小值和平均值,代码如下:

  python main.py forecast –metric-name io_read –forecast-periods 60S –save-path predict_result

  ⑦ 将代码部署到远程服务器,代码如下:

  python main.py deploy –user xxx –host xxx.xxx.xxx.xxx –project-path xxx

  8.5.5 进化路线

  Anomaly-Detection作为数据库指标监控和异常检测工具,目前具备数据采集、数据存储、异常检测、消息推送等基本功能。但是,存在以下问题。(1)Agent模块采集的数据太简单了,目前Agent只能采集数据库的资源索引数据,包括IO、磁盘、内存、CPU等,&lt; @采集未来需要增强。(2)Monitor内置算法覆盖不够,Monitor目前只支持两种时序预测算法,对于异常检测,只支持基于阈值的简单案例,使用场景有限。(3) Server 仅支持 支持单Agent数据传输。目前Server采用的方案只支持从一个Agent接收数据,不支持多个Agent同时传输。这对于只有一个主节点的openGauss数据库来说暂时够用了,但显然不适合分布式部署。不友好。因此,针对上述三个问题,未来将丰富Agent以采集数据,主要包括安全指标、数据库日志等信息。其次,在算法层面,会写出鲁棒性(即算法的鲁棒性和稳定性)。增强异常检测算法,增加异常监控场景。同时,Server 需要改进以支持多 Agent 模式。最后,

  以上内容是对AI技术中的指标采集、预测和异常检测的详细介绍。下一篇将分享“AI查询时间预测”的相关内容,敬请期待!

  - 结尾 -

  

  高斯松鼠*敏*感*词*

  

  汇聚数据库从业者和爱好者,互相帮助解决问题,构建数据库技术交流圈

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线