• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

Flumeng和Mysql开展整合 Flumeng批量处理_mysql

mysql 搞代码 7年前 (2018-06-08) 151次浏览 已收录 0个评论

Flumeng和mysql进行整合 Flumeng批量处理

 

 

package com.iteblog.flume;      import com.google.common.base.Preconditions;  import com.google.common.base.Throwables;  import com.google.common.collect.Lists;  import org.apache.flume.*;  import org.apache.flume.conf.Configurable;  import org.apache.flume.sink.AbstractSink;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory;     import java.sql.Connection;  import java.sql.DriverManager;  import java.sql.PreparedStatement;  import java.sql.SQLException;  import java.util.List;     public class MysqlSink extends AbstractSink implements Configurable {         private Logger LOG = LoggerFactory.getLogger(MysqlSink.class);      private String hostname;      private String port;      private String databaseName;      private String tableName;      private String user;      private String password;      private PreparedStatement preparedStatement;      private Connection conn;      private int batchSize;         public MysqlSink() {          LOG.info("MysqlSink start...");      }         @Override      public void configure(Context context) {          hostname = context.getString("hostname");          Preconditions.checkNotNull(hostname, "hostname must be set!!");          port = context.getString("port");          Preconditions.checkNotNull(port, "port must be set!!");          databaseName = context.getString("databaseName");          Preconditions.checkNotNull(databaseName, "databaseName must be set!!");          tableName = context.getString("tableName");          Preconditions.checkNotNull(tableName, "tableName must be set!!");          user = context.getString("user");          Preconditions.checkNotNull(user, "user must be set!!");          password = context.getString("password");          Preconditions.checkNotNull(password, "password must be set!!");          batchSize = context.getInteger("batchSize", 100);          Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");      }         @Override      public void start() {          super.start();          try {              //调用Class.forName()方法加载驱动程序              Class.forName("com.mysql.jdbc.Driver");          } catch (ClassNotFoundException e) {              e.printStackTrace();          }             String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName;          //调用DriverManager对象的getConnection()方法,获得一个Connection对象             try {              conn = DriverManager.getConnection(url, user, password);              conn.setAutoCommit(false);              //创建一个Statement对象              preparedStatement = conn.prepareStatement("insert into " + tableName +                                                 " (content) values (?)");             } catch (SQLException e) {              e.printStackTrace();              System.exit(1);          }         }         @Override      public void stop() {          super.stop();          if (preparedStatement != null) {              try {                  preparedStatement.close();              } catch (SQLException e) {                  e.printStackTrace();              }          }             if (conn != null) {              try {                  conn.close();              } catch (SQLException e) {                  e.printStackTrace();              }          }      }         @Override      public Status process() throws EventDeliveryException {          Status result = Status.READY;          Channel channel = getChannel();          Transaction transaction = channel.getTransaction();          Event event;          String content;             List<String> actions = Lists.newArrayList();          transaction.begin();          try {              for (int i = 0; i < batchSize; i++) {                  event = channel.take();                  if (event != null) {                      content = new String(event.getBody());                      actions.add(content);                  } else {                      result = Status.BACKOFF;                      break;                  }              }                 if (actions.size() > 0) {                  preparedStatement.clearBatch();                  for (String temp : actions) {                      preparedStatement.setString(1, temp);       preparedStatement.addBatch();                  }                  preparedStatement.executeBatch();                     conn.commit();              }              transaction.commit();          } catch (Throwable e) {              try {                  transaction.rollback();              } catch (Exception e2) {                  LOG.error("Exception in rollback. Rollback might not have been" +                          "successful.", e2);              }              LOG.error("Failed to commit transaction." +                      "Transaction rolled back.", e);              Throwables.propagate(e);          } finally {              transaction.close();          }            return result;      }  }

 

 

pom文件中的依赖:

<dependencies>          <dependency>              <groupId>org.apache.flume</groupId>              <artifactId>flume-ng-core</artifactId>          </dependency>             <dependency>              <groupId>org.apache.flume</groupId>              <artifactId>flume-ng-configuration</artifactId>          </dependency>             <dependency>              <groupId>mysql</groupId>              <artifactId>mysql-connector-java</artifactId>              <version>5.1.25</version>          </dependency>             <dependency>              <groupId>org.slf4j</groupId>              <artifactId>slf4j-api</artifactId>          </dependency>             <dependency>              <groupId>org.slf4j</groupId>              <artifactId>slf4j-log4j12</artifactId>              <scope>test</scope>          </dependency>  </dependencies>

 

 

运行程序时,先在Mysql中创建一个表

mysql> create table mysqltest(      -> id int(11) NOT NULL AUTO_INCREMENT,      -> content varchar(50000) NOT NULL,      -> PRIMARY KEY (`id`)      -> ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;   Query OK, 0 rows affected, 1 warning (0.05 sec)

 

 

然后在flume中创建以下配置

 

agent.sinks.mysqlSink.type = com.iteblog.flume.MysqlSink agent.sinks.mysqlSink.hostname=localhost agent.sinks.mysqlSink.port=3306 agent.sinks.mysqlSink.databaseName=ngmonitor agent.sinks.mysqlSink.tableName=mysqltest agent.sinks.mysqlSink.user=root agent.sinks.mysqlSink.password=123456 agent.sinks.mysqlSink.channel = c1

 

 

用下面的命令就可以启动:

bin/flume-ng agent -c conf/ -f conf/mysql_test.conf  -n agent

 

 

再看下Mysql中的情况:

mysql> select count(*) from mysqltest;  +----------+  | count(*) |  +----------+  |    98300 |  +----------+

 

 

 

欢迎大家阅读《Flumeng和Mysql开展整合 Flumeng批量处理_mysql》,跪求各位点评,by 搞代码


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:Flumeng和Mysql开展整合 Flumeng批量处理_mysql

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址