解读:源码剖析 - 公众号采集阅读器 Liuli
优采云 发布时间: 2022-10-24 19:24解读:源码剖析 - 公众号采集阅读器 Liuli
介绍
偶然发现了琉璃这个项目,项目Github:
看了它的文章,发现琉璃是用Python实现的,所以打算简单看一下它的实现细节,老规矩,看项目,先把好奇的点写下来:
是的,我对这两点很感兴趣。经过一番阅读,关于好奇心 1、其实没有人实现过漂亮的PC软件界面。琉璃只是采集,然后推送内容,所以这篇文章的重点,就看怎么了采集公众号文章,另外,在阅读的过程中,我发现LiuLi还用了一个简单的方法来识别文章是不是广告文章,这个也很有意思,也记录一下。
公众号文章采集
琉璃基于搜狗微信()对公众号文章进行采集,实现了两种方法:
我们可以通过相应的配置文件来控制琉璃使用哪种方式执行文章采集,它使用ruia默认的方式执行采集。
琉璃将功能划分为多个模块,然后通过调度器调度不同的模块。调度器启动方法代码如下:
# src/liuli_schedule.py<br /><br />def start(ll_config_name: str = ""):<br /> """调度启动函数<br /><br /> Args:<br /> task_config (dict): 调度任务配置<br /> """<br /> if not ll_config_name:<br /> freeze_support()<br /><br /> # 默认启动 liuli_config 目录下所有配置<br /> ll_config_name_list = []<br /> for each_file in os.listdir(Config.LL_CONFIG_DIR):<br /> if each_file.endswith("json"):<br /> # 加入启动列表<br /> ll_config_name_list.append(each_file.replace(".json", ""))<br /> # 进程池<br /> p = Pool(len(ll_config_name_list))<br /> for each_ll_config_name in ll_config_name_list:<br /> LOGGER.info(f"Task {each_ll_config_name} register successfully!")<br /> p.apply_async(run_liuli_schedule, args=(each_ll_config_name,))<br /> p.close()<br /> p.join()<br /><br /> else:<br /> run_liuli_schedule(ll_config_name)<br />
从代码中可以看出,调度器会启动Python进程池,然后在其中添加run_liuli_schedule异步任务。在这个异步任务中,会执行run_liuli_task方法,这是一个完整的任务流程。代码如下:
def run_liuli_task(ll_config: dict):<br /> """执行调度任务<br /><br /> Args:<br /> ll_config (dict): Liuli 任务配置<br /> """<br /> # 文章源, 用于基础查询条件<br /> doc_source: str = ll_config["doc_source"]<br /> basic_filter = {"basic_filter": {"doc_source": doc_source}}<br /> # 采集器配置<br /> collector_conf: dict = ll_config["collector"]<br /> # 处理器配置<br /> processor_conf: dict = ll_config["processor"]<br /> # 分发器配置<br /> sender_conf: dict = ll_config["sender"]<br /> sender_conf.update(basic_filter)<br /> # 备份器配置<br /> backup_conf: dict = ll_config["backup"]<br /> backup_conf.update(basic_filter)<br /><br /> # 采集器执行<br /> LOGGER.info("采集器开始执行!")<br /> for collect_type, collect_config in collector_conf.items():<br /> collect_factory(collect_type, collect_config)<br /> LOGGER.info("采集器执行完毕!")<br /> # 采集器执行<br /> LOGGER.info("处理器(after_collect): 开始执行!")<br /> for each in processor_conf["after_collect"]:<br /> func_name = each.pop("func")<br /> # 注入查询条件<br /> each.update(basic_filter)<br /> LOGGER.info(f"处理器(after_collect): {func_name} 正在执行...")<br /> processor_dict[func_name](**each)<br /> LOGGER.info("处理器(after_collect): 执行完毕!")<br /> # 分发器执行<br /> LOGGER.info("分发器开始执行!")<br /> send_doc(sender_conf)<br /> LOGGER.info("分发器执行完毕!")<br /> # 备份器执行<br /> LOGGER.info("备份器开始执行!")<br /> backup_doc(backup_conf)<br /> LOGGER.info("备份器执行完毕!")<br />
从 run_liuli_task 方法中,需要执行一个 Liuli 任务:
关于琉璃的功能大家可以阅读作者自己的文章:,这里我们只关注公众号采集的逻辑。
因为ruia和playwright实现的采集器有两种不同的方式,使用哪一种由配置文件决定,然后通过import_module方法动态导入对应的模块,然后运行模块的run方法,从而实现文章的公众号采集,相关代码如下:
def collect_factory(collect_type: str, collect_config: dict) -> bool:<br /> """<br /> 采集器工厂函数<br /> :param collect_type: 采集器类型<br /> :param collect_config: 采集器配置<br /> :return:<br /> """<br /> collect_status = False<br /> try:<br /> # import_module方法动态载入具体的采集模块<br /> collect_module = import_module(f"src.collector.{collect_type}")<br /> collect_status = collect_module.run(collect_config)<br /> except ModuleNotFoundError:<br /> LOGGER.error(f"采集器类型不存在 {collect_type} - {collect_config}")<br /> except Exception as e:<br /> LOGGER.error(f"采集器执行出错 {collect_type} - {collect_config} - {e}")<br /> return collect_status<br />
编剧采集模块实现
Playwright 是微软出品的自动化库。它类似于硒。它定位于网页测试,但也被人们用来获取网页信息。当然,一些前端的反爬措施,编剧是无法突破的。
与selenium相比,playwright支持python的async,性能有所提升(但还是比不上直接请求)。下面是获取公众号下最新文章的一些逻辑(完整代码太长):
async def playwright_main(wechat_name: str):<br /> """利用 playwright 获取公众号元信息,输出数据格式见上方<br /> Args:<br /> wechat_name ([str]): 公众号名称<br /> """<br /> wechat_data = {}<br /> try:<br /> async with async_playwright() as p:<br /> # browser = await p.chromium.launch(headless=False)<br /> browser = await p.chromium.launch()<br /> context = await browser.new_context(user_agent=Config.SPIDER_UA)<br /> page = await context.new_page()<br /> # 进行公众号检索<br /> await page.goto("https://weixin.sogou.com/")<br /> await page.wait_for_load_state()<br /> await page.click('input[name="query"]')<br /> await page.fill('input[name="query"]', wechat_name)<br /> await asyncio.sleep(1)<br /> await page.click("text=搜公众号")<br /> await page.wait_for_load_state()<br />
从上面的代码可以看出,playwright的用法和selenium很相似,通过自动化用户操作网站的过程可以得到对应的数据。
ruia 采集 模块实现
ruia 是一个轻量级的 Python 异步爬虫框架。因为它比较轻量级,所以我也把它的代码看成了下一篇文章文章的内容。
它的用法有点像scrapy。需要定义一个继承自ruia.Spider的子类,然后调用start方法实现对目标网站的请求,然后ruia会自动调用parse方法解析网页内容。我们来看看具体的代码,首先是入口逻辑:
def run(collect_config: dict):<br /> """微信公众号文章抓取爬虫<br /><br /> Args:<br /> collect_config (dict, optional): 采集器配置<br /> """<br /> s_nums = 0<br /> wechat_list = collect_config["wechat_list"]<br /> delta_time = collect_config.get("delta_time", 5)<br /> for wechat_name in wechat_list:<br /> SGWechatSpider.wechat_name = wechat_name<br /> SGWechatSpider.request_config = {<br /> "RETRIES": 3,<br /> "DELAY": delta_time,<br /> "TIMEOUT": 20,<br /> }<br /> sg_url = f"https://weixin.sogou.com/weixin?type=1&query={wechat_name}&ie=utf8&s_from=input&_sug_=n&_sug_type_="<br /> SGWechatSpider.start_urls = [sg_url]<br /> try:<br /> # 启动爬虫<br /> SGWechatSpider.start(middleware=ua_middleware)<br /> s_nums += 1<br /> except Exception as e:<br /> err_msg = f" 公众号->{wechat_name} 文章更新失败! 错误信息: {e}"<br /> LOGGER.error(err_msg)<br /><br /> msg = f" 微信公众号文章更新完毕({s_nums}/{len(wechat_list)})!"<br /> LOGGER.info(msg)<br />
上面代码中,爬虫是通过SGWechatSpider.start(middleware=ua_middleware)启动的,它会自动请求start_urls的url,然后回调parse方法。parse方法的代码如下:
async def parse(self, response: Response):<br /> """解析公众号原始链接数据"""<br /> html = await response.text()<br /> item_list = []<br /> async for item in SGWechatItem.get_items(html=html):<br /> if item.wechat_name == self.wechat_name:<br /> item_list.append(item)<br /> yield self.request(<br /> url=item.latest_href,<br /> metadata=item.results,<br /> # 下一个回调方法<br /> callback=self.parse_real_wechat_url,<br /> )<br /> break<br />
在parse方法中,通过self.request请求一个新的url,然后回调self.parse_real_wechat_url方法。一切都与scrapy如此相似。
至此采集模块的阅读就结束了(代码中还涉及到一些简单的数据清洗,本文不做讨论),没有特别复杂的部分,从代码来看,作者没被派去做反爬逻辑处理,搜狗微信没反爬?
广告文章标识
然后看广告文章的识别,琉璃还是会采集为广告文章,经过采集,在文章处理模块中,广告文章标记出来,先分析广告文章标记的入口逻辑,回到liuli_schedule.py的run_lili_task方法,注意进程的逻辑(文章处理模块) ,代码如下:
LOGGER.info("处理器(after_collect): 开始执行!")<br /> for each in processor_conf["after_collect"]:<br /> func_name = each.pop("func")<br /> # 注入查询条件<br /> each.update(basic_filter)<br /> LOGGER.info(f"处理器(after_collect): {func_name} 正在执行...")<br /> processor_dict[func_name](**each)<br /> LOGGER.info("处理器(after_collect): 执行完毕!")<br />
从上面的代码可以看出,处理器的主要逻辑是processor_dict字典中的方法。字典的定义路径是 src/processor/__init__.py。代码如下:
from .rss_utils import to_rss<br />from .text_utils import (<br /> ad_marker,<br /> extract_core_html,<br /> extract_keyword_list,<br /> html_to_text_h2t,<br /> str_replace,<br />)<br /><br />processor_dict = {<br /> "to_rss": to_rss,<br /> "ad_marker": ad_marker,<br /> "str_replace": str_replace,<br />}<br />
ad_marker 方法是一种识别文章 是否是广告文章 的方法。其实写的有点绕。核心逻辑是计算当前文章和采集到广告文章构造词频向量的余弦值,判断余弦值的大小判断是否为广告文章,简单看一下相关逻辑。
在ad_marker方法中会调用model_predict_factory方法,传入当前文章的标题、文章的内容和分类的cos_value。相关代码如下(已清理上代码,只显示所需的部分):
def ad_marker(<br /> cos_value: float = 0.6,<br /> is_force=False,<br /> basic_filter={},<br /> **kwargs,<br />):<br /> # 基于余弦相似度<br /> cos_model_resp = model_predict_factory(<br /> model_name="cos",<br /> model_path="",<br /> input_dict={"text": doc_name + doc_keywords, "cos_value": cos_value},<br /> # input_dict={"text": doc_name, "cos_value": Config.COS_VALUE},<br /> ).to_dict()<br />
cos_value为0.6,即如果计算出当前文章与广告文章的余弦值大于等于0.6,则认为当前文章为广告文章,其最终预测逻辑在classifier/model_base/cos_model_loader.py的predict方法中,代码如下:
def predict(self, text: str, cos_value: float = 0.8) -> dict:<br /> """<br /> 对文本相似度进行预测<br /> :param text: 文本<br /> :param cos_value: 阈值 默认是0.9<br /> :return:<br /> """<br /> max_pro, result = 0.0, 0<br /> for each in self.train_data:<br /> # 余弦值具体的运算逻辑<br /> cos = CosineSimilarity(self.process_text(text), each)<br /> res_dict = cos.calculate()<br /> value = res_dict["value"]<br /> # 大于等于cos_value,就返回1,则表示当前的文章是广告文章<br /> result = 1 if value >= cos_value else 0<br /> max_pro = value if value > max_pro else max_pro<br /> if result == 1:<br /> break<br /><br /> return {"result": result, "value": max_pro}<br />
余弦值的具体操作逻辑在CosineSimilarity的calculate方法中,都是和数学有关的,我就不看了。核心是判断当前文章与广告文章的相似度。可以通过TFIDF、文本聚类等算法来完成,相关库几行代码就可以搞定(所以感觉就写在这里)。
剩下的可以参考逻辑结束
琉璃是一个不错的学习项目,下一部分文章,一起来学习ruia Python轻量级异步爬虫框架的代码。
最新信息:短视频seo霸屏全网,采集精准实时有效数据私信截流-运营解决方案
4.同一城市是否有视频和QR码流行?
在同一个城市,我们会直接给你一个完整的系统,和我们之前的客人一起刷一个模式,一个触摸,这个问题不用说了
5. 如何编辑短视频搜索引擎优化?您使用软件吗?我发布了一个,发布后我看不到它,这是怎么回事?
目前,系统中有编辑短视频的功能。如果编辑视频软件,可以使用剪辑PR来编辑那些,操作非常简单。发送后我看不到,一个很大的原因是因为短视频平台还没有获得批准
6. 我昨天发布了视频,我想看看几天是否有任何查询,我可以只看查询吗?
如果您发布视频,您通常会在第二天看到查询。
7. 我已随意测试了一个产品,如何删除查询列表中有关该产品的查询信息?
目前,查询暂时不支持删除,此功能将尽快更新
8.抖音搜索索引使用哪些工具?
我们可以直接在软件上搜索
9. 为客户开立账户时,对账户和密码有什么特殊要求吗?例如,特殊情况等
目前对开户的账号密码没有特殊要求
10.关键词排名信息从何而来?
关键词排名信息相当于在抖音中搜索单词以查找我们发布视频的位置。
排名前10位
11.查询信息会是一两年前吗?
查询信息将
被过滤,一般控制在查询的一个月内,并且我们对工作的实时数据进行监控,我们得到的查询信息会更准确
12. 手动监控是否有数量限制
目前对手动监视器的数量没有限制
13.客户开始选择主版本,如果以后想升级,该如何操作?我可以升级充值点差吗?
我们不像什么主销版本外面给你多少条查询信息什么的,我们直接给你最高层,你想要的查询信息随心所欲,所以没有这样的问题,当然,你得是代理还是独立建设另一个讨论
14.过去两天测试的账号,大部分的查询信息都是来自一个账号,就是我们的检索系统关注的行业本身并不多,后期需要客户手动监控视频来增加查询次数,如果是这样,根本就不需要使用这个系统, 客户自己通过大V大账号抖音查找评论
如果我们自动监控,我们会根据搜索用户,然后找到用户的视频,然后获取视频下方的所有评论来执行此操作。这样,如果用户的视频下有更多的意向数据,一个账号下可能会有短期信息。如果客户分发完成,将有其他用户的信息
15. 手动监控,如果ABC的多个用户同时监控一个抖音账户,分发查询的信息规则是什么?它们会是相同的数据吗?
可能有相同的数据,但不太可能
16.为什么我的查询没有更新?
这
查询信息将在提交后0.5-2小时内更新,如果暂时没有更新,请稍等片刻,可以去外面看很多小同行,基本上是24小时,我们会早点更新新版本
17. 提交视频发布后出现参数错误怎么办?
一般参数错误是由于发布的视频标题数量大于 55 字造成的
数量
主题文本 + #的数量 + 空格数 + 短视频帐户名称数 @ 和 @ 必须小于 55 字
18.上传的视频一直无法审核,会发生什么?
如果审核失败,一般是由于短视频平台认定其不适合公开,一般包括:处理、视频模糊、内容原因等。处理的可能性最大,所以在售后培训时多听,在我们公关技术培训时多听,并配合我们的模板去做,这不会发生
19. 本地客户如何准确掌握?例如,成都*敏*感*词*只在成都有客户进行婚纱摄影
本地用户在监控时可以添加区域字进行监控,如成都*敏*感*词*、成都婚纱摄影
20. 我可以自己控制数据的分发吗?例如,如果我卖出 500 个查询,而当发出 500 个查询时,我删除了帐户的采集
可以授予您此权限
21.没有公司可以推动吗?我必须上传我的营业执照吗?
?
目前没有可以晋升的公司
22.膜结构是否属于建材和家居装饰的范畴?该产品是膜结构车棚停车场也属于家装建材类
膜结构建材家居装修(这种行业问题直接在百度上搜索就很清楚了)说实话,我没有做过这个,也不太了解
23. 我已经监控了这个词,如果我添加它,为什么我不能添加它?
目前,监控词不能重复添加,一个单词监控一次和监控两次具有相同的效果。
24. 我是操作最终客户的操作模式还是教客户自己操作?
如果我们这样做,我们可以尝试向客户收取更换操作的费用,这通常称为代理操作
25.合作结束后,客户的同城流行二维码还能使用吗?以前发布的视频会产生影响吗?
这
同一城市爆竹的二维码不能使用,但之前发布的视频仍将存在。
26.电脑可以接收查询信息,但手机无法接收。
手机绑定不经常使用,一段时间不看后,它们会停止推送
27.发布的视频由手机品牌加水印,没有违规,评论或失败,是平台的问题吗?
如果审核失败,一般是由于短视频平台认定其不适合公开,一般包括:处理、视频模糊、内容原因等。处理的可能性最大,建议在售后组多询问技术,或者查看邮箱中的信息
28. 未通过审核的视频是否会计入套餐中指定的视频数量?
无法上传的内容将不会计入计划中指定的数字,只需单击“重新发布”即可
29.很多客户不做全国,做周边或本地,因为物流成本等,有没有办法做到这一点
监视
本地单词,可以添加本地单词进行监视。同时,正在增加本地呼叫的服务功能
30.经过所有检测后,竞争视频的总数会增加吗?
如果是,竞争视频的数量将会增加
一种新的添加剂,如果没有新的添加剂,它不会增加
31. 检测 929 个视频,潜在客户总数如何达到 9 个?如果检测到所有视频或 9 个视频,该怎么办?
在这种情况下,通常是由于行业内缺乏相关数据。建议更换监控字
32. 如果产品被重新测试,之前发布的查询是否会被重新分发?
在同一帐户中,它不会重复分发。建议不要重复关键词监视。如果监控关键词数量相对较少,建议更换关键词
33. 如果有来自已经测试过视频的新客户的消息,它会自动再次转到采集以增加挖矿次数吗?
视频那已经测试过了,如果有新的客户留言,只要他还在监控我们采集啊,这都是说,我们正在监控同行的工作采集最新的实时新数据
34. 如果潜在客户消息数据相同,采集监控的不同产品是否会重复分发?
监控不同的产品,基本相同的客户消息数据不会出现。也不会有重复的分发
35. 我们在后台发布的视频是否以这些抖音编号发布在作品中?
是的,在后台发布的视频直接发布到绑定到我们平台的抖音帐户
36. 我能否了解客户在后台发布的视频的发布位置?
是的,在后台视频管理中,发布视频,点击查看按钮,可以直接看到它
37.有些客户想先测试,我会有一个测试账号,怎么开个测试
现在不建议先给顾客试用,因为很多顾客试图先抓取一堆数据来实现白人的心理,它
不建议收费试炼,可以远程给他演示,或者不敢相信,叫他检查一下,网上白妓太多了
来自“ITPUB博客”,链接:,如果您需要转载,请注明出处,否则您将承担法律责任。