优化的解决方案:基于SQL的实时股票分析

优采云 发布时间: 2020-09-11 08:22

  基于SQL的实时库存分析

  该文章还发布在个人博客上:基于SQL的实时库存分析

  

  概述

  这一次,我将基于我们公司的StreamSQL流处理功能及其业务数据,为一家经纪公司共享一个实时流处理案例,以进行实时库存分析。

  主要功能是采集

DBF实时股票交易数据并将其写入消息队列。使用StreamSQL组件实时接收消息队列中的数据并进行统计分析。

  流处理功能是:

  每天4小时保存大约5-6百万次的实时详细数据。计算当前营业额中的最高股票(实际效果与Flush看到的营业额相同)在5分钟内实时计算营业额中的最高股票[标题]

  

  过程架构图

  过程架构图[/标题]

  由于时间限制,没有进行更多的深入功能。实际上,结合其他经纪数据和实时数据,您可以进行很多有价值的实时分析(例如:离线数据模型训练,实时重库存和模型数据组合分析以提出库存建议),为经纪产品提供基本数据支持。

  Stream SQL简介

  Transwarp Stream是由Transwarp专门为企业用户创建的流计算引擎。它主要用于实时应用程序场景。例如,金融业需要实时的市场波动预警;运输行业需要实时存储*敏*感*词*数据,并使用图像识别功能在线识别车牌车辆以进行预警等;银行需要在线分析服务,以便及时发现欺诈和其他违规行为;使用复杂物联网的行业,例如机场和风力发电,需要对大量传感器数据进行实时分析和数据挖掘。

  在实时性能,吞吐量,高可用性,易用性,安全性和稳定性方面,企业用户通常对流处理产品有极其苛刻的要求。 Transwarp凭借其强大的技术实力和在中国最复杂的流处理案例中的经验,开发了一种Transwarp流流计算引擎,可以满足这些苛刻的要求:

  

  Transwarp流体系*敏*感*词*

  有关StreamSQL产品的更多介绍,请参阅Transwarp Stream:业界最强大的SQL支持流计算引擎,能够进行实时数据挖掘

  数据采集

  实时库存数据交易数据通常存储在DBF文件中。这种格式已经在证券行业中使用了20多年。

  数据采集

部分,包括功能:

  定期分析dbf文件(判断dbf文件是否已更改)并将其写入Kafka消息队列中,以为后续的streamSQL提供数据

  注意,需要引入解析dbf格式的jar包dbf.jar

  部分代码如下:

   InputStream fis = null;

// 读取文件的输入流

fis = new FileInputStream(path);

// 根据输入流初始化一个DBFReader实例,用来读取DBF文件信息

DBFReader reader = new DBFReader(fis);

reader.setCharactersetName("gbk");

// 调用DBFReader对实例方法得到path文件中字段的个数

int fieldsCount = reader.getFieldCount();

// 取出字段信息

// for (int i = 0; i < fieldsCount; i++) {

// DBFField field = reader.getField(i);

// // logger.info(field.getName() + "\t");

// }

Object[] rowValues;

int num = 1;

String time = null;

A: while ((rowValues = reader.nextRecord()) != null) {

//提出DBF中的当前时间,同时判断文件是否修改

if (num == 1 && rowValues[0].equals("000000")) {

Double t = (Double) rowValues[7];

time = rowValues[1] + "" + t.intValue();

if (now == null) {

now = time;

continue A;

} else if (now.equals(time)) {

break A;

} else {

now = time;

continue A;

}

}

num++;

StringBuffer message = new StringBuffer();

message.append(time + ",");

for (int i = 0; i < rowValues.length; i++) {

message.append(rowValues[i] + ",");

}

logger.info(message.toString());

producer.send(new KeyedMessage(topic, message.toString()));

}

  流处理

  在流处理部分,使用StreamSQL组件编写SQL以完成操作。

  详细保存StreamJOB

  由于稍后需要对详细信息进行统计分析,因此数据将基于内存和SSD存储语句存储在列存储的holodesk组件中。

  create streamjob holo_detail_stream_job as ("insert into holo_stream_zq_detail select * from stream_demo") JOBPROPERTIES('stream.number.receivers'='4');

  StreamJOB 5分钟交易量

  根据详细数据,在5分钟内实时计算营业额

  create streamjob holo_count_stream_job as ("insert into hb_stream_holo select concat(TDH_TODATE(created,'yyyyMMddHHmmss','yyyyMMdd'),row_number() OVER(ORDER BY cjl desc)),created,hqzqdm,HQZQJC,cjl from (select max(created) as created,hqzqdm,HQZQJC,sum(HQZJCJ*HQCJBS) as cjl from (select created,hqzqdm,HQZQJC,HQZJCJ,HQCJBS from holo_stream_zq_detail union select created,hqzqdm,HQZQJC,HQZJCJ,HQCJBS from stream_demo)holo_stream_zq_detail where TDH_TODATE(created,'yyyyMMddHHmmss','yyyy-MM-dd HH:mm:ss:SSS')>CAST(sysdate-TO_MINUTE_INTERVAL(5) AS STRING) group by hqzqdm,HQZQJC order by cjl desc limit 20) t") JOBPROPERTIES('stream.number.receivers'='4');

start streamjob holo_count_stream_job;

  显示计算结果时,仅采集

深圳A股和创业板的数据,并计算当前成交量最大的股票。实际效果与Flush看到的营业额一致。 5分钟内实时计算成交量最高的股票。刷新时没有统计信息,并且参考值很小。基于此,还可以进行实时的大体积分析。

  

  

  报告显示

  借助实时分析结果,结合报表工具的股价图表,实现显示(图中的数据,缺少高低数据)。

  

  摘要

  基于StreamSQL对SQL的完全支持以及实时,吞吐量,高可用性和易用性等功能,实时分析变得更加简单。只需执行数据采集

,并根据需要随时调整统计SQL,即可完成实时分析。

  当然,这个案例使我对股票有了更深的了解。

  参考链接

  有关StreamSQL产品的更多介绍,请参阅

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线