算法 自动采集列表(8.4智能索引推荐之8.5指标采集、预测与异常检测)
优采云 发布时间: 2021-12-27 15:24算法 自动采集列表(8.4智能索引推荐之8.5指标采集、预测与异常检测)
上一篇介绍了《8.4 智能指数推荐》的相关内容,本篇为大家介绍了《8.5 指数采集、预测与异常检测》相关精彩内容的介绍。
8.5 索引采集
、预测和异常检测
数据库指标监控和异常检测技术,通过监控数据库指标,并基于时序预测和异常检测算法,发现异常信息,然后提醒用户采取措施,避免异常情况造成的严重后果。
8.5.1 使用场景
用户操作数据库的某些行为或某些正在运行的服务的变化可能会导致数据库出现异常。如果不及时发现和处理这些异常情况,可能会导致严重的后果。一般情况下,数据库监控指标(指标,如CPU使用率、QPS等)可以反映数据库系统的健康状况。通过监控数据库指标,分析指标数据特征或变化趋势等信息,及时发现数据库异常,及时向运维管理人员推送告警信息,避免损失。
8.5.2 实现原理
图 8-14 异常检测框架
指标采集、预测和异常检测是通过同一个系统实现的,openGauss项目中命名为Anomaly-Detection,其结构如图8-14所示。该工具可以分为两部分:Agent 和 Detector。Agent是一个数据库代理模块,负责采集
数据库索引数据并将数据推送到Detector;Detector是一个数据库异常检测分析模块,主要有3个功能。
(1) 采集
Agent采集
的数据并转储。
(2) 对采集到的数据进行特征分析和异常检测。
(3) 将检测到的异常信息推送给运维管理人员。
1. Agent 模块的组成
Agent 模块负责采集
和发送指标数据。该模块由三个子模块组成:DBSource、MemoryChannel和HttpSink。
(1) DBSource作为数据源,负责定时采集
数据库指标数据,并将数据发送到数据通道MemoryChannel。
(2) MemoryChannel是内存数据通道,本质上是一个FIFO队列,用于数据缓存。HttpSink组件消耗MemoryChannel中的数据,为了防止MemoryChannel中过多的数据造成OOM(out of Memory,内存溢出),设置容量上限,当超过容量上限时,将禁止过多的元素放入队列。
(3) HttpSink是一个数据采集点,该模块周期性地从MemoryChannel获取数据,并以Http(s)的形式转发数据,读取数据后,从MemoryChannel中清除。
2. 检测器模块组成
Detector 模块负责数据检测。该模块由两个子模块组成:Server 和 Monitor。
(1)Server是一个Web服务,为Agent采集的数据提供接收接口,并将数据存储在本地数据库中。为了避免数据的增加,数据库会占用过多的资源。表都设置了行数上限。
(2) Monitor 模块收录
时间序列预测、异常检测等算法。该模块定期从本地数据库中获取数据库指标数据,并根据现有算法对数据进行预测分析。如果算法检测到数据库指标在历史或未来某个时间段或某个时刻发生异常,信息会及时推送给用户。
8.5.3 关键源码分析 1 整体流程分析
智能索引推荐工具的路径是openGauss-server/src/gausskernel/dbmind/tools/anomaly_detection。下面的代码详细展示了程序的入口。
def forecast(args):
…
# 如果没有指定预测方式,则默认使用’auto_arima’算法
if not args.forecast_method:
forecast_alg = get_instance('auto_arima')
else:
forecast_alg = get_instance(args.forecast_method)
# 指标预测功能函数
def forecast_metric(name, train_ts, save_path=None):
…
forecast_alg.fit(timeseries=train_ts)
dates, values = forecast_alg.forecast(
period=TimeString(args.forecast_periods).standard)
date_range = "{start_date}~{end_date}".format(start_date=dates[0],
end_date=dates[-1])
display_table.add_row(
[name, date_range, min(values), max(values), sum(values) / len(values)]
)
# 校验存储路径
if save_path:
if not os.path.exists(os.path.dirname(save_path)):
os.makedirs(os.path.dirname(save_path))
with open(save_path, mode='w') as f:
for date, value in zip(dates, values):
f.write(date + ',' + str(value) + '\n')
# 从本地sqlite中抽取需要的数据
with sqlite_storage.SQLiteStorage(database_path) as db:
if args.metric_name:
timeseries = db.get_timeseries(table=args.metric_name, period=max_rows)
forecast_metric(args.metric_name, timeseries, args.save_path)
else:
# 获取sqlite中所有的表名
tables = db.get_all_tables()
# 从每个表中抽取训练数据进行预测
for table in tables:
timeseries = db.get_timeseries(table=table, period=max_rows)
forecast_metric(table, timeseries)
# 输出结果
print(display_table.get_string())
# 代码远程部署
def deploy(args):
print('Please input the password of {user}@{host}: '.format(user=args.user, host=args.host))
# 格式化代码远程部署指令
command = 'sh start.sh --deploy {host} {user} {project_path}' \
.format(user=args.user,
host=args.host,
project_path=args.project_path)
# 判断指令执行情况
if subprocess.call(shlex.split(command), cwd=SBIN_PATH) == 0:
print("\nExecute successfully.")
else:
print("\nExecute unsuccessfully.")
…
# 展示当前监控的参数
def show_metrics():
…
# 项目总入口
def main():
…
2. 关键代码段分析
(1) 后台线程的实现。
如上所述,这个功能可以分为三个角色:Agent、Monitor和Detector。这三个不同的角色都是驻留在后台的进程,各自执行不同的任务。Daemon 类是负责运行不同业务流程的容器类。下面描述这个类的实现。
class Daemon:
"""
This class implements the function of running a process in the background."""
def __init__(self):
…
def daemon_process(self):
# 注册退出函数
atexit.register(lambda: os.remove(self.pid_file))
signal.signal(signal.SIGTERM, handle_sigterm)
# 启动进程
@staticmethod
def start(self):
try:
self.daemon_process()
except RuntimeError as msg:
abnormal_exit(msg)
self.function(*self.args, **self.kwargs)
# 停止进程
def stop(self):
if not os.path.exists(self.pid_file):
abnormal_exit("Process not running.")
read_pid = read_pid_file(self.pid_file)
if read_pid > 0:
os.kill(read_pid, signal.SIGTERM)
if read_pid_file(self.pid_file) < 0:
os.remove(self.pid_file)
(2) 数据库相关指标的采集过程。
数据库的索引集合架构基于Apache Flume的设计。一个完整的信息采集过程分为三个部分,即Source、Channel和Sink。以上三部分抽象为三个不同的基类,可以派生出不同的集合数据源、缓存管道、数据接收方。
前面提到的DBSource派生自Source,MemoryChannel派生自Channel,HttpSink派生自Sink。下面这段代码来自metric_agent.py,负责采集
指标。这里,上述模块是串联的。
def agent_main():
…
# 初始化通道管理器
cm = ChannelManager()
# 初始化数据源
source = DBSource()
http_sink = HttpSink(interval=params['sink_timer_interval'], url=url, context=context)
source.channel_manager = cm
http_sink.channel_manager = cm
# 获取参数文件里面的功能函数
for task_name, task_func in get_funcs(metric_task):
source.add_task(name=task_name,
interval=params['source_timer_interval'],
task=task_func,
maxsize=params['channel_capacity'])
source.start()
http_sink.start()
(3) 数据存储和监控部分的实现。
Agent 将采集到的指标数据发送到 Detector 服务器,由 Detector 服务器负责存储。*敏*感*词*不断检查存储的数据,以便提前发现异常情况。
这里实现了一种通过 SQLite 进行本地化存储的方法。代码位于 sqlite_storage.py 文件中。实现的类是 SQLiteStorage。该类的主要方法如下:
# 通过时间戳获取最近一段时间的数据
def select_timeseries_by_timestamp(self, table, period):
…
# 通过编号获取最近一段时间的数据
def select_timeseries_by_number(self, table, number):
…
其中,由于不同的索引数据存储在不同的表中,所以上面的参数表也代表了不同索引的名称。
异常检测目前主要支持基于时间序列预测的方法,包括Prophet算法(Facebook开源的工业级时间序列预测算法工具)和ARIMA算法,分别封装成类供Forecaster使用。
上述时序检测的算法类都继承了AlgModel类,该类的结构如下:
class AlgModel(object):
"""
This is the base class for forecasting algorithms.
If we want to use our own forecast algorithm, we should follow some rules.
"""
def __init__(self):
pass
@abstractmethod
def fit(self, timeseries):
pass
@abstractmethod
def forecast(self, period):
pass
def save(self, model_path):
pass
def load(self, model_path):
pass
在 Forecast 类中,通过调用 fit() 方法,可以根据历史时间序列数据进行训练,并使用 predict() 方法预测未来趋势。
获取未来趋势后如何判断是否异常?有很多方法。最简单最基本的方法是通过阈值来判断。在我们的程序中,也默认使用这种方法进行判断。
8.5.4 用法示例
Anomaly-Detection 工具有五种操作模式:启动、停止、预测、show_metrics 和部署。每种模式的说明如表8-12 所示。
表8-12 Anomaly-Detection使用方式及说明
图案名称
操作说明
开始
启动本地或远程服务
停止
停止本地或远程服务
预报
预测未来的变化
show_metrics
输出当前监控的参数
部署
远程部署代码
使用异常检测工具操作模式的示例如下所示。
① 使用启动方式启动本地采集
器服务,代码如下:
python main.py start –role collector
②使用stop方式停止本地采集
器服务,代码如下:
python main.py stop –role collector
③使用启动方式启动远程采集器服务,代码如下:
python main.py start --user xxx --host xxx.xxx.xxx.xxx –project-path xxx –role collector
④ 使用stop方式停止远程采集器服务,代码如下:
python main.py stop --user xxx --host xxx.xxx.xxx.xxx –project-path xxx –role collector
⑤ 显示当前所有的监控参数,代码如下:
python main.py show_metrics
⑥ 预测io_read在接下来的60秒内的最大值、最小值和平均值,代码如下:
python main.py forecast –metric-name io_read –forecast-periods 60S –save-path predict_result
⑦ 将代码部署到远程服务器,代码如下:
python main.py deploy –user xxx –host xxx.xxx.xxx.xxx –project-path xxx
8.5.5 进化路线
Anomaly-Detection作为数据库指标监控和异常检测工具,目前具备数据采集、数据存储、异常检测、消息推送等基础功能。但是,目前存在以下问题。
(1)Agent模块采集的数据过于单一,目前Agent只能采集数据库的资源索引数据,包括IO、磁盘、内存、CPU等,数据的丰富性采集
的索引需要在未来加强。
(2)Monitor内置的算法覆盖不够。Monitor目前只支持两种时间序列预测算法,对于异常检测,只支持基于阈值的简单案例,使用场景有限。
(3) Server 只支持单代理传输数据,目前Server 使用的方案只支持从一个代理接收数据,不支持多个代理同时传输。对于只有一个代理的openGauss 数据库来说暂时够用一个主节点,但是对于分布式部署显然不友好。
因此,针对以上三个问题,未来将丰富Agent,方便采集
数据,包括安全指标、数据库日志等信息。其次,在算法层面,编写更加鲁棒的异常检测算法(即算法的鲁棒性和稳定性),增加异常监控场景。同时,Server 需要改进以支持多代理模式。最后,需要实现故障自动修复功能,并与此功能结合。
感谢您学习第8章AI技术“8.5索引采集、预测与异常检测”的精彩内容。下一篇,我们将开启“8.6 AI Query Time Prediction”的介绍。.
敬请关注。