技巧:快速上手 Pythond 采集器的最佳实践

优采云 发布时间: 2022-12-07 22:38

  技巧:快速上手 Pythond 采集器的最佳实践

  Pythond 是一组完整的场景,用于定期触发用户定义的 Python 采集脚本。本文以“获取每小时登录用户数”为指标向中心上报。

  1.1. 商务演示简介

  业务流程大致如下:采集数据库中的数据(Python 脚本)->pythond 采集器触发脚本定期上报数据(datakit)->指标可以从中心(web)看到。

  数据库现在有一个名为“客户”的表,其中收录以下字段:

  ● 名称:名称(字符串)。

  ● last_logined_time:登录时间(时间戳)。

  表创建语句如下:

  创建表客户

  (

  idBIGINT(20) 不为空 AUTO_INCREMENT 注释 '增量 ID',

  last_logined_time BIGINT(20) 不为空 默认 0COMMENT '登录时间(时间戳),

  名称VARCHAR(48) 不为空 默认值 ''注释 '名称',

  主键,

  关键idx_last_logined_time(last_logined_time)

  ) 引擎=InnoDB 默认字符集=utf8mb4;

  将测试数据插入上表中:

  插入客户(ID、last_logined_time、名称)值(1、1645600127、“张山”);

  插入客户(ID、last_logined_time、名称)值(2、1645600127、“lisi”);

  插入客户(ID、last_logined_time、姓名)值(3、1645600127、“王武”);

  使用以下 SQL 语句获取每小时登录的用户数:

  从客户中选择 count(1),其中 last_logined_time>=(unix_timestamp()-3600);

  以上数据以指标形式上报中心。

  下面详细介绍了实现此目的的步骤。

  1.2. 前提条件 1.2.1.蟒蛇环境

  蟒

  需要安装,Pythond 采集器目前处于 alpha 阶段,并且与 Python 2.7+ 和 Python 3+ 兼容。不过为了以后的兼容性,强烈建议使用 Python 3,毕竟官方不支持 Python 2。以下演示也使用 Python 3。

  1.2.2. Python 依赖项

  需要安装以下依赖项:

  ● 请求(运营网络,用于报告指标)。

  pymySQL(操作MySQL数据库,用于连接到数据库以获取业务数据)。

  安装方法如下:

  蟒蛇3

  python3 -m pip 安装请求

  python3 -m pip install pymysql

  上面的安装需要安装 pip,如果没有,可以参考以下方法(来自:这里):

  Linux/MacOS

  python3 -m ensurepip --upgrade

  窗户

  py -m ensurepip --upgrade

  1.3. 编写用户自定义脚本

  用户需要继承 DataKitFramework 类,然后重写 run 方法。DataKitFramework 类源代码文件datakit_framework.py,路径为 datakit/python.d/core/datakit_framework.py。

  有关具体用法,请参阅源代码文件 datakit/python.d/core/demo.py。

  根据上述要求,我们在这里编写以下 Python 脚本,命名为 hellopythond.py:

  从 datakit_framework 导入 DataKitFramework

  导入 pymysql

  *敏*感*词*再

  导入日志记录

  类 MysqlConn():

  def __init__(self, logger, config):

self.logger = logger

self.config = config

self.re_errno = re.compile(r'^\((\d+),')

try:

self.conn = pymysql.Connect(**self.config)

self.logger.info("pymysql.Connect() ok, {0}".format(id(self.conn)))

except Exception as e:

raise e

def __del__(self):

self.close()

def close(self):

if self.conn:

self.logger.info("conn.close() {0}".format(id(self.conn)))

self.conn.close()

def execute_query(self, sql_str, sql_params=(), first=True):

res_list = None

cur = None

try:

cur = self.conn.cursor()

cur.execute(sql_str, sql_params)

res_list = cur.fetchall()

<p>

except Exception as e:

err = str(e)

self.logger.error('execute_query: {0}'.format(err))

if first:

retry = self._deal_with_network_exception(err)

if retry:

return self.execute_query(sql_str, sql_params, False)

finally:

if cur is not None:

cur.close()

return res_list

def execute_write(self, sql_str, sql_params=(), first=True):

cur = None

n = None

err = None

try:

cur = self.conn.cursor()

n = cur.execute(sql_str, sql_params)

except Exception as e:

err = str(e)

self.logger.error('execute_query: {0}'.format(err))

if first:

retry = self._deal_with_network_exception(err)

if retry:

return self.execute_write(sql_str, sql_params, False)

finally:

if cur is not None:

cur.close()

return n, err

def _deal_with_network_exception(self, stre):

errno_str = self._get_errorno_str(stre)

if errno_str != '2006' and errno_str != '2013' and errno_str != '0':

return False

try:

self.conn.ping()

except Exception as e:

return False

return True

def _get_errorno_str(self, stre):

searchObj = self.re_errno.search(stre)

if searchObj:

errno_str = searchObj.group(1)

else:

errno_str = '-1'

return errno_str

def _is_duplicated(self, stre):

errno_str = self._get_errorno_str(stre)

# 1062:字段值重复,入库失败

# 1169:字段值重复,更新记录失败

if errno_str == "1062" or errno_str == "1169":

return True

return False

</p>

  类HelloPythond(DataKitFramework):

  =(unix_timestamp()-%s)“sql_params = ('3600')n = mysql_conn.execute_query(query_str, sql_params)data = [{”measurement“: ”hour_logined_customers_count“, # 指标名称。“标签”: {“tag_name”: “tag_value”, # 自定义标签,根据要标记的内容填写,我在这里随便写}, “字段”: {“计数”: n[0][0], # 指标,这里是每小时登录的用户数},},]in_data = {'M':d ata,'input': “pyfromgit”}返回self.report( in_data) # 你必须在这里调用self.report“ title=”“ data-bs-original-title=”copy“ aria-label=”copy“ >

  __name = 'HelloPythond'

interval = 10 # 每 10 秒钟采集上报一次。这个根据实际业务进行调节,这里仅作演示。

# if your datakit ip is 127.0.0.1 and port is 9529, you won't need use this,

# just comment it.

# def __init__(self, **kwargs):

# super().__init__(ip = '127.0.0.1', port = 9529)

def run(self):

config = {

"host": "172.16.2.203",

"port": 30080,

"user": "root",

"password": "Kx2ADer7",

"db": "df_core",

"autocommit": True,

# "cursorclass": pymysql.cursors.DictCursor,

<p>

"charset": "utf8mb4"

}

mysql_conn = MysqlConn(logging.getLogger(''), config)

query_str = "select count(1) from customers where last_logined_time>=(unix_timestamp()-%s)"

sql_params = ('3600')

n = mysql_conn.execute_query(query_str, sql_params)

data = [

{

"measurement": "hour_logined_customers_count", # 指标名称。

"tags": {

"tag_name": "tag_value", # 自定义 tag,根据自己想要标记的填写,我这里是随便写的

},

"fields": {

"count": n[0][0], # 指标,这里是每个小时登录的用户数

},

},

]

in_data = {

'M':data,

'input': "pyfromgit"

}

return self.report(in_data) # you must call self.report here

</p>

  1.4. 将自定义脚本放置在正确的位置

  在 Datakit 安装目录的 python.d 目录中创建一个新文件夹,并命名为 hellopythond,这个文件夹名应该和上面写的类名一样,即 hellopythond。

  然后将上面写 hellopythond.py 脚本放在这个文件夹下,即最终的目录结构如下:

  ├── ...

  ├── 资料套件

  └── 蟒蛇

  ├── core

│   ├── datakit_framework.py

│   └── demo.py

└── hellopythond

└── hellopythond.py

  上面的核心文件夹是 Python 的核心文件夹,不要动。

  以上是未启用 gitrepos 功能,

  如果启用了 gitrepos 函数,则路径结构如下所示:

  ├── ...

  ├── 资料套件

  ├── 蟒蛇

  ├── 吉曲波斯

  │ └── 您的项目

  │├── 确认

  │├── 流水线

  │└── 蟒蛇

  │└── 你好蟒蛇

  │└── hellopythond.py

  1.5. 启用 pythond 配置文件

  复制 Pythond 配置文件。将 pythond.conf.sample 复制为 conf.d/pythond 目录中的 pythond.conf,然后按如下方式配置:[

  [inputs.pythond]]

  # Python 采集器名称

name = 'some-python-inputs' # required

# 运行 Python 采集器所需的环境变量

#envs = ['LD_LIBRARY_PATH=/path/to/lib:$LD_LIBRARY_PATH',]

# Python 采集器可执行程序路径(尽可能写绝对路径)

cmd = "python3" # required. python3 is recommended.

# 用户脚本的相对路径(填写文件夹,填好后该文件夹下一级目录的模块和 py 文件都将得到应用)

dirs = ["hellopythond"] # 这里填的是文件夹名,即类名

  1.6. 重启数据工具包

  sudo datakit --restart

  1.7. 效果图

  如果一切顺利,我们将能够在大约 1 分钟内看到中间的指标图。

  1.8. 参考文档

  ● 官方手册:用 Python 开发自定义采集器

  ● 官方手册:通过 Git 管理配置文件

  秘密:信息收集组合拳之从废弃接口中寻找漏洞

  使用OneForAll工具按域名采集url和ip

  工具地址:GitHub - shmilylty/OneForAll:OneForAll是一款强大的子域名采集工具

  常用命令:python oneforall.py --targets targets.txt run

  将需要扫描的域名放在targets.txt中,运行后会在results文件夹下生成扫描结果

  在运行结果中,我们可以看到有url,子域名,ip

  其中,运行结果中的ip多行重复。我们需要把ip提取出来,转换成txt文本,每行一个ip,不重复,方便其他工具扫描

  脚本:删除重复的 ip

  #!/usr/bin/env python# conding:utf-8<br />##把同一行的ip换行,然后写进result.txt的文件里with open('ip.txt','r',encoding='utf-8') as readlist: for dirs in readlist.readlines(): with open('result.txt','a',encoding='utf-8') as writelist: b = dirs.replace(",", '\n') writelist.write(b)<br />#去除重复ip,然后把结果写进only.txt文件里with open('result.txt','r',encoding='utf-8') as readlist: lines_seen = set() for line in readlist.readlines(): if line not in lines_seen: lines_seen.add(line) with open('only.txt','a',encoding='utf-8') as writelist: writelist.write(line)<br />#参考文章:https://blog.csdn.net/qq_22764813/article/details/73187473?locationNum=1&fps=1

  提取成这样的一行只有一个ip,没有重复的文字,我们可以放到goby,fscan,小米风扇等工具中扫描

  fscan工具扫描ip

  工具地址:GitHub - shadow1ng/fscan:一款全面的内网扫描工具,方便一键自动化,全方位漏扫。

  本工具主要用于内网扫描、资产发现、漏洞扫描和弱口令爆破。它运行得非常快。对于一些网络资产的外部网络检测和发现,它也是一个不错的选择。

  常用命令:全端口扫描 fscan64.exe -hf ip.txt -p 1-65535 -o result.txt

  将要扫描的ip地址放在ip.txt中,result.txt就是运行结果

  

  小米风扇

  工具地址:我的小工具-标签-范世强-博客园

  (如果找不到这个版本的地址,就贴出作者博客的地址)

  JSFinder扫描js和url

  工具地址:GitHub - Threezh1/JSFinder:JSFinder是一款快速从网站JS文件中提取URL和子域名的工具。

  常用命令:python JSFinder.py -f targets.txt -d -ou JSurl.txt -os JSdomain.txt

  将要扫描的url放在targets.txt中,运行后会生成两个txt文本,JSurl.txt为URL,JSdomain.txt为子域名

  上述工具的扫描结果收录大量的url。如果我们需要更高效,我们可以先从参数入手,这样我们就需要对收录参数的url进行过滤。

  脚本:带参数提取url

  #!/usr/bin/env python# conding:utf-8<br />#字符串中有“?”且不在字符串的结尾的就写入result.txt中with open('JSurl.txt','r',encoding='utf-8') as readlist: for dirs in readlist.readlines(): # re_result=re.search(r"'?'",dirs) # re_result=str(re_result) if "?" in dirs :#判断字符中是否有“?”,如果有则返回该字符串的位置,是从坐标0开始算的 re = dirs.find("?") # a=len(dirs)-2是为了判断“?”是不是在最后一个字符,len()与find()不同是从一开始算字符串的长度的,在加上每行字符中\n换行符也占了一个字符,所以要减2 a=len(dirs)-2#判断字符串中“?”是不是在字符的最后 if re < a : with open('result.txt','a',encoding='utf-8') as writelist: writelist.write(dirs)<br />#去除result.txt中的重复字符串,然后把结果写进only.txt文件里with open('result.txt','r',encoding='utf-8') as readlist: lines_seen = set() for line in readlist.readlines(): if line not in lines_seen: lines_seen.add(line) with open('only.txt','a',encoding='utf-8') as writelist: writelist.write(line)<br />#参考文章:https://www.cnblogs.com/luguankun/p/11846401.html(判断一个字符是否在一个字符串中)

  <br style="outline: 0px;" />

  脚本运行后生成的带参数的url如下

  从已弃用的接口中查找漏洞

  有的网站经过多轮渗透,正常业务测试很差,连逻辑漏洞都找不到。经过以上信息采集,一般就可以采集到网站url的历史业务了。

  然后我们使用脚本对url进行处理和筛选,批量放入sqlmap中运行。还有一些敏感接口可以尝试发现未授权访问、信息泄露等。

  sqlmap批量扫描

  

  常用命令:python sqlmap.py -m urls.txt --batch

  在 urls.txt 文件中,放入我们使用脚本“Extract url with parameters”过滤的 url

  除了参数,还可以用同样的思路修改脚本,找到敏感接口和url跳转参数等。

  通用敏感接口

  常用跳转参数

  toUrl=

  登录网址=

  注册网址

  重定向网址=

  加载网址=

  proxy_url=

  文件网址=

  跳转网址=

  在某个项目中,客户想知道我是如何找到接口的

  侵权请私聊公众号删文

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线