大数据spark基础编程 (基于spark大数据分析)

spark大数据处理平台的缺点,大数据spark基础编程

一、什么是Spark?

1、什么是Spark?生态体系结构

Apache Spark™ is a fast and general engine for large-scale data processing.

生态圈:

1、Spark Core

2、Spark SQL

3、Spark Streaming

4、Spark MLLib:机器学习

5、Spark GraphX:图计算

2、为什么要学习Spark?

复习:MapReduce的Shuffle过程

Spark的最大特点:基于内存

Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。

spark大数据处理平台的缺点,大数据spark基础编程

3、Spark的特点

(1)快:基于内存

同时也是缺点:没有对内存进行管理,把所有的内存管理都交给应用程序,容易出现OOM(outof memory 内存溢出)

如何分析Java内存溢出?? 工具:Java Heap Dump

https://www.cnblogs.com/JackDesperado/p/4798499.html

(2)易用:Java、Scala

(3)通用:不同的组件

Hive推荐使用Spark作为执行引擎 ------> 配置Hive On Spark非常麻烦,不成熟

提供文档:Hive On Spark

(4)兼容性:Hadoop的生态圈

二、Spark的体系结构和安装配置

1、体系结构:Client-Server(主从模式) ----> 单点故障:HA(ZooKeeper)

http://spark.apache.org/docs/latest/cluster-overview.html

准备工作:安装Linux、JDK、主机名、免密码登录

2、安装和部署:standalone

tar -zxvf spark-2.1.0-bin-hadoop2.7.tgz -C ~/training/

注意:hadoop和spark命令脚本有冲突,只能设置一个

核心配置文件:spark-env.sh

(*)伪分布模式: bigdata11机器

spark-env.sh

export JAVA_HOME=/root/training/jdk1.8.0_144

export SPARK_MASTER_HOST=bigdata11

export SPARK_MASTER_PORT=7077

slave文件:

bigdata11

启动:sbin/start-all.sh

Web Console: http://ip:8080 (内置了一个tomcat)

(*)全分布模式: bigdata12 bigdata13 bigdata14

(1)在主节点上进行安装

spark-env.sh

export JAVA_HOME=/root/training/jdk1.8.0_144

export SPARK_MASTER_HOST=bigdata12

export SPARK_MASTER_PORT=7077

slave文件:

bigdata13

bigdata14

(2) 复制到从节点上

scp -r spark-2.1.0-bin-hadoop2.7/ root@bigdata13:/root/training

scp -r spark-2.1.0-bin-hadoop2.7/ root@bigdata14:/root/training

(3) 在主节点上启动

启动:sbin/start-all.sh

Web Console: http://ip:8080 (内置了一个tomcat)

3、Spark的HA实现:两种方式

(1)基于文件系统的单点故障恢复:只有一个主节点、只能用于开发测试

(*)特点:把Spark的运行信息写入到一个本地的恢复目录

如果Master死掉了,恢复master的时候从恢复目录上读取之前的信息

需要人为重启

(*)Spark的运行信息

Spark Application和Worker的注册信息

(*)配置:

(a)创建目录:mkdir /root/training/spark-2.1.0-bin-hadoop2.7/recovery

(b)参数:

spark.deploy.recoveryMode:取值:默认NONE--> 没有开启HA

FILESYSTEM ---> 基于文件系统的单点故障恢复

ZOOKEEPER ---> 基于ZooKeeper实现Standby的Master

spark.deploy.recoveryDirectory: 恢复目录

(c)修改spark-env.sh

增加:export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/training/spark-2.1.0-bin-hadoop2.7/recovery"

(*)测试:启动spark-shell

bin/spark-shell --master spark://bigdata12:7077

sbin/stop-master.sh

日志

scala> 18/02/09 00:40:42 WARN StandaloneAppClient$ClientEndpoint: Connection to bigdata12:7077 failed; waiting for master to reconnect...

18/02/09 00:40:42 WARN StandaloneSchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...

18/02/09 00:40:42 WARN StandaloneAppClient$ClientEndpoint: Connection to bigdata12:7077 failed; waiting for master to reconnect...

重新启动master

(2)基于ZooKeeper实现Standby的Master

(*)复习:相当于是一个“数据库”

角色:leader、follower

功能:选举、数据同步、分布式锁(秒杀功能)

(*)原理:类似Yarn,参考讲义P41页

(*)参数

spark.deploy.recoveryMode设置为ZOOKEEPER开启单点恢复功能,默认值:NONE

spark.deploy.zookeeper.urlZooKeeper集群的地址

spark.deploy.zookeeper.dirSpark信息在ZK中的保存目录,默认:/spark

(*)修改spark-env.sh

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata12:2181,bigdata13:2181,bigdata14:2181 -Dspark.deploy.zookeeper.dir=/spark"

(*)测试

bigdata12: sbin/start-all.sh

bigdata13(14):手动启动一个master

sbin/start-master.sh

三、执行Spark任务: 客户端

1、Spark Submit工具:提交Spark的任务(jar文件)

(*)spark提供的用于提交Spark任务工具

(*)example:/root/training/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar

(*)SparkPi.scala 例子:蒙特卡罗求PI

bin/spark-submit --master spark://bigdata11:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.0.jar 100

Pi is roughly 3.1419547141954713

bin/spark-submit --master spark://bigdata11:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.0.jar 300

Pi is roughly 3.141877971395932

2、Spark Shell 工具:交互式命令行工具、作为一个Application运行

两种模式:(1)本地模式

bin/spark-shell

日志:Spark context available as 'sc' (master = local[*], app id = local-1518181597235).

(2)集群模式

bin/spark-shell --master spark://bigdata11:7077

日志:Spark context available as 'sc' (master = spark://bigdata11:7077, app id = app-20180209210815-0002).

对象:Spark context available as 'sc'

Spark session available as 'spark' ---> 在Spark 2.0后,新提供

是一个统一的访问接口:Spark Core、Spark SQL、Spark Streaming

sc.textFile("hdfs://bigdata11:9000/input/data.txt") 通过sc对象读取HDFS的文件

.flatMap(_.split(" ")) 分词操作、压平

.map((_,1)) 每个单词记一次数

.reduceByKey(_+_) 按照key进行reduce,再将value进行累加

.saveAsTextFile("hdfs://bigdata11:9000/output/spark/day0209/wc")

多说一句:

.reduceByKey(_+_)

完整

.reduceByKey((a,b) => a+b)

Array((Tom,1),(Tom,2),(Mary,3),(Tom,6))

(Tom,(1,2,6))

1+2 = 3

3+6 = 9

3、开发WordCount程序

http://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.package

(1)Scala版本: 在IDEA中

(2)Java版本(比较麻烦) :在eclipse中

四、Spark任务运行机制及原理分析

1、WordCount程序执行的过程(画图)

2、Spark提交任务的流程

五、Spark的算子:方法、函数

1、什么是RDD? 最核心

(*)弹性分布式数据集,Resilent distributed DataSet

(*)Spark中数据的基本抽象

(*)结合源码,查看RDD的概念

* Internally, each RDD is characterized by five main properties:

*

* - A list of partitions

一组分区,把数据分成了的不同的分区,每个分区可能运行在不同的worker

* - A function for computing each split

一个函数,用于计算每个分区中的数据

RDD的函数(算子)

(1)Transformation

(2)Action

* - A list of dependencies on other RDDs

RDD之间存在依赖关系:(1)窄依赖 (2)宽依赖

根据依赖的关系,来划分任务的Stage(阶段)

* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

如何创建一个RDD?有两种方式

(1)使用sc.parallelize方法

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)

(2)通过使用外部的数据源创建RDD:比如:HDFS

val rdd2 = sc.textFile("hdfs://bigdata11:9000/input/data.txt")

val rdd2 = sc.textFile("/root/temp/input/data.txt")

2、Transformation算子:不会触发计算、延时加载(lazy值)

map(func):该操作是对原来的RDD进行操作后,返回一个新的RDD

filter: 过滤操作、返回一个新的RDD

flatMap:类似map

mapPartitions:对每个分区进行操作

mapPartitionsWithIndex: 对每个分区进行操作,带分区的下标

union 并集

intersection 交集

distinct 去重

groupByKey: 都是按照Key进行分组

reduceByKey: 都是按照Key进行分组、会有一个本地操作(相当于:Combiner操作)

3、Action算子:会触发计算

collect: 触发计算、打印屏幕上。以数组形式返回

count: 求个数

first: 第一个元素(take(1))

take(n)

saveAsTextFile: 会转成String的形式,会调用toString()方法

foreach: 在RDD的每个元素上进行某个操作

4、RDD的缓存机制:默认在内存中

(*)提高效率

(*)默认:缓存在Memory中

(*)调用:方法:persist或者cache

def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

def cache(): this.type = persist()

(*)缓存的位置:StorageLevel定义的

val NONE = new StorageLevel(false, false, false, false)

val DISK_ONLY = new StorageLevel(true, false, false, false)

val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)

val MEMORY_ONLY = new StorageLevel(false, true, false, true)

val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)

val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)

val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)

val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)

val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)

val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

(*)示例:

测试数据:Oracle数据库的订单变 sales表(大概92万)

步骤

(1)读入数据

val rdd1 = sc.textFile("hdfs://bigdata11:9000/input/sales")

(2)计算

rdd1.count ---> Action,这一次没有缓存

rdd1.cache ---> 缓存数据,但是不会触发计算,cache是一个Transformation

rdd1.count ----> 触发计算,将结果缓存

rdd1.count ----> ???会从哪里得到数据

IDEA功能键:ctrl + n 查找类

ctl+alt+shit+N 在类中找方法

5、RDD的容错机制:checkpoint检查点:两种类型 (1)本地目录 (2)HDFS目录

(1)复习检查点:HDFS中,合并元信息

Oracle中,会以最高优先级唤醒数据库写进程(DBWn),来内存中的脏数据---> 数据文件

(2)RDD的检查点:容错机制,辅助Lineage(血统)---> 整个计算的过程

如果lineage越长,出错的概率就越大

两种类型 (1)本地目录 : 需要将spark-shell运行在本地模式上

(2)HDFS目录: 需要将spark-shell运行在集群模式上

scala> sc.setCheckpointDir("hdfs://bigdata11:9000/spark/checkpoint")

scala> val rdd1 = sc.textFile("hdfs://bigdata11:9000/input/sales")

rdd1: org.apache.spark.rdd.RDD[String] = hdfs://bigdata11:9000/input/sales MapPartitionsRDD[41] at textFile at <console>:24

scala> rdd1.checkpoint

scala> rdd1.count

源码中的说明:

/**

* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint

* directory set with `SparkContext#setCheckpointDir` and all references to its parent

* RDDs will be removed. This function must be called before any job has been

* executed on this RDD. It is strongly recommended that this RDD is persisted in

* memory, otherwise saving it on a file will require recomputation.

*/

6、RDD的依赖关系、划分Spark任务的Stage(阶段)

(*)窄依赖:每一个父RDD的分区最多被子RDD的一个分区使用

比方:独生子女

(*)宽依赖:多个子RDD的分区会依赖同一个父RDD的分区

比方:超生

参考:P52页

7、RDD算子的基础例子

六、Spark RDD的高级算子

1、mapPartitionsWithIndex: 对RDD中的每个分区进行操作,带有分区号

定义:def mapPartitionsWithIndex[U](f: (Int, Iterator[T])=>Iterator[U], preservesPartitioning: Boolean = false)

(implicit arg0: ClassTag[U]): RDD[U]

参数说明:

f: (Int, Iterator[T])=>Iterator[U]

(*)Int: 分区号

(*)Iterator[T]: 该分区中的每个元素

(*)返回值:Iterator[U]

Demo:

(1)创建一个RDD:val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)

(2)创建一个函数,作为f的值

def func1(index:Int,iter:Iterator[Int]):Iterator[String] ={

iter.toList.map(x=>"[PartID:" + index +",value="+x+"]").iterator

}

(3)调用

rdd1.mapPartitionsWithIndex(func1).collect

(4)结果:

Array([PartID:0,value=1], [PartID:0,value=2], [PartID:0,value=3], [PartID:0,value=4],

[PartID:1,value=5], [PartID:1,value=6], [PartID:1,value=7], [PartID:1,value=8], [PartID:1,value=9])

2、aggregate:聚合操作

定义:def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

作用:先对局部进行操作,再对全局进行操作

举例:

val rdd1 = sc.parallelize(List(1,2,3,4,5),2)

(1)求每个分区最大值的和

先查看每个分区中的元素:

rdd1..mapPartitionsWithIndex(func1).collect

rdd1.aggregate(0)(math.max(_,_),_+_)

(2)改一下:

rdd1.aggregate(0)(_+_,_+_) ====> 15

rdd1.aggregate(10)(math.max(_,_),_+_) ===> 30

(3)讲义上,还有一个字符串的例子 P57页

3、aggregateByKey

(1)类似aggregate,也是先对局部,再对全局

(2)区别:aggregateByKey操作<key,value>

(3)测试数据:

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

每个分区中的元素(key,value)

def func3(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {

iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator

}

[partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)],

[partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)]

(4)把每个笼子中,每种动物最多的个数进行求和

pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect

4、coalesce和repartition

(*)都是将RDD中的分区进行重分区

(*)区别:coalesce 默认:不会进行shuffle(false)

repartition 会进行shuffle

(*)举例:

创建一个RDD

val rdd4 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)

进行重分区

val rdd5 = rdd4.repartition(3)

val rdd6 = rdd4.coalesce(3,false) ---> 分区的长度: 2

val rdd6 = rdd4.coalesce(3,true) ---> 分区的长度: 2

5、其他高级算子:参考文档

http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

七、Spark编程案例(都是在本地模式上运行)

测试数据:tomcat的访问日志

1、案例一:求网站的访问量

需求:找到访问量最高的两个网页

2、案例二:创建自定义的分区

类似MapReduce中的分区,根据Map的输出进行分区

需求:根据网页进行分区,相同的jsp的日志放到同一个分区下

3、案例三:访问数据库

(*)问题:在哪里创建创建一个Connection对象

(*)针对RDD: foreach和foreachPartition

如果把Connection创建放在最上边,错误的!!!!!

Caused by: java.io.NotSerializableException: oracle.jdbc.driver.T4CConnection

(*)更好的一个做法:foreachPartition ---> 针对每个分区进行操作(比如:跟外部系统通信)

(*)使用JdbcRDD 直接访问数据

局限:An RDD that executes a SQL query on a JDBC connection and reads results.

For usage example, see test case JdbcRDDSuite.

只能进行select操作

只能接受两个参数

JdbcRDD(sc: SparkContext, SparkContext对象

getConnection: () ⇒ Connection, 数据库的链接

sql: String, Select语句

lowerBound: Long, Select语句下界,就是Select的第一个参数

upperBound: Long, Select语句上界,就是Select的第二个参数

numPartitions: Int, 分区个数, 启动多少个Executor

mapRow: (ResultSet) ⇒ 结果集

欢迎关注微信公众号:大数据AI之家,定期分享清华、北大、麻省理工、中科院的博士及专家的论文、思想及授课视频,助你快速掌握最前沿实用的大数据、机器学习、区块链等人工智能技术。