Spark Structured Streaming 对接 Grafana 监控

监控

Structured Streaming 简介

Structured Streaming 在 Spark 2.0 版本于 2016 年引入, 是基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎,对比传统的 Spark Streaming,由于复用了 Spark SQL 引擎,代码的写法和批处理 API (基于 Dataframe 和 Dataset API)一样,而且这些 API 非常的简单。

Structured Streaming 还支持使用 event time,通过设置 watermark 来处理延时到达的数据;而 Spark Streaming 只能基于 process time 做计算,显然是不够用的。

比如 .withWatermark("timestamp", "10 minutes") 表示用 DataFrame 里面的 timestamp 字段作为 event time,如果 event time 比 process time 落后超过 10 分钟,那么就不会处理这些数据。

Structured Streaming 默认情况下还是使用 micro batch 模式处理数据,不过从 Spark 2.3 开始提供了一种叫做 Continuous Processing 的模式,可以在至少一次语义下数据端到端只需 1ms 。

不过 Structured Streaming 的 Web UI 并没有和 Spark Streaming 一样的监控指标,所以有了这篇文章。

Grafana 监控准备工作

Structured Streaming 有个 StreamingQueryListener 用于异步报告指标,这是一个官方示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
val spark: SparkSession = ...

spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.progress)
}
})

我们监控的话,主要是利用 onQueryProgress 方法来上报数据给监控系统。

我这里新建了一个类,继承了 StreamingQueryListener,因为 Grafana 底层时序数据库是 Prometheus,所以用 graphite_exporter 来上报数据,同时 Prometheus 从它这里拉取数据用于展示,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit

import com.codahale.metrics.graphite.{Graphite, GraphiteReporter}
import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent,QueryStartedEvent,QueryTerminatedEvent}

class GraphiteMetrics(prefix: String) extends StreamingQueryListener{
// prefix 自定义参数用来区分不同程序、随便写
val metrics = new MetricRegistry()

var inputRowsPerSecond = 0D
var processedRowsPerSecond = 0D
var numInputRows = 0D
var triggerExecution = 0L

override def onQueryStarted(event: QueryStartedEvent): Unit = {
// graphite_exporter 的默认参数 --graphite.listen-address=":9109"
val graphite = new Graphite(new InetSocketAddress("192.168.1.xx", 9109))
val reporter: GraphiteReporter = GraphiteReporter
.forRegistry(metrics)
.prefixedWith(s"spark_${prefix}") // 指标名称前缀,便于在 Grafana 里面使用
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(MetricFilter.ALL)
.build(graphite)
reporter.start(30, TimeUnit.SECONDS)
// 注册指标:读取速率
metrics.register(s"inputRowsPerSecond", new Gauge[Double] {
override def getValue: Double = inputRowsPerSecond
})
// 处理速率
metrics.register(s"processedRowsPerSecond", new Gauge[Double] {
override def getValue: Double = processedRowsPerSecond
})
// 一个批次的读取条数
metrics.register("numInputRows", new Gauge[Double] {
override def getValue: Double = numInputRows
})
// 一个批次的处理时长
metrics.register("triggerExecution", new Gauge[Long] {
override def getValue: Long = triggerExecution
})
}

override def onQueryProgress(event: QueryProgressEvent): Unit = {
// 对各个指标进行赋值、上报
inputRowsPerSecond = event.progress.inputRowsPerSecond
processedRowsPerSecond = event.progress.processedRowsPerSecond
numInputRows = event.progress.numInputRows
triggerExecution = event.progress.durationMs.getOrDefault("triggerExecution", 0L)
}

override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
println("onQueryTerminated")
}
}

这里只加了 4 个指标,可以自己随便修改添加;在主程序里面添加监听:

1
spark.streams.addListener(new GraphiteMetrics(prefix = "click"))

首先需要启动 graphite_exporter,随便找一台服务器即可,有两个默认端口:

  • 9109 用来上报数据,即 spark -> graphite_exporter
  • 9108 是 Prometheus 从 graphite_exporter 拉去数据用的

还需要在 Prometheus 配置文件 prometheus.yml 里面配置读取数据

1
2
3
4
scrape_configs:
- job_name: 'spark'
static_configs:
- targets: ['192.168.1.xx:9108']

最后启动 spark 程序之后,就可以在 Grafana 里面配置图表了。

配置 Grafana 图表

比如我设置的 prefixclick,那么我们在 Grafana 里面的 Explore 模块可以选择 Prometheus 数据源,输入指标 spark_click_inputRowsPerSecond ,点击 Query 就可以获取读取速率这个指标了,如图:

Grafana Explore

之后就可以随意创建图表啦~