微博关键词爬虫,数据解析最关键的一步!

优采云 发布时间: 2021-07-02 23:17

  微博关键词爬虫,数据解析最关键的一步!

  requests 库是 Python 爬虫中最常用的库。与内置的urllib库相比,更加简洁高效。是每个爬虫必须掌握的基础;但它也有缺点,就是不支持异步操作,可以通过多线程解决,但是当需要发送大量请求时,创建大量线程会浪费太多资源;这时候出现了一个新的库aiohttp,它支持异步操作,可以在线程中,通过异步多任务快速发送请求,提高效率。这一次,我基于这两个库做了一个高效的微博关键词爬虫。源代码在文章末尾。

  首先,我从微博的移动地址开始,发现是一个 ajsx 请求。在请求参数中,除了页码,其他都是不变的。因此,要实现多页请求,直接发送页码作为参数。能够。但是页面返回的json数据并没有直接表示总页数,需要自己计算。经过进一步分析,发现数据收录微博帖子总数和每页帖子数。这是突破点。对它进行简单的计算就可以得到总页数。这里只需要发送一次请求即可获取信息,所以这里使用的是requests。

  

  

  def get_page():

"""

先用requests构造请求,解析出关键词搜索出来的微博总页数

:return: 返回每次请求需要的data参数

"""

data_list = []

data = {

'containerid': '100103type=1&q={}'.format(kw),

'page_type': 'searchall'}

resp = requests.get(url=url, headers=headers, params=data)

total_page = resp.json()['data']['cardlistInfo']['total'] # 微博总数

# 一页有10条微博,用总数对10整除,余数为0则页码为总数/10,余数不为0则页码为(总数/10)+1

if total_page % 10 == 0:

page_num = int(total_page / 10)

else:

page_num = int(total_page / 10) + 1

# 页码为1,data为当前data,页码不为1,通过for循环构建每一页的data参数

if page_num == 1:

data_list.append(data)

return data_list

else:

for i in range(1, page_num + 1):

data['page'] = i

data_list.append(copy.deepcopy(data))

return data_list

  页码分析

  得到页码后,就可以分析数据了。每个页面都需要发送一个单独的请求。这里为了提高效率,使用了aiohttp。通过 async关键词 定义一个特殊的函数并返回一个协程对象。请注意,函数内的所有代码都必须支持异步操作。构造请求时需要注意具体的格式。

  

  

  # async定义函数,返回一个协程对象

async def crawl(data):

"""

多任务异步解析页面,存储数据

:param data: 请求所需的data参数

:return: None

"""

async with aiohttp.ClientSession() as f: # 实例化一个ClientSession

async with await f.get(url=url, headers=headers, params=data) as resp: # 携带参数发送请求

text = await resp.text() # await 等待知道获取完整数据

text_dict = json.loads(text)['data']['cards']

parse_dict = {}

for card in text_dict:

if card['card_type'] == 9:

scheme = card['scheme']

if card['mblog']['isLongText'] is False:

text = card['mblog']['text']

text = re.sub(r'|\n+', '', text)

else:

text = card['mblog']['longText']['longTextContent']

user = card['mblog']['user']['profile_url']

comments_count = card['mblog']['comments_count']

attitudes_count = card['mblog']['attitudes_count']

parse_dict['url'] = scheme

parse_dict['text'] = text

parse_dict['author'] = user

parse_dict['comments_count'] = comments_count

parse_dict['attitudes_count'] = attitudes_count

parse_dict_list.append(copy.deepcopy(parse_dict))

  数据分析

  最关键的一步是将协程对象添加到事件循环中,实现异步执行。

  

  

  task_list = [] # 定义一个任务列表

for data in data_list:

c = crawl(data) # 调用协程,传参

task = asyncio.ensure_future(c) # 创建任务对象

task_list.append(task) # 将任务添加到列表中

loop = asyncio.get_event_loop() # 创建事件循环

loop.run_until_complete(asyncio.wait(task_list)) # 开启循环,并将阻塞的任务挂起

  事件循环

  以上部分是整个爬虫的关键。剩下的数据写入(导出到excel)直接放在源码里面。如有不足之处请指正!

  

  

  import copy

import aiohttp

import requests

import re

import asyncio

import json

import xlwt

def get_page():

"""

先用requests构造请求,解析出关键词搜索出来的微博总页数

:return: 返回每次请求需要的data参数

"""

data_list = []

data = {

'containerid': '100103type=1&q={}'.format(kw),

'page_type': 'searchall'}

resp = requests.get(url=url, headers=headers, params=data)

total_page = resp.json()['data']['cardlistInfo']['total'] # 微博总数

# 一页有10条微博,用总数对10整除,余数为0则页码为总数/10,余数不为0则页码为(总数/10)+1

if total_page % 10 == 0:

page_num = int(total_page / 10)

else:

page_num = int(total_page / 10) + 1

# 页码为1,data为当前data,页码不为1,通过for循环构建每一页的data参数

if page_num == 1:

data_list.append(data)

return data_list

else:

for i in range(1, page_num + 1):

data['page'] = i

data_list.append(copy.deepcopy(data))

return data_list

# async定义函数,返回一个协程对象

async def crawl(data):

"""

多任务异步解析页面,存储数据

:param data: 请求所需的data参数

:return: None

"""

async with aiohttp.ClientSession() as f: # 实例化一个ClientSession

async with await f.get(url=url, headers=headers, params=data) as resp: # 携带参数发送请求

text = await resp.text() # await 等待知道获取完整数据

text_dict = json.loads(text)['data']['cards']

parse_dict = {}

for card in text_dict:

if card['card_type'] == 9:

scheme = card['scheme']

if card['mblog']['isLongText'] is False:

text = card['mblog']['text']

text = re.sub(r'|\n+', '', text)

else:

text = card['mblog']['longText']['longTextContent']

user = card['mblog']['user']['profile_url']

comments_count = card['mblog']['comments_count']

attitudes_count = card['mblog']['attitudes_count']

parse_dict['url'] = scheme

parse_dict['text'] = text

parse_dict['author'] = user

parse_dict['comments_count'] = comments_count

parse_dict['attitudes_count'] = attitudes_count

parse_dict_list.append(copy.deepcopy(parse_dict))

def insert_data(file_name):

"""

将数据导出到excle中

:param file_name: 文件名

:return:

"""

wr = xlwt.Workbook(encoding='utf8')

table = wr.add_sheet(file_name)

table.write(0, 0, '原链接')

table.write(0, 1, '正文')

table.write(0, 2, '作者首页')

table.write(0, 3, '评论数')

table.write(0, 4, '点赞数')

for index, data in enumerate(parse_dict_list):

table.write(index + 1, 0, data['url'])

table.write(index + 1, 1, data['text'])

table.write(index + 1, 2, data['author'])

table.write(index + 1, 3, data['comments_count'])

table.write(index + 1, 4, data['attitudes_count'])

file_path = file_name + '.xls'

wr.save(file_path)

def main(file_name):

"""

开启多任务循环

:return: None

"""

data_list = get_page() # 接收data参数列表

task_list = [] # 定义一个任务列表

for data in data_list:

c = crawl(data) # 调用协程,传参

task = asyncio.ensure_future(c) # 创建任务对象

task_list.append(task) # 将任务添加到列表中

loop = asyncio.get_event_loop() # 创建事件循环

loop.run_until_complete(asyncio.wait(task_list)) # 开启循环,并将阻塞的任务挂起

insert_data(file_name)

if __name__ == '__main__':

kw = input('关键词:')

headers = {

'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.57.2 (KHTML, like Gecko) Version/5.1.7 Safari/534.57.2'}

url = 'https://m.weibo.cn/api/container/getIndex'

parse_dict_list = [] # 临时存放爬取的数据

main(kw)

  完整代码

  注意,由于微博的反爬虫机制,每次短时间的大量请求都会导致ip短时间被禁用,这里可以通过添加代理来解决。我的想法是在页码分析部分添加代理池,随机选择代理。如果当前ip返回的状态码为200,会解析页码,将ip带到页面进行分析;如果状态码不是200,会重复选择下一个ip。

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线