如何用Python编程实时监控拉盘或砸盘行为(图)

优采云 发布时间: 2021-06-09 22:19

  如何用Python编程实时监控拉盘或砸盘行为(图)

  一、想法整理

  在写了上一篇文章《如何使用Python编程实时监控拉币或砸币行为》后,发现公信宝的拉币每次都会在微博上提前通知,如下图

  另外,第三次回购持续了两天,所以如果你能及时在微博上收到通知,就会有更多消息。但是不可能随时盯着微博,所以想用python实时监控微博的新内容。

  在谷歌下,给出的解决方案大多是使用python模拟登录手机版微博,然后从自己的关注列表或者关注用户的UID中获取内容。但是我做了几次,花了几个小时才完成。主要原因是搜索文章基本在3月份之前,微博的反爬虫功能会越来越完善,导致很多原有的方法失效。

  我试过他的代码,运行没有错误。太好了,我可以从中学习。非常感谢作者公众号:编程的思考。

  借鉴他的方法,先爬取微博内容。之后就可以在此基础上编程实现自己想要的功能了。

  二、Grab 微博内容

  使用手机网页打开微博首页进行监控。比如我要监控的公信宝首页是%3D1%26q%3D%E5%85%AC%E4%BF%A1%E5%AE%9D。打开后按F12调出开发者工具网络选项卡,勾选隐藏数据网址,选择XHR,找到微博内容相关信息,发现隐藏在getIndex?uid=5598561921下,每次下载都会出现有了新标签页,我们监控新内容,只要第一页就够了。

  然后点击headers,找到我们需要的内容的Request URL。获取方法是requests.get,User-Agent,复制。

  这样我们就可以通过以下几项获取公信宝首页的数据,内容需要进行处理。

  导入请求

  importjson,time,datetime

  user_agent ='Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.368 3.86 移动版 Safari/537.36'

  header = {'User-Agent': user_agent}# 不需要 cookie

  #用户公信宝

  XHRUrl ='%3D1%26q%3D%E5%85%AC%E4%BF%A1%E5%AE%9D&containerid=61921'

  r = requests.get(XHRUrl,headers=header)

  # 打印(r.text)

  json_str = r.text#所有数据

  dict_ = json.loads(json_str)#转成json格式

  下一步是如何找到我们想要的微博内容。

  点击预览,一一展开,我们看到10个非常整齐的数据:

  再次展开每个数据栏,比如点击0,我们发现微博的内容对应有一个“文字”,然后我们通过处理字典数据就可以找到这个内容。

  返回搜狐,查看更多 . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .

  n =1# 用于打印数量

  #对于一个可迭代/可遍历的对象(如列表、字符串),enumerate组成一个索引序列,可以同时获取索引和值

  WB_text_List = []#微博内容列表

  forindex,cardinenumerate(dict_['data']['cards']):

  ifdict_['data']['cards'][index]['card_type'] ==9:#

  }

  headers = {"Content-Type":"application/json ;charset=utf-8"}

  url ='#x27;+ robots_id

  body = json.dumps(msg)

  status = requests.post(url,data=body,headers=headers)

  ifstatus.status_code ==200:

  returnstatus.json()

  # 其他:

  #return response.json()

  返回状态

  exceptExceptionaserr:

  print('钉钉发送失败',err)

  user_agent ='Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.368 3.86 移动版 Safari/537.36'

  header = {'User-Agent': user_agent}# 不需要 cookie

  #用户公信宝

  XHRUrl ='%3D1%26q%3D%E5%85%AC%E4%BF%A1%E5%AE%9D&containerid=61921'

  r = requests.get(XHRUrl,headers=header)

  # 打印(r.text)

  json_str = r.text#所有数据

  dict_ = json.loads(json_str)#转成json格式

  n =1# 用于打印数量

  #对于一个可迭代/可遍历的对象(如列表、字符串),enumerate组成一个索引序列,可以同时获取索引和值

  WB_text_List = []#微博内容列表

  forindex,cardinenumerate(dict_['data']['cards']):

  ifdict_['data']['cards'][index]['card_type'] ==9:# {card_type: 9,只有9以下的内容

  # print(dict_['data']['cards'][index]['mblog']['id']) #每个微博的ID

  text = dict_['data']['cards'][index]['mblog']['text']#每条微博的内容

  text = text.split(')[]#发现内容混杂,使用split对字符串进行拆分,分离后的文本会在新列表的第一个,暂时去掉图片

  created_at = dict_['data']['cards'][index]['mblog']['created_at']#每条微博的发布时间

  # 打印(n,文本)

  打印(n,created_at,text)

  WB_text_List.append(text)# 当前内容列表

  n = n +1

  虽然为真:

  试试:

  r = requests.get(XHRUrl,headers=header)#使用.format()输入用户id和页码

  json_str = r.text#所有数据

  dict_ = json.loads(json_str)#转成json格式

  n =1# 用于打印数量

  forindex,cardinenumerate(dict_['data']['cards']):

  ifdict_['data']['cards'][index]['card_type'] ==9:# {card_type: 9,只有9以下的内容

  # print(dict_['data']['cards'][index]['mblog']['id']) #每个微博的ID

  text = dict_['data']['cards'][index]['mblog']['text']#每条微博的内容

  text = text.split(')[]#发现内容混杂,使用split对字符串进行拆分,分离后的文本会在新列表的第一个,暂时去掉图片

  created_at = dict_['data']['cards'][index]['mblog']['created_at']#每条微博的发布时间

  如果不是(textinWB_text_List):

  print('新微博',text,sep='\n')

  content ='新微博'+'\n'+ text

  send_msg1 = send_dingding_msg1(内容)

  打印(send_msg1)

  n = n +1

  time.sleep(5)

  exceptExceptionaserr:

  print("查询错误",err)

  time.sleep(1)

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线