MapReduce —— MapTask阶段源码分析(Output环节)
MapReduce —— MapTask阶段源码分析(Output环节)
接上一节Input
环节,接下来分析 output
环节。代码在runNewMapper()
方法中:
private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical,TaskReporter reporter) { ....... // 这个out也被包含在map的上下文当中了,所以在map方法中的输出,调用的是output的write方法 org.apache.hadoop.mapreduce.RecordWriter output = null; // 记住这个数值 0 if (job.getNumReduceTasks() == 0) { // 判断ReduceTask的数量 output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { // > 0 // 创建一个 Collector 对象 【看构造源码可以知道输出的时候是需要分区的】 output = new NewOutputCollector(taskContext, job, umbilical, reporter); } // -----------new NewOutputCollector() begin ------------------ NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { //1、 赋值操作。先不仔细看,跳过~ 下一段说 collector = createSortingCollector(job, reporter); // 2、有多少个reducetask 就有多少个分区 // 回忆:一个分区可以有若干组,相同的key为一组 partitions = jobContext.getNumReduceTasks(); if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) // 常见套路:反射生成实例对象,如果有自定义分区器,则不使用默认的 // 默认的分区算法是简单的hash取模,会保证相同的key在一组 ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { // reducetask = 1,所有的组都会进入一个分区 partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() { // 返回分区号,返回的值固定为 0 public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; } }// -----------new NewOutputCollector() end ------------------ // -----------write(K key, V value) begin ------------------ // output往外写的时候带着 (k v p) 三元组 public void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, partitioner.getPartition(key, value, partitions));// -----------write(K key, V value) end -------------------- .............. }
createSortingCollector(job, reporter)
方法进去:
private <KEY, VALUE> MapOutputCollector<KEY, VALUE> createSortingCollector(JobConf job, TaskReporter reporter) throws IOException, ClassNotFoundException { // 反射创建collector实例 MapOutputCollector<KEY, VALUE> collector = (MapOutputCollector<KEY, VALUE>) // 常见套路:如果没有用户自定义collector,那么就取默认的 ReflectionUtils.newInstance( job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, // MapOutputBuffer 这玩意牛逼,后边再说。 MapOutputBuffer.class, MapOutputCollector.class), job); MapOutputCollector.Context context = new MapOutputCollector.Context(this, job, reporter); // 初始化的就是 MapOutputBuffer,真正要使用它之前要初始化。 // 重要方法,下段分析 collector.init(context); return collector; }
重头戏了,进入初始化环节:collector.init(context)
,删除非核心代码,清清爽爽开开心心读源码 ~
public void init(MapOutputCollector.Context context) { // 0.随便看看 job = context.getJobConf(); reporter = context.getReporter(); mapTask = context.getMapTask(); mapOutputFile = mapTask.getMapOutputFile(); sortPhase = mapTask.getSortPhase(); spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS); partitions = job.getNumReduceTasks(); rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw(); // 1.溢写的阈值 0.8 , 剩下的 0.2 空间还可以继续使用 final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); // 2.缓冲区的默认大小 final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); // 3. 排序器:如果没有自定义,就使用默认的快排算法 // 排序的本质就是在做比较:字典序或者数值序,所以排序器要用到【比较器】后边会说 sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job); //--------------------这可就是大名鼎鼎的环形缓冲区,真™牛X的设计--------------- int maxMemUsage = sortmb << 20; maxMemUsage -= maxMemUsage % METASIZE; kvbuffer = new byte[maxMemUsage]; bufvoid = kvbuffer.length; kvmeta = ByteBuffer.wrap(kvbuffer) .order(ByteOrder.nativeOrder()) .asIntBuffer(); setEquator(0); bufstart = bufend = bufindex = equator; kvstart = kvend = kvindex; maxRec = kvmeta.capacity() / NMETA; softLimit = (int)(kvbuffer.length * spillper); bufferRemaining = softLimit; //-------------------------------------------------------------------- // k/v serialization // 4.获取【比较器】进行排序。如果没有自定义,就使用默认的。 // key 类型都是Hadoop封装的可序列化类,自身都带比较器 comparator = job.getOutputKeyComparator(); ............. // output counters ............. // compression:数据压缩 ............ // combiner:相同的key在map端做一次合并,减少reduce拉取的数据量.为我们提供了调优接口 // 俗称:小reduce ,会在map端发生一次或多次. 之后的文章会介绍这个源码 ............. // 4. 溢写线程 // 当环形缓冲区的占用到80%,将缓冲区中的数据写入到磁盘 // 此时的缓冲区是多个线程共享的:有线程在往磁盘写,有线程在往缓冲区写 // 怎样防止读写线程碰撞?答:反向写数据到缓冲区 spillInProgress = false; minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3); spillThread.setDaemon(true); spillThread.setName("SpillThread"); spillLock.lock(); try { spillThread.start(); while (!spillThreadRunning) { spillDone.await(); } } catch (InterruptedException e) { } finally { spillLock.unlock(); } }
后边源码也没必要一行行看了,直接文字总结描述了
MapOutBuffer:
map 输出的K-V会被序列化成字节数组,计算出分区号,最终是三元组<k,v,p>
buffer 是map过程使用到的环形缓冲区:
本质是字节数组;
赤道:两端分别存放K-V,索引;
索引:对K-V的索引,固定长度16B,4个int:分区号P,K的偏移量,V的偏移量,V的数据长度;
数据填充到缓冲区的阈值 80% 时,启动溢写线程;
快速排序 80%的数据,同时Map输出的线程向缓冲区的剩余部分写入;
快速排序的过程,比较的是key,但是移动的是索引;
溢写时只要排序后的索引,溢出数据就是有序的;
注意:排序是二次排序:
分区有序:reduce拉取数据是按照分区拉取;
分区内key 有序:因为reduce计算是按照分组计算;
调优:在溢写过程中会发生combiner
其实就是一个 map 里的reduce,按照组进行统计;
发生时间点:排序之后相同的key放在一起了,开始combiner,然后溢写;
minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3)
,最终map结束输出过程buffer会溢出多个小文件,当文件的个数达到3个时,map会把小文件合并,避免文件的碎片化【小文件问题,后边还会提及】
附 溢写线程相关源码:
protected class SpillThread extends Thread { @Override public void run() { spillLock.lock(); spillThreadRunning = true; try { while (true) { spillDone.signal(); while (!spillInProgress) { spillReady.await(); } try { spillLock.unlock(); // 排序并溢写会被调用 sortAndSpill(); } catch (Throwable t) { sortSpillException = t; } finally { spillLock.lock(); if (bufend < bufstart) { bufvoid = kvbuffer.length; } kvstart = kvend; bufstart = bufend; spillInProgress = false; } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { spillLock.unlock(); spillThreadRunning = false; } } }
sortAndSpill()
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { //approximate the length of the output file to be the length of the //buffer + header lengths for the partitions final long size = (bufend >= bufstart ? bufend - bufstart : (bufvoid - bufend) + bufstart) + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size); out = rfs.create(filename); final int mstart = kvend / NMETA; final int mend = 1 + // kvend is a valid record (kvstart >= kvend ? kvstart : kvmeta.capacity() + kvstart) / NMETA; sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); int spindex = mstart; final IndexRecord rec = new IndexRecord(); final InMemValBytes value = new InMemValBytes(); for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); writer = new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter); // 会调用combiner if (combinerRunner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { final int kvoff = offsetFor(spindex % maxRec); int keystart = kvmeta.get(kvoff + KEYSTART); int valstart = kvmeta.get(kvoff + VALSTART); key.reset(kvbuffer, keystart, valstart - keystart); getVBytesForOffset(kvoff, value); writer.append(key, value); ++spindex; } } else { int spstart = spindex; while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); } }
来源https://www.cnblogs.com/simon-1024/p/14874155.html