• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

地图reduce读取mysql

mysql 搞代码 7年前 (2018-06-06) 317次浏览 已收录 0个评论

mapreduce读取mysql

package com.sun.mysql; import java.io.DataInput;   import java.io.DataOutput;   import java.io.IOException;   import java.sql.PreparedStatement;   import java.sql.ResultSet;   import java.sql.SQLException;   import java.util.Iterator;      import org.apache.hadoop.conf.Configuration;   import org.apache.hadoop.fs.Path;   import org.apache.hadoop.io.LongWritable;   import org.apache.hadoop.io.Text;   import org.apache.hadoop.io.Writable;   import org.apache.hadoop.mapreduce.Job;   import org.apache.hadoop.mapreduce.Mapper;   import org.apache.hadoop.mapreduce.Reducer;   import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;   import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;   import org.apache.hadoop.mapreduce.lib.db.DBWritable;   import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;    /**   * 从mysql中读数据(结果存放在HDFS中)然后经mapreduce处理   * @author asheng   */ public class ReadDataFromMysql {       /**      * 重写DBWritable      * @author asheng      * TblsRecord需要从mysql读取数据      */     public static class TblsRecord implements Writable, DBWritable      {               String tbl_name;               String tbl_type;               public TblsRecord()              {                 }               @Override               public void write(PreparedStatement statement) throws SQLException              {                     statement.setString(1, this.tbl_name);                       statement.setString(2, this.tbl_type);               }               @Override               public void readFields(ResultSet resultSet) throws SQLException              {                       this.tbl_name = resultSet.getString(1);                       this.tbl_type = resultSet.getString(2);               }               @Override               public void write(DataOutput out) throws IOException              {                       Text.writeString(out, this.tbl_name);                       Text.writeString(out, this.tbl_type);               }               @Override               public void readFields(DataInput in) throws IOException              {                       this.tbl_name = Text.readString(in);                       this.tbl_type = Text.readString(in);               }               public String toString()              {                   return new String(this.tbl_name + " " + this.tbl_type);               }       }       /**      * Mapper      * @author asheng      * 下面的类中的Mapper一定是包org.apache.hadoop.mapreduce.Mapper;下的      */ public static class ConnMysqlMapper extends Mapper<LongWritable,TblsRecord,Text,Text>  //TblsRecord是自定义的类型,也就是上面重写的DBWritable类 {           public void map(LongWritable key,TblsRecord values,Context context)throws IOException,                                                                                                                 InterruptedException          {                  //只是将从数据库读取进来数据转换成Text类型然后输出给reduce                 context.write(new Text(values.tbl_name), new Text(values.tbl_type));           }   }   /**  * Reducer  * @author asheng  * 下面的类中的Reducer一定是包org.apache.hadoop.mapreduce.Reducer;下的  */ public static class ConnMysqlReducer extends Reducer<Text,Text,Text,Text> {           public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,                                                                                                                 InterruptedException          {                  //循环遍历并写入相应的指定文件中                 for(Iterator<Text> itr = values.iterator();itr.hasNext();)                  {                           context.write(key, itr.next());                   }           }   }   public static void main(String[] args) throws Exception  {           Configuration conf = new Configuration();           DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://127.0.0.1:3306/mapreduce_test",                                                                                                                                                "root", "root");            Job job = new Job(conf,"test mysql connection");           job.setJarByClass(ReadDataFromMysql.class);                      job.setMapperClass(ConnMysqlMapper.class);           job.setReducerClass(ConnMysqlReducer.class);                      job.setOutputKeyClass(Text.class);           job.setOutputValueClass(Text.class);                      job.setInputFormatClass(DBInputFormat.class);           FileOutputFormat.setOutputPath(job, new Path("hdfs://127.0.0.1:9000/user/lxw/output/"));                      //对应数据库中的列名           String[] fields = { "TBL_NAME", "TBL_TYPE" };            //setInput方法六个参数分别的含义:           //1.Job;2.Class<? extends DBWritable>按照什么类型读取的           //3.表名;4.where条件           //5.order by语句;6.列名所组成的数组          DBInputFormat.setInput(job, TblsRecord.class,"lxw_tabls", "TBL_NAME like 'lxy%'", "TBL_NAME", fields);             System.exit(job.waitForCompletion(true) ? 0 : 1);         //本程序表示从mysql数据库mapreduce_test的表lxw_tabls中查询处列TAB_NAME为lxy开头的数据并放入hdfs中         //执行完后的查看bin/hadoop fs -cat /user/lxw/output/part-r-00000 /*结果 lxyae lxyaccg lxybf          */ }   } /* mysql> select * from lxw_tabls; +----------+----------+ | TBL_NAME | TBL_TYPE | +----------+----------+ | zhao     | a        | | qian     | b        | | sun      | c        | | li       | d        | | lxya     | e        | | lxyb     | f        | | lxyacc   | g        | +----------+----------+ 7 rows in set (0.00 sec) */

欢迎大家阅读《地图reduce读取mysql》,跪求各位点评,by 搞代码

 


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:地图reduce读取mysql
喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址