抓取网页新闻(一周怎么去爬取新浪网和每经网的上市公司新闻数据 )
优采云 发布时间: 2021-12-08 17:09抓取网页新闻(一周怎么去爬取新浪网和每经网的上市公司新闻数据
)
在过去的一周里,我一直在深入研究如何使用新闻数据进行量化投资。在正式进行文本挖掘和制定策略之前,当然要先准备好数据。“网络爬虫”和“数据爬虫”这两个词已经很臭了,说起来不难,但要做到精准并不容易。如果忽略数据采集的重置价格,数据将永远存在。只要爬到的网站服务器不删除数据,就会一直请求数据。但如果你只是认为自己很强大,那你就错了。一般服务器都会有反蜘蛛的,但在大多数情况下,反蜘蛛的需求不会影响网站的正常使用,也就是网站的功能要求必须高于反蜘蛛-爬虫要求。
下面我们就来看看如何爬取新浪网和上市公司的新闻数据。
在爬取数据之前,准备好数据库更方便。这里我比较喜欢非关系型数据库,优缺点就不多说了。这里我选择MongoDB。如果习惯了可视化管理数据的方式,当然不能错过 Robomongo。
接下来我们看一下两个网站的页面结构:
单线程爬行速度肯定没有多线程快,但是协程爬行和多线程爬行是不能完全区分的。协程虽然是轻量级线程,但是达到一定数量后,仍然会导致服务器崩溃和报错,比如下面的“cannot watch more than 1024 sockets”问题。解决这个问题的最好方法是限制并发协程的数量。
不考虑多进程,占用大量内存,启动时间特别长。新浪网的响应速度还是有优势的,页面的字节数也很大,这意味着在这种情况下,多线程相对于单线程的优势会更加明显。下面是抓取历史新闻的代码:
# -*- coding: utf-8 -*-
"""
Created on Mon Jan 22 10:01:40 2018
@author: Damon
"""
import time
import re
import requests
import gevent
from gevent import monkey,pool
monkey.patch_all()
from concurrent import futures
from bs4 import BeautifulSoup
from pymongo import MongoClient
class WebCrawlFromSina(object):
def __init__(self,*arg,**kwarg):
self.totalPages = arg[0] #totalPages
self.Range = arg[1] #Range
self.ThreadsNum = kwarg['ThreadsNum']
self.dbName = kwarg['dbName']
self.colName = kwarg['collectionName']
self.IP = kwarg['IP']
self.PORT = kwarg['PORT']
self.Porb = .5
def countchn(self,string):
pattern = re.compile(u'[\u1100-\uFFFDh]+?')
result = pattern.findall(string)
chnnum = len(result)
possible = chnnum/len(str(string))
return (chnnum, possible)
def getUrlInfo(self,url): #get body text and key words
respond = requests.get(url)
respond.encoding = BeautifulSoup(respond.content, "lxml").original_encoding
bs = BeautifulSoup(respond.text, "lxml")
meta_list = bs.find_all('meta')
span_list = bs.find_all('span')
part = bs.find_all('p')
article = ''
date = ''
summary = ''
keyWords = ''
stockCodeLst = ''
for meta in meta_list:
if 'name' in meta.attrs and meta['name'] == 'description':
summary = meta['content']
elif 'name' in meta.attrs and meta['name'] == 'keywords':
keyWords = meta['content']
if summary != '' and keyWords != '':
break
for span in span_list:
if 'class' in span.attrs:
if span['class'] == ['date'] or span['class'] == ['time-source']:
string = span.text.split()
for dt in string:
if dt.find('年') != -1:
date += dt.replace('年','-').replace('月','-').replace('日',' ')
elif dt.find(':') != -1:
date += dt
break
if 'id' in span.attrs and span['id'] == 'pub_date':
string = span.text.split()
for dt in string:
if dt.find('年') != -1:
date += dt.replace('年','-').replace('月','-').replace('日',' ')
elif dt.find(':') != -1:
date += dt
break
for span in span_list:
if 'id' in span.attrs and span['id'].find('stock_') != -1:
stockCodeLst += span['id'][8:] + ' '
for paragraph in part:
chnstatus = self.countchn(str(paragraph))
possible = chnstatus[1]
if possible > self.Porb:
article += str(paragraph)
while article.find('') != -1:
string = article[article.find('')+1]
article = article.replace(string,'')
while article.find('\u3000') != -1:
article = article.replace('\u3000','')
article = ' '.join(re.split(' +|\n+', article)).strip()
return summary, keyWords, date, stockCodeLst, article
def GenPagesLst(self):
PageLst = []
k = 1
while k+self.Range-1 = .1:
self.Prob -= .1
summary, keyWords, date, stockCodeLst, article = self.getUrlInfo(a['href'])
self.Prob =.5
if article != '':
data = {'Date' : date,
'Address' : a['href'],
'Title' : a.string,
'Keywords' : keyWords,
'Summary' : summary,
'Article' : article,
'RelevantStock' : stockCodeLst}
self._collection.insert_one(data)
def ConnDB(self):
Conn = MongoClient(self.IP, self.PORT)
db = Conn[self.dbName]
self._collection = db.get_collection(self.colName)
def extractData(self,tag_list):
data = []
for tag in tag_list:
exec(tag + " = self._collection.distinct('" + tag + "')")
exec("data.append(" + tag + ")")
return data
def single_run(self):
page_ranges_lst = self.GenPagesLst()
for ind, page_range in enumerate(page_ranges_lst):
self.CrawlCompanyNews(page_range[0],page_range[1])
def coroutine_run(self):
jobs = []
page_ranges_lst = self.GenPagesLst()
for page_range in page_ranges_lst:
jobs.append(gevent.spawn(self.CrawlCompanyNews,page_range[0],page_range[1]))
gevent.joinall(jobs)
def multi_threads_run(self,**kwarg):
page_ranges_lst = self.GenPagesLst()
print(' Using ' + str(self.ThreadsNum) + ' threads for collecting news ... ')
with futures.ThreadPoolExecutor(max_workers=self.ThreadsNum) as executor:
future_to_url = {executor.submit(self.CrawlCompanyNews,page_range[0],page_range[1]) : \
ind for ind, page_range in enumerate(page_ranges_lst)}
if __name__ == '__main__':
t1 = time.time()
WebCrawl_Obj = WebCrawlFromSina(5000,100,ThreadsNum=4,IP="localhost",PORT=27017,\
dbName="Sina_Stock",collectionName="sina_news_company")
WebCrawl_Obj.coroutine_run() #Obj.single_run() #Obj.multi_threads_run()
t2 = time.time()
print(' running time:', t2 - t1)
因为在爬取的过程中很容易因为对方服务器断了连接停了很久,或者长时间没有响应,但是又不想在重启程序的时候脑补多余的数据,所以启动时,可以先更改数据库中的地址标签数据或日期。获取到数据,然后在插入新的爬取数据之前,先比较是否有重复的Address或Date,然后选择是否插入新的数据。运行下图:首先比较是否有重复的地址或日期,然后选择是否插入新数据。运行下图:首先比较是否有重复的地址或日期,然后选择是否插入新数据。运行下图:
爬取各个网络的时候出现了一个小分叉,检索页面时出现各种连接中断的问题。即使连接没问题,很多爬取的数据只有标题,没有文字和时间。一开始以为是自己写的代码没被抓到,后来发现是被爬回来了。
所以在代码中,你要记录爬取成功的网址和爬取不成功的网址,然后当然继续爬取,直到革命胜利。这种访问同一个链接的连续循环,很容易锁定对方服务器的IP。如果真的是打嗝,就得换个IP才能玩。因此,最好每个周期睡眠一定次数。当然,如果不麻烦的话,睡一个随机数的间隔(看起来比较做作),然后继续爬行。在这里我只是睡了1秒钟继续攀登。一开始在多线程中调用CrawlCompanyNews函数,然后统计返回一个没有被抓取的url_lst_withoutNews,然后传入ReCrawlNews函数,并且单线程被一一重新捕获。最终捕获如下图所示: