一. MR中的join的两种方式:
1.reduce side join(面试题)
reduce side join是一种最简单的join方式,其主要思想如下:
在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value对,对每条数据打一个标签(tag),比如:tag=1表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签,在shuffle阶段已经自然按key分组.
在reduce阶段,reduce函数获取相同k2的v2 list(v2来自File1和File2), 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。
这种方法有2个问题:
1, map阶段没有对数据瘦身,shuffle的网络传输和排序性能很低。2, reduce端对2个集合做乘积计算,很耗内存,容易导致OOM。
我关于reduce side join的博文总结地址:
2.map side join(面试题)
之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。
Map side join是针对以下场景进行的优化:
两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),
然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。
为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。Job在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个Container的本地磁盘上。
(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
这种方法的局限性:
这种方法,要使用hadoop中的DistributedCache把小数据分布到各个计算节点,每个map节点都要把小数据库加载到内存,按关键字建立索引。这种方法有明显的局限性:有一份数据比较小,在map端,能够把它加载到内存,并进行join操作。
3.针对Map Side Join 局限的解决方法:
①使用内存服务器,扩大节点的内存空间
针对map join,可以把一份数据存放到专门的内存服务器,在map()方法中,对每一个<key,value>的输入对,根据key到内存服务器中取出数据,进行连接
②使用BloomFilter过滤空连接的数据
对其中一份数据在内存中建立BloomFilter,另外一份数据在连接之前,用BloomFilter判断它的key是否存在,如果不存在,那这个记录是空连接,可以忽略。
③使用mapreduce专为join设计的包
在mapreduce包里看到有专门为join设计的包,对这些包还没有学习,不知道怎么使用,只是在这里记录下来,作个提醒。
jar: mapreduce-client-core.jarpackage: org.apache.hadoop.mapreduce.lib.join
4.具体Map Side Join的使用
有客户数据customer和订单数据orders。
customer
客户编号 | 姓名 | 地址 | 电话 |
---|---|---|---|
1 | hanmeimei | ShangHai | 110 |
2 | leilei | BeiJing | 112 |
3 | lucy | GuangZhou | 119 |
** order**
订单编号 | 客户编号 | 其它字段被忽略 |
---|---|---|
1 | 1 | 50 |
2 | 1 | 200 |
3 | 3 | 15 |
4 | 3 | 350 |
5 | 3 | 58 |
6 | 1 | 42 |
7 | 1 | 352 |
8 | 2 | 1135 |
9 | 2 | 400 |
10 | 2 | 2000 |
11 | 2 | 300 |
要求对customer和orders按照客户编号进行连接,结果要求对客户编号分组,对订单编号排序,对其它字段不作要求
客户编号 | 订单编号 | 订单金额 | 姓名 | 地址 | 电话 |
---|---|---|---|---|---|
1 | 1 | 50 | hanmeimei | ShangHai | 110 |
1 | 2 | 200 | hanmeimei | ShangHai | 110 |
1 | 6 | 42 | hanmeimei | ShangHai | 110 |
1 | 7 | 352 | hanmeimei | ShangHai | 110 |
2 | 8 | 1135 | leilei | BeiJing | 112 |
2 | 9 | 400 | leilei | BeiJing | 112 |
2 | 10 | 2000 | leilei | BeiJing | 112 |
2 | 11 | 300 | leilei | BeiJing | 112 |
3 | 3 | 15 | lucy | GuangZhou | 119 |
3 | 4 | 350 | lucy | GuangZhou | 119 |
3 | 5 | 58 | lucy | GuangZhou | 119 |
- 在提交job的时候,把小数据通过DistributedCache分发到各个节点。
- map端使用DistributedCache读到数据,在内存中构建映射关系--如果使用专门的内存服务器,就把数据加载到内存服务器,map()节点可以只保留一份小缓存;如果使用BloomFilter来加速,在这里就可以构建;
- map()函数中,对每一对<key,value>,根据key到第2)步构建的映射里面中找出数据,进行连接,输出。
上代码:
1 public class MapSideJoin extends Configured implements Tool { 2 // customer文件在hdfs上的位置。 3 private static final String CUSTOMER_CACHE_URL = "hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt"; 4 //客户数据表对应的实体类 5 private static class CustomerBean { 6 private int custId; 7 private String name; 8 private String address; 9 private String phone; 10 11 public CustomerBean() { 12 } 13 14 public CustomerBean(int custId, String name, String address,String phone) { 15 super(); 16 this.custId = custId; 17 this.name = name; 18 this.address = address; 19 this.phone = phone; 20 } 21 22 public int getCustId() { 23 return custId; 24 } 25 26 public String getName() { 27 return name; 28 } 29 30 public String getAddress() { 31 return address; 32 } 33 34 public String getPhone() { 35 return phone; 36 } 37 } 38 //客户订单对应的实体类 39 private static class CustOrderMapOutKey implements WritableComparable{ 40 private int custId; 41 private int orderId; 42 43 public void set(int custId, int orderId) { 44 this.custId = custId; 45 this.orderId = orderId; 46 } 47 48 public int getCustId() { 49 return custId; 50 } 51 52 public int getOrderId() { 53 return orderId; 54 } 55 56 @Override 57 public void write(DataOutput out) throws IOException { 58 out.writeInt(custId); 59 out.writeInt(orderId); 60 } 61 62 @Override 63 public void readFields(DataInput in) throws IOException { 64 custId = in.readInt(); 65 orderId = in.readInt(); 66 } 67 68 @Override 69 public int compareTo(CustOrderMapOutKey o) { 70 int res = Integer.compare(custId, o.custId); 71 return res == 0 ? Integer.compare(orderId, o.orderId) : res; 72 } 73 74 @Override 75 public boolean equals(Object obj) { 76 if (obj instanceof CustOrderMapOutKey) { 77 CustOrderMapOutKey o = (CustOrderMapOutKey)obj; 78 return custId == o.custId && orderId == o.orderId; 79 } else { 80 return false; 81 } 82 } 83 84 @Override 85 public String toString() { 86 return custId + "\t" + orderId; 87 } 88 } 89 90 private static class JoinMapper extends Mapper { 91 private final CustOrderMapOutKey outputKey = new CustOrderMapOutKey(); 92 private final Text outputValue = new Text(); 93 /** 94 * 把表中每一行的客户信息封装成一个Map,存储在内存中 95 * Map的key是客户的id,value是封装的客户bean对象 96 */ 97 private static final Map CUSTOMER_MAP = new HashMap (); 98 @Override 99 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {100 // 格式: 订单编 客户编号 订单金额101 String[] cols = value.toString().split("\t"); 102 if (cols.length < 3) {103 return;104 }105 106 int custId = Integer.parseInt(cols[1]);// 取出客户编号107 CustomerBean customerBean = CUSTOMER_MAP.get(custId);108 109 if (customerBean == null) { // 没有对应的customer信息可以连接110 return;111 }112 113 StringBuffer sb = new StringBuffer();114 sb.append(cols[2]).append("\t")115 .append(customerBean.getName()).append("\t")116 .append(customerBean.getAddress()).append("\t")117 .append(customerBean.getPhone());118 outputValue.set(sb.toString());119 outputKey.set(custId, Integer.parseInt(cols[0]));120 context.write(outputKey, outputValue);121 }122 123 //在Mapper方法执行前执行124 @Override125 protected void setup(Context context) throws IOException, InterruptedException {126 FileSystem fs = FileSystem.get(URI.create(CUSTOMER_CACHE_URL), context.getConfiguration());127 FSDataInputStream fdis = fs.open(new Path(CUSTOMER_CACHE_URL));128 129 BufferedReader reader = new BufferedReader(new InputStreamReader(fdis));130 String line = null;131 String[] cols = null;132 133 // 格式:客户编号 姓名 地址 电话134 while ((line = reader.readLine()) != null) {135 cols = line.split("\t");136 if (cols.length < 4) { // 数据格式不匹配,忽略137 continue;138 }139 CustomerBean bean = new CustomerBean(Integer.parseInt(cols[0]), cols[1], cols[2], cols[3]);140 CUSTOMER_MAP.put(bean.getCustId(), bean);141 }142 }143 }144 145 /**146 * reduce147 */148 private static class JoinReducer extends Reducer {149 @Override150 protected void reduce(CustOrderMapOutKey key, Iterable values, Context context) throws IOException, InterruptedException {151 // 什么事都不用做,直接输出152 for (Text value : values) {153 context.write(key, value);154 }155 }156 }157 /**158 * @param args159 * @throws Exception160 */161 public static void main(String[] args) throws Exception {162 if (args.length < 2) {163 new IllegalArgumentException("Usage: ");164 return;165 }166 ToolRunner.run(new Configuration(), new Join(), args);167 }168 169 @Override170 public int run(String[] args) throws Exception {171 Configuration conf = getConf();172 Job job = Job.getInstance(conf, Join.class.getSimpleName());173 job.setJarByClass(SecondarySortMapReduce.class);174 175 // 添加customer cache文件176 job.addCacheFile(URI.create(CUSTOMER_CACHE_URL));177 178 FileInputFormat.addInputPath(job, new Path(args[0]));179 FileOutputFormat.setOutputPath(job, new Path(args[1]));180 181 // map settings182 job.setMapperClass(JoinMapper.class);183 job.setMapOutputKeyClass(CustOrderMapOutKey.class);184 job.setMapOutputValueClass(Text.class);185 186 // reduce settings187 job.setReducerClass(JoinReducer.class);188 job.setOutputKeyClass(CustOrderMapOutKey.class);189 job.setOutputKeyClass(Text.class);190 191 boolean res = job.waitForCompletion(true);192 return res ? 0 : 1;193 }194 }
上面的代码没有使用DistributedCache类:
5.Map Side Join的再一个例子:
1 import java.io.BufferedReader; 2 import java.io.FileReader; 3 import java.io.IOException; 4 import java.util.HashMap; 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.conf.Configured; 7 import org.apache.hadoop.filecache.DistributedCache; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16 import org.apache.hadoop.util.Tool; 17 import org.apache.hadoop.util.ToolRunner; 18 import org.slf4j.Logger; 19 import org.slf4j.LoggerFactory; 20 /** 21 * 用途说明: 22 * Map side join中的left outer join 23 * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段 24 * table1(左表):tb_dim_city 25 * (id int,name string,orderid int,city_code int,is_show int), 26 * 假设tb_dim_city文件记录数很少 27 * tb_dim_city.dat文件内容,分隔符为"|": 28 * id name orderid city_code is_show 29 * 0 其他 9999 9999 0 30 * 1 长春 1 901 1 31 * 2 吉林 2 902 1 32 * 3 四平 3 903 1 33 * 4 松原 4 904 1 34 * 5 通化 5 905 1 35 * 6 辽源 6 906 1 36 * 7 白城 7 907 1 37 * 8 白山 8 908 1 38 * 9 延吉 9 909 1 39 * -------------------------风骚的分割线------------------------------- 40 * table2(右表):tb_user_profiles 41 * (userID int,userName string,network string,flow double,cityID int) 42 * tb_user_profiles.dat文件内容,分隔符为"|": 43 * userID network flow cityID 44 * 1 2G 123 1 45 * 2 3G 333 2 46 * 3 3G 555 1 47 * 4 2G 777 3 48 * 5 3G 666 4 49 * .................................. 50 * .................................. 51 * -------------------------风骚的分割线------------------------------- 52 * 结果: 53 * 1 长春 1 901 1 1 2G 123 54 * 1 长春 1 901 1 3 3G 555 55 * 2 吉林 2 902 1 2 3G 333 56 * 3 四平 3 903 1 4 2G 777 57 * 4 松原 4 904 1 5 3G 666 58 */ 59 public class MapSideJoinMain extends Configured implements Tool{ 60 private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class); 61 62 public static class LeftOutJoinMapper extends Mapper
6.SemiJoin
SemiJoin就是所谓的半连接,其实仔细一看就是reduce join的一个变种,就是在map端过滤掉一些数据,在网络中只传输参与连接的数据不参与连接的数据不必在网络中进行传输,从而减少了shuffle的网络传输量,使整体效率得到提高,其他思想和reduce join是一模一样的。说得更加接地气一点就是将小表中参与join的key单独抽出来通过DistributedCach分发到相关节点,然后将其取出放到内存中(可以放到HashSet中),在map阶段扫描连接表,将join key不在内存HashSet中的记录过滤掉,让那些参与join的记录通过shuffle传输到reduce端进行join操作,其他的和reduce join都是一样的。看代码:
1 import java.io.BufferedReader; 2 import java.io.FileReader; 3 import java.io.IOException; 4 import java.util.ArrayList; 5 import java.util.HashSet; 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.conf.Configured; 8 import org.apache.hadoop.filecache.DistributedCache; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 19 import org.apache.hadoop.util.Tool; 20 import org.apache.hadoop.util.ToolRunner; 21 import org.slf4j.Logger; 22 import org.slf4j.LoggerFactory; 23 /** 24 * @author zengzhaozheng 25 * 26 * 用途说明: 27 * reudce side join中的left outer join 28 * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段 29 * table1(左表):tb_dim_city 30 * (id int,name string,orderid int,city_code,is_show) 31 * tb_dim_city.dat文件内容,分隔符为"|": 32 * id name orderid city_code is_show 33 * 0 其他 9999 9999 0 34 * 1 长春 1 901 1 35 * 2 吉林 2 902 1 36 * 3 四平 3 903 1 37 * 4 松原 4 904 1 38 * 5 通化 5 905 1 39 * 6 辽源 6 906 1 40 * 7 白城 7 907 1 41 * 8 白山 8 908 1 42 * 9 延吉 9 909 1 43 * -------------------------风骚的分割线------------------------------- 44 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int) 45 * tb_user_profiles.dat文件内容,分隔符为"|": 46 * userID network flow cityID 47 * 1 2G 123 1 48 * 2 3G 333 2 49 * 3 3G 555 1 50 * 4 2G 777 3 51 * 5 3G 666 4 52 * .................................. 53 * .................................. 54 * -------------------------风骚的分割线------------------------------- 55 * joinKey.dat内容: 56 * city_code 57 * 1 58 * 2 59 * 3 60 * 4 61 * -------------------------风骚的分割线------------------------------- 62 * 结果: 63 * 1 长春 1 901 1 1 2G 123 64 * 1 长春 1 901 1 3 3G 555 65 * 2 吉林 2 902 1 2 3G 333 66 * 3 四平 3 903 1 4 2G 777 67 * 4 松原 4 904 1 5 3G 666 68 */ 69 public class SemiJoin extends Configured implements Tool{ 70 private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class); 71 public static class SemiJoinMapper extends Mapper
这里还说说SemiJoin也是有一定的适用范围的,其抽取出来进行join的key是要放到内存中的,所以不能够太大,容易在Map端造成OOM。
二、总结
blog介绍了三种join方式。这三种join方式适用于不同的场景,其处理效率上的相差还是蛮大的,其中主要导致因素是网络传输。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,写分布式大数据处理程序的时最好要对整体要处理的数据分布情况作一个了解,这可以提高我们代码的效率,使数据的倾斜度降到最低,使我们的代码倾向性更好。