实时抓取网页数据(废话如何知道你写的爬虫有没有正常运行,请求了多少个网页)

优采云 发布时间: 2022-03-04 17:18

  实时抓取网页数据(废话如何知道你写的爬虫有没有正常运行,请求了多少个网页)

  效果图:

  

  废话

  怎么知道自己写的爬虫是否正常运行,运行了多长时间,请求了多少网页,抓取了多少条数据?官方其实提供了一个收录一些爬取相关信息的字典:crawler.stats.get_stats(),crawler是scrapy中的一个组件。您可以在许多组件中访问它,例如所有收录 from_crawler(cls, crawler) 方法的组件。

  既然可以获取scrapy的运行状态,那么实时显示应该很简单。我们还使用了上一篇博客中使用的influxdb+grafana来展示数据。我们只需要将一些scrapy的操作信息实时同步到influxdb数据库中,然后我们就可以通过grafana将数据库的内容以图形的形式展示出来。

  写数据库

  如何将字典实时同步到数据库?必须在此处设置同步间隔,例如 5 秒。那么我们的要求就是让scrapy每隔5秒将爬虫的运行状态信息写入数据库。如上所述,可以访问 crawler.stats.get_stats() 的组件有很多,例如中间件、管道和爬虫。我们应该在哪个组件中同步信息?

  对此,我们可以先看一些内置组件实现了哪些功能,然后再看与需求最相似的功能。显然,最适合的功能是扩展组件。很多人可能没有使用过这个组件。看了很多博客很少提到这个组件,因为这个组件能做什么,别人也能做什么,用它只是为了让分工更清晰。因此,一些附加功能一般都写在扩展中。我们先来看看内置的实现了哪些功能。

  日志统计扩展就是将 crawler.stats.get_stats() 的字典信息写入日志,和我要实现的功能基本类似。所以代码可以参考参考。看看我的代码:

  import logging

from scrapy import signals

import datetime

from threading import Timer

from influxdb import InfluxDBClient

logger = logging.getLogger(__name__)

class SpiderStatLogging:

def __init__(self, crawler, dbparams, interval):

self.exit_code = False

self.interval = interval

self.crawler = crawler

self.client = InfluxDBClient(**dbparams)

self.stats_keys = set()

self.cur_d = {

'log_info': 0,

'log_warning': 0,

'requested': 0,

'request_bytes': 0,

'response': 0,

'response_bytes': 0,

'response_200': 0,

'response_301': 0,

'response_404': 0,

'responsed': 0,

'item': 0,

'filtered': 0,

}

@classmethod

def from_crawler(cls, crawler):

dbparams = crawler.settings.get('INFLUXDB_PARAMS')

interval = crawler.settings.get('INTERVAL', 60)

ext = cls(crawler, dbparams, interval)

crawler.signals.connect(ext.engine_started, signal=signals.engine_started)

crawler.signals.connect(ext.engine_stopped, signal=signals.engine_stopped)

crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)

crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)

return ext

def spider_closed(self, spider, reason):

logger.info(self.stats_keys)

influxdb_d = {

"measurement": "spider_closed",

"time": datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),

"tags": {

'spider_name': spider.name

},

"fields": {

'end_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),

'reason': reason,

'spider_name':spider.name

}

}

if not self.client.write_points([influxdb_d]):

raise Exception('写入influxdb失败!')

def spider_opened(self, spider):

influxdb_d = {

"measurement": "spider_opened",

"time": datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),

"tags": {

'spider_name': spider.name

},

"fields": {

'start_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),

'spider_name':spider.name

}

}

if not self.client.write_points([influxdb_d]):

raise Exception('写入influxdb失败!')

def engine_started(self):

Timer(self.interval, self.handle_stat).start()

def engine_stopped(self):

self.exit_code = True

def handle_stat(self):

stats = self.crawler.stats.get_stats()

d = {

'log_info': stats.get('log_count/INFO', 0),

'dequeued': stats.get('scheduler/dequeued/redis', 0),

'log_warning': stats.get('log_count/WARNING', 0),

'requested': stats.get('downloader/request_count', 0),

'request_bytes': stats.get('downloader/request_bytes', 0),

'response': stats.get('downloader/response_count', 0),

'response_bytes': stats.get('downloader/response_bytes', 0),

'response_200': stats.get('downloader/response_status_count/200', 0),

'response_301': stats.get('downloader/response_status_count/301', 0),

'response_404': stats.get('downloader/response_status_count/404', 0),

'responsed': stats.get('response_received_count', 0),

'item': stats.get('item_scraped_count', 0),

'depth': stats.get('request_depth_max', 0),

'filtered': stats.get('bloomfilter/filtered', 0),

'enqueued': stats.get('scheduler/enqueued/redis', 0),

'spider_name': self.crawler.spider.name

}

for key in self.cur_d:

d[key], self.cur_d[key] = d[key] - self.cur_d[key], d[key]

influxdb_d = {

"measurement": "newspider",

"time": datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),

"tags": {

'spider_name': self.crawler.spider.name

},

"fields": d

}

if not self.client.write_points([influxdb_d]):

raise Exception('写入influxdb失败!')

self.stats_keys.update(stats.keys())

if not self.exit_code:

Timer(self.interval, self.handle_stat).start()

  代码应该不难理解。从settings.py中读取两个变量'INFLUXDB_PARAMS'和'INTERVAL',然后在引擎启动时启动一个定时器,每隔INTERVAL秒执行一次handle_stat函数。handle_stat函数的作用就是把 crawler.stats.get_stats() 这个字典写入influxdb数据库。然后只需在配置文件中启用这个扩展,

  EXTENSIONS = {

'项目名称.文件名称.SpiderStatLogging': 1,

# 假设上面的代码都保存在extensions.py中,放在和settings.py同级目录,

# 则可以写成:项目名称.extensions..SpiderStatLogging

}

  显示数据库

  关于granfana我就不多说了,不明白的请百度,或者看我上一篇博客再百度。

  图表json:(如果太长,直接放到网盘,复制到grafana导入)

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线