这篇文章主要介绍“MapReduce Map Join怎么使用”,在日常操作中,相信很多人在MapReduce Map Join怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”MapReduce Map Join怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
创新互联是专业的绵阳网站建设公司,绵阳接单;提供网站建设、成都做网站,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行绵阳网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
1. 样例数据
011990-99999 SIHCCAJAVRI 012650-99999 TYNSET-HANSMOEN
012650-99999 194903241200 111 012650-99999 194903241800 78 011990-99999 195005150700 0 011990-99999 195005151200 22 011990-99999 195005151800 -11
2. 需求
3. 思路、代码
将足够小的关联文件(即气象台信息)添加到分布式缓存,然后在每个 Mapper 端读取被缓存到本地的全量气象台信息,再与天气信息相关联。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class MapJoin {
static class RecordMapper extends Mapper {
private Map stationMap = new HashMap();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//预处理,把要关联的文件加载到缓存中
Path[] paths = context.getLocalCacheFiles();
//新的检索缓存文件的API是 context.getCacheFiles() ,而 context.getLocalCacheFiles() 被弃用
//然而 context.getCacheFiles() 返回的是 HDFS 路径; context.getLocalCacheFiles() 返回的才是本地路径
//这里只缓存了一个文件,所以取第一个即可
BufferedReader reader = new BufferedReader(new FileReader(paths[0].toString()));
String line = null;
try {
while ((line = reader.readLine()) != null) {
String[] vals = line.split("\\t");
if (vals.length == 2) {
stationMap.put(vals[0], vals[1]);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
reader.close();
}
super.setup(context);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] vals = value.toString().split("\\t");
if (vals.length == 3) {
String stationName = stationMap.get(vals[0]); //Join
stationName = stationName == null ? "" : stationName;
context.write(new Text(vals[0]),
new Text(stationName + "\t" + vals[1] + "\t" + vals[2]));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Parameter number is wrong, please enter three parameters: 4. 运行结果
到此,关于“MapReduce Map Join怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!