解读:《文章推荐系统》系列之收集用户行为数据

优采云 发布时间: 2022-11-01 10:45

  解读:《文章推荐系统》系列之收集用户行为数据

  在上一篇文章中,我们完成了业务数据的同步。推荐系统中的另一个重要数据是用户行为数据。可以说,用户行为数据是推荐系统的基石。,所以接下来,我们需要将用户的行为数据同步到推荐系统数据库中。

  在文章推荐系统中,用户行为包括曝光、点击、停留、采集、分享等,所以我们这里定义的用户行为数据的字段包括:发生时间(actionTime)、停留时间(readTime)、通道ID(channelId)、事件名称(action)、用户ID(userId)、文章 ID(articleId)、算法ID(algorithmCombine),json格式,如下图

  # 曝光的参数<br />{"actionTime":"2019-04-10 18:15:35","readTime":"","channelId":0,"param":{"action": "exposure", "userId": "2", "articleId": "[18577, 14299]", "algorithmCombine": "C2"}}<br /><br /># 对文章触发行为的参数<br />{"actionTime":"2019-04-10 18:15:36","readTime":"","channelId":18,"param":{"action": "click", "userId": "2", "articleId": "18577", "algorithmCombine": "C2"}}<br />{"actionTime":"2019-04-10 18:15:38","readTime":"1621","channelId":18,"param":{"action": "read", "userId": "2", "articleId": "18577", "algorithmCombine": "C2"}}<br />{"actionTime":"2019-04-10 18:15:39","readTime":"","channelId":18,"param":{"action": "click", "userId": "1", "articleId": "14299", "algorithmCombine": "C2"}}<br />{"actionTime":"2019-04-10 18:15:39","readTime":"","channelId":18,"param":{"action": "click", "userId": "2", "articleId": "14299", "algorithmCombine": "C2"}}<br />{"actionTime":"2019-04-10 18:15:41","readTime":"914","channelId":18,"param":{"action": "read", "userId": "2", "articleId": "14299", "algorithmCombine": "C2"}}<br />{"actionTime":"2019-04-10 18:15:47","readTime":"7256","channelId":18,"param":{"action": "read", "userId": "1", "articleId": "14299", "algorithmCombine": "C2"}}<br />

  用户离线行为数据

  由于用户行为数据规模巨大,通常每天更新一次,用于离线计算。首先在Hive中创建用户行为数据库profile和用户行为表user_action,按日期设置分区,匹配json格式

  -- 创建用户行为数据库<br />create database if not exists profile comment "use action" location '/user/hive/warehouse/profile.db/';<br />-- 创建用户行为信息表<br />create table user_action<br />(<br /> actionTime STRING comment "user actions time",<br /> readTime STRING comment "user reading time",<br /> channelId INT comment "article channel id",<br /> param map comment "action parameter"<br />)<br /> COMMENT "user primitive action"<br /> PARTITIONED BY (dt STRING) # 按照日期分区<br /> ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' # 匹配json格式<br /> LOCATION '/user/hive/warehouse/profile.db/user_action';<br />

  通常用户行为数据存储在应用服务器的日志文件中。我们可以使用 Flume 监控应用服务器上的日志文件,并将用户行为数据采集到 Hive 的 user_action 表对应的 HDFS 目录中。Flume配置如下

  a1.sources = s1<br />a1.sinks = k1<br />a1.channels = c1<br /><br />a1.sources.s1.channels= c1<br />a1.sources.s1.type = exec<br />a1.sources.s1.command = tail -F /root/logs/userClick.log<br />a1.sources.s1.interceptors=i1 i2<br />a1.sources.s1.interceptors.i1.type=regex_filter<br />a1.sources.s1.interceptors.i1.regex=\\{.*\\}<br />a1.sources.s1.interceptors.i2.type=timestamp<br /><br /># c1<br />a1.channels.c1.type=memory<br />a1.channels.c1.capacity=30000<br />a1.channels.c1.transactionCapacity=1000<br /><br /># k1<br />a1.sinks.k1.type=hdfs<br />a1.sinks.k1.channel=c1<br />a1.sinks.k1.hdfs.path=hdfs://192.168.19.137:9000/user/hive/warehouse/profile.db/user_action/%Y-%m-%d<br />a1.sinks.k1.hdfs.useLocalTimeStamp = true<br />a1.sinks.k1.hdfs.fileType=DataStream<br />a1.sinks.k1.hdfs.writeFormat=Text<br />a1.sinks.k1.hdfs.rollInterval=0<br />a1.sinks.k1.hdfs.rollSize=10240<br />a1.sinks.k1.hdfs.rollCount=0<br />a1.sinks.k1.hdfs.idleTimeout=60<br />

  编写 Flume 启动脚本 collect_click.sh

  #!/usr/bin/env bash<br /><br />export JAVA_HOME=/root/bigdata/jdk<br />export HADOOP_HOME=/root/bigdata/hadoop<br />export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin<br /><br />/root/bigdata/flume/bin/flume-ng agent -c /root/bigdata/flume/conf -f /root/bigdata/flume/conf/collect_click.conf -Dflume.root.logger=INFO,console -name a1<br />

  Flume自动生成目录后,需要手动关联Hive分区才能加载到数据中

  

  alter table user_action add partition (dt='2019-11-11') location "/user/hive/warehouse/profile.db/user_action/2011-11-11/"<br />

  用户实时行为数据

  为了提高推荐的实时性,我们还需要采集用户的实时行为数据进行在线计算。这里使用 Flume 将日志采集到 Kafka 中,在线计算任务可以从 Kafka 中读取实时的用户行为数据。首先,启动zookeeper并将其作为守护进程运行

  /root/bigdata/kafka/bin/zookeeper-server-start.sh -daemon /root/bigdata/kafka/config/zookeeper.properties<br />

  打开卡夫卡

  /root/bigdata/kafka/bin/kafka-server-start.sh /root/bigdata/kafka/config/server.properties<br /><br /># 开启消息生产者<br />/root/bigdata/kafka/bin/kafka-console-producer.sh --broker-list 192.168.19.19092 --sync --topic click-trace<br /># 开启消费者<br />/root/bigdata/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.19.137:9092 --topic click-trace<br />

  修改Flume的日志采集配置文件,添加c2和k2,采集日志数据到Kafka

  a1.sources = s1<br />a1.sinks = k1 k2<br />a1.channels = c1 c2<br /><br />a1.sources.s1.channels= c1 c2<br />a1.sources.s1.type = exec<br />a1.sources.s1.command = tail -F /root/logs/userClick.log<br />a1.sources.s1.interceptors=i1 i2<br />a1.sources.s1.interceptors.i1.type=regex_filter<br />a1.sources.s1.interceptors.i1.regex=\\{.*\\}<br />a1.sources.s1.interceptors.i2.type=timestamp<br /><br /># c1<br />a1.channels.c1.type=memory<br />a1.channels.c1.capacity=30000<br />a1.channels.c1.transactionCapacity=1000<br /><br /># c2<br />a1.channels.c2.type=memory<br />a1.channels.c2.capacity=30000<br />a1.channels.c2.transactionCapacity=1000<br /><br /># k1<br />a1.sinks.k1.type=hdfs<br />a1.sinks.k1.channel=c1<br />a1.sinks.k1.hdfs.path=hdfs://192.168.19.137:9000/user/hive/warehouse/profile.db/user_action/%Y-%m-%d<br />a1.sinks.k1.hdfs.useLocalTimeStamp = true<br />a1.sinks.k1.hdfs.fileType=DataStream<br />a1.sinks.k1.hdfs.writeFormat=Text<br />a1.sinks.k1.hdfs.rollInterval=0<br />a1.sinks.k1.hdfs.rollSize=10240<br />a1.sinks.k1.hdfs.rollCount=0<br />a1.sinks.k1.hdfs.idleTimeout=60<br /><br /># k2<br />a1.sinks.k2.channel=c2<br />a1.sinks.k2.type=org.apache.flume.supervisorctl<br />我们可以利用supervisorctl来管理supervisor。sink.kafka.KafkaSink<br />a1.sinks.k2.kafka.bootstrap.servers=192.168.19.137:9092<br />a1.sinks.k2.kafka.topic=click-trace<br />a1.sinks.k2.kafka.batchSize=20<br />a1.sinks.k2.kafka.producer.requiredAcks=1<br />

  编写Kafka启动脚本start_kafka.sh

  #!/usr/bin/env bash<br /># 启动zookeeper<br />/root/bigdata/kafka/bin/zookeeper-server-start.sh -daemon /root/bigdata/kafka/config/zookeeper.properties<br /># 启动kafka<br />/root/bigdata/kafka/bin/kafka-server-start.sh /root/bigdata/kafka/config/server.properties<br /># 增加topic<br />/root/bigdata/kafka/bin/kafka-topics.sh --zookeeper 192.168.19.137:2181 --create --replication-factor 1 --topic click-trace --partitions 1<br />

  流程管理

  我们在这里使用 Supervisor 进行流程管理。当进程出现异常时,可以自动重启。Flume流程配置如下

  

  [program:collect-click]<br />command=/bin/bash /root/toutiao_project/scripts/collect_click.sh<br />user=root<br />autorestart=true<br />redirect_stderr=true<br />stdout_logfile=/root/logs/collect.log<br />loglevel=info<br />stopsignal=KILL<br />stopasgroup=true<br />killasgroup=true<br />

  Kafka进程配置如下

  [program:kafka]<br />command=/bin/bash /root/toutiao_project/scripts/start_kafka.sh<br />user=root<br />autorestart=true<br />redirect_stderr=true<br />stdout_logfile=/root/logs/kafka.log<br />loglevel=info<br />stopsignal=KILL<br />stopasgroup=true<br />killasgroup=true<br />

  启动主管

  supervisord -c /etc/supervisord.conf<br />

  启动Kafka消费者并将日志数据写入应用服务器日志文件,Kafka消费者可以采集实时行为数据

  # 启动Kafka消费者<br />/root/bigdata/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.19.137:9092 --topic click-trace<br /><br /># 写入日志数据<br />echo {\"actionTime\":\"2019-04-10 21:04:39\",\"readTime\":\"\",\"channelId\":18,\"param\":{\"action\": \"click\", \"userId\": \"2\", \"articleId\": \"14299\", \"algorithmCombine\": \"C2\"}} >> userClick.log<br /><br /># 消费者接收到日志数据<br />{"actionTime":"2019-04-10 21:04:39","readTime":"","channelId":18,"param":{"action": "click", "userId": "2", "articleId": "14299", "algorithmCombine": "C2"}}<br />

  supervisor常用命令如下

  supervisorctl<br /><br />> status # 查看程序状态<br />> start apscheduler # 启动apscheduler单一程序<br />> stop toutiao:* # 关闭toutiao组程序<br />> start toutiao:* # 启动toutiao组程序<br />> restart toutiao:* # 重启toutiao组程序<br />> update # 重启配置文件修改过的程序<br />

  参考

  ⬇️⬇️⬇️精彩继续⬇️⬇️⬇️

  专业知识:消费者行为分析实训系统

  一、平台概况

  互联网业务数据采集平台是基于智能算法开发的智能采集系统,可实现对采集对象的智能识别。系统不仅可以对采集的数据进行自动化处理,还可以对采集过程中的数据进行清洗,可以快速准确的获取海量网页数据。

  二、主要参数

  1.在采集器首页的输入框中:只能输入一个URL。这里输入网址后,软件会直接跳转到任务编辑界面,用户可以继续操作。

  2. 我的使命

  (1) 导入任务:可以选择添加要导入的文件,在选中的组名下导入。

  (2)创建任务:可以创建新的任务组;您可以添加智能模式任务和流程图模式任务。

  (3)查看所有任务:可以搜索查看所有任务信息,包括任务名称、任务id、创建时间、结束时间、采集结果、状态、操作;可以查看自动导出任务列表信息,包括组名、任务名、自动导出名、状态、导出成功(条)、导出失败(条)、操作。

  3、创建流程图模式:基于人工智能算法,输入URL即可自动识别网页内容和分页,无需配置采集规则,一键式采集数据。

  4、创建流程图模式:可视化流程操作可以根据提示点击网页中的内容生成采集规则,可以模拟任意操作。

  5.任务优先界面

  (1)任务分组:可以对任务进行分组

  (2)任务名称:可以通过三种方式设置任务名称:自定义输入、网页标题、任务组名称_编号

  (3) URL导入:需要采集的URL链接可以通过手动导入、文件导入、量产等方式导入。

  (4) URL 预览:可以预览和查看添加的 URL 链接。

  6、页面类型:在页面类型中,数据可以通过设置列表类型和单页类型为采集。列表类型可以设置为自动识别、手动点击列表、编辑列表xpath。

  7.分页设置:分页设置可以设置分页按钮、瀑布分页、不启用分页。在分页按钮中,可以选择和设置自动识别分页,点击分页按钮,编辑分页xpath。

  

  8. 设置 采集 范围

  (1)设置起始页:可以设置当前页或自定义起始页数

  (2)设置结束页:可以设置下一页或自定义结束页数

  (3)设置跳过项:可以设置每页前后要跳过的数据条数

  (4) 新条件:满足设置条件时停止采集;可以在新条件中添加组织关系、组内关系、字段名、条件、值等信息,完成新组、新条件、删除条件。等等。

  9、数据过滤:可以添加条件设置数据过滤。在数据过滤中,可以完成新建分组、新建条件、删除条件等操作。

  10. 全部清除:可以清除所有选择准备采集的数据。

  11. 深入采集:可以为页面链接设置详情页数据抓取

  12. 字段设置

  (1) 添加字段:可根据数据抓取的需要添加字段

  (2) 可以修改字段名、合并字段、在页面中选择、编辑字段xpath、删除字段等。

  (3)设置值属性:可以对模块进行设置操作,如提取文本、提取内部html、提取外部html、提取链接地址、提取图片等媒体地址、提取输入框内容、下载按钮等。

  (4) 更改为特殊字段:可以设置特殊字段包括采集处的时间、采集处的时间戳、当前网页的url、当前网页的标题,以及当前网页的源代码。

  13. 流程图组件

  (1) 打开网页:当流程图任务创建时,会自动生成一个打开网页组件。该组件作为任务的组件,不能拖拽删除,并且可以编辑修改任务栏的URL。

  (2)点击:点击中的可点击元素可以设置在循环组件中收录分页按钮,在循环组件中依次点击列表中的元素,手动点击该元素;点击方式可设置包括单机、双击;点击新标签等内容操作后是否打开。

  (3) 提取数据:在提取数据时,可以设置停止条件、过滤数据、清除所有字段、下钻采集、添加字段。

  (4) 定时等待:可以设置等待组件的内容

  

  (5) 滚动页面:可以添加滚动到页面底部和滚动一屏高度的滚动方式;您可以设置每次滚动后的等待时间。

  (6) 输入文本:在文本组件中,可以设置输入文本的选择输入框、文本内容条件、组合文本的列、输入后回车。

  (7) 移动鼠标:移动鼠标组件的作用是将鼠标移动到网页中的元素上以显示内容。您可以在组件中设置移动鼠标。

  (8) 下拉框:下拉框组件的功能是针对网页中的下拉框选项,可以在组件中设置选择下拉框,选择单个选项。

  (9)判断:判断组件可以根据不同的条件进行判断,从而进行不同的操作。可以在判断组件中设置判断条件、判断文本、判断范围等内容操作

  (10)循环:循环组件可以进行一些操作,循环模式和选择列表元素可以在组件中进行操作和设置

  (11) 返回:返回组件可以返回上一页。

  (12) 复制:复制组件可以复制页面中的元素内容。

  (13)验证码:可以选择验证码输入框,选择验证码图片,选择验证码提交按钮,选择验证码错误提示,在验证码组件中启用自动编码。

  (14) 跳出循环:跳出循环组件通常与判断组件配合使用,即在满足什么条件或不满足什么条件时提前结束循环。

  14. 开始 采集

  (1) 定时启动:定时启动可以设置循环采集的间隔时间、单次运行时间、开始频率、开始日期、开始时间、停止时间。

  (2)智能策略:智能切换和手动切换的条件可以设置和添加。

  (3) 自动导出:对于自动导出的数据,可以新建任务或删除自动导出。

  (4)文件下载:可以在采集的时候下载文件,可以设置采集下载的文件类型、下载文件的存放路径、文件夹规则选择、文件名规则选择,以及文件等Module操作。

  (5) 加速引擎:可根据引擎情况开启加速引擎。

  (6) 重复数据删除:可以选择或添加重复数据删除条件,并将其执行动作设置为重复数据时跳过继续采集,重复数据时停止任务。

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线