HDFS(二)Java API

HDFS Java API

Posted by Spencer on January 15, 2017

一. 基本操作

  • 连接HDFS时,需要将Hadoop配置文件夹下的core-site.xml和hdfs-site.xml拷贝到工程的src目录下

  • 连接/关闭HDFS
    public class HDFSClientDemo {
      private Configuration conf = null;
      private FileSystem fs = null;
      @Before
      public void connect() throws IOException {
          // 参数true表示加载默认资源,在Configuration类的静态代码块中可以看到如下两行代码
          // addDefaultResource("core-default.xml");
          // addDefaultResource("core-site.xml");
          conf = new Configuration(true);
          // 获得文件系统操作的客户端实例对象
          fs = FileSystem.get(conf);
      }
      @After
      public void close() throws IOException {
          if (fs != null)  fs.close();
      }
    }
    
  • 上传文件到HDFS中
      @Test
      public void uploadFile() throws IOException {
          Path src = new Path("/Users/spencer/derby.log");
          Path dst = new Path("/dir4test/derby.log");
          fs.copyFromLocalFile(src, dst);
      }
      // 除了使用封装好的copyFromLocalFile方法,也可以用更为底层的IO流方式实现
      @Test
      public void upLoadFile() throws IOException {
          // 对hdfs的输出流
          Path outputFile = new Path("/dir4test/derby.log");
          FSDataOutputStream outputStream = fs.create(outputFile);
          // 本地文件输入流
          InputStream input =
            new BufferedInputStream(new FileInputStream(new File("/Users/spencer/derby.log")));
          if (input == null) {
            System.out.println("File not exists");
          }
          IOUtils.copyBytes(input, outputStream, conf, true);
      }
    
  • 从HDFS中下载文件
      @Test
      public void downloadFile() throws IOException {
          Path src = new Path("/dir4test/derby.log");
          Path dst = new Path("/private/tmp/myDerby.log");
          fs.copyToLocalFile(src, dst);
      }
      // 使用IO流实现
      @Test
      public void downloadFile() throws IOException {
          // 对hdfs的输入流
          Path filePath = new Path("/dir4test/derby.log");
          FSDataInputStream inputStream = fs.open(filePath);
          // 本地文件输出流
          OutputStream outputStream =
           new BufferedOutputStream(new FileOutputStream(new File("/private/tmp/myDerby.log")));
          IOUtils.copyBytes(inputStream, outputStream, conf, true);
      }
    
  • 获得文件信息
      @Test
      public void fileStatus() throws IOException {
          Path path = new Path("/");
          // 第二个参数代表是否递归查询子目录
          RemoteIterator<LocatedFileStatus> iter = fs.listFiles(path, true);
          while (iter.hasNext()) {
              LocatedFileStatus fileStatus = iter.next();
              System.out.println("blockSize: " + fileStatus.getBlockSize());
              System.out.println("owner: " + fileStatus.getOwner());
              System.out.println("Replication: " + fileStatus.getReplication());
              System.out.println("Permission: " + fileStatus.getPermission());
              System.out.println("Name: " + fileStatus.getPath().getName());
              System.out.println("-----------------------");
          }
      }
    
  • 获得Block信息
      @Test
      public void fileLocation() throws IOException {
          Path path = new Path("/dir4test/test.csv");
          // FileStatus封装了文件系统中文件和目录的元数据
          FileStatus fileStatus = fs.getFileStatus(path);
          BlockLocation[] fileBlockLocations =
            fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
          for (BlockLocation fileBlockLocation : fileBlockLocations) {
              for (String host : fileBlockLocation.getHosts()) {
                  System.out.print(host + " ");
              }
              System.out.println();
          }
      }