博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkStreaming(8):windows窗口操作
阅读量:4280 次
发布时间:2019-05-27

本文共 2253 字,大约阅读时间需要 7 分钟。

一、概念

1.基本功能

在一定的时间间隔(interval)进行一个时间段(window length)内的数据处理。

【参考:】

2.核心

(1)window length  : 窗口的长度(下图是3)

(2)sliding interval: 窗口的间隔(下图是2)

(3)这2个参数和Streaming的batch size都是倍数关系,否则会报错!

3.实例(官方)

  每10s计算前30s的数据

// Reduce last 30 seconds of data, every 10 secondsval windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

     【注意:】

      Seconds(30), //窗口大小,指定计算最近多久的数据量,要求是父DStream的批次产生时间的整数倍

      Seconds(10) //滑动大小/新的DStream批次产生间隔时间,就是几秒钟来一次数据,要求是父DStream的批次产生时间的整数倍

二、实例代码

1.源码

package _0809kafkaimport org.apache.spark.streaming.dstream.DStreamimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.streaming.{Seconds, StreamingContext}/**  * Created by Administrator on 2018/10/20.  */object WindowsReduceStream_simple_1020 {  def main(args: Array[String]): Unit = {    val sparkconf=new SparkConf().setMaster("local[2]").setAppName("WindowsReduceStream_simple_1020")    val sc=new  SparkContext(sparkconf)    val ssc = new StreamingContext(sc, Seconds(2))    val checkpointPathDir = s"file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\streaming_08"    ssc.checkpoint(checkpointPathDir)    val dstream = ssc.socketTextStream("hadoop01", 9999)    val batchResultDStream = dstream.flatMap(_.split(" ")).map(word => {      (word,1)    }).reduceByKey(_ + _)    val resultDStream: DStream[(String, Int)] = batchResultDStream.reduceByKeyAndWindow(      (a:Int,b:Int) => a+b,      Seconds(6), //窗口大小,指定计算最近多久的数据量,要求是父DStream的批次产生时间的整数倍      Seconds(2) //滑动大小/新的DStream批次产生间隔时间,就是几秒钟来一次数据,要求是父DStream的批次产生时间的整数倍    )    resultDStream.print()    ssc.start()             // 启动    ssc.awaitTermination()  }}

2.测试

  -》开启9999端口

nc -lt 9999

  -》打开程序

  -》结果:

-------------------------------------------	Time: 1540020870000 ms	-------------------------------------------	(hadoophadoop,15)	(hadoop,60)	(ccs,45)

3.扩展

(1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新 的 Dstream;

(2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;

(3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;

(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对 的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据使用 reduce 函数来整合每个 key 的 value 值。

(测试成功!)

你可能感兴趣的文章
OK6410A 开发板 (三) 18 u-boot-2021.01 boot 解析 U-boot 镜像运行部分 env
查看>>
OK6410A 开发板 (三) 19 u-boot-2021.01 boot 解析 U-boot 镜像运行部分 driver model
查看>>
OK6410A 开发板 (三) A u-boot-2021.01 OK6410A 文章整理
查看>>
OK6410A 开发板 (三) 20 u-boot-2021.01 boot 解析 U-boot 镜像运行部分 system clock
查看>>
OK6410A 开发板 (三) 21 u-boot-2021.01 boot 解析 U-boot 镜像运行部分 standalone
查看>>
OK6410A 开发板 (三) 22 u-boot-2021.01 boot 解析 U-boot 镜像运行部分 malloc
查看>>
OK6410A 开发板 (三) 23 u-boot-2021.01 boot 解析 U-boot 镜像运行部分 DM 的一次实例分析 - 串口
查看>>
OK6410A 开发板 (三) 24 u-boot-2021.01 boot 解析 U-boot 镜像运行部分 fs-fat
查看>>
OK6410A 开发板 (三) 25 u-boot-2021.01 boot 解析 U-boot 内存命令 md
查看>>
OK6410A 开发板 (三) 26 u-boot-2021.01 u-boot镜像
查看>>
OK6410A 开发板 (八) A linux-5.11 OK6410A 文章整理
查看>>
OK6410A 开发板 (六) 4 OK6410A linux-5.11 镜像生成过程解析
查看>>
u-boot-2021.01引导linux-5.11(uImage)的过程详解
查看>>
OK6410A 开发板 (八) 1 linux-5.11 OK6410A ethernet dm9000 移植
查看>>
OK6410A 开发板 (八) 2 linux-5.11 OK6410A linux开发环境搭建
查看>>
OK6410A 开发板 (三) 27 u-boot-2021.01 boot 解析 U-boot 镜像运行部分 console
查看>>
市面上的单板计算机
查看>>
OK6410A 开发板 (八) 3 linux-5.11 OK6410A lcd wxcat43 移植
查看>>
OK6410A 开发板 (八) 4 linux-5.11 OK6410A 外围驱动
查看>>
OK6410A 开发板 (八) 18 linux-5.11 OK6410A start_kernel 功能角度 第二阶段之idle进程
查看>>