解决方案:Serverless在游戏运营行业进行数据采集分析的最佳实践
优采云 发布时间: 2022-11-07 16:58解决方案:Serverless在游戏运营行业进行数据采集分析的最佳实践
• 触发器名称:defaultTrigger
• 身份验证方法:匿名(即无需身份验证)
• 请求方法:GET、POST
创建函数后,我们通过在线编辑器编写代码:
# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
HELLO_WORLD = b'Hello world!\n'
def handler(environ, start_response):
logger = logging.getLogger()
context = environ['fc.context']
request_uri = environ['fc.request_uri']
for k, v in environ.items():
if k.startswith('HTTP_'):
# process custom request headers
pass
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
# 接收回传的数据
request_body = environ['wsgi.input'].read(request_body_size)
request_body_str = urllib.parse.unquote(request_body.decode("GBK"))
request_body_obj = json.loads(request_body_str)
logger.info(request_body_obj["action"])
logger.info(request_body_obj["articleAuthorId"])
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return [HELLO_WORLD]
这时候的代码很简单,就是要接收用户的参数,我们可以调用接口进行验证:
您可以在函数的日志查询中看到此调用的日志:
同时,我们还可以查看函数的link trace来分析每一步的调用时间,比如函数接收请求的过程→冷启动(没有活动实例时)→准备代码→执行初始化方法→执行入口函数逻辑:
从调用链接图中可以看出,之前的请求收录了冷启动时间,因为当时没有活动实例,整个过程耗时418毫秒,入口函数代码的实际执行时间为8毫秒。
再次调用该接口时,可以看到直接执行了入口函数的逻辑,因为此时已经有一个实例在运行,整个时间只有2.3毫秒:
2. 处理数据的函数
第一个函数是通过函数计算控制台在界面上创建的,运行环境选择Python3。我们可以在官方文档中查看预设的Python3运行环境中构建了哪些模块,因为第二个功能需要操作Kafka和RDS,所以需要确认对应的模块。
从文档中可以看出,内置模块包括RDS的SDK模块,但没有Kafka的SDK模块。这时候我们需要手动安装Kafka SDK模块,创建功能也会使用另一种方式。
Funcraft
Funcraft是一款支持Serverless应用部署的命令行工具,可以帮助我们方便的管理函数计算、API网关、日志服务等资源。它通过资源配置文件(template.yml)帮助我们进行开发、构建和部署。
所以第二个函数我们需要用Fun来操作,整个操作分为四步:
• 安装有趣的工具。
• 编写template.yml 模板文件来描述函数。
• 安装我们需要的第 3 方依赖项。
• 上传部署功能。
安装乐趣
Fun提供了三种安装方式:
• 通过npm 包管理安装- 适用于所有平台(Windows/Mac/Linux)上预装npm 的开发人员。
• 通过下载二进制文件安装- 适用于所有平台(Windows/Mac/Linux)。
• 通过 Homebrew 包管理器安装——适用于 Mac 平台,更符合 MacOS 开发者习惯。
文例环境是Mac,所以使用npm安装,很简单,一行命令搞定:
sudo npm install @alicloud/fun -g
安装完成后。在控制终端输入fun命令查看版本信息:
$ fun --version
3.6.20
第一次使用fun前,需要执行fun config命令进行配置,按照提示依次配置Account ID、Access Key Id、Secret Access Key、Default Region Name。Account ID 和 Access Key Id 可以在函数计算控制台首页右上角获取:
有趣的配置
? 阿里云账号ID 01
? 阿里云Access Key ID qef6j
? 阿里云Access Key Secret ***UFJG
? 默认地域名称 cn-hangzhou
? 每个 SDK 客户端调用的超时时间(秒) 60
? 每个 SDK 客户端的最大重试次数 3
编写模板.yml
创建一个新目录,并在此目录中创建一个名为 template.yml 的 YAML 文件。该文件主要描述了要创建的函数的配置。说白了,函数计算控制台上配置的配置信息是用 YAML 格式写的。在文件中:
ROSTemplateFormatVersion: '2015-09-01'
Transform: 'Aliyun::Serverless-2018-04-03'
Resources:
FCBigDataDemo:
Type: 'Aliyun::Serverless::Service'
Properties:
Description: 'local invoke demo'
VpcConfig:
VpcId: 'vpc-xxxxxxxxxxx'
VSwitchIds: [ 'vsw-xxxxxxxxxx' ]
SecurityGroupId: 'sg-xxxxxxxxx'
LogConfig:
Project: fcdemo
Logstore: fc_demo_store
dataToKafka:
Type: 'Aliyun::Serverless::Function'
Properties:
Initializer: index.my_initializer
Handler: index.handler
CodeUri: './'
Description: ''
Runtime: python3
我们来解析一下上面文件的核心内容:
• FCBigDataDemo:自定义服务名称。该服务由以下Type属性表示,即Aliyun::Serverless::Service。
• 属性:属性下的属性都是服务的配置项。
• VpcConfig:服务的VPC配置,包括:
VpcId:VPC ID。VSwitchIds:交换机ID,这里是一个数组,可以配置多个交换机。SecurityGroupId:安全组 ID。
• LogConfig:服务绑定日志服务 (SLS) 配置,包括:
项目:日志服务项目。日志存储:日志存储名称。
• dataToKafka:该服务下用户定义的函数名。该函数由以下Type属性表示,即Aliyun::Serverless::Function。
• 属性:属性下的属性都是该功能的配置项。
• Initializer:配置初始化函数。
• Handler:配置入口函数。
• 运行时:函数运行时环境。
目录结构为:
安装第三方依赖
在创建了服务和功能的模板之后,让我们安装我们需要使用的第三方依赖项。在本例的场景中,第二个功能需要用到Kafka SDK,所以可以通过fun工具结合Python包管理工具pip来安装:
有趣的安装 --runtime python3 --package-type pip kafka-python
执行命令后,出现如下提示信息:
此时我们会发现目录下会生成一个.fun文件夹,我们安装的依赖都在这个目录下:
部署功能
现在模板文件写好了,我们需要的Kafka SDK也安装好了,我们需要添加我们的代码文件index.py。代码内容如下:
# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
from kafka import KafkaProducer
producer = None
def my_initializer(context):
logger = logging.getLogger()
logger.info("init kafka producer")
global producer
producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092')
def handler(event, context):
logger = logging.getLogger()
# 接收回传的数据
event_str = json.loads(event)
event_obj = json.loads(event_str)
logger.info(event_obj["action"])
logger.info(event_obj["articleAuthorId"])
# 向Kafka发送消息
global producer
producer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))
producer.close()
return 'hello world'
代码很简单,这里简单分析一下:
• my_initializer:当函数实例被拉起时,会先执行函数,然后再执行handler函数。当函数实例运行时,后续请求不会执行 my_initializer 函数。一般用于各种连接的初始化。初始化Kafka Producer的方法放在这里,避免重复初始化Producer。
• handler:这个函数只有两个逻辑,接收返回的数据和将数据发送到Kafka的指定topic。
让我们使用 fun deploy 命令来部署函数,它做了两件事:
• 根据template.yml 中的配置创建服务和功能。
• 将 index.py 和 .fun 上传到函数中。
登录函数计算控制台,可以看到通过 fun 命令部署的服务和函数:
进入函数,还可以清晰的看到第三方依赖包的目录结构:
3.函数之间的调用
目前,这两个功能都已创建。下面的工作就是在第一个函数接收到数据后,拉起第二个函数向Kafka发送消息。我们只需要对第一个函数进行一些更改:
# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
import fc2
HELLO_WORLD = b'Hello world!\n'
client = None
def my_initializer(context):
logger = logging.getLogger()
logger.info("init fc client")
global client
client = fc2.Client(
endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",
accessKeyID="your_ak",
accessKeySecret="your_sk"
)
def handler(environ, start_response):
logger = logging.getLogger()
context = environ['fc.context']
request_uri = environ['fc.request_uri']
for k, v in environ.items():
if k.startswith('HTTP_'):
# process custom request headers
pass
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
# 接收回传的数据
request_body = environ['wsgi.input'].read(request_body_size)
request_body_str = urllib.parse.unquote(request_body.decode("GBK"))
request_body_obj = json.loads(request_body_str)
logger.info(request_body_obj["action"])
logger.info(request_body_obj["articleAuthorId"])
global client
client.invoke_function(
'FCBigDataDemo',
'dataToKafka',
payload=json.dumps(request_body_str),
headers = {'x-fc-invocation-type': 'Async'}
)
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
<p>
return [HELLO_WORLD]</p>
如上代码所示,对第一个函数的代码做了三处改动:
• 导入函数计算的库:import fc2
• 添加创建函数计算客户端的初始化方法:
def my_initializer(context):
logger = logging.getLogger()
logger.info("init fc client")
global client
client = fc2.Client(
endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",
accessKeyID="your_ak",
accessKeySecret="your_sk"
)
这里需要注意的是,我们在代码中添加初始化方法时,需要在函数配置中指定初始化方法的入口:
• 通过函数计算客户端调用第二个函数:
global client
client.invoke_function(
'FCBigDataDemo',
'dataToKafka',
payload=json.dumps(request_body_str),
headers = {'x-fc-invocation-type': 'Async'}
)
invoke_function 函数有四个参数:
• 第一个参数:调用函数的服务的名称。
• 第二个参数:调用函数的函数名。
• 第三个参数:传递给调用函数的数据。
• 第四个参数:调用第二个函数Request Header 信息。这里x-fc-invocation-type的key用来设置是同步调用还是异步调用。这里 Async 设置为异步调用。
通过这个设置,我们可以验证请求是通过第一个函数提供的HTTP接口发起的→采集数据→调用第二个函数→将数据作为消息传递给Kafka。
使用两个函数的目的
这里有同学可能会有疑问,为什么需要两个函数而不是第一个函数直接向Kafka发送数据呢?我们先来看这张图:
当我们使用异步调用函数时,请求的数据会在函数内部默认放入消息队列进行第一次削峰填谷,然后每个队列会通过对应的函数实例的弹性拉起多个实例函数实例。进行第二次削峰填谷。所以这也是这个架构能够稳定承载大并发请求的核心原因之一。
4.配置卡夫卡
在游戏运营场景中,数据量比较大,所以对Kafka的性能要求比较高。与开源自建相比,使用云上的Kafka节省了大量的运维操作,例如:
• 我们不再需要维护Kafka 集群的各个节点。
• 无需关心主从节点之间的数据同步。
• 可快速动态扩展Kafka集群规格,动态增加topic,动态增加partition数量。
• 完善的指标监控功能和消息查询功能。
一般来说,所有的SLA都被云端覆盖了,我们只需要关注消息发送和消息消费即可。
因此,我们可以打开Kafka激活界面,根据实际场景需要一键激活Kafka实例,激活Kafka后登录控制台,在基本信息中查看Kafka接入点:
• 默认接入点:VPC 内网场景的接入点。
• SSL 接入点:公网场景中的接入点。
您可以将默认接入点配置到函数计算的第二个功能中。
....
producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092')
....
然后点击左侧控制台的Topic Management,创建一个Topic:
将创建的 Topic 配置到函数计算的第二个函数中。
...
# 第一个参数为Topic名称
producer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))
...
上面已经列出了云上 Kafka 的优势,比如动态增加一个主题的分区数,我们可以在主题列表中动态调整一个主题的分区数:
单个主题最多支持360个分区,这是开源自建无法实现的。
接下来,点击控制台左侧的Consumer Group Management,创建一个Consumer Group:
至此,云上的Kafka就配置好了,即Producer可以向刚刚创建的topic发送消息,Consumer可以设置刚刚创建的GID订阅该Topic进行消息的接收和消费。
Flink 卡夫卡消费者
在这种场景下,Kafka 后面往往会跟着 Flink,所以这里简单介绍一下如何在 Flink 中创建 Kafka Consumer 和消费数据。代码片段如下:
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
String kafkaTopic = parameterTool.get("kafka-topic","ikf-demo");
String brokers = parameterTool.get("brokers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092");
Properties kafkaProps = new Properties();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "ikf-demo");
FlinkKafkaConsumer kafka = new FlinkKafkaConsumer(kafkaTopic, new UserBehaviorEventSchema(), kafkaProps);
kafka.setStartFromLatest();
kafka.setCommitOffsetsOnCheckpoints(false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource dataStreamByEventTime = env.addSource(kafka);
以上是构建 Flink Kafka Consumer 并添加 Kafka Source 的代码片段,非常简单。
压力测试验证
至此,整个数据采集的架构已经搭建完成。接下来,我们将通过压力测试来测试整个架构的性能。这里使用阿里云PTS进行压力测试。
创建压力测试场景
打开PTS控制台,点击左侧菜单创建压力测试/创建PTS场景:
在场景配置中,使用第一个函数计算函数暴露的HTTP接口作为串口,如下图所示:
接口配置好之后,我们来配置压力:
• 压力模式:
• 并发模式:指定有多少并发用户同时发出请求。
• RPS 模式:指定每秒有多少请求。
• 增量模式:在压力测量过程中,可以手动调节压力,也可以按百分比自动增加压力。
• 最大并发:有多少虚拟用户可以同时发起请求。
• 增量百分比:如果是自动增量,则在此处按百分比增量。
• 单级持续时间:当未完全达到全压时,每一级梯度的压力保持持续时间。
• 压力测试总持续时间:压力测试的总持续时间。
这里由于资源成本的原因,将并发用户数设置为2500进行验证。
从上图压测的情况来看,TPS已经达到2w上限,549w+请求,99.99%的请求成功,369异常也可以点击查看,都是请求超时造成的的压力测试工具。
总结
至此,整个基于serverless的大数据采集传输架构搭建完成,并进行了压力测试,验证整体性能也不错,整个架构非常简单易懂。这种架构不仅适用于游戏运营行业,其实任何大数据采集传输场景都适用。目前,已经有很多客户在基于serverless架构的生产环境中运行,或者正在对serverless架构进行改造。在途中。
解决方案:模拟手工采集美团网商家的数据采集产品详细介绍-美团商家
模拟手动采集美团商户数据采集软件,可以采集指定城市,指定关键词商户信息,包括姓名、地址、电话、来源网址、等等
美团商户数据采集产品详情
一、软件功能
1.基于美团网公开数据采集。
2.内置数据库保存采集的数据,支持数十万条数据,支持库内去重,即采集到数据库的数据不会被删除重复。
3. 多种采集算法采集更多数据。
4.数据记忆功能商务手机采集器,意外关机或下次开机数据还在。
5、一键导出为CSV、EXCEL、VCF等文件。
6.可将VCF文件导入手机*敏*感*词*,方便快捷。
7.实时采集,而不是查看自己的数据库。
8.支持Win XP、Win7和Win10等操作系统
2.使用帮助
1.手动选择城市商务电话的电话号码软件采集器采集,可以选择多个。
2.软件采集过程中,为了效率,软件界面只显示不超过3000条数据。软件内置数据库保存采集的数据,支持数十万条数据。
3、关于杀毒软件的误报,部分杀毒软件会因为软件保护原因误报误报。如果它被阻止,请允许它。建议使用或金山毒霸。
4.支持导出到VCF文件。VCF文件是标准的手机*敏*感*词*格式文件,可以导入到手机*敏*感*词*中。方法是通过qq将vcf文件传输到手机,点击打开vcf文件,选择用手机*敏*感*词*打开,按照提示导入即可。
5. 搜索关键词最好按行业关键词。一次输入一个单词,不带标点符号。6. 选择城市和关键词后,点击“开始采集”按钮。美团商户数据采集-数据采集软件-顺鑫科技官网|百度地图数据采集器|高德地图数据采集器|抖音快手数据采集器