解决方案:问我,问我社区,问我学院,专注软硬件开发,测试和运维平台技术文章分享

优采云 发布时间: 2022-12-15 04:41

  解决方案:问我,问我社区,问我学院,专注软硬件开发,测试和运维平台技术文章分享

  交易所安全测试-信息采集一、概述

  对于所有与安全相关的测试,信息采集是非常重要和必要的第一步。有时非常全面和完善的信息采集甚至会占渗透测试总工程量的70%到80%。后续工作节省了大量能源,提供了便利。数字货币交易所的安全测试也是如此。信息采集的第一步至关重要。本文将展示零时科技安全团队多年攻防经验,以及大量交易所客户真实案例。虽然我们对此知之甚少,但我们可以通过案例了解数字货币交易所在安全测试过程中有哪些信息可供黑客采集。使用及其造成的危害。

  2.测试清单

  信息采集清单

  三、案例分析

  关于信息采集,众说纷纭,甚至有人说信息采集是考试中最没用的部分。诚然,并不是所有的信息都是有效的,都可以利用的,但有一部分确实是在情况不佳的时候可以利用的。,从而再次找到新的突破口。

  以下案例将揭示信息采集阶段如何在测试中为整个测试过程做出贡献。

  服务器真实IP发现

  开启CDN后,网站会根据用户所在位置访问CDN节点服务器,不会直接访问源服务器。由于CDN节点的阻断保护,无论服务器被渗透还是DD0S攻击,攻击的目标都是CDN节点,可以更好的保护服务器的安全。

  在黑客攻击过程中找到目标的真实IP地址非常重要。攻击者可以通过各种方式绕过保护找到服务器的真实IP地址。最常见的方法是通过查询历史DNS记录来获取服务器的真实IP。直接通过真实IP绕过防护,进行端口扫描、服务指纹识别,绕过常规Web安全防护,扩大攻击面。

  下图是通过DNS记录得到的某交易所的真实IP:

  

  目标子域检测

  子域检测是查找一个或多个域的子域的过程。这是信息采集阶段的重要组成部分。子域检测可以帮助我们在渗透测试中发现更多的服务,这会增加发现漏洞的可能性,并且发现一些被遗忘的用户较少的子域,运行在其上的应用程序可能会导致我们发现关键漏洞。

  检测子域的方法有很多,例如利用DNS域传输漏洞、检查HTTPS证书、枚举挖掘等。至于交易所后台的发现,经过大量测试,发现交易所的部分后台会隐藏在其二级域名下,以确保安全。

  下图为某交易所后台登录界面,其子域名为admin的MD5:

  从某种意义上说,后台与主站分离增加了管理后台被攻击者发现的成本,但也无法避免自身缺陷带来的安全问题。因此,在保证隐蔽性的前提下,管理后台可以使用白名单IP访问限制、强密码、手机令牌等更加安全的登录方式。

  API接口信息泄露

  API的使用频率越来越高,占比也越来越大。所谓“能力越大,责任越大”。安全的API使用固然可以带来极大的便利,但是一旦API安全出现问题,就会带来严重的问题。后果将是毁灭性的。在测试的第一步,在信息采集领域,我们首先能接触到的是API的具体参数等信息的保密状态。

  零时科技安全团队在对某交易所进行安全测试时,发现该交易所的代码是外包公司编写的。在后续的信息采集过程中,零时科技的安全团队在谷歌上找到了外包公司在编写代码时留下的、托管在团队协作平台上的API文档。文档详细解释了使用API​​时所用到的各种参数,以及它们的类型、具体含义和用途,测试中用到的一些具体参数都留在sample中,为后续测试提供了很大的帮助。

  域名 Whois 和备案信息采集

  虽然已经有一些交易所在注册域名时使用了域名注册商提供的服务,并且没有在Whois等域名信息备案上泄露公司或相关人员信息网站,但还是有一些交易所会亲自注册域名,此时使用Whois或其他工具查找交易所域名注册公司或相关人员的详细信息。而这些不起眼的信息对后续的测试手段(如密码猜测、社会工程学攻击等)会有很大的帮助,可以大大提高其成功率。

  零时科技安全团队在对另一家交易所进行安全测试时,根据该交易所在Whois上留下的门户网站域名备案信息,找到了其注册公司,进而找到了部分手机公司经理(也是股东之一)的电话号码、QQ号、微信号和注册邮箱地址。虽然由于授权原因没有进行后续的社会工程学攻击等测试手段,但这些信息无疑会在真正需要特殊攻击手段时大大提高成功率,让测试人员更加冷静,轻松撕破,完成测试。

  

  发现 GitHub 源代码泄漏

  有些开发者在写代码的时候会习惯性的将源码上传到github等代码托管平台,而这些源码正是每个测试人员日以继夜想要得到的。毕竟拿到源码就可以审计,直接找写源码时留下的漏洞和疏忽。这将使整个测试过程变得更加简单,并减少大量工作。同时,通过直接审计源代码可以发现的问题和漏洞会更加全面和有针对性。

  同样,寻找交易所使用的源代码也是信息采集的重要环节。以下是在交易所 网站 上找到的 /.git 源代码文件。零时科技安全团队对源代码进行了审计,将审计过程中发现的敏感信息、评论中存储的评论等审计结果与发现的漏洞进行了验证,在测试过程中成功获取了服务器的控制权,并完成了测试。这个测试。

  敏感文件发现

  敏感文件的种类很多,其中最经典,往往在测试过程中效果最好的是robots.txt、sitemap.xml等文件。一些敏感文件甚至可以成为测试的突破口。

  以下只是两个交易所 网站 中 robots.txt 中的一些信息的示例。对于测试人员来说,有了这些信息,很容易找到交易所 网站 中确实存在但不允许轻易访问的敏感页面。如果这些页面有一定的规律或者特点,你甚至可以找到使用的组件,cms等信息,然后进行更有针对性的测试。

  在对交易所进行测试的过程中,零时科技的安全团队确实利用了这些信息,并配合其他手段,成功攻入了交易所后台。

  解决方案:中文开源技术交流社区

  Spark Streaming 用于流式数据处理。Spark Streaming支持多种数据输入源,如Kafka、Flume、Twitter、ZeroMQ和简单的TCP sockets等。数据输入后,Spark的高度抽象原语如:map、reduce、join、window等可以用于计算。并且结果还可以保存在很多地方,比如HDFS、数据库等。

  类似于Spark基于RDD的概念,Spark Streaming使用离散化的流作为抽象表示,称为DStream。DStream 是随时间接收的数据序列。在内部,每个时间间隔接收到的数据以 RDD 的形式存在,DStream 是这些 RDD 的序列(因此得名“离散化”)。

  离线数据:不可更改的数据;实时数据:变化对数据;流处理;批量处理

  批处理(微批处理,不是流式处理)

  什么是DStream

  DSream表示一系列连续的RDD,DStream中的每个RDD都收录特定时间间隔的数据;离散流,一个或多个RDD

  Spark 流架构

  字数案例

  需求:使用netcat工具不断向9999端口发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

  StreamingContext中有这个构造方法: def this(conf: SparkConf, batchDuration: Duration)

  //测试Spark实时计算

object StreamWordCount {

def main(args: Array[String]): Unit = {

//创建配置对象

val conf: SparkConf = new SparkConf().setAppName("Streaming").setMaster("local[*]")

val streamContext: StreamingContext = new StreamingContext(conf, Seconds(5))

//通过监控端口创建DStream,读进来的数据为一行行

val socket: ReceiverInputDStream[String] = streamContext.socketTextStream("hadoop101", 9999)

//将每一行数据做切分,形成一个个单词 读取是按一行一行来读 line ==> word

val dsTream: DStream[String] = socket.flatMap(_.split(" "))

//将单词映射成元组(word,1)

val word: DStream[(String, Int)] = dsTream.map((_, 1))

//reduceByKey

val wordCount: DStream[(String, Int)] = word.reduceByKey(_+_)

//打印

wordCount.print()

//启动采集器

streamContext.start()

//Driver不能停止,等待采集器的结束

streamContext.awaitTermination()

}

  [kris@hadoop101 ~]$ nc -lk 9999

Hello world

Hello

Hello java

Hello spark

  如果程序运行时日志过多,可以将log4j文件放到resources中的spark conf目录下,并将日志级别改为ERROR

  DStream是一系列连续的RDD来表示。每个 RDD 收录一个时间间隔的数据

  1.文件数据源

  文件数据流:可以读取所有兼容HDFS API的文件系统文件,通过fileStream方法读取。Spark Streaming 将监控 dataDirectory 目录并不断处理移入的文件。请记住,目前不支持嵌套目录。

  streamingContext.textFileStream(dataDirectory),其他代码同上;

  预防措施:

  1)文件需要具有相同的数据格式;

  2)文件进入dataDirectory的方式需要通过移动或者重命名来实现;

  3)文件一旦移动到目录中,就不能修改,即使修改也不会读取新的数据;

  2.自定义数据源

  需要继承Receiver并实现onStart和onStop方法来自定义数据源采集。自定义数据源,*敏*感*词*某个端口号,获取端口号的内容。

  自定义数据 采集器

  // 自定义数据采集器

class CustomerReceive(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){ //有一个构造方法

var socket: Socket = null

//读数据并将数据发送给Spark

def receive(): Unit = {

//创建一个Socket

val socket = new Socket(host, port)

//字节流 ---->字符流

val inputStream: InputStream = socket.getInputStream //字节流

//字符流

val bufferedReader: BufferedReader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"))

var line: String = null

while ((line = bufferedReader.readLine()) != null){

if (!"--END--".equals(line)){

store(line) //存储到这里边

}else{

return

}

}

}

//启动采集器

//最初启动的时候,调用该方法,作用为:读数据并将数据发送给Spark

override def onStart(): Unit = {

new Thread(new Runnable{

override def run(): Unit = {

receive()

}

}).start()

}

//关闭采集器

override def onStop(): Unit = {

if (socket != null){

socket.close()

socket = null

}

}

}

  //测试:

object FileStream {

def main(args: Array[String]): Unit = {

// 创建流式处理环境对象

// 创建对象时,需要传递采集数据的周期(时间)

val conf: SparkConf = new SparkConf().setAppName("Streaming").setMaster("local[*]")

val streamContext: StreamingContext = new StreamingContext(conf, Seconds(5))

// 从端口号获取数据

val socketDStream: ReceiverInputDStream[String] = streamContext.receiverStream(new CustomerReceive("hadoop101", 9999))

// 一行一行的数据 line ==> word

val wordDStream: DStream[String] = socketDStream.flatMap(_.split(" "))

// word ==> (word, 1)

val wordToCountDStream: DStream[(String, Int)] = wordDStream.map((_, 1))

// reduceByKey

val wordToSumDStream: DStream[(String, Int)] = wordToCountDStream.reduceByKey(_ + _)

<p>

//打印数据

wordToSumDStream.print()

// TODO 启动采集器

streamContext.start()

// TODO Driver不能停止,等待采集器的结束

// wait, sleep

streamContext.awaitTermination()

}

}</p>

  3. Kafka数据源(重点)

  KafkaUtils 对象可以从 StreamingContext 和 JavaStreamingContext 中的 Kafka 消息创建 DStream。由于 KafkaUtils 可以订阅多个主题,因此它创建的 DStream 由成对的主题和消息组成。要创建流式流,请使用 StreamingContext 实例、以逗号分隔的 ZooKeeper 主机字符串列表、消费者组名称(唯一名称)以及从主题到该主题的接收线程数的映射,以调用 createStream()方法。

  //*敏*感*词*kafka消息

object KafkaStreaming {

def main(args: Array[String]): Unit = {

// 创建配置对象

val sparkConf = new SparkConf().setAppName("KafkaStreaming").setMaster("local[*]")

// 创建流式处理环境对象

// 创建对象时,需要传递采集数据的周期(时间)

val socket: StreamingContext = new StreamingContext(sparkConf, Seconds(5))

// 一个类如果创建SparkContext,那么这个类我们称之为Driver类

// 从Kafka集群中获取数据

//定义kafka参数

val kafkaParams = Map[String, String](

"group.id" -> "kris",

"zookeeper.connect" -> "hadoop101:2181",

ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->"org.apache.kafka.common.serialization.StringDeserializer",//StringDeserializer的全类名,StringDeserializer implements Deserializer

ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"

) //别导错包流,是kafka.clients.consumer里对

//定义topic参数

val topicMap = Map("thrid" -> 3)

val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](

socket,

kafkaParams,

topicMap,

StorageLevel.MEMORY_ONLY) //StorageLevel别导错包流

val wordToCountDStream = kafkaDStream.map {

case (k, v) => {(v, 1)}

}

val wordToSumDStream: DStream[(String, Int)] = wordToCountDStream.reduceByKey(_ + _)

//打印数据

wordToSumDStream.print()

//启动采集器

socket.start()

//Driver不能停,等待采集器对结束

socket.awaitTermination()

}

}

  启动kafka,在控制台启动producer

  [kris@hadoop101 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic thrid

  打印:

  -------------------------------------------

Time: 1555065970000 ms

-------------------------------------------

(Hello world,1)

-------------------------------------------

Time: 1555065975000 ms

-------------------------------------------

(Hello,1)

-------------------------------------------

Time: 1555065980000 ms

-------------------------------------------

(Hello,1)

(java,1)

-------------------------------------------

Time: 1555065985000 ms

-------------------------------------------

(spark,1)

-------------------------------------------

  查看代码

  DStream转换

  DStream上的原语和RDD类似,分为Transformations(转换)和Output Operations(输出)。此外,在转换操作中还有一些特殊的原语,如:updateStateByKey()、transform()和各种Window相关的原语。

  4.有状态的转换操作(重点)UpdateStateByKey

  UpdateStateByKey 原语用于记录历史记录。有时,我们需要在 DStream 中跨批维护状态(例如在流计算中累积字数)。对于这种情况,updateStateByKey() 为我们提供了访问键值对 DStream 的状态变量的权限。给定一个由(key, event)对组成的DStream,并传递一个指定如何根据新事件更新每个key对应状态的函数,就可以构造一个内部数据为(key, state)对的新DStream。

  updateStateByKey() 的结果将是一个新的 DStream,其内部 RDD 序列由对应于每个时间间隔的 (key, state) 对组成。

  updateStateByKey 操作允许我们在使用新信息更新时保持任意状态。要使用此功能,您需要执行以下两个步骤:

  1.定义状态,可以是任意数据类型。

  2. 定义一个状态更新函数,阐明如何用输入流中的先前状态和新值更新状态。

  使用updateStateByKey需要配置checkpoint目录,会使用checkpoint保存状态。(只要key相同,它的状态就会更新)

  如果关键词相同,就会形成一组数量对,Seq[Int]就是那个数量(比如你好,1;你好,1;Seq是1 1 1);option只有两个值(有的有值,none没有值),为了解决空指针的出现,不需要判断当前对象是否为空,直接使用option即可

  更新状态:多条数据之间是否有关系,有状态的还是无状态的

  每周采集数据是无状态的,但是实时数据需要是有状态的,用checkPoint聚合--&gt;有状态

  将数据保存在CheckPoint中,缓冲临时缓冲

  //SparkStreaming有状态转换操作

object DStreamState {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("Stream").setMaster("local[*]")

val streamContext: StreamingContext = new StreamingContext(conf, Seconds(5))

//设置Checkpoints的目录

streamContext.sparkContext.setCheckpointDir("cp")

val socketDStream: ReceiverInputDStream[String] = streamContext.socketTextStream("hadoop101", 9999)

val wordDStream: DStream[String] = socketDStream.flatMap(_.split(" "))

val wordToCountDStream: DStream[(String, Int)] = wordDStream.map((_, 1))

// 进行有状态的转换操作

<p>

val resultDStream: DStream[(String, Long)] = wordToCountDStream.updateStateByKey {// 要加范型

case (seq, buffer) => { //seq序列当前周期中单词对数量对集合, buffer表缓冲当中的值,所谓的checkPoint

val sumCount = seq.sum + buffer.getOrElse(0L)

Option(sumCount) //表往缓存里边更新对值  它需要返回一个Option

}

}

resultDStream.print()

streamContext.start()

streamContext.awaitTermination()

}

}</p>

  打印:

  有状态转换操作

-------------------------------------------

Time: 1555070600000 ms

-------------------------------------------

(Hello,1)

(world,1)

-------------------------------------------

Time: 1555070605000 ms

-------------------------------------------

(Hello,2)

(world,2)

-------------------------------------------

Time: 1555070610000 ms

-------------------------------------------

(Hello,3)

(java,1)

(world,2)

-------------------------------------------

Time: 1555070615000 ms

-------------------------------------------

(Hello,3)

(java,1)

(world,2)

  查看代码

  窗口操作

  Window Operations可以通过设置窗口的大小和滑动窗口的间隔来动态获取当前Steaming的允许状态。基于窗口的操作通过在比 StreamingContext 的批次间隔更长的时间范围内组合多个批次的结果来计算整个窗口的结果。

  窗口数据是指一段时间内的数据作为一个整体的使用情况。随着时间的推移,窗口数据也会发生变化。这样的函数称为窗口函数,这个窗口是可以变化的,也称为滑动窗口;

  object DStreamWindow {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("Stream").setMaster("local[*]")

val streamContext: StreamingContext = new StreamingContext(conf, Seconds(3))

val socketDStream: ReceiverInputDStream[String] = streamContext.socketTextStream("hadoop101", 9999)

// 设定数据窗口:window

// 第一个参数表示窗口的大小(时间的范围,应该为采集周期的整数倍)

// 第二个参数表示窗口的滑动的幅度(时间的范围,应该为采集周期的整数倍)

val windowDStream: DStream[String] = socketDStream.window(Seconds(6), Seconds(3))

val wordDStream: DStream[String] = windowDStream.flatMap(_.split(" "))

val wordCountDStream: DStream[(String, Int)] = wordDStream.map((_, 1))

val wordSumDStream: DStream[(String, Int)] = wordCountDStream.reduceByKey(_+_)

wordSumDStream.print()

streamContext.start()

streamContext.awaitTermination()

}

}

  转换

  Transform 原语允许在 DStream 上执行任意 RDD-to-RDD 函数。即使这些函数没有暴露在 DStream API 中,Spark API 也可以通过这个函数轻松扩展。此函数每批次调度一次。其实就是对DStream中的RDD应用transformation。

  Transform和map对的区别:

  // TODO XXXXXX (Drvier) * 1,这里可写Driver代码但只执行一遍;

wordSumDStream.map{

case(word, sum) => {

// TODO YYYYYY (Executor) * N ,这里执行的是Executor代码可执行N遍

(word, 1)

}

}

// transform可以将DStream包装好的RDD抽取出来进行转换操作

// transform可以在每一个采集周期对rdd进行操作

  // TODO AAAAAA (Driver) * 1

wordSumDStream.transform{

rdd => {

// TODO BBBBBBB (Driver) * N

rdd.map{

case (word, sum) => {

// TODO CCCCCC (Executor) * N

(word, 1)

}

}

}

}

  数据流输出

  输出操作指定对流式数据进行转换操作得到的数据要进行的操作(如将结果推送到外部数据库或输出到屏幕)。类似于RDD中的lazy evaluation,如果一个DStream及其派生的DStreams还没有输出,那么这些DStreams将不会被求值。如果在 StreamingContext 中没有设置输出操作,则不会启动整个上下文。

  输出操作如下:

  (1)print():在运行流程序的驱动节点上打印DStream中每批数据的前10个元素。这用于开发和调试。在 Python API 中,相同的操作称为 print()。

  (2) saveAsTextFiles(prefix, [suffix]):以文本文件的形式存储这个DStream的内容。每个批次的存储文件名以参数中的前缀和后缀为准。“prefix-Time_IN_MS[.suffix]”。

  (3) saveAsObjectFiles(prefix, [suffix]):将Stream中的数据以Java对象序列化的形式保存为SequenceFiles。每个批次的存储文件名是基于参数中的“prefix-TIME_IN_MS[.suffix]”。Python目前不可用。

  (4) saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为Hadoop文件。每个batch的存储文件名以参数中的“prefix-TIME_IN_MS[.suffix]”为准。

  Python API 目前在 Python 中不可用。

  (5)foreachRDD(func):这是最通用的输出操作,即函数func用于从流中生成的每一个RDD。作为参数传入的函数func应该将每个RDD中的数据推送到外部系统,比如将RDD存储在文件中或者通过网络写入数据库。注意:函数func是在运行流应用的driver中执行的,它里面的通用函数RDD操作强制执行它对流RDD的操作。

  通用输出操作 foreachRDD(),用于在 DStream 中对 RDD 进行任意计算。这有点类似于 transform(),都允许我们访问任意 RDD。在 foreachRDD() 中,我们可以重用我们在 Spark 中实现的所有操作。

  例如,其中一个常见用例是将数据写入外部数据库(如 MySQL)。注意:

  (1) 连接不能写在驱动层;

  (2)如果写成foreach,每一个RDD都会被创建,得不偿失;

  (3)添加foreachPartition,在分区中创建。

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线