解决方案:Serverless在游戏运营行业进行数据采集分析的最佳实践

优采云 发布时间: 2022-11-07 16:58

  解决方案:Serverless在游戏运营行业进行数据采集分析的最佳实践

  • 触发器名称:defaultTrigger

  • 身份验证方法:匿名(即无需身份验证)

  • 请求方法:GET、POST

  创建函数后,我们通过在线编辑器编写代码:

  # -*- coding: utf-8 -*-

import logging

import json

import urllib.parse

HELLO_WORLD = b'Hello world!\n'

def handler(environ, start_response):

logger = logging.getLogger()

context = environ['fc.context']

request_uri = environ['fc.request_uri']

for k, v in environ.items():

if k.startswith('HTTP_'):

# process custom request headers

pass

try:

request_body_size = int(environ.get('CONTENT_LENGTH', 0))

except (ValueError):

request_body_size = 0

# 接收回传的数据

request_body = environ['wsgi.input'].read(request_body_size)

request_body_str = urllib.parse.unquote(request_body.decode("GBK"))

request_body_obj = json.loads(request_body_str)

logger.info(request_body_obj["action"])

logger.info(request_body_obj["articleAuthorId"])

status = '200 OK'

response_headers = [('Content-type', 'text/plain')]

start_response(status, response_headers)

return [HELLO_WORLD]

  这时候的代码很简单,就是要接收用户的参数,我们可以调用接口进行验证:

  您可以在函数的日志查询中看到此调用的日志:

  同时,我们还可以查看函数的link trace来分析每一步的调用时间,比如函数接收请求的过程→冷启动(没有活动实例时)→准备代码→执行初始化方法→执行入口函数逻辑:

  从调用链接图中可以看出,之前的请求收录了冷启动时间,因为当时没有活动实例,整个过程耗时418毫秒,入口函数代码的实际执行时间为8毫秒。

  再次调用该接口时,可以看到直接执行了入口函数的逻辑,因为此时已经有一个实例在运行,整个时间只有2.3毫秒:

  2. 处理数据的函数

  第一个函数是通过函数计算控制台在界面上创建的,运行环境选择Python3。我们可以在官方文档中查看预设的Python3运行环境中构建了哪些模块,因为第二个功能需要操作Kafka和RDS,所以需要确认对应的模块。

  从文档中可以看出,内置模块包括RDS的SDK模块,但没有Kafka的SDK模块。这时候我们需要手动安装Kafka SDK模块,创建功能也会使用另一种方式。

  Funcraft

  Funcraft是一款支持Serverless应用部署的命令行工具,可以帮助我们方便的管理函数计算、API网关、日志服务等资源。它通过资源配置文件(template.yml)帮助我们进行开发、构建和部署。

  所以第二个函数我们需要用Fun来操作,整个操作分为四步:

  • 安装有趣的工具。

  • 编写template.yml 模板文件来描述函数。

  • 安装我们需要的第 3 方依赖项。

  • 上传部署功能。

  安装乐趣

  Fun提供了三种安装方式:

  • 通过npm 包管理安装- 适用于所有平台(Windows/Mac/Linux)上预装npm 的开发人员。

  • 通过下载二进制文件安装- 适用于所有平台(Windows/Mac/Linux)。

  • 通过 Homebrew 包管理器安装——适用于 Mac 平台,更符合 MacOS 开发者习惯。

  文例环境是Mac,所以使用npm安装,很简单,一行命令搞定:

  sudo npm install @alicloud/fun -g

  安装完成后。在控制终端输入fun命令查看版本信息:

  $ fun --version

3.6.20

  第一次使用fun前,需要执行fun config命令进行配置,按照提示依次配置Account ID、Access Key Id、Secret Access Key、Default Region Name。Account ID 和 Access Key Id 可以在函数计算控制台首页右上角获取:

  有趣的配置

  ? 阿里云账号ID 01

  ? 阿里云Access Key ID qef6j

  ? 阿里云Access Key Secret ***UFJG

  ? 默认地域名称 cn-hangzhou

  ? 每个 SDK 客户端调用的超时时间(秒) 60

  ? 每个 SDK 客户端的最大重试次数 3

  编写模板.yml

  创建一个新目录,并在此目录中创建一个名为 template.yml 的 YAML 文件。该文件主要描述了要创建的函数的配置。说白了,函数计算控制台上配置的配置信息是用 YAML 格式写的。在文件中:

  ROSTemplateFormatVersion: '2015-09-01'

Transform: 'Aliyun::Serverless-2018-04-03'

Resources:

FCBigDataDemo:

Type: 'Aliyun::Serverless::Service'

Properties:

Description: 'local invoke demo'

VpcConfig:

VpcId: 'vpc-xxxxxxxxxxx'

VSwitchIds: [ 'vsw-xxxxxxxxxx' ]

SecurityGroupId: 'sg-xxxxxxxxx'

LogConfig:

Project: fcdemo

Logstore: fc_demo_store

dataToKafka:

Type: 'Aliyun::Serverless::Function'

Properties:

Initializer: index.my_initializer

Handler: index.handler

CodeUri: './'

Description: ''

Runtime: python3

  我们来解析一下上面文件的核心内容:

  • FCBigDataDemo:自定义服务名称。该服务由以下Type属性表示,即Aliyun::Serverless::Service。

  • 属性:属性下的属性都是服务的配置项。

  • VpcConfig:服务的VPC配置,包括:

  

  VpcId:VPC ID。VSwitchIds:交换机ID,这里是一个数组,可以配置多个交换机。SecurityGroupId:安全组 ID。

  • LogConfig:服务绑定日志服务 (SLS) 配置,包括:

  项目:日志服务项目。日志存储:日志存储名称。

  • dataToKafka:该服务下用户定义的函数名。该函数由以下Type属性表示,即Aliyun::Serverless::Function。

  • 属性:属性下的属性都是该功能的配置项。

  • Initializer:配置初始化函数。

  • Handler:配置入口函数。

  • 运行时:函数运行时环境。

  目录结构为:

  安装第三方依赖

  在创建了服务和功能的模板之后,让我们安装我们需要使用的第三方依赖项。在本例的场景中,第二个功能需要用到Kafka SDK,所以可以通过fun工具结合Python包管理工具pip来安装:

  有趣的安装 --runtime python3 --package-type pip kafka-python

  执行命令后,出现如下提示信息:

  此时我们会发现目录下会生成一个.fun文件夹,我们安装的依赖都在这个目录下:

  部署功能

  现在模板文件写好了,我们需要的Kafka SDK也安装好了,我们需要添加我们的代码文件index.py。代码内容如下:

  # -*- coding: utf-8 -*-

import logging

import json

import urllib.parse

from kafka import KafkaProducer

producer = None

def my_initializer(context):

logger = logging.getLogger()

logger.info("init kafka producer")

global producer

producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092')

def handler(event, context):

logger = logging.getLogger()

# 接收回传的数据

event_str = json.loads(event)

event_obj = json.loads(event_str)

logger.info(event_obj["action"])

logger.info(event_obj["articleAuthorId"])

# 向Kafka发送消息

global producer

producer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))

producer.close()

return 'hello world'

  代码很简单,这里简单分析一下:

  • my_initializer:当函数实例被拉起时,会先执行函数,然后再执行handler函数。当函数实例运行时,后续请求不会执行 my_initializer 函数。一般用于各种连接的初始化。初始化Kafka Producer的方法放在这里,避免重复初始化Producer。

  • handler:这个函数只有两个逻辑,接收返回的数据和将数据发送到Kafka的指定topic。

  让我们使用 fun deploy 命令来部署函数,它做了两件事:

  • 根据template.yml 中的配置创建服务和功能。

  • 将 index.py 和 .fun 上传到函数中。

  登录函数计算控制台,可以看到通过 fun 命令部署的服务和函数:

  进入函数,还可以清晰的看到第三方依赖包的目录结构:

  3.函数之间的调用

  目前,这两个功能都已创建。下面的工作就是在第一个函数接收到数据后,拉起第二个函数向Kafka发送消息。我们只需要对第一个函数进行一些更改:

  # -*- coding: utf-8 -*-

import logging

import json

import urllib.parse

import fc2

HELLO_WORLD = b'Hello world!\n'

client = None

def my_initializer(context):

logger = logging.getLogger()

logger.info("init fc client")

global client

client = fc2.Client(

endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",

accessKeyID="your_ak",

accessKeySecret="your_sk"

)

def handler(environ, start_response):

logger = logging.getLogger()

context = environ['fc.context']

request_uri = environ['fc.request_uri']

for k, v in environ.items():

if k.startswith('HTTP_'):

# process custom request headers

pass

try:

request_body_size = int(environ.get('CONTENT_LENGTH', 0))

except (ValueError):

request_body_size = 0

# 接收回传的数据

request_body = environ['wsgi.input'].read(request_body_size)

request_body_str = urllib.parse.unquote(request_body.decode("GBK"))

request_body_obj = json.loads(request_body_str)

logger.info(request_body_obj["action"])

logger.info(request_body_obj["articleAuthorId"])

global client

client.invoke_function(

'FCBigDataDemo',

'dataToKafka',

payload=json.dumps(request_body_str),

headers = {'x-fc-invocation-type': 'Async'}

)

status = '200 OK'

response_headers = [('Content-type', 'text/plain')]

start_response(status, response_headers)

<p>

return [HELLO_WORLD]</p>

  如上代码所示,对第一个函数的代码做了三处改动:

  • 导入函数计算的库:import fc2

  • 添加创建函数计算客户端的初始化方法:

  def my_initializer(context):

logger = logging.getLogger()

logger.info("init fc client")

global client

client = fc2.Client(

endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",

accessKeyID="your_ak",

accessKeySecret="your_sk"

)

  这里需要注意的是,我们在代码中添加初始化方法时,需要在函数配置中指定初始化方法的入口:

  • 通过函数计算客户端调用第二个函数:

  global client

client.invoke_function(

&#39;FCBigDataDemo&#39;,

&#39;dataToKafka&#39;,

payload=json.dumps(request_body_str),

headers = {&#39;x-fc-invocation-type&#39;: &#39;Async&#39;}

)

  invoke_function 函数有四个参数:

  • 第一个参数:调用函数的服务的名称。

  • 第二个参数:调用函数的函数名。

  • 第三个参数:传递给调用函数的数据。

  • 第四个参数:调用第二个函数Request Header 信息。这里x-fc-invocation-type的key用来设置是同步调用还是异步调用。这里 Async 设置为异步调用。

  通过这个设置,我们可以验证请求是通过第一个函数提供的HTTP接口发起的→采集数据→调用第二个函数→将数据作为消息传递给Kafka。

  使用两个函数的目的

  这里有同学可能会有疑问,为什么需要两个函数而不是第一个函数直接向Kafka发送数据呢?我们先来看这张图:

  当我们使用异步调用函数时,请求的数据会在函数内部默认放入消息队列进行第一次削峰填谷,然后每个队列会通过对应的函数实例的弹性拉起多个实例函数实例。进行第二次削峰填谷。所以这也是这个架构能够稳定承载大并发请求的核心原因之一。

  4.配置卡夫卡

  在游戏运营场景中,数据量比较大,所以对Kafka的性能要求比较高。与开源自建相比,使用云上的Kafka节省了大量的运维操作,例如:

  • 我们不再需要维护Kafka 集群的各个节点。

  • 无需关心主从节点之间的数据同步。

  • 可快速动态扩展Kafka集群规格,动态增加topic,动态增加partition数量。

  • 完善的指标监控功能和消息查询功能。

  一般来说,所有的SLA都被云端覆盖了,我们只需要关注消息发送和消息消费即可。

  因此,我们可以打开Kafka激活界面,根据实际场景需要一键激活Kafka实例,激活Kafka后登录控制台,在基本信息中查看Kafka接入点:

  • 默认接入点:VPC 内网场景的接入点。

  • SSL 接入点:公网场景中的接入点。

  您可以将默认接入点配置到函数计算的第二个功能中。

  ....

producer = KafkaProducer(bootstrap_servers=&#39;XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092&#39;)

....

  然后点击左侧控制台的Topic Management,创建一个Topic:

  将创建的 Topic 配置到函数计算的第二个函数中。

  ...

# 第一个参数为Topic名称

producer.send(&#39;ikf-demo&#39;, json.dumps(event_str).encode(&#39;utf-8&#39;))

...

  上面已经列出了云上 Kafka 的优势,比如动态增加一个主题的分区数,我们可以在主题列表中动态调整一个主题的分区数:

  单个主题最多支持360个分区,这是开源自建无法实现的。

  接下来,点击控制台左侧的Consumer Group Management,创建一个Consumer Group:

  至此,云上的Kafka就配置好了,即Producer可以向刚刚创建的topic发送消息,Consumer可以设置刚刚创建的GID订阅该Topic进行消息的接收和消费。

  Flink 卡夫卡消费者

  在这种场景下,Kafka 后面往往会跟着 Flink,所以这里简单介绍一下如何在 Flink 中创建 Kafka Consumer 和消费数据。代码片段如下:

  final ParameterTool parameterTool = ParameterTool.fromArgs(args);

String kafkaTopic = parameterTool.get("kafka-topic","ikf-demo");

String brokers = parameterTool.get("brokers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092");

Properties kafkaProps = new Properties();

kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "ikf-demo");

FlinkKafkaConsumer kafka = new FlinkKafkaConsumer(kafkaTopic, new UserBehaviorEventSchema(), kafkaProps);

kafka.setStartFromLatest();

kafka.setCommitOffsetsOnCheckpoints(false);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource dataStreamByEventTime = env.addSource(kafka);

  以上是构建 Flink Kafka Consumer 并添加 Kafka Source 的代码片段,非常简单。

  压力测试验证

  至此,整个数据采集的架构已经搭建完成。接下来,我们将通过压力测试来测试整个架构的性能。这里使用阿里云PTS进行压力测试。

  创建压力测试场景

  打开PTS控制台,点击左侧菜单创建压力测试/创建PTS场景:

  在场景配置中,使用第一个函数计算函数暴露的HTTP接口作为串口,如下图所示:

  接口配置好之后,我们来配置压力:

  • 压力模式:

  • 并发模式:指定有多少并发用户同时发出请求。

  • RPS 模式:指定每秒有多少请求。

  • 增量模式:在压力测量过程中,可以手动调节压力,也可以按百分比自动增加压力。

  • 最大并发:有多少虚拟用户可以同时发起请求。

  • 增量百分比:如果是自动增量,则在此处按百分比增量。

  • 单级持续时间:当未完全达到全压时,每一级梯度的压力保持持续时间。

  • 压力测试总持续时间:压力测试的总持续时间。

  这里由于资源成本的原因,将并发用户数设置为2500进行验证。

  从上图压测的情况来看,TPS已经达到2w上限,549w+请求,99.99%的请求成功,369异常也可以点击查看,都是请求超时造成的的压力测试工具。

  总结

  至此,整个基于serverless的大数据采集传输架构搭建完成,并进行了压力测试,验证整体性能也不错,整个架构非常简单易懂。这种架构不仅适用于游戏运营行业,其实任何大数据采集传输场景都适用。目前,已经有很多客户在基于serverless架构的生产环境中运行,或者正在对serverless架构进行改造。在途中。

  解决方案:模拟手工采集美团网商家的数据采集产品详细介绍-美团商家

  模拟手动采集美团商户数据采集软件,可以采集指定城市,指定关键词商户信息,包括姓名、地址、电话、来源网址、等等

  美团商户数据采集产品详情

  一、软件功能

  1.基于美团网公开数据采集。

  2.内置数据库保存采集的数据,支持数十万条数据,支持库内去重,即采集到数据库的数据不会被删除重复。

  3. 多种采集算法采集更多数据。

  

  4.数据记忆功能商务手机采集器,意外关机或下次开机数据还在。

  5、一键导出为CSV、EXCEL、VCF等文件。

  6.可将VCF文件导入手机*敏*感*词*,方便快捷。

  7.实时采集,而不是查看自己的数据库。

  8.支持Win XP、Win7和Win10等操作系统

  

  2.使用帮助

  1.手动选择城市商务电话的电话号码软件采集器采集,可以选择多个。

  2.软件采集过程中,为了效率,软件界面只显示不超过3000条数据。软件内置数据库保存采集的数据,支持数十万条数据。

  3、关于杀毒软件的误报,部分杀毒软件会因为软件保护原因误报误报。如果它被阻止,请允许它。建议使用或金山毒霸。

  4.支持导出到VCF文件。VCF文件是标准的手机*敏*感*词*格式文件,可以导入到手机*敏*感*词*中。方法是通过qq将vcf文件传输到手机,点击打开vcf文件,选择用手机*敏*感*词*打开,按照提示导入即可。

  5. 搜索关键词最好按行业关键词。一次输入一个单词,不带标点符号。6. 选择城市和关键词后,点击“开始采集”按钮。美团商户数据采集-数据采集软件-顺鑫科技官网|百度地图数据采集器|高德地图数据采集器|抖音快手数据采集器

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线