解决方案:【实践案例】Databricks 数据洞察在美的暖通与楼宇的应用实践
优采云 发布时间: 2022-11-26 22:19解决方案:【实践案例】Databricks 数据洞察在美的暖通与楼宇的应用实践
作者
美的暖通与建筑事业部高等研究中心智能技术部
美的暖通物联网数据平台建设背景
美的暖通及楼宇事业部(以下简称美的暖通)是美的集团五大板块之一。建筑弱电综合解决方案远销*敏*感*词*200多个国家。目前业务部门设备的数据上云仅停留在数据存储层面,缺乏挖掘数据价值的平台,造成大量数据浪费,并不断消耗存储资源,增加存储成本和维护成本. 另一方面,现有的数据驱动应用缺乏部署平台,难以产生真正的价值。因此,迫切需要一个统一、通用的物联网数据平台来支持设备运行数据的快速分析和建模。
我们物联网数据平台的建设是基于阿里云的Databricks数据洞察全托管Spark产品。下面是整体的业务架构图。在本文后续章节中,我们将分享一些关于物联网数据平台构建技术选型的思考,以及Spark技术栈的应用实践,尤其是Delta Lake场景。
选择Spark & Delta Lake
在数据平台计算引擎层的技术选型上,由于我们的数据团队刚刚成立,前期的架构选型我们做了大量的研究。综合考虑,我们希望选择一个成熟统一的平台:既能支持数据处理,又能支持数据分析场景,也能很好地支持数据科学场景。再加上团队成员在Python和Spark方面的丰富经验,从一开始就针对Spark技术栈。
选择 Databricks Data Studio Delta Lake
通过与阿里云计算平台团队的各种技术交流和实际的概念验证,我们最终选择了阿里云Databricks数据洞察产品。作为Spark引擎的母公司,其商业版的Spark引擎、完全托管的Spark技术栈、统一的数据工程和数据科学等都是我们决定选择Databricks Data Insights的重要原因。
具体来说,Databricks数据洞察提供的核心优势如下:
物联网数据平台总体架构
整体结构如上图所示。
我们访问的物联网数据分为两部分,历史存量数据和实时数据。目前,股票历史数据每天通过Spark SQL从不同的客户关系数据库批量导入Delta Lake表;实时数据通过IoT平台采集到云端Kafka,被Spark Structured Streaming消费后实时写入Delta Lake表。在这个过程中,我们将实时数据和历史数据都下沉到同一个Delta表中。这种批流一体化操作,可以大大简化我们的ETL流程(参考后面的案例部分)。在数据管道的下游,我们连接数据分析和数据科学工作流程。
物联网数据采集:从小数据到大数据
作为物联网场景的典型应用,美的暖通的核心数据来源于物联网终端设备。在整个物联网环境中,分布着无数的终端传感器。从小的角度来看,传感器本身产生的数据属于Small Data(或Little Data)。当所有传感器连接成一个大型物联网网络时,不同传感器产生的数据通过网关连接到云端,最终在云端形成大数据。
在我们的场景中,物联网平台本身会先对不同协议的数据进行解析,通过定制的硬件网络设备将解析后的半结构化JSON数据通过网络发送到云端的Kafka。Cloud Kafka作为整个数据管道的入口。
数据入湖:Delta Lake
物联网场景下的数据具有以下特点:
物联网数据的上述特点给数据处理、数据分析和数据科学带来了诸多挑战。幸运的是,使用 Spark 和 Delta Lake 可以很好地应对这些挑战。Delta Lake提供ACID事务保证,支持数据表增量更新,支持流批同步写入。通过Spark Structed Streaming,物联网时序数据可以实时流入湖中。
下面是Delta Lake经典的三级数据表架构。针对美的暖通物联网数据场景,我们定义了各个层级的数据表如下:
数据分析:临时查询
我们内部基于开源的Superset定制了内部版本的SQL查询和数据可视化平台,通过PyHive连接Databricks data insight Spark Thrift Server服务,可以将SQL提交到集群。商业版的thrift server在易用性和性能方面得到了增强,Databricks Data Insights提供了基于LDAP的用户认证实现,用于JDBC连接安全认证。借助 Superset,数据分析师和数据科学家可以快速高效地对 Delta Lake 表进行数据探索。
数据科学:工作区
建筑能耗预测和设备故障诊断预测是美的暖通物联网大数据平台建设的两大业务目标。在物联网数据管道的下游,需要连接一个机器学习平台。现阶段,为了更快捷方便地支持数据科学场景,我们将Databricks数据洞察集群与阿里云数据开发平台DDC打通。DDC集成了Jupyter Notebook,在数据科学场景下更加友好。通过在 Jupyter 上使用 PySpark,可以在 Databricks 数据洞察集群上运行作业;同时,作业也可以借助 Apache Airflow 进行调度。同时考虑机器学习模型构建、迭代训练、指标检测、部署等基本环节,
典型应用场景引入Delta Lake数据入湖(批流一体化)
使用UDF函数定义流数据写入Delta Lake的Merge规则
%spark
import org.apache.spark.sql._
import io.delta.tables._
// Function to upsert `microBatchOutputDF` into Delta table using MERGE
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
<p>
" />
MERGE INTO delta_{table_name} t
USING updates s
ON s.uuid = t.uuid
WHEN MATCHED THEN UPDATE SET
t.device_id = s.device_id,
t.indoor_temperature =
s.indoor_temperature,
t.ouoor_temperature = s.ouoor_temperature,
t.chiller_temperature =
s.chiller_temperature,
t.electricity = s.electricity,
t.protocal_version = s.protocal_version,
t.dt=s.dt,
t.update_time=current_timestamp()
WHEN NOT MATCHED THEN INSERT
(t.uuid,t.device_id,t.indoor_temperature,t.ouoor_temperature ,t.chiller_temperature
,t.electricity,t.protocal_version,t.dt,t.create_time,t.update_time)
values
(s.uuid,s.device_id,s.indoor_temperature,s.ouoor_temperature,s.chiller_temperature,s.electricity,s.protocal_version
,s.dt,current_timestamp(),current_timestamp())
""")
}</p>
使用 Spark Structured Streaming 将实时流写入 Delta Lake
%spark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
def getquery(checkpoint_dir:String,tableName:String,servers:String,topic:String ) {
var streamingInputDF =
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("minPartitions", "10")
.option("failOnDataLoss", "true")
.load()
<p>
" />
val resDF=streamingInputDF
.select(col("value").cast("string"))
.withColumn("newMessage",split(col("value"), " "))
.filter(col("newMessage").getItem(7).isNotNull)
.select(
col("newMessage").getItem(0).as("uuid"),
col("newMessage").getItem(1).as("device_id"),
col("newMessage").getItem(2).as("indoor_temperature"),
col("newMessage").getItem(3).as("ouoor_temperature"),
col("newMessage").getItem(4).as("chiller_temperature"),
col("newMessage").getItem(5).as("electricity"),
col("newMessage").getItem(6).as("protocal_version")
)
.withColumn("dt",date_format(current_date(),"yyyyMMdd"))
val query = resDF
.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_dir)
.trigger(Trigger.ProcessingTime("60 seconds")) // 执行流处理时间间隔
.foreachBatch(upsertToDelta _) //引用upsertToDelta函数
.outputMode("update")
query.start()
}</p>
数据容灾:深度克隆
由于Delta Lake的数据只对接实时数据,对于股票历史数据,我们使用SparkSQL一次性下沉Delta Lake的表,这样流批处理时只维护一张Delta表,所以我们最初只维护这两个表。对一些数据进行 Merge 操作。同时,为了保证数据的高安全性,我们使用Databricks Deep Clone进行数据容灾,每天会定期更新,维护一张副表进行备份。对于每天新增的数据,使用Deep Clone只会插入新数据,更新需要更新的数据,可以大大提高执行效率。
CREATE OR REPLACE TABLE delta.delta_{table_name}_clone
DEEP CLONE delta.delta_{table_name};
性能优化:OPTIMIZE & Z-Ordering
在流处理场景下,会产生大量的小文件,大量小文件的存在会严重影响数据系统的读取性能。Delta Lake 提供了 OPTIMIZE 命令,可以合并压缩小文件。另外,对于Ad-Hoc查询场景,由于涉及单表多维数据的查询,我们可以通过Delta Lake提供的Z-Ordering机制,有效提升性能。查询性能。这大大提高了读取表的性能。DeltaLake本身提供了Auto Optimize选项,但是会牺牲少量的写入性能,增加数据写入delta表的延迟。相反,执行OPTIMIZE命令不会影响写入的性能,因为Delta Lake本身就支持MVCC,在支持OPTIMIZE的同时并发执行写操作。因此,我们采用定时触发OPTIMIZE执行的方案,每小时通过OPTIMIZE合并小文件,同时执行VACCUM清理过期数据文件:
OPTIMIZE delta.delta_{table_name} ZORDER by device_id, indoor_temperature;
set spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM delta.delta_{table_name} RETAIN 1 HOURS;
另外,对于Ad-Hoc查询场景,由于涉及单表多维数据的查询,我们可以借助Delta Lake提供的Z-Ordering机制,有效提升查询的性能。
总结与展望
基于阿里云Databricks数据洞察产品提供的商业版Spark和Delta Lake技术栈,我们快速搭建了物联网数据处理平台。Databricks数据洞察全托管免运维,商业版引擎的性能优势和计算/存储分离的架构,为我们节省了整体成本。同时,Databricks数据洞察产品所提供的丰富特性也大大提升了我们数据团队的工作效率,为数据分析业务的快速开发和交付奠定了基础。未来,美的暖通希望与阿里云Databricks数据洞察团队合作,输出更多行业领先的物联网场景解决方案。
原文链接
解决方案:k-近邻算法
KNN概览
k最近邻(kNN,k-NearestNei*敏*感*词*or)算法是一种基本的分类和回归方法。我们这里只讨论分类问题中的k近邻算法。
k近邻算法的输入是实例的特征向量,对应特征空间中的点;输出是实例的类别,可以有多个类别。k近邻算法假设给定一个训练数据集,其中的实例类别已经确定。分类时,根据其k个最近邻训练实例的类别,通过多数投票的方式预测一个新实例。因此,k近邻算法没有明确的学习过程。
k近邻算法实际上是利用训练数据集划分特征向量空间,作为其分类的“模型”。k值的选择、距离度量和分类决策规则是k近邻算法的三个基本要素。
KNN场景
电影可以按题材来分类,那么如何区分动作片和爱情片呢?
1. 动作片:多打架 2. 爱情片:多接吻
根据电影中接吻和打斗的次数,使用k近邻算法构造程序,可以自动划分电影的类型。
现在根据上面我们得到的样本集中所有电影与未知电影的距离,按照距离递增排序,可以找到 k 个距离最近的电影。
假定 k=3,则三个最靠近的电影依次是, He's Not Really into Dudes 、 Beautiful Woman 和 California Man。
knn 算法按照距离最近的三部电影的类型,决定未知电影的类型,而这三部电影全是爱情片,因此我们判定未知电影是爱情片。
KNN 原理 KNN 的工作原理假设有一个带标签的样本数据集(训练样本集),其中收录
了每条数据与其类别之间的对应关系。输入没有标签的新数据后,将新数据的每个特征与样本集中数据的相应特征进行比较。计算新数据与样本数据集中每条数据的距离。对所有得到的距离进行排序(从小到大,越小越相似)。取前k(k一般小于等于20)样本数据对应的分类标签。
找到k个数据中出现次数最多的分类标签作为新数据的分类。KNN通俗理解
给定一个训练数据集,对于一个新的输入实例,在训练数据集中找到k个最近的实例,这k个实例中的大部分属于某一类,将输入实例归入该类。
KNN发展历程
收集数据:任何方法
准备数据:距离计算所需要的数值,最好是结构化的数据格式
分析数据:任何方法
训练算法:此步骤不适用于 k-近邻算法
测试算法:计算错误率
使用算法:输入样本数据和结构化的输出结果,然后运行 k-近邻算法判断输入数据分类属于哪个分类,最后对计算出的分类执行后续处理
KNN算法特点
优点:精度高、对异常值不敏感、无数据输入假定
缺点:计算复杂度高、空间复杂度高
适用数据范围:数值型和标称型
KNN项目案例项目案例一:优化交友网站的匹配效果项目概述
海伦使用约会网站寻找约会对象。一段时间后,她发现自己约会过三种类型的人:不喜欢有魅力的*非常有魅力的
她想: 1. 工作日和一般有魅力的人约会 2. 周末有非常有魅力的人 3. 排除不喜欢的人
现在她采集
了约会网站没有记录的数据,这有助于她对匹配进行更多分类。
开发过程
收集数据:提供文本文件
准备数据:使用 Python 解析文本文件
分析数据:使用 Matplotlib 画二维散点图
训练算法:此步骤不适用于 k-近邻算法
测试算法:使用海伦提供的部分数据作为测试样本。
测试样本和非测试样本的区别在于:
测试样本是已经完成分类的数据,如果预测分类与实际类别不同,则标记为一个错误。
使用算法:产生简单的命令行程序,然后海伦可以输入一些特征数据以判断对方是否为自己喜欢的类型。
采集
数据:提供文本文件
Helen 将这些约会对象的数据存储在文本文件 datingTestSet2.txt 中,该文件共有 1000 行。海伦的约会对象主要有以下3个特点:
文本文件数据格式如下:
40920 8.326976 0.953952 3
14488 7.153469 1.673904 2
26052 1.441871 0.805124 1
75136 13.147394 0.428964 1
38344 1.669788 0.134296 1
准备数据:使用 Python 解析文本文件
将文本记录转换为 NumPy 的解析器
def file2matrix(filename):
"""
Desc:
导入训练数据
parameters:
filename: 数据文件路径
return:
数据矩阵 returnMat 和对应的类别 classLabelVector
"""
fr = open(filename)
# 获得文件中的数据行的行数
numberOfLines = len(fr.readlines())
# 生成对应的空矩阵
# 例如:zeros(2,3)就是生成一个 2*3的矩阵,各个位置上全是 0
returnMat = zeros((numberOfLines, 3)) # prepare matrix to return
classLabelVector = [] # prepare labels return
fr = open(filename)
index = 0
for line in fr.readlines():
# str.strip([chars]) --返回移除字符串头尾指定的字符生成的新字符串
line = line.strip()
# 以 '\t' 切割字符串
listFromLine = line.split('\t')
# 每列的属性数据
returnMat[index, :] = listFromLine[0:3]
# 每列的类别数据,就是 label 标签数据
classLabelVector.append(int(listFromLine[-1]))
index += 1
# 返回数据矩阵returnMat和对应的类别classLabelVector
return returnMat, classLabelVector
分析数据:使用 Matplotlib 绘制二维散点图
import matplotlib
import matplotlib.pyplot as plt
fig = plt.figure()
<p>
" />
ax = fig.add_subplot(111)
ax.scatter(datingDataMat[:, 1], datingDataMat[:, 2], 15.0*array(datingLabels), 15.0*array(datingLabels))
plt.show()</p>
下图中利用了矩阵的第一列和第三列属性得到了很好的展示效果,清晰的识别出三个不同的样本分类区域,不同爱好的人有不同的类别区域。
序列号 玩电子游戏所花时间的百分比 年度飞行常客里程数 每周消耗的冰淇淋升数 样本类别 10.84000.5.933020 0001..12
样本 3 和样本 4 之间的距离:
对特征值进行归一化,消除特征之间的量级差异带来的影响
归一化的定义:我是这么认为的,归一化就是把你需要处理的数据(通过一定的算法)限制在你需要的一定范围内。首先,归一化是为了方便后续的数据处理,其次,在程序运行时加速了保正程序的收敛。方法如下: * 线性函数转换,表达式如下:
y=(x-MinValue)/(MaxValue-MinValue)
说明:x、y分别为转换前、后的值,MaxValue、MinValue分别为样本的最大值和最小值。
y=log10(x)
描述:以 10 为底数的对数函数转换。
如图所示:
y=atan(x)*2/PI
如图所示:
在统计学中,归一化的具体作用是总结均匀样本的统计分布。0-1之间的归一化是统计概率分布,-1--+1之间的归一化是统计坐标分布。
def autoNorm(dataSet):
"""
Desc:
归一化特征值,消除特征之间量级不同导致的影响
parameter:
dataSet: 数据集
return:
归一化后的数据集 normDataSet. ranges和minVals即最小值与范围,并没有用到
归一化公式:
Y = (X-Xmin)/(Xmax-Xmin)
其中的 min 和 max 分别是数据集中的最小特征值和最大特征值。该函数可以自动将数字特征值转化为0到1的区间。
"""
# 计算每种属性的最大值、最小值、范围
minVals = dataSet.min(0)
maxVals = dataSet.max(0)
# 极差
ranges = maxVals - minVals
normDataSet = zeros(shape(dataSet))
m = dataSet.shape[0]
# 生成与最小值之差组成的矩阵
normDataSet = dataSet - tile(minVals, (m, 1))
# 将最小值之差除以范围组成矩阵
normDataSet = normDataSet / tile(ranges, (m, 1)) # element wise divide
return normDataSet, ranges, minVals
Training Algorithm:这一步不适用于k近邻算法
由于每次都将测试数据与完整的训练数据进行比较,因此这个过程是不必要的。
测试算法:使用Helen提供的部分数据作为测试样本。如果预测类别与实际类别不同,则将其标记为错误。
针对交友网站的 kNN 分类器测试代码
def datingClassTest():
"""
Desc:
对约会网站的测试方法
parameters:
none
return:
错误数
"""
# 设置测试数据的的一个比例(训练数据集比例=1-hoRatio)
hoRatio = 0.1 # 测试范围,一部分测试一部分作为样本
# 从文件中加载数据
datingDataMat, datingLabels = file2matrix('input/2.KNN/datingTestSet2.txt') # load data setfrom file
# 归一化数据
normMat, ranges, minVals = autoNorm(datingDataMat)
# m 表示数据的行数,即矩阵的第一维
m = normMat.shape[0]
# 设置测试的样本数量, numTestVecs:m表示训练样本的数量
numTestVecs = int(m * hoRatio)
print 'numTestVecs=', numTestVecs
errorCount = 0.0
for i in range(numTestVecs):
# 对数据测试
classifierResult = classify0(normMat[i, :], normMat[numTestVecs:m, :], datingLabels[numTestVecs:m], 3)
print "the classifier came back with: %d, the real answer is: %d" % (classifierResult, datingLabels[i])
if (classifierResult != datingLabels[i]): errorCount += 1.0
print "the total error rate is: %f" % (errorCount / float(numTestVecs))
print errorCount
使用算法:生成一个简单的命令行程序,然后海伦可以输入一些特征数据来判断对方是否是她喜欢的类型。
交友网站预测功能
def clasdifyPerson():
resultList = ['not at all', 'in small doses', 'in large doses']
percentTats = float(raw_input("percentage of time spent playing video games ?"))
ffMiles = float(raw_input("frequent filer miles earned per year?"))
iceCream = float(raw_input("liters of ice cream consumed per year?"))
datingDataMat, datingLabels = file2matrix('datingTestSet2.txt')
<p>
" />
normMat, ranges, minVals = autoNorm(datingDataMat)
inArr = array([ffMils, percentTats, iceCream])
classifierResult = classify0((inArr-minVals)/ranges,normMat,datingLabels, 3)
print "You will probably like this person: ", resultList[classifierResult - 1]</p>
实际运行效果如下:
>>> kNN.classifyPerson()
percentage of time spent playing video games?10
frequent flier miles earned per year?10000
liters of ice cream consumed per year?0.5
You will probably like this person: in small doses
完整代码地址:/apachecn/MachineLearning/blob/master/src/python/2.KNN/kNN.py
项目案例二:手写数字识别系统项目概述
构建一个基于KNN分类器的手写数字识别系统,可以识别0到9的数字。
要识别的数字是存储在文本文件中的黑白图像,颜色和大小相同:宽和高都是32像素*32像素。
开发过程
收集数据:提供文本文件。
准备数据:编写函数 img2vector(), 将图像格式转换为分类器使用的向量格式
分析数据:在 Python 命令提示符中检查数据,确保它符合要求
训练算法:此步骤不适用于 KNN
测试算法:编写函数使用提供的部分数据集作为测试样本,测试样本与非测试样本的
区别在于测试样本是已经完成分类的数据,如果预测分类与实际类别不同,
则标记为一个错误
使用算法:本例没有完成此步骤,若你感兴趣可以构建完整的应用程序,从图像中提取
数字,并完成数字识别,美国的邮件分拣系统就是一个实际运行的类似系统
采集
数据:提供文本文件
目录trainingDigits收录
约2000个示例,每个示例的内容如下图所示,每个数字约有200个样本;目录 testDigits 收录
大约 900 个测试数据。
准备数据:编写函数img2vector(),将图片文本数据转化为分类器使用的向量
将图像文本数据转换为矢量
def img2vector(filename):
returnVect = zeros((1,1024))
fr = open(filename)
for i in range(32):
lineStr = fr.readLine()
for j in range(32):
returnVect[0,32*i+j] = int(lineStr[j])
return returnVect
分析数据:在 Python 命令提示符中检查数据以确保其符合要求
在 Python 命令行中输入以下命令来测试 img2vector 函数并将其与在文本编辑器中打开的文件进行比较:
>>> testVector = kNN.img2vector('testDigits/0_13.txt')
>>> testVector[0,0:31]
array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
>>> testVector[0,31:63]
array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 1., 1., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
训练算法:此步骤不适用于 KNN
由于每次都将测试数据与完整的训练数据进行比较,因此这个过程是不必要的。
测试算法:编写一个函数,使用提供的部分数据集作为测试样本,并在预测类别与实际类别不同时标记错误
def handwritingClassTest():
# 1. 导入训练数据
hwLabels = []
trainingFileList = listdir('input/2.KNN/trainingDigits') # load the training set
m = len(trainingFileList)
trainingMat = zeros((m, 1024))
# hwLabels存储0~9对应的index位置, trainingMat存放的每个位置对应的图片向量
for i in range(m):
fileNameStr = trainingFileList[i]
fileStr = fileNameStr.split('.')[0] # take off .txt
classNumStr = int(fileStr.split('_')[0])
hwLabels.append(classNumStr)
# 将 32*32的矩阵->1*1024的矩阵
trainingMat[i, :] = img2vector('input/2.KNN/trainingDigits/%s' % fileNameStr)
# 2. 导入测试数据
testFileList = listdir('input/2.KNN/testDigits') # iterate through the test set
errorCount = 0.0
mTest = len(testFileList)
for i in range(mTest):
fileNameStr = testFileList[i]
fileStr = fileNameStr.split('.')[0] # take off .txt
classNumStr = int(fileStr.split('_')[0])
vectorUnderTest = img2vector('input/2.KNN/testDigits/%s' % fileNameStr)
classifierResult = classify0(vectorUnderTest, trainingMat, hwLabels, 3)
print "the classifier came back with: %d, the real answer is: %d" % (classifierResult, classNumStr)
if (classifierResult != classNumStr): errorCount += 1.0
print "\nthe total number of errors is: %d" % errorCount
print "\nthe total error rate is: %f" % (errorCount / float(mTest))
Using Algorithm:本例没有做这一步,有兴趣的可以自己搭建一个完整的应用,从图片中提取数字,完成数字识别。美国的邮件分拣系统是一个实际有效的类似系统
完整代码地址:/apachecn/MachineLearning/blob/master/src/python/2.KNN/kNN.py
KNN总结
经过上面的介绍,我们可以知道k近邻算法有3个基本要素:
. 使用的距离是欧几里德距离,但其他距离也是可能的,例如更一般的
距离,或 Minkowski 距离。