Spark Sql

Spark Sql 实战入门

Posted by leone on 2018-09-13

Spark SQL

Spark SQL 核心概念

什么是 Spark SQL

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。
我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!

Spark SQL 特点

1.易整合

2.统一的数据访问方式

3.兼容Hive

4.标准的数据连接

SparkSQL可以看做是一个转换层,向下对接各种不同的结构化数据源,向上提供不同的数据访问方式。

RDD vs DataFrames vs DataSet

在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。
在后期的Spark版本中,DataSet会逐步取代RDD和DataFrame成为唯一的API接口。

  • RDD

RDD是一个懒执行的不可变的可以支持Lambda表达式的并行数据集合。
RDD的最大好处就是简单,API的人性化程度很高。
RDD的劣势是性能限制,它是一个JVM驻内存对象,这也就决定了存在GC的限制和数据增加时Java序列化成本的升高

  • Dataframe

与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。由于与R和Pandas的DataFrame类似,Spark DataFrame很好地继承了传统单机数据分析的开发体验

RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。
DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待DataFrame也是懒执行的。
性能上比RDD要高,主要有两方面原因:
定制化内存管理
数据以二进制的方式存在于非堆内存,节省了大量空间之外,还摆脱了GC的限制。

DataFrame和DataSet

首先,最简单的理解我们可以认为DataFrame就是Spark中的数据表(类比传统数据库),DataFrame的结构如下:
DataFrame(表)= Schema(表结构) + Data(表数据)。DataFrame(表)是Spark SQL对结构化数据的抽象。可以将DataFrame看做RDD。

Dataset是数据的分布式集合。Dataset是在Spark 1.6中添加的一个新接口,是DataFrame之上更高一级的抽象。它提供了RDD的优点(强类型化)以及Spark SQL优化后的执行引擎的优点。一个Dataset 可以从JVM对象构造,然后使用函数转换(map, flatMap,filter等)去操作。 Dataset API 支持Scala和Java。 Python不支持Dataset API。

三者的共性

1、RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利

2、三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过.

1
2
3
4
5
6
7
8
val sparkconf = new SparkConf().setMaster("local").setAppName("test").set("spark.port.maxRetries","1000")
val spark = SparkSession.builder().config(sparkconf).getOrCreate()
val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))
// map不运行
rdd.map{line=>
println("运行")
line._1
}

3、三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出

4、三者都有partition的概念
5、三者有许多共同的函数,如filter,排序等
6、在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持

1
import spark.implicits._

7、DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型DataFrame:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
testDF.map{
case Row(col1:String,col2:Int)=>
println(col1);println(col2)
col1
case _=>
""
}
Dataset:
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
testDS.map{
case Coltest(col1:String,col2:Int)=>
println(col1);println(col2)
col1
case _=>
""
}

sparkSql 实战

创建DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 启动spark shell
$ bin/spark-shell

# 创建DataFrame

# 定义case class代表表的结构schema
scala> case class User(userId: Int,username: String,sex: String,age: Int,email: String)

# 读取Linux本地数据
scala> val lines = sc.textFile("file:///root/data/user.csv").map(_.split(","))

# 或者读取hdfs上的数据
scala> val lines = sc.textFile("hdfs://node-1:9000/spark-2.1.3/input/user.csv").map(_.split(","))

# 生成表: DataFrame
scala> val userDF = lines.map(x => User(x(0).toInt,x(1),x(2),x(3).toInt,x(4))).toDF

# 操作: DSL语句
scala> userDF.show ----> select * from userDF
scala> userDF.printSchema ----> desc userDF

使用SparkSession对象创建DataFrame

Apache Spark 2.0引入了SparkSession,其为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,它减少了用户需要了解的一些概念,使得我们可以很容易地与Spark交互。

在2.0版本之前,与Spark交互之前必须先创建SparkConf和SparkContext。然而在Spark 2.0中,我们可以通过SparkSession来实现同样的功能,而不需要显式地创建SparkConf, SparkContext 以及 SQLContext,因为这些对象已经封装在SparkSession中。通过SparkSession可以访问Spark所有的模块!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 定义case class代表表的结构schema
scala> case class User(userId: Int,username: String,sex: String,age: Int,email: String)

# 读取linux本地文件系统的结构化数据
scala> val lines = sc.textFile("file:///root/data/user.csv").map(_.split(","))

# 或者读取HDFS数据
scala> val lines = sc.textFile("hdfs://node-1:9000/spark-2.1.3/input/user.csv").map(_.split(","))

# 定义schema:StructType
scala> import org.apache.spark.sql._

scala> import org.apache.spark.sql.types._

scala> val schema = StructType(List(
StructField("userId", DataTypes.IntegerType),
StructField("username", DataTypes.StringType),
StructField("sex", DataTypes.StringType),
StructField("age", DataTypes.IntegerType),
StructField("email", DataTypes.StringType)))

# 把读入的每一行数据映射成一个个Row
scala>val rowRDD = lines.map(x => User(x(0).toInt,x(1),x(2),x(3).toInt,x(4)))

# 使用SparkSession.createDataFrame创建表
scala>val df = spark.createDataFrame(rowRDD,schema)

Spark SQL java 读写数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Properties;

/**
* <p> spark jdbc 查询数据库
*
* @author leone
* @since 2019-01-10
**/
public class JavaSparkSqlJdbcTest {

public static void main(String[] args) {
// 创建 sparkSql 上下文
SparkSession spark = SparkSession.builder()
.appName("javaSql").config("spark.master", "local[*]").getOrCreate();

Dataset<Row> df = spark.read()
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/spark")
.option("dbtable", "t_logs")
.option("user", "root")
.option("password", "root")
.option("driver", "com.mysql.jdbc.Driver")
.load();

df.show();

df.select(new Column("id"), new Column("name")).where("name like 'andy'").distinct().show();

Properties prop = new Properties();
prop.put("user", "root");
prop.put("password", "root");
prop.put("driver", "com.mysql.jdbc.Driver");
df.write().jdbc("jdbc:mysql://localhost:3306/spark?useSSL=false", "t_logs_1", prop);

spark.close();
}
}

Spark SQL java join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

/**
* <p>
*
* @author leone
* @since 2019-01-10
**/
public class JavaSparkSqlJsonTest {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("javaSql");
JavaSparkContext context = new JavaSparkContext(conf);

// 创建 sparkSql 上下文
SparkSession spark = SparkSession.builder()
.appName("javaSql")
.config("spark.master", "local[*]").getOrCreate();

Dataset<Row> df = spark.read().json("file:///E:/tmp/spark/data/user.json");
df.show(3);

df.createOrReplaceTempView("customer");

spark.sql("select * from customer where age > 22").show();

// df.where("age > 23").show();
// spark.sql("select count(1) from customer").show();

JavaRDD<Row> javaRDD = df.toJavaRDD();

// dataFrame 和 RDD转换
// javaRDD.foreach(e -> System.out.println(e.getLong(0) + "\t" + e.getLong(1) + "\t" + e.getString(2)));


df.write().mode(SaveMode.Append).json("file:///E:/tmp/spark/a.json");
// df.write().json("file:///E:/tmp/spark/output5");

}

}

Spark SQL scala DateFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

/**
* <p>
*
* @author leone
* @since 2019-01-10
**/
object DataFrameTest {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("sql").setMaster("local[*]")
val sc = new SparkContext(conf)
val session = SparkSession.builder().appName("sparkSql").master("local[*]").getOrCreate()

val array: Array[String] = Array("1,james,12", "2,jack,23", "3,andy,34")

val rdd: RDD[String] = sc.makeRDD(array)

val rdd2 = rdd.map(e => {
val arr = e.split(",")
Customer(arr(0).toInt, arr(1), arr(2).toInt)
})

val frame = session.createDataFrame(rdd2)
frame.printSchema()
frame.show()
frame.createTempView("customer")

val sql = session.sql("select * from customer where age > 20 order by age desc")
sql.show()

// frame.selectExpr("id", "name").show()

// frame.where("age > 20").show()

// frame.agg(sum("age"),sum("id"))

}


}

case class Customer(id: Int, name: String, age: Int) {}

Spark SQL scala 操作mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73


import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
* <p>
*
* @author leone
* @since 2019-01-07
**/
object JdbcDataSource {

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().appName("jdbcDataSource").master("local[*]").getOrCreate()

val dbTable: DataFrame = spark.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://localhost:3306/db01?useSSL=false",
"driver" -> "com.mysql.jdbc.Driver",
"user" -> "root",
"dbtable" -> "t_user",
"password" -> "root")
).load()

// 表字段
dbTable.printSchema()

// 数据信息
dbTable.show()


// 使用函数式编程过滤
/*val filter: Dataset[Row] = dbTable.filter(e => {
e.getAs[Int](2) <= 6
})
filter.show()*/


// 使用lambda的方式
// val r = dbTable.filter($"age" <= 23)
// r.show()
// val result: DataFrame = dbTable.select($"id",$"name",$"age" * 10 as "age")
// val result: DataFrame = dbTable.select($"name")
// result.show()

// 写回数据库
// val props = new Properties()
// props.put("user", "root")
// props.put("password", "root")
// result.write.mode("ignore").jdbc("jdbc:mysql://localhost:3306/spark","t_logs_bak",props)

// result.write.text("E:\\tmp\\spark\\text")
// result.write.json("E:\\tmp\\spark\\json")
// result.write.csv("E:\\tmp\\spark\\csv")
// result.write.parquet("E:\\tmp\\spark\\parquet")


// 读取指定的json文件
// val json: DataFrame = sprak.read.json("E:\\tmp\\spark\\json")
// val filter: DataFrame = json.where($"age" <= 300)
// filter.show()

// val csv: DataFrame = sprak.read.csv("E:\\tmp\\spark\\csv")
// val pdf: DataFrame = csv.toDF("id", "name", "age")
// csv.show()

// val parquet: DataFrame = spark.read.parquet("E:\\tmp\\spark\\parquet")
// parquet.show()

spark.close()
}

}