apache-spark PairDStreamFunctions.updateStateByKey
示例
updateState按键可以用于DStream基于即将到来的数据创建有状态。它需要一个功能:
object UpdateStateFunctions {
def updateState(current: Seq[Double], previous: Option[StatCounter]) = {
previous.map(s => s.merge(current)).orElse(Some(StatCounter(current)))
}
}其中采用一系列current值,即Option先前状态的,并返回Option更新状态的。全部放在一起:
import org.apache.spark._
import org.apache.spark.streaming.dstream.DStream
import scala.collection.mutable.Queue
import org.apache.spark.util.StatCounter
import org.apache.spark.streaming._
object UpdateStateByKeyApp {
def main(args: Array[String]) {
val sc = new SparkContext("local", "updateStateByKey", new SparkConf())
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("/tmp/chk")
val queue = Queue(
sc.parallelize(Seq(("foo", 5.0), ("bar", 1.0))),
sc.parallelize(Seq(("foo", 1.0), ("foo", 99.0))),
sc.parallelize(Seq(("bar", 22.0), ("foo", 1.0))),
sc.emptyRDD[(String, Double)],
sc.emptyRDD[(String, Double)],
sc.emptyRDD[(String, Double)],
sc.parallelize(Seq(("foo", 1.0), ("bar", 1.0)))
)
val inputStream: DStream[(String, Double)] = ssc.queueStream(queue)
inputStream.updateStateByKey(UpdateStateFunctions.updateState _).print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
热门推荐
9 圆圆的祝福语简短
10 今日祝福语简短10字
11 感谢父母的简短祝福语
12 很暧昧的简短祝福语
13 给表弟的祝福语简短
14 信耶稣的人祝福语简短
15 生日果酒祝福语简短独特
16 九子订婚祝福语简短
17 哥哥考大学祝福语简短
18 关于好的祝福语简短