• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

详解HDFS多文件Join操作的实例

java 搞代码 4年前 (2022-01-05) 30次浏览 已收录 0个评论

这篇文章主要介绍了详解HDFS多文件Join操作的实例的相关资料,希望通过本文能帮助到大家,让大家理解掌握这部分内容,需要的朋友可以参考下

详解HDFS多文件Join操作的实例

最近在做HDFS文件处理之时,遇到了多文件Join操作,其中包括:All Join以及常用的Left Join操作,

下面是个简单的例子;采用两个表来做left join其中数据结构如下:

A 文件:

a|1b|2|c

B文件:

a|b|1|2|c

即:A文件中的第一、二列与B文件中的第一、三列对应;类似数据库中Table的主键/外键

代码如下:

 import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import cn.eshore.traffic.hadoop.util.CommUtil; import cn.eshore.traffic.hadoop.util.StringUtil; /** * @ClassName: DataJoin * @Description: HDFS JOIN操作 * @author hadoop * @date 2012-12-18 下午5:51:32 */ public class InstallJoin extends Configured implements Tool { private String static enSplitCode = "\\|"; private String static splitCode = "|"; // 自定义Reducer public static class ReduceClass extends DataJoinReducerBase { @Override protected TaggedMapOutput combine(Object[] tags, Object[] values) { String joinedStr = ""; //该段判断用户生成Left join限制【其中tags表示文件的路径,install表示文件名称前缀】 //去掉则为All Join if (tags.length == 1 && tags[0].toString().contains("install")) { return null; } Map map = new HashMap(); for (int i = 0; i <values.length; i++) { TaggedWritable tw = (TaggedWritable) values[i]; String line = ((Text) tw.getData()).toString(); String[] tokens = line.split(enSplitCode, 8); String groupValue = tokens[6]; String type = tokens[7]; map.put(type, groupValue); } joinedStr += StringUtil.getCount(map.get("7"))+"|"+StringUtil.getCount(map.get("30")); TaggedWritable retv = new TaggedWritable(new Text(joinedStr)); retv.setTag((Text) tags[0]); return retv; } } // 自定义Mapper public static class MapClass extends DataJoinMapperBase { //自定义Key【类似数据库中的主键/外键】 @Override protected Text generateGroupKey(TaggedMapOutput aRecord) { String line = ((Text) aRecord.getData()).toString(); String[] tokens = line.split(CommUtil.enSplitCode); String key = ""; String type = tokens[7]; //由于不同文件中的Key所在列有可能不同,所以需要动态生成Key,其中type为不同文件中的数据标识;如:A文件最后一列为a用于表示此数据为A文件数据 if ("7".equals(type)) { key = tokens[0]+"|"+tokens[1]; }else if ("30".equals(type)) { key = tokens[0]+"|"+tokens[2]; } return new Text(key); } @Override protected Text generateInputTag(String inputFile) { return new Text(inputFile); } @Override protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } } public static class TaggedWritable extends TaggedMapOutput { private Writable data; // 自定义 public TaggedWritable() { this.tag = new Text(""); } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } @Override public Writable getData() { return data; } @Override public void write(DataOutput out) throws IOException { this.tag.write(out); out.writeUTF(this.data.getClass().getName()); this.data.write(out); } @Override public void readFields(DataInput in) throws IOException { this.tag.readFields(in); String dataClz<div style="color:transparent">来源gaodai^.ma#com搞#代!码网</div> = in.readUTF(); if (this.data == null || !this.data.getClass().getName().equals(dataClz)) { try { this.data = (Writable) ReflectionUtils.newInstance( Class.forName(dataClz), null); } catch (ClassNotFoundException e) { e.printStackTrace(); } } this.data.readFields(in); } } /** * job运行 */ @Override public int run(String[] paths) throws Exception { int no = 0; try { Configuration conf = getConf(); JobConf job = new JobConf(conf, InstallJoin.class); FileInputFormat.setInputPaths(job, new Path(paths[0])); FileOutputFormat.setOutputPath(job, new Path(paths[1])); job.setJobName("join_data_test"); job.setMapperClass(MapClass.class); job.setReducerClass(ReduceClass.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", CommUtil.splitCode); JobClient.runJob(job); no = 1; } catch (Exception e) { throw new Exception(); } return no; } //测试 public static void main(String[] args) { String[] paths = { "hdfs://master...:9000/home/hadoop/traffic/join/newtype", "hdfs://master...:9000/home/hadoop/traffic/join/newtype/output" } int res = 0; try { res = ToolRunner.run(new Configuration(), new InstallJoin(), paths); } catch (Exception e) { e.printStackTrace(); } System.exit(res); } } 

如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

以上就是详解HDFS多文件Join操作的实例的详细内容,更多请关注gaodaima搞代码网其它相关文章!


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:详解HDFS多文件Join操作的实例

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址