• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

详解Spark中将对象序列化存储到hdfs

java 搞代码 4年前 (2022-01-09) 26次浏览 已收录 0个评论

这篇文章主要介绍了java 中Spark中将对象序列化存储到hdfs的相关资料,需要的朋友可以参考下

java 中Spark中将对象序列化存储到hdfs

摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.

废话不多说, 直接贴代码了. spark1.4 + hbase0.98

import org.apache.spark.storage.StorageLevelimport scala.collection.JavaConverters._import java.io.Fileimport java.io.FileInputStreamimport java.io.FileOutputStreamimport java.io.ObjectInputStreamimport java.io.ObjectOutputStreamimport java.net.URIimport java.util.Dateimport org.ansj.library.UserDefineLibraryimport org.ansj.splitWord.analysis.NlpAnalysisimport org.ansj.splitWord.analysis.ToAnalysisimport org.apache.hadoop.fs.FSDataInputStreamimport org.apache.hadoop.fs.FSDataOutputStreamimport org.apache.hadoop.fs.FileSystemimport org.apache.hadoop.fs.FileUtilimport org.apache.hadoop.fs.Pathimport org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}import org.apache.hadoop.hbase.filter.FilterListimport org.apache.hadoop.hbase.filter.PageFilterimport org.apache.hadoop.hbase.filter.RegexStringComparatorimport org.apache.hadoop.hbase.filter.SingleColumnValueFilterimport org.apache.hadoop.hbase.filter.CompareFilter.CompareOpimport org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.hadoop.hbase.protobuf.ProtobufUtilimport org.apache.hadoop.hbase.util.{Base64, Bytes}import com.feheadline.fespark.db.Neo4jManagerimport com.feheadline.fespark.util.Envimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.rdd._import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}import scala.math.logimport scala.io.Sourceobject Word2VecDemo { def convertScanToString(scan: Scan) = {  val proto = ProtobufUtil.toScan(scan)  Base64.encodeBytes(proto.toByteArray) } def main(args: Array[String]): Unit = {  val sparkConf = new SparkConf().setAppName("Word2Vec Demo")  sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  sparkConf.set("spark.kryoserializer.buffer", "256m")  sparkConf.set("spark.kryoserializer.buffer.max","2046m")  sparkConf.set("spark.akka.frameSize", "500")  sparkConf.set("spark.rpc.askTimeout", "30")    val sc = new SparkContext(sparkConf)  val hbaseConf = HBaseConfiguration.create()  hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")  hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")  val scan = new Scan()  val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)    val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")    val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(  "data".getBytes,  "article".getBytes,  CompareOp.EQUAL,  comp  )    filterList.addFilter(articleFilter)  filterList.addFilter(new PageFilter(100))    scan.setFilter(filterList)  scan.setCaching(50)  scan.setCacheBlocks(false)  hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))  val crawledRDD = sc.newAPIHadoopRDD(   hbaseConf,   classOf[TableInputFormat],   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritab<a style="color:transparent">本文来源gao($daima.com搞@代@#码(网5</a>le],   classOf[org.apache.hadoop.hbase.client.Result]  )   val articlesRDD = crawledRDD.filter{   case (_,result) => {     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))     content != null   }  }  val wordsInDoc = articlesRDD.map{   case (_,result) => {     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))     if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq     else Seq("")   }  }    val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)    val word2vec = new Word2Vec()  val model = word2vec.fit(fitleredWordsInDoc)    //---------------------------------------重点看这里-------------------------------------------------------------  //将上面的模型存储到hdfs  val hadoopConf = sc.hadoopConfiguration  hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")  val fileSystem = FileSystem.get(hadoopConf)  val path = new Path("/user/hadoop/data/mllib/word2vec-object")  val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))  oos.writeObject(model)  oos.close    //这里示例另外一个程序直接从hdfs读取序列化对象使用模型  val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))  val sample_model = ois.readObject.asInstanceOf[Word2VecModel]    /*  * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型  * import java.io._  * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}  * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))  * val sample_model = ois.readObject.asInstanceOf[Word2VecModel]  * ois.close  */  //-------------------------------------------------------------------------------------------------------------- }}

以上就是详解Spark中将对象序列化存储到hdfs的详细内容,更多请关注搞代码gaodaima其它相关文章!


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:详解Spark中将对象序列化存储到hdfs
喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址