• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

【Spark八十五】Spark Streaming分析结果坠地到MySQL_mysql

mysql 搞代码 7年前 (2018-06-08) 172次浏览 已收录 0个评论

【Spark八十五】Spark Streaming分析结果落地到mysql

几点总结:

1. DStream.foreachRDD是一个Output Operation,类似于RDD的action,会触发Job的提交。DStream.foreachRDD是数据落地很常用的方法

2. 获取MySQL Connection的操作应该放在foreachRDD的参数(是一个RDD[T]=>Unit的函数类型),这样,当foreachRDD方法在每个Worker上执行时,连接是在Worker上创建。如果Connection的获取放到dstream.foreachRDD之前,那么

Connection的获取动作将发生在Driver端,然后通过序列化的方式发送到各个Worker(Connection的序列化通常是无法正确序列化的)

3. Connection的获取在foreachRDD的参数中获取,同时还要在遍历RDD之前获取(调用RDD的foreach方法前获取),如果遍历中获取,那么RDD中的每个record都要打开关闭连接,这对于数据库连接资源将是极大的考验

4. 业务逻辑处理定义在func中,它是在foreachRDD的方法参数体中定义的,如果把func的定义放到外面,即Driver中,貌似也是可以的,Spark会对计算方法通过Broadcast进行广播到各个计算节点。

 

 

package spark.examples.streaming  import java.sql.{PreparedStatement, Connection, DriverManager} import java.util.concurrent.atomic.AtomicInteger  import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._   //No need to call Class.forName("com.mysql.jdbc.Driver") to register Driver?  object SparkStreamingForPartition {   def main(args: Array[String]) {     val conf = new SparkConf().setAppName("NetCatWordCount")     conf.setMaster("local[3]")     val ssc = new StreamingContext(conf, Seconds(5))     //This dstream object represents the stream of data that will be received from the data     //server. Each record in this DStream is a line of text     //The DStream is a collection of RDD, which makes the method foreachRDD reasonable     val dstream = ssc.socketTextStream("192.168.26.140", 9999)     dstream.foreachRDD(rdd => {       //embedded function       def func(records: Iterator[String]) {         var conn: Connection = null         var stmt: PreparedStatement = null         try {           val url = "jdbc:mysql://192.168.26.140:3306/person";           val user = "root";           val password = ""           conn = DriverManager.getConnection(url, user, password)           records.flatMap(_.split(" ")).foreach(word => {             val sql = "insert into TBL_WORDS(word) values (?)";             stmt = conn.prepareStatement(sql);             stmt.setString(1, word)             stmt.executeUpdate();           })         } catch {           case e: Exception => e.printStackTrace()         } finally {           if (stmt != null) {             stmt.close()           }           if (conn != null) {             conn.close()           }         }       }        val repartitionedRDD = rdd.repartition(3)       repartitionedRDD.foreachPartition(func)     })     ssc.start()     ssc.awaitTermination()   } } 

 

欢迎大家阅读《【Spark八十五】Spark Streaming分析结果坠地到MySQL_mysql》,跪求各位点评,by 搞代码


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:【Spark八十五】Spark Streaming分析结果坠地到MySQL_mysql

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址