最近要在 Spark job 中通过 Spark SQL 的方式读取 Elasticsearch 数据,踩了一些坑,总结于此。
环境说明
Spark job 的编写语言为 Scala,scala-library 的版本为 2.11.8。
Spark 相关依赖包的版本为 2.3.2,如 spark-core、spark-sql。
Elasticsearch 数据
schema
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28{
"settings": {
"number_of_replicas": 1
},
"mappings": {
"label": {
"properties": {
"docId": {
"type": "keyword"
},
"labels": {
"type": "nested",
"properties": {
"id": {
"type": "long"
},
"label": {
"type": "keyword"
}
}
},
"itemId": {
"type": "long"
}
}
}
}
}sample data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48{
"took" : 141,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 17370929,
"max_score" : 1.0,
"hits" : [
{
"_index" : "aen-label-v1",
"_type" : "label",
"_id" : "123_ITEM",
"_score" : 1.0,
"_source" : {
"docId" : "123_ITEM",
"labels" : [
{
"id" : 7378,
"label" : "1kg"
}
],
"itemId" : 123
}
},
{
"_index" : "aen-label-v1",
"_type" : "label",
"_id" : "456_ITEM",
"_score" : 1.0,
"_source" : {
"docId" : "456_ITEM",
"labels" : [
{
"id" : 7378,
"label" : "2kg"
}
],
"itemId" : 456
}
}
]
}
}
准备工作
既然要用 Spark SQL,当然少不了其对应的依赖,
1 | dependencies { |
对于 ES 的相关库,如同 官网 所说,要在 Spark 中访问 ES,需要将 elasticsearch-hadoop
依赖包加入到 Spark job 运行的类路径中,具体而言就是添加到 Spark job 工程的依赖中,公司的 nexus 中当前最新的版本为 7.15.0,且目前我们是使用 gradle 管理依赖,故添加依赖的代码如下,
1 | dependencies { |
本地测试
对于 Spark,基于资源管理器的不同,可以在两种模式下运行:本地模式和集群模式,可通过 --master
参数来指定资源管理器的方式。本地模式时,不依赖额外的 Spark 集群,Spark 将在同一台机器上运行所有内容,非常方便用于本地测试,对于 Spark SQL,只需要在创建 SparkSession 时采用 local 的模式即可,
1 | class MyUtils extends Serializable { |
测试代码
1 | object LocalTest extends LazyLogging { |
知识点
Spark SQL Data Sources
Spark SQL 通过 DataFrameReader
类支持读取各种类型的数据源,比如 Parquet、ORC、JSON、CSV 等格式的文件,Hive table,以及其他 database。而 Elasticsearch 只不过是众多数据源中的一种,DataFrameReader
通过 format(...)
指定数据源格式,通过 option(...)
定制对应数据源下的配置,最后通过 load()
加载生成 DataFrame
,也就是 Dataset[Row]
的类型别名。有了 DataFrame
,就可以创建一个临时表,然后就能以 SQL 的方式读取数据。
在 Spark 1.5 以前,Elasticsearch 在 format(...)
中对应的 source 名需要是全包名 org.elasticsearch.spark.sql
,而在 Spark 1.5 以及之后的版本,source 名称简化为 es
。
Spark SQL 中 DataFrame 常用 API
- df.printSchema(),打印 schema
- df.show(),查看数据列表,默认是 truncate 前 20 条,传 false 时列出全部数据。
- df.createOrReplaceTempView(“view_name”),构建临时表视图,方便后续 SQL 操作。
- df.withColumn(),添加新列或替换现有列。
- df.withColumn(“final_result”, lit(“PASS”)) ,通过
lit
添加常量列。
- df.withColumn(“final_result”, lit(“PASS”)) ,通过
- df.filter(col(“label”).isNotNull),用指定的条件过滤行。
- df.dropDuplicates(“itemId”,”attributeId”),按指定列对行去重,返回新的数据集。
- df.union(otherDf),将两个 DataFrame 的记录合并且不去重,相当于 union all。
- df.toDF(“itemId”, “attributeId”, “label”, “final_result”),为 df 各列指定一个有意义的名称。
Scala 与 Java 类型映射
- scala.Long -> long
- Array[T] -> T[]
Scala 与 Java 类型转换
1 | import scala.collection.JavaConverters._ |
Scala 中的 : _*
:_*
是 type ascription 的一个特例,它会告诉编译器将序列类型的单个参数视为变参数序列,即 varargs。应用例子,
1 | val indices = Array("aen-label", "aen-label-seller") |
踩的坑
es.nodes.wan.only
该配置项表示连接器是否用于 WAN 上的云或受限环境如 AWS 中的 Elasticsearch 实例,默认为 false,而公司的 Elasticsearch 集群是在 AWS 上的,endpoint 只能在内网访问,因而刚开始测试时,遇到如下报错,
1 | Exception in thread "main" org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available |
通过 option("es.nodes.wan.only", value = true)
将配置项设置为 true 后恢复正常。
importing spark.implicits._
在遍历 DataFrame 时遇到如下编译错误,
1 | Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ |
在处理 DataFrame 之前需要加上 importing spark.implicits._
,用于将常见的 Scala 对象转换为 DataFrame,通常在获取 SparkSession 后立马 import。
Spark SQL 读取 hive 表中 array 类型时,对于 Scala 语言,得到的类型是 WrappedArray
而不是 Array
当我们通过 createOrReplaceTempView("temp_labels")
构建一个临时表视图后,就可以通过 SQL 像操作 hive 表那样读取数据。例如读取指定的列,
1 | val sqlDf = spark.sql("select itemId, labels from temp_labels where itemId in (123, 456)") |
通过 sqlDf.printSchema()
可以看到 sqlDf 的 schema 长这样,
1 | root |
labels
是包含 struct 的数组,于是从 row 中将 labels
列读出时想尝试转换为 Array,
1 | val newDf = sqlDf.map( |
结果报错如下,
1 | java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Lorg.apache.spark.sql.Row; |
可以看到 Spark SQL 在读取表中数组列时,是用的 scala.collection.mutable.WrappedArray
来存储结果的,看其类定义可知,它是间接实现 Seq 接口的,所以也可用 row.getAs[Seq[Row]]("labels")
来读取。**这里需要注意的是,Array[T] 虽然在 Scala 源码定义中是 class,但其对标的 Java 类型是原生数组 T[]**。
判断 Column 是否为 null 时,需要用 is null
或 is not null
,而不是 ===
或 !==
对于错误的用法,filter 并不会生效,就像下面这样
1 | newDf.filter(col("label") !== null) |
这一点和 hive 表以及 MySQL 表判断字段是否为 null,是保持一致的,应该像下面这样,
1 | newDf.filter(col("label").isNotNull) |
最终代码
1 | import com.google.common.base.Joiner |
补充:提交 spark job
将 job 工程打包为 Jar,上传到 AWS 的 s3,比如 s3://sherlockyb-test/1.0.0/artifacts/spark/
目录下,然后通过 Genie 提交 spark job 到 Spark 集群运行。Genie 是 Netflix 研发的联合作业执行引擎,提供 REST-full API 来运行各种大数据作业,如 Hadoop、Pig、Hive、Spark、Presto、Sqoop 等。
1 | def run_spark(job_name, spark_jar_name, spark_class_name, arg_str, spark_param=''): |