Structured Streaming 简介
Structured Streaming 在 Spark 2.0 版本于 2016 年引入, 是基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎,对比传统的 Spark Streaming,由于复用了 Spark SQL 引擎,代码的写法和批处理 API (基于 Dataframe 和 Dataset API)一样,而且这些 API 非常的简单。
Structured Streaming 还支持使用 event time,通过设置 watermark 来处理延时到达的数据;而 Spark Streaming 只能基于 process time 做计算,显然是不够用的。
比如 .withWatermark("timestamp", "10 minutes")
表示用 DataFrame 里面的 timestamp
字段作为 event time,如果 event time 比 process time 落后超过 10 分钟,那么就不会处理这些数据。
Structured Streaming 默认情况下还是使用 micro batch 模式处理数据,不过从 Spark 2.3 开始提供了一种叫做 Continuous Processing 的模式,可以在至少一次语义下数据端到端只需 1ms 。
不过 Structured Streaming 的 Web UI 并没有和 Spark Streaming 一样的监控指标,所以有了这篇文章。
Grafana 监控准备工作
Structured Streaming 有个 StreamingQueryListener
用于异步报告指标,这是一个官方示例:
1 2 3 4 5 6 7 8 9 10 11 12 13
| val spark: SparkSession = ...
spark.streams.addListener(new StreamingQueryListener() { override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { println("Query started: " + queryStarted.id) } override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { println("Query terminated: " + queryTerminated.id) } override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { println("Query made progress: " + queryProgress.progress) } })
|
我们监控的话,主要是利用 onQueryProgress
方法来上报数据给监控系统。
我这里新建了一个类,继承了 StreamingQueryListener
,因为 Grafana 底层时序数据库是 Prometheus,所以用 graphite_exporter 来上报数据,同时 Prometheus 从它这里拉取数据用于展示,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| import java.net.InetSocketAddress import java.util.concurrent.TimeUnit
import com.codahale.metrics.graphite.{Graphite, GraphiteReporter} import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry} import org.apache.spark.sql.streaming.StreamingQueryListener import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent,QueryStartedEvent,QueryTerminatedEvent}
class GraphiteMetrics(prefix: String) extends StreamingQueryListener{ val metrics = new MetricRegistry()
var inputRowsPerSecond = 0D var processedRowsPerSecond = 0D var numInputRows = 0D var triggerExecution = 0L
override def onQueryStarted(event: QueryStartedEvent): Unit = { val graphite = new Graphite(new InetSocketAddress("192.168.1.xx", 9109)) val reporter: GraphiteReporter = GraphiteReporter .forRegistry(metrics) .prefixedWith(s"spark_${prefix}") .convertRatesTo(TimeUnit.SECONDS) .convertDurationsTo(TimeUnit.MILLISECONDS) .filter(MetricFilter.ALL) .build(graphite) reporter.start(30, TimeUnit.SECONDS) metrics.register(s"inputRowsPerSecond", new Gauge[Double] { override def getValue: Double = inputRowsPerSecond }) metrics.register(s"processedRowsPerSecond", new Gauge[Double] { override def getValue: Double = processedRowsPerSecond }) metrics.register("numInputRows", new Gauge[Double] { override def getValue: Double = numInputRows }) metrics.register("triggerExecution", new Gauge[Long] { override def getValue: Long = triggerExecution }) }
override def onQueryProgress(event: QueryProgressEvent): Unit = { inputRowsPerSecond = event.progress.inputRowsPerSecond processedRowsPerSecond = event.progress.processedRowsPerSecond numInputRows = event.progress.numInputRows triggerExecution = event.progress.durationMs.getOrDefault("triggerExecution", 0L) }
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = { println("onQueryTerminated") } }
|
这里只加了 4 个指标,可以自己随便修改添加;在主程序里面添加监听:
1
| spark.streams.addListener(new GraphiteMetrics(prefix = "click"))
|
首先需要启动 graphite_exporter,随便找一台服务器即可,有两个默认端口:
- 9109 用来上报数据,即 spark -> graphite_exporter
- 9108 是 Prometheus 从 graphite_exporter 拉去数据用的
还需要在 Prometheus 配置文件 prometheus.yml
里面配置读取数据
1 2 3 4
| scrape_configs: - job_name: 'spark' static_configs: - targets: ['192.168.1.xx:9108']
|
最后启动 spark 程序之后,就可以在 Grafana 里面配置图表了。
配置 Grafana 图表
比如我设置的 prefix
是 click
,那么我们在 Grafana 里面的 Explore 模块可以选择 Prometheus 数据源,输入指标 spark_click_inputRowsPerSecond
,点击 Query 就可以获取读取速率这个指标了,如图:
之后就可以随意创建图表啦~