这篇文章主要介绍了Spark中任务的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
我们提供的服务有:成都网站设计、网站建设、微信公众号开发、网站优化、网站认证、宣汉ssl等。为上1000家企事业单位解决了网站和推广的问题。提供周到的售前咨询和贴心的售后服务,是有科学管理、有技术的宣汉网站制作公司
任务(Task)是Spark的最小执行单元,Spark任务是通过Task来执行的。Spark的任务体系是最神秘也是最容易学习的核心模块,任务执行机制点透了那么Spark也就了解的更深入了。Task是任务体系的一个抽象类,有两个子类:ResultTask和ShuffleMapTask,这三个类构成了任务系统的核心。
ResultTask好理解,就是直接执行Task中RDD某个分区的数据操作,还记得之前的RDD的结构吗,里面有一个compute函数,任务就是执行compute函数。
ShuffleMapTask也是执行Task中RDD某个分区的数据操作,所不同的是输出结果的存储方式不一样。ShuffleMapTask会把数据操作的结果保存到类似BlockManager的全局存储中,ShuffleMapTask的结果可供下一个Task作为输入数据。为什么分两种呢?换个说法就很清楚了,ResultTask对应窄依赖的RDD,ShuffleMapTask对应宽依赖的RDD操作(如全连接操作)。ShuffleMapTask需要对数据的读写进行特殊的处理,要用BlockManager来输出数据集的;同样,ShuffleMapTask的子RDD的读取数据集也是从BlockManager来的。
ResultTask和ShuffleMapTask的类的代码非常简单,就是重写runTask方法。
Task通过Task描述对象来反序列化,获得RDD和分区等对象后,创建TaskContextImpl作为任务上下文,然后执行run方法运行任务,读取RDD中的迭代器数据并处理数据。run方法实际是调用子类重写的runTask方法具体执行的。而runTask方法在ResultTask和ShuffleMapTask中被重写。
直接结果任务,这类任务执行完也就完了,其数据不需要被下一个任务再次处理。可以任务是终结者任务。
重写runTask方法。runTask方法的核心代码如下:
override def runTask(context: TaskContext): U = { val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) func(context, rdd.iterator(partition, context)) }
反序列化得到RDD中定义的数据处理函数func,func符合格式:
(TaskContext, Iterator[T]) => U
然后执行:
func(context, rdd.iterator(partition, context))
这方法的意思就是对rdd分区的数据迭代器轮询,每次取出一条数据执行func操作。ResultTask的重写部分就是这么简单。
ShuffleMap格式的任务,这类任务的执行结果是要被下一个RDD消费的,因此输出数据需要写出到Shuffle区域。Shuffle区域会在分区数据管理中详细的介绍。
重写runTask方法。runTask方法的核心代码如下:
override def runTask(context: TaskContext): MapStatus = { val ser = SparkEnv.get.closureSerializer.newInstance() val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) val rdd = rddAndDep._1 val dep = rddAndDep._2 dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition) }
前半段和Result类似,反序列化得到RDD和分区,以及依赖分区dep。然后迭代rdd中的数据并写入到依赖dep的shuffle区域中。
感谢你能够认真阅读完这篇文章,希望小编分享的“Spark中任务的示例分析”这篇文章对大家有帮助,同时也希望大家多多支持创新互联,关注创新互联行业资讯频道,更多相关知识等着你来学习!