Sample data: student.txt
1,yaoshuya,25
2,yaoxiaohua,29 3,yaoyuanyie,15 4,yaoshupei,26
Sample data:score.txt
1,yuwen,100
1,shuxue,99 2,yuwen,99 2,shuxue,88 3,yuwen,99 3,shuxue,56 4,yuwen,33 4,shuxue,99
输出文件内容:
1 [yaoshuya,25,yuwen,100]
1 [yaoshuya,25,shuxue,99] 2 [yaoxiaohua,29,yuwen,99] 2 [yaoxiaohua,29,shuxue,88] 3 [yaoyuanyie,15,yuwen,99] 3 [yaoyuanyie,15,shuxue,56] 4 [yaoshupei,26,yuwen,33] 4 [yaoshupei,26,shuxue,99]
参数:
args= "-Dio.sort.mb=10
-r 1
-inFormat org.apache.hadoop.mapred.KeyValueTextInputFormat
-outFormat org.apache.hadoop.mapred.TextOutputFormat
-outKey org.apache.hadoop.io.Text
-outValue org.apache.hadoop.mapred.join.TupleWritable
hdfs://namenode:9000/user/hadoop/student/student.txt
hdfs://namenode:9000/user/hadoop/student/score2.txt
hdfs://namenode:9000/user/hadoop/joinout".split(" ");
需要注意的是我使用的输出格式是TextOutputFormat(完全是为了方便观察输出后的数据)
输出的valuetype是org.apache.hadoop.mapred.join.TupleWritable ,这个 类型非常方便,类似于数组类型,可以接受多值。
在源码中添加的一句代码,是用来配置我的数据源文件的keyvalue分隔符是,(comma).
jobConf.set("key.value.separator.in.input.line", ",");
关键代码简析:
job.setInputFormatClass(CompositeInputFormat.class);
job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose(op, inputFormatClass, plist.toArray(new Path[0])));
使用CompositeInputFormat来进行join操作。此类的说明:
/**
* An InputFormat capable of performing joins over a set of data sources sorted * and partitioned the same way. * * A user may define new join types by setting the property * <tt>mapreduce.join.define.<ident></tt> to a classname. * In the expression <tt>mapreduce.join.expr</tt>, the identifier will be * assumed to be a ComposableRecordReader. * <tt>mapreduce.join.keycomparator</tt> can be a classname used to compare * keys in the join. * @see #setFormat * @see JoinRecordReader * @see MultiFilterRecordReader */
通过op来指定连接类型:inner,outer,tbl等,有其他需要也可以实现。
具体是怎么连接的呢?根据两个source进入mapper的key进行归并连接。所以要求数据源是根据key值有序的。此连接是在map端实现的。
测试中我使用KeyValueTextInputFormat来处理,其默认格式是key\tValue,所以我使用了上面的代码来进行重置这个格式。但如果你的文件不是key放在第一个位置,你就需要自己写FileInputFormat啦。
但明显需要你要处理的数据源都是使用同样的FileInputFormat去读取。
还有一点,这里支持多文件连接,示例中我只使用了两个示例文件,可以添加更多的文件,路径添加到outputdir之前即可。