本文共 2253 字,大约阅读时间需要 7 分钟。
在一定的时间间隔(interval)进行一个时间段(window length)内的数据处理。
【参考:】
(1)window length : 窗口的长度(下图是3)
(2)sliding interval: 窗口的间隔(下图是2)
(3)这2个参数和Streaming的batch size都是倍数关系,否则会报错!
每10s计算前30s的数据
// Reduce last 30 seconds of data, every 10 secondsval windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
【注意:】
Seconds(30), //窗口大小,指定计算最近多久的数据量,要求是父DStream的批次产生时间的整数倍
Seconds(10) //滑动大小/新的DStream批次产生间隔时间,就是几秒钟来一次数据,要求是父DStream的批次产生时间的整数倍package _0809kafkaimport org.apache.spark.streaming.dstream.DStreamimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.streaming.{Seconds, StreamingContext}/** * Created by Administrator on 2018/10/20. */object WindowsReduceStream_simple_1020 { def main(args: Array[String]): Unit = { val sparkconf=new SparkConf().setMaster("local[2]").setAppName("WindowsReduceStream_simple_1020") val sc=new SparkContext(sparkconf) val ssc = new StreamingContext(sc, Seconds(2)) val checkpointPathDir = s"file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\streaming_08" ssc.checkpoint(checkpointPathDir) val dstream = ssc.socketTextStream("hadoop01", 9999) val batchResultDStream = dstream.flatMap(_.split(" ")).map(word => { (word,1) }).reduceByKey(_ + _) val resultDStream: DStream[(String, Int)] = batchResultDStream.reduceByKeyAndWindow( (a:Int,b:Int) => a+b, Seconds(6), //窗口大小,指定计算最近多久的数据量,要求是父DStream的批次产生时间的整数倍 Seconds(2) //滑动大小/新的DStream批次产生间隔时间,就是几秒钟来一次数据,要求是父DStream的批次产生时间的整数倍 ) resultDStream.print() ssc.start() // 启动 ssc.awaitTermination() }}
-》开启9999端口
nc -lt 9999
-》打开程序
-》结果:------------------------------------------- Time: 1540020870000 ms ------------------------------------------- (hadoophadoop,15) (hadoop,60) (ccs,45)
(1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新 的 Dstream;
(2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
(3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对 的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据使用 reduce 函数来整合每个 key 的 value 值。