评论

收藏

[Java] MapTask阶段shuffle源码分析

编程语言 编程语言 发布于:2021-10-05 22:46 | 阅读数:381 | 评论:0

今天小编就为大家分享一篇关于MapTask阶段shuffle源码分析,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
1. 收集阶段
在mapper中,调用context.write(key,value)实际是调用代理newoutputcollector的wirte方法
public void write(keyout key, valueout value
      ) throws ioexception, interruptedexception {
  output.write(key, value);
 }
实际调用的是mapoutputbuffer的collect(),在进行收集前,调用partitioner来计算每个key-value的分区号
@override
  public void write(k key, v value) throws ioexception, interruptedexception {
   collector.collect(key, value,
      partitioner.getpartition(key, value, partitions));
  }
2. newoutputcollector对象的创建
@suppresswarnings("unchecked")
  newoutputcollector(org.apache.hadoop.mapreduce.jobcontext jobcontext,
      jobconf job,
      taskumbilicalprotocol umbilical,
      taskreporter reporter
      ) throws ioexception, classnotfoundexception {
  // 创建实际用来收集key-value的缓存区对象
   collector = createsortingcollector(job, reporter);
  // 获取总的分区个数
   partitions = jobcontext.getnumreducetasks();
   if (partitions > 1) {
  partitioner = (org.apache.hadoop.mapreduce.partitioner<k,v>)
   reflectionutils.newinstance(jobcontext.getpartitionerclass(), job);
   } else {
  // 默认情况,直接创建一个匿名内部类,所有的key-value都分配到0号分区
  partitioner = new org.apache.hadoop.mapreduce.partitioner<k,v>() {
   @override
   public int getpartition(k key, v value, int numpartitions) {
    return partitions - 1;
   }
  };
   }
  }
3. 创建环形缓冲区对象
@suppresswarnings("unchecked")
 private <key, value> mapoutputcollector<key, value>
   createsortingcollector(jobconf job, taskreporter reporter)
  throws ioexception, classnotfoundexception {
  mapoutputcollector.context context =
   new mapoutputcollector.context(this, job, reporter);
  // 从当前job的配置中,获取mapreduce.job.map.output.collector.class,如果没有设置,使用mapoutputbuffer.class
  class<?>[] collectorclasses = job.getclasses(
   jobcontext.map_output_collector_class_attr, mapoutputbuffer.class);
  int remainingcollectors = collectorclasses.length;
  exception lastexception = null;
  for (class clazz : collectorclasses) {
   try {
  if (!mapoutputcollector.class.isassignablefrom(clazz)) {
   throw new ioexception("invalid output collector class: " + clazz.getname() +
    " (does not implement mapoutputcollector)");
  }
  class<? extends mapoutputcollector> subclazz =
   clazz.assubclass(mapoutputcollector.class);
  log.debug("trying map output collector class: " + subclazz.getname());
   // 创建缓冲区对象
  mapoutputcollector<key, value> collector =
   reflectionutils.newinstance(subclazz, job);
   // 创建完缓冲区对象后,执行初始化
  collector.init(context);
  log.info("map output collector class = " + collector.getclass().getname());
  return collector;
   } catch (exception e) {
  string msg = "unable to initialize mapoutputcollector " + clazz.getname();
  if (--remainingcollectors > 0) {
   msg += " (" + remainingcollectors + " more collector(s) to try)";
  }
  lastexception = e;
  log.warn(msg, e);
   }
  }
  throw new ioexception("initialization of all the collectors failed. " +
   "error in last collector was :" + lastexception.getmessage(), lastexception);
 }
3. mapoutputbuffer的初始化   环形缓冲区对象
@suppresswarnings("unchecked")
  public void init(mapoutputcollector.context context
      ) throws ioexception, classnotfoundexception {
   job = context.getjobconf();
   reporter = context.getreporter();
   maptask = context.getmaptask();
   mapoutputfile = maptask.getmapoutputfile();
   sortphase = maptask.getsortphase();
   spilledrecordscounter = reporter.getcounter(taskcounter.spilled_records);
   // 获取分区总个数,取决于reducetask的数量
   partitions = job.getnumreducetasks();
   rfs = ((localfilesystem)filesystem.getlocal(job)).getraw();
   //sanity checks
   // 从当前配置中,获取mapreduce.map.sort.spill.percent,如果没有设置,就是0.8
   final float spillper =
  job.getfloat(jobcontext.map_sort_spill_percent, (float)0.8);
   // 获取mapreduce.task.io.sort.mb,如果没设置,就是100mb
   final int sortmb = job.getint(jobcontext.io_sort_mb, 100);
   indexcachememorylimit = job.getint(jobcontext.index_cache_memory_limit,
           index_cache_memory_limit_default);
   if (spillper > (float)1.0 || spillper <= (float)0.0) {
  throw new ioexception("invalid "" + jobcontext.map_sort_spill_percent +
    "": " + spillper);
   }
   if ((sortmb & 0x7ff) != sortmb) {
  throw new ioexception(
    "invalid "" + jobcontext.io_sort_mb + "": " + sortmb);
   }
// 在溢写前,对key-value排序,采用的排序器,使用快速排序,只排索引
   sorter = reflectionutils.newinstance(job.getclass("map.sort.class",
    quicksort.class, indexedsorter.class), job);
   // buffers and accounting
   int maxmemusage = sortmb << 20;
   maxmemusage -= maxmemusage % metasize;
   // 存放key-value
   kvbuffer = new byte[maxmemusage];
   bufvoid = kvbuffer.length;
  // 存储key-value的属性信息,分区号,索引等
   kvmeta = bytebuffer.wrap(kvbuffer)
   .order(byteorder.nativeorder())
   .asintbuffer();
   setequator(0);
   bufstart = bufend = bufindex = equator;
   kvstart = kvend = kvindex;
   maxrec = kvmeta.capacity() / nmeta;
   softlimit = (int)(kvbuffer.length * spillper);
   bufferremaining = softlimit;
   log.info(jobcontext.io_sort_mb + ": " + sortmb);
   log.info("soft limit at " + softlimit);
   log.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
   log.info("kvstart = " + kvstart + "; length = " + maxrec);
   // k/v serialization
  // 获取快速排序的key的比较器,排序只按照key进行排序!
   comparator = job.getoutputkeycomparator();
  // 获取key-value的序列化器
   keyclass = (class<k>)job.getmapoutputkeyclass();
   valclass = (class<v>)job.getmapoutputvalueclass();
   serializationfactory = new serializationfactory(job);
   keyserializer = serializationfactory.getserializer(keyclass);
   keyserializer.open(bb);
   valserializer = serializationfactory.getserializer(valclass);
   valserializer.open(bb);
   // output counters
   mapoutputbytecounter = reporter.getcounter(taskcounter.map_output_bytes);
   mapoutputrecordcounter =
  reporter.getcounter(taskcounter.map_output_records);
   fileoutputbytecounter = reporter
   .getcounter(taskcounter.map_output_materialized_bytes);
   // 溢写到磁盘,可以使用一个压缩格式! 获取指定的压缩编解码器
   // compression
   if (job.getcompressmapoutput()) {
  class<? extends compressioncodec> codecclass =
   job.getmapoutputcompressorclass(defaultcodec.class);
  codec = reflectionutils.newinstance(codecclass, job);
   } else {
  codec = null;
   }
   // 获取combiner组件
   // combiner
   final counters.counter combineinputcounter =
  reporter.getcounter(taskcounter.combine_input_records);
   combinerrunner = combinerrunner.create(job, gettaskid(),
             combineinputcounter,
             reporter, null);
   if (combinerrunner != null) {
  final counters.counter combineoutputcounter =
   reporter.getcounter(taskcounter.combine_output_records);
  combinecollector= new combineoutputcollector<k,v>(combineoutputcounter, reporter, job);
   } else {
  combinecollector = null;
   }
   spillinprogress = false;
   minspillsforcombine = job.getint(jobcontext.map_combine_min_spills, 3);
   // 设置溢写线程在后台运行,溢写是在后台运行另外一个溢写线程!和收集是两个线程!
   spillthread.setdaemon(true);
   spillthread.setname("spillthread");
   spilllock.lock();
   try {
   // 启动线程
  spillthread.start();
  while (!spillthreadrunning) {
   spilldone.await();
  }
   } catch (interruptedexception e) {
  throw new ioexception("spill thread failed to initialize", e);
   } finally {
  spilllock.unlock();
   }
   if (sortspillexception != null) {
  throw new ioexception("spill thread failed to initialize",
    sortspillexception);
   }
  }
4. paritionner的获取
从配置中读取mapreduce.job.partitioner.class,如果没有指定,采用hashpartitioner.class
如果reducetask > 1, 还没有设置分区组件,使用hashpartitioner
@suppresswarnings("unchecked")
 public class<? extends partitioner<?,?>> getpartitionerclass()
   throws classnotfoundexception {
  return (class<? extends partitioner<?,?>>)
   conf.getclass(partitioner_class_attr, hashpartitioner.class);
 }
public class hashpartitioner<k, v> extends partitioner<k, v> {
 /** use {@link object#hashcode()} to partition. **/
 public int getpartition(k key, v value,
       int numreducetasks) {
  return (key.hashcode() & integer.max_value) % numreducetasks;
 }
}
分区号的限制:0 <= 分区号 < 总的分区数(reducetask的个数)
if (partition < 0 || partition >= partitions) {
  throw new ioexception("illegal partition for " + key + " (" +
    partition + ")");
   }
5.maptask shuffle的流程
①在map()调用context.write()
②调用mapoutputbuffer的collect()

  •                             调用分区组件partitionner计算当前这组key-value的分区号
③将当前key-value收集到mapoutputbuffer中

  •                             如果超过溢写的阀值,在后台启动溢写线程,来进行溢写!
④溢写前,先根据分区号,将相同分区号的key-value,采用快速排序算法,进行排序!

  •                             排序并不在内存中移动key-value,而是记录排序后key-value的有序索引!
⑤ 开始溢写,按照排序后有序的索引,将文件写入到一个临时的溢写文件中

  •                             如果没有定义combiner,直接溢写!
  •                             如果定义了combiner,使用combinerrunner.conbine()对key-value处理后再次溢写!
⑥多次溢写后,每次溢写都会产生一个临时文件
⑦最后,执行一次flush(),将剩余的key-value进行溢写
⑧mergeparts: 将多次溢写的结果,保存为一个总的文件!

  •                      在合并为一个总的文件前,会执行归并排序,保证合并后的文件,各个分区也是有序的!
  •                      如果定义了conbiner,conbiner会再次运行(前提是溢写的文件个数大于3)!
  •                      否则,就直接溢写!
⑨最终保证生成一个最终的文件,这个文件根据总区号,分为若干部分,每个部分的key-value都已经排好序,等待reducetask来拷贝相应分区的数据
6. combiner
combiner其实就是reducer类型:
class<? extends reducer<k,v,k,v>> cls =
  (class<? extends reducer<k,v,k,v>>) job.getcombinerclass();
combiner的运行时机:
maptask:

  •               ①每次溢写前,如果指定了combiner,会运行
  •               ②将多个溢写片段,进行合并为一个最终的文件时,也会运行combiner,前提是片段数>=3
reducetask:
③reducetask在运行时,需要启动shuffle进程拷贝maptask产生的数据!

  •                      数据在copy后,进入shuffle工作的内存,在内存中进行merge和sort!
  •                      数据过多,内部不够,将部分数据溢写在磁盘!
  •                      如果有溢写的过程,那么combiner会再次运行!
①一定会运行,②,③需要条件!
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对CodeAE代码之家的支持。如果你想了解更多相关内容请查看下面相关链接
原文链接:https://blog.csdn.net/qq_43193797/article/details/86097451

关注下面的标签,发现更多相似文章