文章采集器( Python实现公众号文章采集Liuli的方法代码如下:从代码可知)
优采云 发布时间: 2022-02-20 06:09文章采集器(
Python实现公众号文章采集Liuli的方法代码如下:从代码可知)
介绍
偶然发现了琉璃这个项目,项目Github:
看了它的文章,发现琉璃是用Python实现的,所以打算简单看一下它的实现细节,老规矩,看项目,先把好奇的点写下来:
是的,我对这两点很感兴趣。经过一番阅读,关于好奇心 1、其实没有人实现过漂亮的PC软件界面。琉璃只是采集,然后推送内容,所以这篇文章的重点,就看怎么了采集公众号文章,另外,在阅读的过程中,我发现LiuLi还用了一个简单的方法来识别文章是否是广告文章,这个也很有意思,也记录一下。
公众号文章采集
琉璃基于搜狗微信()对公众号文章进行采集,实现了两种方法:
我们可以通过相应的配置文件来控制琉璃使用哪种方式执行文章采集,它使用ruia默认的方式执行采集。
琉璃将功能划分为多个模块,然后通过调度器调度不同的模块。调度器启动方法代码如下:
# src/liuli_schedule.py
def start(ll_config_name: str = ""):
"""调度启动函数
Args:
task_config (dict): 调度任务配置
"""
if not ll_config_name:
freeze_support()
# 默认启动 liuli_config 目录下所有配置
ll_config_name_list = []
for each_file in os.listdir(Config.LL_CONFIG_DIR):
if each_file.endswith("json"):
# 加入启动列表
ll_config_name_list.append(each_file.replace(".json", ""))
# 进程池
p = Pool(len(ll_config_name_list))
for each_ll_config_name in ll_config_name_list:
LOGGER.info(f"Task {each_ll_config_name} register successfully!")
p.apply_async(run_liuli_schedule, args=(each_ll_config_name,))
p.close()
p.join()
else:
run_liuli_schedule(ll_config_name)
从代码可以看出,调度器会启动Python进程池,然后将run_liuli_schedule异步任务加入其中。在这个异步任务中,会执行run_liuli_task方法,这是一个完整的任务流程。代码如下:
def run_liuli_task(ll_config: dict):
"""执行调度任务
Args:
ll_config (dict): Liuli 任务配置
"""
# 文章源, 用于基础查询条件
doc_source: str = ll_config["doc_source"]
basic_filter = {"basic_filter": {"doc_source": doc_source}}
# 采集器配置
collector_conf: dict = ll_config["collector"]
# 处理器配置
processor_conf: dict = ll_config["processor"]
# 分发器配置
sender_conf: dict = ll_config["sender"]
sender_conf.update(basic_filter)
# 备份器配置
backup_conf: dict = ll_config["backup"]
backup_conf.update(basic_filter)
# 采集器执行
LOGGER.info("采集器开始执行!")
for collect_type, collect_config in collector_conf.items():
collect_factory(collect_type, collect_config)
LOGGER.info("采集器执行完毕!")
# 采集器执行
LOGGER.info("处理器(after_collect): 开始执行!")
for each in processor_conf["after_collect"]:
func_name = each.pop("func")
# 注入查询条件
each.update(basic_filter)
LOGGER.info(f"处理器(after_collect): {func_name} 正在执行...")
processor_dict[func_name](**each)
LOGGER.info("处理器(after_collect): 执行完毕!")
# 分发器执行
LOGGER.info("分发器开始执行!")
send_doc(sender_conf)
LOGGER.info("分发器执行完毕!")
# 备份器执行
LOGGER.info("备份器开始执行!")
backup_doc(backup_conf)
LOGGER.info("备份器执行完毕!")
从 run_liuli_task 方法中,需要执行一个 Liuli 任务:
关于琉璃的功能,可以看作者自己的文章:基于琉璃搭建纯RSS公众号信息流,这里只关注公众号采集的逻辑。
因为采集器有两种不同的实现方式,ruia和playwright,使用哪一种由配置文件决定,然后通过import_module方法动态导入对应的模块,然后运行模块的run方法,从而实现文章的公众号 bool:
"""
采集器工厂函数
:param collect_type: 采集器类型
:param collect_config: 采集器配置
:return:
"""
collect_status = False
try:
# import_module方法动态载入具体的采集模块
collect_module = import_module(f"src.collector.{collect_type}")
collect_status = collect_module.run(collect_config)
except ModuleNotFoundError:
LOGGER.error(f"采集器类型不存在 {collect_type} - {collect_config}")
except Exception as e:
LOGGER.error(f"采集器执行出错 {collect_type} - {collect_config} - {e}")
return collect_status
编剧采集模块实现
Playwright 是微软出品的自动化库。它类似于硒。它定位于网页测试,但也被人们用来获取网页信息。当然,一些前端的反爬措施,编剧是无法突破的。
与selenium相比,playwright支持python的async,性能有所提升(但还是比不上直接请求)。下面是获取公众号下最新文章的一些逻辑(完整代码太长):
async def playwright_main(wechat_name: str):
"""利用 playwright 获取公众号元信息,输出数据格式见上方
Args:
wechat_name ([str]): 公众号名称
"""
wechat_data = {}
try:
async with async_playwright() as p:
# browser = await p.chromium.launch(headless=False)
browser = await p.chromium.launch()
context = await browser.new_context(user_agent=Config.SPIDER_UA)
page = await context.new_page()
# 进行公众号检索
await page.goto("https://weixin.sogou.com/")
await page.wait_for_load_state()
await page.click('input[name="query"]')
await page.fill('input[name="query"]', wechat_name)
await asyncio.sleep(1)
await page.click("text=搜公众号")
await page.wait_for_load_state()
从上面的代码可以看出,playwright的用法和selenium很相似,通过自动化用户操作网站的过程可以得到对应的数据。
ruia 采集 模块实现
ruia 是一个轻量级的 Python 异步爬虫框架。因为它比较轻量级,所以我也把它的代码看成了下一篇文章文章的内容。
它的用法有点像scrapy。需要定义一个继承自ruia.Spider的子类,然后调用start方法请求目标网站,然后ruia会自动调用parse方法解析网页内容。我们来看看具体的代码,首先是入口逻辑:
def run(collect_config: dict):
"""微信公众号文章抓取爬虫
Args:
collect_config (dict, optional): 采集器配置
"""
s_nums = 0
wechat_list = collect_config["wechat_list"]
delta_time = collect_config.get("delta_time", 5)
for wechat_name in wechat_list:
SGWechatSpider.wechat_name = wechat_name
SGWechatSpider.request_config = {
"RETRIES": 3,
"DELAY": delta_time,
"TIMEOUT": 20,
}
sg_url = f"https://weixin.sogou.com/weixin?type=1&query={wechat_name}&ie=utf8&s_from=input&_sug_=n&_sug_type_="
SGWechatSpider.start_urls = [sg_url]
try:
# 启动爬虫
SGWechatSpider.start(middleware=ua_middleware)
s_nums += 1
except Exception as e:
err_msg = f" 公众号->{wechat_name} 文章更新失败! 错误信息: {e}"
LOGGER.error(err_msg)
msg = f" 微信公众号文章更新完毕({s_nums}/{len(wechat_list)})!"
LOGGER.info(msg)
上面代码中,爬虫是通过SGWechatSpider.start(middleware=ua_middleware)启动的,它会自动请求start_urls的url,然后回调parse方法。parse方法的代码如下:
async def parse(self, response: Response):
"""解析公众号原始链接数据"""
html = await response.text()
item_list = []
async for item in SGWechatItem.get_items(html=html):
if item.wechat_name == self.wechat_name:
item_list.append(item)
yield self.request(
url=item.latest_href,
metadata=item.results,
# 下一个回调方法
callback=self.parse_real_wechat_url,
)
break
在parse方法中,通过self.request请求一个新的url,然后回调self.parse_real_wechat_url方法。一切都与scrapy如此相似。
至此采集模块的阅读就结束了(代码中还涉及到一些简单的数据清洗,本文不做讨论),没有特别复杂的部分,从代码来看,作者没被派去做反爬逻辑处理,搜狗微信没反爬?
广告文章标识
然后看广告文章的识别,琉璃依然会采集为广告文章,经过采集,在文章处理模块中,广告 dict:
"""
对文本相似度进行预测
:param text: 文本
:param cos_value: 阈值 默认是0.9
:return:
"""
max_pro, result = 0.0, 0
for each in self.train_data:
# 余弦值具体的运算逻辑
cos = CosineSimilarity(self.process_text(text), each)
res_dict = cos.calculate()
value = res_dict["value"]
# 大于等于cos_value,就返回1,则表示当前的文章是广告文章
result = 1 if value >= cos_value else 0
max_pro = value if value > max_pro else max_pro
if result == 1:
break
return {"result": result, "value": max_pro}
余弦值的具体操作逻辑在CosineSimilarity的calculate方法中,都是和数学有关的,我就不看了。核心是判断当前文章与广告文章的相似度。可以通过TFIDF、文本聚类等算法来完成,相关库几行代码就可以搞定(所以感觉自己写在这里)。
剩下的可以参考逻辑结束
琉璃是一个不错的学习项目,下一部分文章,一起来学习ruia Python轻量级异步爬虫框架的代码。