
一、什么是Spark?
1、什么是Spark?生态体系结构
Apache Spark™ is a fast and general engine for large-scale data processing.
生态圈:
1、Spark Core
2、Spark SQL
3、Spark Streaming
4、Spark MLLib:机器学习
5、Spark GraphX:图计算
2、为什么要学习Spark?
复习:MapReduce的Shuffle过程
Spark的最大特点:基于内存
Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。

3、Spark的特点
(1)快:基于内存
同时也是缺点:没有对内存进行管理,把所有的内存管理都交给应用程序,容易出现OOM(outof memory 内存溢出)
如何分析Java内存溢出?? 工具:Java Heap Dump
https://www.cnblogs.com/JackDesperado/p/4798499.html
(2)易用:Java、Scala
(3)通用:不同的组件
Hive推荐使用Spark作为执行引擎 ------> 配置Hive On Spark非常麻烦,不成熟
提供文档:Hive On Spark
(4)兼容性:Hadoop的生态圈
二、Spark的体系结构和安装配置
1、体系结构:Client-Server(主从模式) ----> 单点故障:HA(ZooKeeper)
http://spark.apache.org/docs/latest/cluster-overview.html
准备工作:安装Linux、JDK、主机名、免密码登录
2、安装和部署:standalone
tar -zxvf spark-2.1.0-bin-hadoop2.7.tgz -C ~/training/
注意:hadoop和spark命令脚本有冲突,只能设置一个
核心配置文件:spark-env.sh
(*)伪分布模式: bigdata11机器
spark-env.sh
export JAVA_HOME=/root/training/jdk1.8.0_144
export SPARK_MASTER_HOST=bigdata11
export SPARK_MASTER_PORT=7077
slave文件:
bigdata11
启动:sbin/start-all.sh
Web Console: http://ip:8080 (内置了一个tomcat)
(*)全分布模式: bigdata12 bigdata13 bigdata14
(1)在主节点上进行安装
spark-env.sh
export JAVA_HOME=/root/training/jdk1.8.0_144
export SPARK_MASTER_HOST=bigdata12
export SPARK_MASTER_PORT=7077
slave文件:
bigdata13
bigdata14
(2) 复制到从节点上
scp -r spark-2.1.0-bin-hadoop2.7/ root@bigdata13:/root/training
scp -r spark-2.1.0-bin-hadoop2.7/ root@bigdata14:/root/training
(3) 在主节点上启动
启动:sbin/start-all.sh
Web Console: http://ip:8080 (内置了一个tomcat)
3、Spark的HA实现:两种方式
(1)基于文件系统的单点故障恢复:只有一个主节点、只能用于开发测试
(*)特点:把Spark的运行信息写入到一个本地的恢复目录
如果Master死掉了,恢复master的时候从恢复目录上读取之前的信息
需要人为重启
(*)Spark的运行信息
Spark Application和Worker的注册信息
(*)配置:
(a)创建目录:mkdir /root/training/spark-2.1.0-bin-hadoop2.7/recovery
(b)参数:
spark.deploy.recoveryMode:取值:默认NONE--> 没有开启HA
FILESYSTEM ---> 基于文件系统的单点故障恢复
ZOOKEEPER ---> 基于ZooKeeper实现Standby的Master
spark.deploy.recoveryDirectory: 恢复目录
(c)修改spark-env.sh
增加:export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/training/spark-2.1.0-bin-hadoop2.7/recovery"
(*)测试:启动spark-shell
bin/spark-shell --master spark://bigdata12:7077
sbin/stop-master.sh
日志
scala> 18/02/09 00:40:42 WARN StandaloneAppClient$ClientEndpoint: Connection to bigdata12:7077 failed; waiting for master to reconnect...
18/02/09 00:40:42 WARN StandaloneSchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...
18/02/09 00:40:42 WARN StandaloneAppClient$ClientEndpoint: Connection to bigdata12:7077 failed; waiting for master to reconnect...
重新启动master
(2)基于ZooKeeper实现Standby的Master
(*)复习:相当于是一个“数据库”
角色:leader、follower
功能:选举、数据同步、分布式锁(秒杀功能)
(*)原理:类似Yarn,参考讲义P41页
(*)参数
spark.deploy.recoveryMode设置为ZOOKEEPER开启单点恢复功能,默认值:NONE
spark.deploy.zookeeper.urlZooKeeper集群的地址
spark.deploy.zookeeper.dirSpark信息在ZK中的保存目录,默认:/spark
(*)修改spark-env.sh
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata12:2181,bigdata13:2181,bigdata14:2181 -Dspark.deploy.zookeeper.dir=/spark"
(*)测试
bigdata12: sbin/start-all.sh
bigdata13(14):手动启动一个master
sbin/start-master.sh
三、执行Spark任务: 客户端
1、Spark Submit工具:提交Spark的任务(jar文件)
(*)spark提供的用于提交Spark任务工具
(*)example:/root/training/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar
(*)SparkPi.scala 例子:蒙特卡罗求PI
bin/spark-submit --master spark://bigdata11:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.0.jar 100
Pi is roughly 3.1419547141954713
bin/spark-submit --master spark://bigdata11:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.0.jar 300
Pi is roughly 3.141877971395932
2、Spark Shell 工具:交互式命令行工具、作为一个Application运行
两种模式:(1)本地模式
bin/spark-shell
日志:Spark context available as 'sc' (master = local[*], app id = local-1518181597235).
(2)集群模式
bin/spark-shell --master spark://bigdata11:7077
日志:Spark context available as 'sc' (master = spark://bigdata11:7077, app id = app-20180209210815-0002).
对象:Spark context available as 'sc'
Spark session available as 'spark' ---> 在Spark 2.0后,新提供
是一个统一的访问接口:Spark Core、Spark SQL、Spark Streaming
sc.textFile("hdfs://bigdata11:9000/input/data.txt") 通过sc对象读取HDFS的文件
.flatMap(_.split(" ")) 分词操作、压平
.map((_,1)) 每个单词记一次数
.reduceByKey(_+_) 按照key进行reduce,再将value进行累加
.saveAsTextFile("hdfs://bigdata11:9000/output/spark/day0209/wc")
多说一句:
.reduceByKey(_+_)
完整
.reduceByKey((a,b) => a+b)
Array((Tom,1),(Tom,2),(Mary,3),(Tom,6))
(Tom,(1,2,6))
1+2 = 3
3+6 = 9
3、开发WordCount程序
http://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.package
(1)Scala版本: 在IDEA中
(2)Java版本(比较麻烦) :在eclipse中
四、Spark任务运行机制及原理分析
1、WordCount程序执行的过程(画图)
2、Spark提交任务的流程
五、Spark的算子:方法、函数
1、什么是RDD? 最核心
(*)弹性分布式数据集,Resilent distributed DataSet
(*)Spark中数据的基本抽象
(*)结合源码,查看RDD的概念
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
一组分区,把数据分成了的不同的分区,每个分区可能运行在不同的worker
* - A function for computing each split
一个函数,用于计算每个分区中的数据
RDD的函数(算子)
(1)Transformation
(2)Action
* - A list of dependencies on other RDDs
RDD之间存在依赖关系:(1)窄依赖 (2)宽依赖
根据依赖的关系,来划分任务的Stage(阶段)
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
如何创建一个RDD?有两种方式
(1)使用sc.parallelize方法
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)
(2)通过使用外部的数据源创建RDD:比如:HDFS
val rdd2 = sc.textFile("hdfs://bigdata11:9000/input/data.txt")
val rdd2 = sc.textFile("/root/temp/input/data.txt")
2、Transformation算子:不会触发计算、延时加载(lazy值)
map(func):该操作是对原来的RDD进行操作后,返回一个新的RDD
filter: 过滤操作、返回一个新的RDD
flatMap:类似map
mapPartitions:对每个分区进行操作
mapPartitionsWithIndex: 对每个分区进行操作,带分区的下标
union 并集
intersection 交集
distinct 去重
groupByKey: 都是按照Key进行分组
reduceByKey: 都是按照Key进行分组、会有一个本地操作(相当于:Combiner操作)
3、Action算子:会触发计算
collect: 触发计算、打印屏幕上。以数组形式返回
count: 求个数
first: 第一个元素(take(1))
take(n)
saveAsTextFile: 会转成String的形式,会调用toString()方法
foreach: 在RDD的每个元素上进行某个操作
4、RDD的缓存机制:默认在内存中
(*)提高效率
(*)默认:缓存在Memory中
(*)调用:方法:persist或者cache
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
def cache(): this.type = persist()
(*)缓存的位置:StorageLevel定义的
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
(*)示例:
测试数据:Oracle数据库的订单变 sales表(大概92万)
步骤
(1)读入数据
val rdd1 = sc.textFile("hdfs://bigdata11:9000/input/sales")
(2)计算
rdd1.count ---> Action,这一次没有缓存
rdd1.cache ---> 缓存数据,但是不会触发计算,cache是一个Transformation
rdd1.count ----> 触发计算,将结果缓存
rdd1.count ----> ???会从哪里得到数据
IDEA功能键:ctrl + n 查找类
ctl+alt+shit+N 在类中找方法
5、RDD的容错机制:checkpoint检查点:两种类型 (1)本地目录 (2)HDFS目录
(1)复习检查点:HDFS中,合并元信息
Oracle中,会以最高优先级唤醒数据库写进程(DBWn),来内存中的脏数据---> 数据文件
(2)RDD的检查点:容错机制,辅助Lineage(血统)---> 整个计算的过程
如果lineage越长,出错的概率就越大
两种类型 (1)本地目录 : 需要将spark-shell运行在本地模式上
(2)HDFS目录: 需要将spark-shell运行在集群模式上
scala> sc.setCheckpointDir("hdfs://bigdata11:9000/spark/checkpoint")
scala> val rdd1 = sc.textFile("hdfs://bigdata11:9000/input/sales")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://bigdata11:9000/input/sales MapPartitionsRDD[41] at textFile at <console>:24
scala> rdd1.checkpoint
scala> rdd1.count
源码中的说明:
/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
6、RDD的依赖关系、划分Spark任务的Stage(阶段)
(*)窄依赖:每一个父RDD的分区最多被子RDD的一个分区使用
比方:独生子女
(*)宽依赖:多个子RDD的分区会依赖同一个父RDD的分区
比方:超生
参考:P52页
7、RDD算子的基础例子
六、Spark RDD的高级算子
1、mapPartitionsWithIndex: 对RDD中的每个分区进行操作,带有分区号
定义:def mapPartitionsWithIndex[U](f: (Int, Iterator[T])=>Iterator[U], preservesPartitioning: Boolean = false)
(implicit arg0: ClassTag[U]): RDD[U]
参数说明:
f: (Int, Iterator[T])=>Iterator[U]
(*)Int: 分区号
(*)Iterator[T]: 该分区中的每个元素
(*)返回值:Iterator[U]
Demo:
(1)创建一个RDD:val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
(2)创建一个函数,作为f的值
def func1(index:Int,iter:Iterator[Int]):Iterator[String] ={
iter.toList.map(x=>"[PartID:" + index +",value="+x+"]").iterator
}
(3)调用
rdd1.mapPartitionsWithIndex(func1).collect
(4)结果:
Array([PartID:0,value=1], [PartID:0,value=2], [PartID:0,value=3], [PartID:0,value=4],
[PartID:1,value=5], [PartID:1,value=6], [PartID:1,value=7], [PartID:1,value=8], [PartID:1,value=9])
2、aggregate:聚合操作
定义:def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
作用:先对局部进行操作,再对全局进行操作
举例:
val rdd1 = sc.parallelize(List(1,2,3,4,5),2)
(1)求每个分区最大值的和
先查看每个分区中的元素:
rdd1..mapPartitionsWithIndex(func1).collect
rdd1.aggregate(0)(math.max(_,_),_+_)
(2)改一下:
rdd1.aggregate(0)(_+_,_+_) ====> 15
rdd1.aggregate(10)(math.max(_,_),_+_) ===> 30
(3)讲义上,还有一个字符串的例子 P57页
3、aggregateByKey
(1)类似aggregate,也是先对局部,再对全局
(2)区别:aggregateByKey操作<key,value>
(3)测试数据:
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
每个分区中的元素(key,value)
def func3(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
[partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)],
[partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)]
(4)把每个笼子中,每种动物最多的个数进行求和
pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect
4、coalesce和repartition
(*)都是将RDD中的分区进行重分区
(*)区别:coalesce 默认:不会进行shuffle(false)
repartition 会进行shuffle
(*)举例:
创建一个RDD
val rdd4 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
进行重分区
val rdd5 = rdd4.repartition(3)
val rdd6 = rdd4.coalesce(3,false) ---> 分区的长度: 2
val rdd6 = rdd4.coalesce(3,true) ---> 分区的长度: 2
5、其他高级算子:参考文档
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
七、Spark编程案例(都是在本地模式上运行)
测试数据:tomcat的访问日志
1、案例一:求网站的访问量
需求:找到访问量最高的两个网页
2、案例二:创建自定义的分区
类似MapReduce中的分区,根据Map的输出进行分区
需求:根据网页进行分区,相同的jsp的日志放到同一个分区下
3、案例三:访问数据库
(*)问题:在哪里创建创建一个Connection对象
(*)针对RDD: foreach和foreachPartition
如果把Connection创建放在最上边,错误的!!!!!
Caused by: java.io.NotSerializableException: oracle.jdbc.driver.T4CConnection
(*)更好的一个做法:foreachPartition ---> 针对每个分区进行操作(比如:跟外部系统通信)
(*)使用JdbcRDD 直接访问数据
局限:An RDD that executes a SQL query on a JDBC connection and reads results.
For usage example, see test case JdbcRDDSuite.
只能进行select操作
只能接受两个参数
JdbcRDD(sc: SparkContext, SparkContext对象
getConnection: () ⇒ Connection, 数据库的链接
sql: String, Select语句
lowerBound: Long, Select语句下界,就是Select的第一个参数
upperBound: Long, Select语句上界,就是Select的第二个参数
numPartitions: Int, 分区个数, 启动多少个Executor
mapRow: (ResultSet) ⇒ 结果集
欢迎关注微信公众号:大数据AI之家,定期分享清华、北大、麻省理工、中科院的博士及专家的论文、思想及授课视频,助你快速掌握最前沿实用的大数据、机器学习、区块链等人工智能技术。