本篇内容主要讲解“MapReduce的output输出过程是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“MapReduce的output输出过程是什么”吧!
创新互联建站是创新、创意、研发型一体的综合型网站建设公司,自成立以来公司不断探索创新,始终坚持为客户提供满意周到的服务,在本地打下了良好的口碑,在过去的十余年时间我们累计服务了上千家以及全国政企客户,如成都VR全景等企业单位,完善的项目管理流程,严格把控项目进度与质量监控加上过硬的技术实力获得客户的一致赞美。//--------------------------ReduceTask.java public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException { job.setBoolean("mapreduce.job.skiprecords", this.isSkipping()); if (this.isMapOrReduce()) { this.copyPhase = this.getProgress().addPhase("copy"); this.sortPhase = this.getProgress().addPhase("sort"); this.reducePhase = this.getProgress().addPhase("reduce"); } TaskReporter reporter = this.startReporter(umbilical); boolean useNewApi = job.getUseNewReducer(); //reducetask初始化工作 this.initialize(job, this.getJobID(), reporter, useNewApi); if (this.jobCleanup) { this.runJobCleanupTask(umbilical, reporter); } else if (this.jobSetup) { this.runJobSetupTask(umbilical, reporter); } else if (this.taskCleanup) { this.runTaskCleanupTask(umbilical, reporter); } else { this.codec = this.initCodec(); RawKeyValueIterator rIter = null; ShuffleConsumerPlugin shuffleConsumerPlugin = null; Class combinerClass = this.conf.getCombinerClass(); CombineOutputCollector combineCollector = null != combinerClass ? new CombineOutputCollector(this.reduceCombineOutputCounter, reporter, this.conf) : null; Class extends ShuffleConsumerPlugin> clazz = job.getClass("mapreduce.job.reduce.shuffle.consumer.plugin.class", Shuffle.class, ShuffleConsumerPlugin.class); shuffleConsumerPlugin = (ShuffleConsumerPlugin)ReflectionUtils.newInstance(clazz, job); LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin); Context shuffleContext = new Context(this.getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, this.codec, combinerClass, combineCollector, this.spilledRecordsCounter, this.reduceCombineInputCounter, this.shuffledMapsCounter, this.reduceShuffleBytes, this.failedShuffleCounter, this.mergedMapOutputsCounter, this.taskStatus, this.copyPhase, this.sortPhase, this, this.mapOutputFile, this.localMapFiles); shuffleConsumerPlugin.init(shuffleContext); rIter = shuffleConsumerPlugin.run(); this.mapOutputFilesOnDisk.clear(); this.sortPhase.complete(); this.setPhase(Phase.REDUCE); this.statusUpdate(umbilical); Class keyClass = job.getMapOutputKeyClass(); Class valueClass = job.getMapOutputValueClass(); RawComparator comparator = job.getOutputValueGroupingComparator(); //开始运行reducetask if (useNewApi) { this.runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } else { this.runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } shuffleConsumerPlugin.close(); this.done(umbilical, reporter); }
和MapTask类似,主要有 this.initialize() 以及 this.runNewReducer() 这两个方法。做了初始化以及开始运行task的操作。
//----------------------------------------ReduceTask.java public void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException { //创建上下文对象 this.jobContext = new JobContextImpl(job, id, reporter); this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter); //修改reducetask的状态为运行中 if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) { this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING); } if (useNewApi) { if (LOG.isDebugEnabled()) { LOG.debug("using new api for output committer"); } //反射获取outputformat类对象。getOutputFormatClass这个方法在JobContextImpl中。 //默认是TextOutputFormat.class this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job); this.committer = this.outputFormat.getOutputCommitter(this.taskContext); } else { this.committer = this.conf.getOutputCommitter(); } //获取输出路径 Path outputPath = FileOutputFormat.getOutputPath(this.conf); if (outputPath != null) { if (this.committer instanceof FileOutputCommitter) { FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext)); } else { FileOutputFormat.setWorkOutputPath(this.conf, outputPath); } } this.committer.setupTask(this.taskContext); Class extends ResourceCalculatorProcessTree> clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class); this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf); LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree); if (this.pTree != null) { this.pTree.updateProcessTree(); this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime(); } }
主要就是初始化上下文对象,获取outputformat对象。
//-----------------------------------------------ReduceTask.java privatevoid runNewReducer(JobConf job, TaskUmbilicalProtocol umbilical, final TaskReporter reporter, final RawKeyValueIterator rIter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException, ClassNotFoundException { //匿名内部类,用于构建key,value的迭代器 rIter = new RawKeyValueIterator() { public void close() throws IOException { rIter.close(); } public DataInputBuffer getKey() throws IOException { return rIter.getKey(); } public Progress getProgress() { return rIter.getProgress(); } public DataInputBuffer getValue() throws IOException { return rIter.getValue(); } public boolean next() throws IOException { boolean ret = rIter.next(); reporter.setProgress(rIter.getProgress().getProgress()); return ret; } }; TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter); //反射获取Reducer对象 org.apache.hadoop.mapreduce.Reducer reducer = (org.apache.hadoop.mapreduce.Reducer)ReflectionUtils.newInstance(taskContext.getReducerClass(), job); //获取RecordWriter对象,用于将结果写入到文件中 org.apache.hadoop.mapreduce.RecordWriter trackedRW = new ReduceTask.NewTrackingRecordWriter(this, taskContext); job.setBoolean("mapred.skip.on", this.isSkipping()); job.setBoolean("mapreduce.job.skiprecords", this.isSkipping()); //创建reduceContext对象,用于reduce任务 org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, this.getTaskID(), rIter, this.reduceInputKeyCounter, this.reduceInputValueCounter, trackedRW, this.committer, reporter, comparator, keyClass, valueClass); //开始运行reduce try { reducer.run(reducerContext); } finally { //关闭输出流 trackedRW.close(reducerContext); } }
可以看到,主要做了以下工作:
1)获取reducer对象,用于运行run() ,也就是运行reduce方法
2)创建 RecordWriter对象
3)创建reduceContext
4)开始运行reducer中的run
//--------------------------------------NewTrackingRecordWriter.java static class NewTrackingRecordWriterextends org.apache.hadoop.mapreduce.RecordWriter { private final org.apache.hadoop.mapreduce.RecordWriter real; private final org.apache.hadoop.mapreduce.Counter outputRecordCounter; private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter; private final List fsStats; NewTrackingRecordWriter(ReduceTask reduce, TaskAttemptContext taskContext) throws InterruptedException, IOException { this.outputRecordCounter = reduce.reduceOutputCounter; this.fileOutputByteCounter = reduce.fileOutputByteCounter; List matchedStats = null; if (reduce.outputFormat instanceof FileOutputFormat) { matchedStats = Task.getFsStatistics(FileOutputFormat.getOutputPath(taskContext), taskContext.getConfiguration()); } this.fsStats = matchedStats; long bytesOutPrev = this.getOutputBytes(this.fsStats); //通过outputFormat创建RecordWriter对象 this.real = reduce.outputFormat.getRecordWriter(taskContext); long bytesOutCurr = this.getOutputBytes(this.fsStats); this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); } ..................... }
重点的就是通过outputFormat.getRecordWriter来创建 RecordWriter 对象。
上面也说到,outputFormat默认就是 TextOutputFormat,所以下面看看
TextOutputFormat.getRecordWriter()
public class TextOutputFormatextends FileOutputFormat { public TextOutputFormat() { } //可以看到,返回的是静态内部类TextOutputFormat.LineRecordWriter public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { boolean isCompressed = getCompressOutput(job); //key和value的分隔符,默认是 \t String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t"); //分为压缩和非压缩输出 if (!isCompressed) { //获取输出路径 Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); //创建输出流 FSDataOutputStream fileOut = fs.create(file, progress); return new TextOutputFormat.LineRecordWriter(fileOut, keyValueSeparator); } else { Class extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, job); Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); //返回LineRecordWriter对象 return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator); } } //这里就是 LineRecordWriter 类 protected static class LineRecordWriter implements RecordWriter { private static final byte[] NEWLINE; protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; this.keyValueSeparator = keyValueSeparator.getBytes(StandardCharsets.UTF_8); } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text)o; this.out.write(to.getBytes(), 0, to.getLength()); } else { this.out.write(o.toString().getBytes(StandardCharsets.UTF_8)); } } //将KV输出 public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (!nullKey || !nullValue) { //先写key if (!nullKey) { this.writeObject(key); } //接着写入key和value之间的分隔符 if (!nullKey && !nullValue) { this.out.write(this.keyValueSeparator); } //最后写入value if (!nullValue) { this.writeObject(value); } //接着写入新的一行 this.out.write(NEWLINE); } } public synchronized void close(Reporter reporter) throws IOException { this.out.close(); } static { NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); } } }
可以看到,最终返回的RecordWriter对象是 LineRecordWriter 类型的。
接着回到3中,看 reduceContext这个对象的类
protected staticReducer .Context createReduceContext(Reducer reducer, Configuration job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter, org.apache.hadoop.mapreduce.Counter inputKeyCounter, org.apache.hadoop.mapreduce.Counter inputValueCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException { ReduceContext reduceContext = new ReduceContextImpl(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass); Reducer .Context reducerContext = (new WrappedReducer()).getReducerContext(reduceContext); return reducerContext; }
可以看到reducerContext是一个ReduceContextImpl类对象。
下面看看ReduceContextImpl 这个类的构造方法
//---------------------------------ReduceContextImpl.java public ReduceContextImpl(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriteroutput, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws InterruptedException, IOException { //父类是 TaskInputOutputContextImpl,把outputformat对象传递进去了 super(conf, taskid, output, committer, reporter); this.input = input; this.inputKeyCounter = inputKeyCounter; this.inputValueCounter = inputValueCounter; this.comparator = comparator; this.serializationFactory = new SerializationFactory(conf); this.keyDeserializer = this.serializationFactory.getDeserializer(keyClass); this.keyDeserializer.open(this.buffer); this.valueDeserializer = this.serializationFactory.getDeserializer(valueClass); this.valueDeserializer.open(this.buffer); this.hasMore = input.next(); this.keyClass = keyClass; this.valueClass = valueClass; this.conf = conf; this.taskid = taskid; }
这里面,它继续调用了父类的构造方法,把outputformat对象传递进去了。
继续看看父类 TaskInputOutputContextImpl
public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid, RecordWriteroutput, OutputCommitter committer, StatusReporter reporter) { //可以看到这里的output就是recordWriter对象 super(conf, taskid, reporter); this.output = output; this.committer = committer; } //这里的逻辑其实就是先读取KV到 this.key和this.value中,如果没有KV就返回false,否则返回true public abstract boolean nextKeyValue() throws IOException, InterruptedException; public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; //调用recordWriter的write方法,将KV输出,默认是LineRecordWriter这个类 public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException { this.output.write(key, value);
可以看到,这里有3个抽象方法(在子类ReduceContextImpl中实现了逻辑,和RecordWriter无关),以及write这个具体方法。分别用于获取KV以及将结果KV写入。write这个写入方法,就是调用的 recordWriter的write方法,也就是5中创建的LineRecordWriter对象中的write方法,将KV输出。
public void run(Reducer.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKey()) { this.reduce(context.getCurrentKey(), context.getValues(), context); Iterator iter = context.getValues().iterator(); if (iter instanceof ValueIterator) { ((ValueIterator)iter).resetBackupStore(); } } } finally { this.cleanup(context); } }
可以看到,这里就是调用6中创建的 reduceContext中的方法来获取KV。而且在reduce方法中,我们会通过 context.write(key,value)来将结果KV输出。调用的其实就是 LineRecordWriter对象中的write方法。
到此,相信大家对“MapReduce的output输出过程是什么”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。