spark资源动态分配 (spark调度的过程)

资源调度源码分析

资源请求简单图

spark调度配置,spark任务调度

资源调度Master路径:

spark调度配置,spark任务调度

路径:spark-1.6.0/core/src/main/scala/org.apache.spark/deploy/Master/Master.scala

提交应用程序,submit的路径:

spark调度配置,spark任务调度

路径:spark-1.6.0/core/src/main/scala/org.apache.spark/ deploy/SparkSubmit.scala

总结:

  1. Executor在集群中分散启动,有利于task计算的数据本地化。
  2. 默认情况下(提交任务的时候没有设置--executor-cores选项),每一个Worker为当前的Application启动一个Executor,这个Executor会使用这个Worker的所有的cores和1G内存。
  3. 如果想在Worker上启动多个Executor,提交Application的时候要加--executor-cores这个选项。
  4. 默认情况下没有设置--total-executor-cores,一个Application会使用Spark集群中所有的cores。

结论演示

使用Spark-submit提交任务演示。也可以使用spark-shell

  1. 默认情况每个worker为当前的Application启动一个Executor,这个Executor使用集群中所有的cores和1G内存。
./spark-submit --master spark://node01:7077
 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10000

运行结果

spark调度配置,spark任务调度

2.在workr上启动多个Executor,设置--executor-cores参数指定每个executor使用的core数量。

./spark-submit  --master  spark://node01:7077 --executor-cores 1 
 --class org.apache.spark.examples.SparkPi  ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10000

运行结果

spark调度配置,spark任务调度

3.内存不足的情况下启动core的情况。Spark启动是不仅看core配置参数,也要看配置的core的内存是否够用。

./spark-submit  --master  spark://node01:7077 --executor-cores 1  --executor-memory 3g 
--class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10000

4.--total-executor-cores集群*共中**使用多少cores

注意:一个进程不能让集群多个节点共同启动。

./spark-submit  --master  spark://node01:7077  --executor-cores 1  --executor-memory 2g 
--total-executor-cores 3
--class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10000

任务调度源码分析

Action算子开始分析

任务调度可以从一个Action类算子开始。因为Action类算子会触发一个job的执行。

划分stage,以taskSet形式提交任务

DAGScheduler 类中getMessingParentStages()方法是切割job划分stage。可以结合以下这张图来分析:

spark调度配置,spark任务调度

二次排序

在项目中添加一个SecondSort.txt文件

排序前文件中内容

spark调度配置,spark任务调度

编写代码

package com.gw.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
object SparkSecondSort {
 def main(args: Array[String]): Unit = {
 val sconf=new SparkConf().setAppName("SecondSort").setMaster("local")
 val sc=new SparkContext(sconf)
 val lines=sc.textFile("secondSort.txt")
 
 val pairs=  lines.map { x=>(new SecondSortKey(x.split(" ")(0).toInt,x.split(" ")(1).toInt),x) }
 val sortedPairs= pairs.sortByKey(false)
//      val sortedPairs = pairs.sortBy(_._1, false)
     sortedPairs.map(_._2).foreach {println }
     sc.stop()
  }
}
class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable {
 def compare(that: SecondSortKey): Int = {
 if(this.first-that.first==0)
 this.second- that.second 
 else 
 this.first-that.first
  }
}

运行效果

spark调度配置,spark任务调度

topN和分组取topN

topN

需求:获取成绩单中,成绩排在前五的学生信息

在项目中添加一个top.txt文件,在文件中以K,V(K表示成绩,V表示姓名)对的形式添加数据,如下图:

spark调度配置,spark任务调度

编写代码

package com.gw.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object SparkTopN {
 def main(args: Array[String]): Unit = {
 val conf = new SparkConf().setAppName("TopN").setMaster("local")
 val sc = new SparkContext(conf)
 val lines=sc.textFile("top.txt")
 val lineList=lines.map(x=>(x.split(",")(0),x))
 val sortRdd = lineList.sortByKey(false)
 val resultRDD  = sortRdd.map(x=>x._2)
 for(a <-resultRDD.take(5)){
      println(a)
    }
    sc.stop()
  }
}

运行结果

spark调度配置,spark任务调度

分组取topN

需求:给每个班级的学生成绩排序

在项目中添加一个文件scores.txt,在文件中编写K,V(K表示班级,V表示成绩)格式的数据,如下图

spark调度配置,spark任务调度

编写代码

package com.gw.scala
import org.apache.spark.{SparkContext, SparkConf}
object SparkGroupTopN {
 def main(args: Array[String]): Unit = {
 val conf = new SparkConf().setAppName("GroupTopN").setMaster("local")
 val sc = new SparkContext(conf)
 val lines=sc.textFile("scores.txt")
 val lineList=lines.map(x=>(x.split("\t")(0),x.split("\t")(1))).groupByKey()
 val topList=lineList.map(x=>{
 var t = List[Int]()
 for(a<-x._2){
        t = t.::(a.toInt)
      }
      println(x._1)
      t.sortBy { x => -x }.take(3)
    })
    topList.foreach { println }
  }
}

运行结果

spark调度配置,spark任务调度