优化的解决方案:基于SQL的实时股票分析
优采云 发布时间: 2020-09-11 08:22基于SQL的实时库存分析
该文章还发布在个人博客上:基于SQL的实时库存分析
概述
这一次,我将基于我们公司的StreamSQL流处理功能及其业务数据,为一家经纪公司共享一个实时流处理案例,以进行实时库存分析。
主要功能是采集
DBF实时股票交易数据并将其写入消息队列。使用StreamSQL组件实时接收消息队列中的数据并进行统计分析。
流处理功能是:
每天4小时保存大约5-6百万次的实时详细数据。计算当前营业额中的最高股票(实际效果与Flush看到的营业额相同)在5分钟内实时计算营业额中的最高股票[标题]
过程架构图
过程架构图[/标题]
由于时间限制,没有进行更多的深入功能。实际上,结合其他经纪数据和实时数据,您可以进行很多有价值的实时分析(例如:离线数据模型训练,实时重库存和模型数据组合分析以提出库存建议),为经纪产品提供基本数据支持。
Stream SQL简介
Transwarp Stream是由Transwarp专门为企业用户创建的流计算引擎。它主要用于实时应用程序场景。例如,金融业需要实时的市场波动预警;运输行业需要实时存储*敏*感*词*数据,并使用图像识别功能在线识别车牌车辆以进行预警等;银行需要在线分析服务,以便及时发现欺诈和其他违规行为;使用复杂物联网的行业,例如机场和风力发电,需要对大量传感器数据进行实时分析和数据挖掘。
在实时性能,吞吐量,高可用性,易用性,安全性和稳定性方面,企业用户通常对流处理产品有极其苛刻的要求。 Transwarp凭借其强大的技术实力和在中国最复杂的流处理案例中的经验,开发了一种Transwarp流流计算引擎,可以满足这些苛刻的要求:
Transwarp流体系*敏*感*词*
有关StreamSQL产品的更多介绍,请参阅Transwarp Stream:业界最强大的SQL支持流计算引擎,能够进行实时数据挖掘
数据采集
实时库存数据交易数据通常存储在DBF文件中。这种格式已经在证券行业中使用了20多年。
数据采集
部分,包括功能:
定期分析dbf文件(判断dbf文件是否已更改)并将其写入Kafka消息队列中,以为后续的streamSQL提供数据
注意,需要引入解析dbf格式的jar包dbf.jar
部分代码如下:
InputStream fis = null;
// 读取文件的输入流
fis = new FileInputStream(path);
// 根据输入流初始化一个DBFReader实例,用来读取DBF文件信息
DBFReader reader = new DBFReader(fis);
reader.setCharactersetName("gbk");
// 调用DBFReader对实例方法得到path文件中字段的个数
int fieldsCount = reader.getFieldCount();
// 取出字段信息
// for (int i = 0; i < fieldsCount; i++) {
// DBFField field = reader.getField(i);
// // logger.info(field.getName() + "\t");
// }
Object[] rowValues;
int num = 1;
String time = null;
A: while ((rowValues = reader.nextRecord()) != null) {
//提出DBF中的当前时间,同时判断文件是否修改
if (num == 1 && rowValues[0].equals("000000")) {
Double t = (Double) rowValues[7];
time = rowValues[1] + "" + t.intValue();
if (now == null) {
now = time;
continue A;
} else if (now.equals(time)) {
break A;
} else {
now = time;
continue A;
}
}
num++;
StringBuffer message = new StringBuffer();
message.append(time + ",");
for (int i = 0; i < rowValues.length; i++) {
message.append(rowValues[i] + ",");
}
logger.info(message.toString());
producer.send(new KeyedMessage(topic, message.toString()));
}
流处理
在流处理部分,使用StreamSQL组件编写SQL以完成操作。
详细保存StreamJOB
由于稍后需要对详细信息进行统计分析,因此数据将基于内存和SSD存储语句存储在列存储的holodesk组件中。
create streamjob holo_detail_stream_job as ("insert into holo_stream_zq_detail select * from stream_demo") JOBPROPERTIES('stream.number.receivers'='4');
StreamJOB 5分钟交易量
根据详细数据,在5分钟内实时计算营业额
create streamjob holo_count_stream_job as ("insert into hb_stream_holo select concat(TDH_TODATE(created,'yyyyMMddHHmmss','yyyyMMdd'),row_number() OVER(ORDER BY cjl desc)),created,hqzqdm,HQZQJC,cjl from (select max(created) as created,hqzqdm,HQZQJC,sum(HQZJCJ*HQCJBS) as cjl from (select created,hqzqdm,HQZQJC,HQZJCJ,HQCJBS from holo_stream_zq_detail union select created,hqzqdm,HQZQJC,HQZJCJ,HQCJBS from stream_demo)holo_stream_zq_detail where TDH_TODATE(created,'yyyyMMddHHmmss','yyyy-MM-dd HH:mm:ss:SSS')>CAST(sysdate-TO_MINUTE_INTERVAL(5) AS STRING) group by hqzqdm,HQZQJC order by cjl desc limit 20) t") JOBPROPERTIES('stream.number.receivers'='4');
start streamjob holo_count_stream_job;
显示计算结果时,仅采集
深圳A股和创业板的数据,并计算当前成交量最大的股票。实际效果与Flush看到的营业额一致。 5分钟内实时计算成交量最高的股票。刷新时没有统计信息,并且参考值很小。基于此,还可以进行实时的大体积分析。
报告显示
借助实时分析结果,结合报表工具的股价图表,实现显示(图中的数据,缺少高低数据)。
摘要
基于StreamSQL对SQL的完全支持以及实时,吞吐量,高可用性和易用性等功能,实时分析变得更加简单。只需执行数据采集
,并根据需要随时调整统计SQL,即可完成实时分析。
当然,这个案例使我对股票有了更深的了解。
参考链接
有关StreamSQL产品的更多介绍,请参阅