在使用Java读取一个文件系统中的一个文件时,我们会首先构造一个DataInputStream对象,然后就能够从文件中读取数据。对于存储在HDFS上的文件,也对应着类似的工具类,但是底层的实现逻辑却是非常不同的。我们先从使用DFSClient.DFSDataInputStream类来读取HD
在使用Java读取一个文件系统中的一个文件时,我们会首先构造一个DataInputStream对象,然后就能够从文件中读取数据。对于存储在HDFS上的文件,也对应着类似的工具类,但是底层的实现逻辑却是非常不同的。我们先从使用DFSClient.DFSDataInputStream类来读取HDFS上一个文件的一段代码来看,如下所示:
package org.shirdrn.hadoop.hdfs;
import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;public class HdfsFileReader { public static void main(String[] args) { String file = "hdfs://hadoop-cluster-m:8020/data/logs/basis_user_behavior/201405071237_10_10_1_73.log"; Path path = new Path(file); Configuration conf = new Configuration(); FileSystem fs; FSDataInputStream in; BufferedReader reader = null; try { fs = FileSystem.get(conf); in = fs.open(path); // 打开文件path,返回一个FSDataInputStream流对象 reader = new BufferedReader(new InputStreamReader(in)); String line = null; while((line = reader.readLine()) != null) { // 读取文件行内容 System.out.println("Record: " + line); } <strong>本文来源gaodai#ma#com搞@@代~&码网</strong> } catch (IOException e) { e.printStackTrace(); } finally { try { if(reader != null) reader.close(); } catch (IOException e) { e.printStackTrace(); } } }}
基于上面代码,我们可以看到,通过一个FileSystem对象可以打开一个Path文件,返回一个FSDataInputStream文件输入流对象,然后从该FSDataInputStream对象就能够读取出文件的内容。所以,我们从FSDataInputStream入手,详细分析从HDFS读取文件内容的过程,在实际地读取物理数据块之前,首先要获取到文件对应的Block列表元数据信息,整体流程如下图所示:
下面,详细说明整个流程:
创建FSDataInputStream流对象
从一个Path路径对象,能够获取到一个FileSystem对象,然后通过调用FileSystem的open方法打开一个文件流:
public FSDataInputStream open(Path f) throws IOException { return open(f, getConf().getInt("io.file.buffer.size", 4096)); }
由于FileSystem是抽象类,将具体的打开操作留给具体子类实现,例如FTPFileSystem、HarFileSystem、WebHdfsFileSystem等,不同的文件系统具有不同打开文件的行为,我们以DistributedFileSystem为例,open方法实现,代码如下所示:
public FSDataInputStream open(Path f, int bufferSize) throws IOException { statistics.incrementReadOps(1); return new DFSClient.DFSDataInputStream( dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics)); }
statistics对象用来收集文件系统操作的统计数据,这里使读取文件操作的计数器加1。然后创建了一个DFSClient.DFSDataInputStream对象,该对象的参数是通过DFSClient dfs客户端对象打开一个这个文件从而返回一个DFSInputStream对象,下面,我们看DFSClient的open方法实现,代码如下所示:
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum, FileSystem.Statistics stats) throws IOException { checkOpen(); // Get block info from namenode return new DFSInputStream(src, buffersize, verifyChecksum); }