如何利用开源的Scrapy爬虫框架来爬取新闻网站的数据

优采云 发布时间: 2021-03-23 00:10

  如何利用开源的Scrapy爬虫框架来爬取新闻网站的数据

  在当今的Internet环境中,越来越多地将Internet上的各种业务数据(例如新闻,社交网站,交易,政府公共数据,天气数据等)应用于公司数据。外部数据和内部数据之间的通道,两者相互碰撞。这些数据通常收录大量数据,这是最适合用MaxCompute分析和处理的数据类型。特别是,您可以使用MaxCompute的机器学习功能来完成一些数据挖掘业务场景。本文介绍如何使用开源Scrapy采集器框架。将新闻网站的数据抓取到MaxCompute中。

  

  一、 Scrapy简介

  Scrapy是一个用Python编写的Crawler框架,简单,轻巧,非常方便。

  Scrapy使用Twisted(一个异步网络库)来处理网络通信。它具有清晰的体系结构,并包括各种中间件接口,可以灵活地满足各种要求。整体结构如下图所示:

  

  绿线是数据流向。首先,调度程序将从初始URL开始,将其交给下载器进行下载,然后在下载之后将其交给Spider进行分析。 Spider分析的结果有两个:一个是需要进一步的爬网,例如,到之前分析的“下一页”的链接,这些东西将被发送回调度程序。另一个是需要保存的数据,然后将它们发送到项目管道,这是对数据的后处理(详细的分析,过滤,存储等)。另外,可以在数据流通道中安装各种中间件以执行必要的处理。

  二、 Scrapy环境安装系统环境要求:

  Linux

  软件环境要求:已安装:Python 2. 7(下载链接:)已安装:pip(请参阅:安装Scrapy安装

  执行安装命令:

  pip install Scrapy

  草率验证

  执行命令:

  scrapy

  执行结果:

  

  ODPS Python安装

  执行安装命令:

  pip install pyodps

  ODPS Python验证

  执行命令:

  python -c "from odps import ODPS"

  执行结果:如果未报告任何错误,则说明安装成功

  三、创建一个Scrapy项目

  在要创建Scrapy项目的目录中,执行:

  scrapy startproject hr_scrapy_demo

  在Scrapy创建项目后查看目录结构:

  hr_scrapy_demo /

scrapy.cfg # 全局配置文件

hr_scrapy_demo / # 项目下的Python模块,你可以从这里引用该Python模块

__init__.py

items.py # 自定义的Items

pipelines.py # 自定义的Pipelines

settings.py # 自定义的项目级配置信息

spiders/ # 自定义的spiders

__init__.py

  四、创建OdpsPipelines

  在hr_scrapy_demo / pipelines.py中,我们可以自定义数据处理管道。以下是我之前写过的OdpsPipeline。此管道可用于将我们采集的项目保存到ODPS,但还有几点需要说明:

  ODPS中的表必须已经预先创建。 Spider中采集的项目必须收录表的所有字段,并且名称必须一致,否则将引发异常。支持分区表和非分区表。

  在您的项目中将以下代码替换为pipelines.py

  

# -*- coding: utf-8 -*-

# Define your item pipelines here

#

# Don't forget to add your pipeline to the ITEM_PIPELINES setting

# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html

from odps import ODPS

import logging

logger = logging.getLogger('OdpsPipeline')

class OdpsPipeline(object):

collection_name = 'odps'

records = []

def __init__(self, odps_endpoint, odps_project,accessid,accesskey,odps_table,odps_partition=None,buffer=1000):

self.odps_endpoint = odps_endpoint

self.odps_project = odps_project

self.accessid = accessid

self.accesskey = accesskey

self.odps_table = odps_table

self.odps_partition = odps_partition

self.buffer = buffer

@classmethod

def from_crawler(cls, crawler):

return cls(

odps_endpoint=crawler.settings.get('ODPS_ENDPOINT'),

odps_project=crawler.settings.get('ODPS_PROJECT'),

accessid=crawler.settings.get('ODPS_ACCESSID'),

accesskey=crawler.settings.get('ODPS_ACCESSKEY'),

odps_table=crawler.settings.get('ODPS_TABLE'),

odps_partition=crawler.settings.get('ODPS_PARTITION'),

buffer=crawler.settings.get('WRITE_BUFFER')

)

def open_spider(self, spider):

self.odps = ODPS(self.accessid,self.accesskey,project=self.odps_project,endpoint=self.odps_endpoint)

self.table = self.odps.get_table(self.odps_table)

if(self.odps_partition is not None and self.odps_partition != ""):

self.table.create_partition(self.odps_partition,if_not_exists=True)

def close_spider(self, spider):

self.write_to_odps()

'''

将数据写入odps

'''

def write_to_odps(self):

if(len(self.records) is None or len(self.records) == 0):

return

if(self.odps_partition is None or self.odps_partition == ""):

with self.table.open_writer() as writer:

writer.write(self.records)

logger.info("write to odps {0} records. ".format(len(self.records)))

self.records = []

else:

with self.table.open_writer(partition=self.odps_partition) as writer:

writer.write(self.records)

logger.info("write to odps {0} records. ".format(len(self.records)))

self.records = []

def isPartition(self,name):

for pt in self.table.schema.partitions:

if(pt.name == name):

return True

return False

def process_item(self, item, spider):

cols = []

for col in self.table.schema.columns:

if(self.isPartition(col.name)):

continue

c = None

for key in item.keys():

if(col.name == key):

c = item[key]

break

if(c is None):

raise Exception("{0} column not found in item.".format(col.name))

cols.append(c)

self.records.append(self.table.new_record(cols))

#logger.info("records={0} : buffer={1}".format(len(self.records),self.buffer))

if( len(self.records) >= int(self.buffer)):

self.write_to_odps()

return item

  将管道注册到hr_scrapy_demo / setting.py并将ITEM_PIPELINES的值修改为:

  # Configure item pipelines

# See http://scrapy.readthedocs.org/en/latest/topics/item-pipeline.html

ITEM_PIPELINES = {

'hr_scrapy_demo.pipelines.OdpsPipeline': 300,

}

#300代表Pipeline的优先级,可以同时存在多个pipeline,依据该数值从小到大依次执行pipeline

  五、配置ODPS基本信息

  在hr_scrapy_demo / setting.py中,添加如下参数:

  六、创建自己的蜘蛛

  Spider主要用于采集 网站数据,并分析网站数据并将其转换为相应的项目,然后由管道对其进行处理。对于需要采集的每个网站,我们需要分别创建一个相应的Spider。

  以下是基于采集南方新闻网的重要新闻的蜘蛛示例。

  

# -*- coding:utf-8 -*-

import scrapy

import logging

logger = logging.getLogger('NanfangSpider')

class NanfangSpider(scrapy.Spider):

name = "nanfang"

'''

设置你要采集的其实网址,可以是多个.

此处以南方新闻网-要闻-首页为例.

'''

start_urls = [

'http://www.southcn.com/pc2016/yw/node_346416.htm'

]

'''

[ODPS配置信息]

ODPS_TABLE:ODPS表名

ODPS_PARTITION:ODPS表的分区值(可选)

WRITE_BUFFER:写入缓存(默认1000条)

'''

custom_settings = {

'ODPS_TABLE':'hr_scrapy_nanfang_news',

#'ODPS_PARTITION':'pt=20170209',

'WRITE_BUFFER':'1000'

}

'''

ODPS Demo DDL:

drop table if exists hr_scrapy_nanfang_news;

create table hr_scrapy_nanfang_news

(

title string,

source string,

times string,

url string,

editor string,

content string

);

'''

'''

对start_urls的url的解析方法,返回结果为item.

关于具体解析API可参考:https://doc.scrapy.org/en/latest/intro/tutorial.html

'''

def parse(self, response):

#查找网页中DIV元素,且其class=j-link,并对其进行遍历

for quote in response.css("div.j-link"):

#查找该DIV中的所有<a>超链接,并获取其href

href = quote.css("a::attr('href')").extract_first()

#进入该href链接,此处跳转到方法:parse_details,对其返回HTML进行再次处理。

yield scrapy.Request(response.urljoin(href),callback=self.parse_details)

#查找下一页的连接,此处用xpath方式获取,因css语法简单,无法获取

nexthref = response.xpath(u'//div[@id="displaypagenum"]//center/a[last()][text()="\u4e0b\u4e00\u9875"]/@href').extract_first()

#如找到下一页,则跳转到下一页,并继续由parse对返回HTML进行处理。

if(nexthref is not None):

yield scrapy.Request(response.urljoin(nexthref),callback=self.parse)

'''

新闻详情页处理方法

'''

def parse_details(self, response):

#找到正文

main_div = response.css("div.main")

#因新闻详情也可能有分页,获取下一页的链接

next_href = main_div.xpath(u'//div[@id="displaypagenum"]/center/a[last()][text()="\u4e0b\u4e00\u9875"]/@href').extract_first()

#获取正文内容,仅取DIV内所有<p>元素下的文本。

content = main_div.xpath('//div[@class="content"]//p//text()').extract()

content = "\n".join(content)

if(next_href is None):

#最后一页,则获取所有内容,返回item

title = main_div.css('div.m-article h2::text').extract_first()

source = main_div.css('div.meta span[id="pubtime_baidu"]::text').extract_first()

times = main_div.css('div.meta span[id="source_baidu"]::text').extract_first()

url = response.url

editor = main_div.css('div.m-editor::text').extract_first()

item = {}

if('item' in response.meta):

item = response.meta['item']

item['title'] = title

item['source'] = source

item['times'] = times

item['url'] = url

item['editor'] = editor

if('content' in item):

item['content'] += '\n'+content

else:

item['content'] = content

yield item

else:

#非最后一页 ,则取出当前页content,并拼接,然后跳转到下一页

request = scrapy.Request(response.urljoin(next_href),

callback=self.parse_details)

item = {}

if('item' in response.meta and 'content' in response.meta['item']):

item = response.meta['item']

item['content'] += '\n'+content

else:

item['content'] = content

request.meta['item'] = item

yield request

  七、运行Scrapy

  切换到您的项目目录并执行以下命令:

  Scrapy crawl nanfang –loglevel INFO

执行结果如下图所示:

  

  八、验证抓取结果

  完成数据采集后,登录到DATA IDE以查看采集的内容:

  

  本文仅演示一个简单的案例。实际生产中还需要考虑多线程处理,网站验证,分布式爬网等。

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线