Java 操作 HDFS API

avatar 2020年6月26日14:56:32 评论 37 views

本文介绍 Java 操作 HDFS API。

一、Hadoop 命令行操作HDFS文件

先介绍一下通过 hadoop 命令来操作文件

进入 hadoop 安装位置的 bin 目录

hadoop fs -ls /   查看 HDFS 根目录文件

hadoop fs -put README.txt /     将本地文件上传到 HDFS

hadoop fs -copyFromLocal LICENSE.txt /  将本地文件拷贝到 HDFS

hadoop fs -cat /README.txt  读取文件

hadoop fs -text /README.txt  读取文件

hadoop fs -get /README.txt .   从HDFS下载文件到本地

hadoop fs -mkdir /test 创建文件夹

hadoop fs -mv  /README.txt /test/ 移动文件到文件夹

hadoop fs -cp  /README.txt  /README.txt-back 复制文件

hadoop fs -getmerge  /test ./t.txt 将HDFS里的/test里的文件下载合并到本地的 t.txt中

hadoop fs -rm /REAME.txt  删除文件

hadoop fs -rmdir /test  删除文件夹(只能删除空文件夹)
hadoop fs -rm -r /test  删除文件夹

二、Java操作 HDFS API

下面介绍通过 Java 代码来操作 HDFS 的文件

1、新建一个 Maven 项目

项目名称自定义

文件结构如下

2、pom.xml

为了方便大家参考,我这里直接把完整的 pom.xml 贴出来了

主要是添加了 hadoop-client 依赖,配置了 cloudera 仓库。

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>hadoop-train</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>hadoop-train</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <hadoop.version>2.6.0-cdh5.15.1</hadoop.version>
  </properties>


  <repositories>
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
    </repository>
  </repositories>

  <dependencies>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    
  </dependencies>

</project>

 

3、单元测试

完整代码如下

package org.liuyanzhao.bigdata.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * 使用Java API操作HDFS文件系统
 * <p>
 * 关键点:
 * 1)创建Configuration
 * 2)获取FileSystem
 * 3)操作
 *
 * @author 言曌
 * @date 2020/6/25 10:08 下午
 */

public class HDFSApp {

    public static final String HDFS_PATH = "hdfs://localhost:8020";

    FileSystem fileSystem = null;
    Configuration configuration = null;

    @Before
    public void before() throws URISyntaxException, IOException, InterruptedException {
        System.out.println("------before-------");
        configuration = new Configuration();
        // 设置创建文件时副本数为1
        configuration.set("dfs.replication", "1");

        fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, "liuyanzhao");
    }

    @After
    public void after() {
        System.out.println("------after-------");
        configuration = null;
        fileSystem = null;
    }


    /**
     * 创建 HDFS 文件夹
     *
     * @throws IOException
     */
    @Test
    public void mkdir() throws IOException {
        Path path = new Path("/hdfsapi/test2");
        boolean result = fileSystem.mkdirs(path);
        System.out.println(result);
    }

    /**
     * 查看 HDFS 文件内容
     *
     * @throws IOException
     */
    @Test
    public void text() throws IOException {
        Path path = new Path("/LICENSE.txt");
        FSDataInputStream in = fileSystem.open(path);
        IOUtils.copyBytes(in, System.out, 1024);
    }

    /**
     * 创建文件
     *
     * @throws IOException
     */
    @Test
    public void create() throws IOException {
        Path path = new Path("/hdfsapi/test/a.txt");
        FSDataOutputStream out = fileSystem.create(path);
        out.writeUTF("hello hdfs");
        out.flush();
        out.close();
    }

    /**
     * 重命名
     *
     * @throws IOException
     */
    @Test
    public void rename() throws IOException {
        Path srcPath = new Path("/hdfsapi/test/a.txt");
        Path destPath = new Path("/hdfsapi/test/b.txt");
        boolean result = fileSystem.rename(srcPath, destPath);
        System.out.println(result);
    }

    /**
     * 上传本地文件到 HDFS
     *
     * @throws IOException
     */
    @Test
    public void copyFromLocalFile() throws IOException {
        Path srcPath = new Path("/Users/liuyanzhao/Desktop/category.sql");
        Path descPath = new Path("/hdfsapi/test/");
        fileSystem.copyFromLocalFile(srcPath, descPath);
    }


    /**
     * 上传大文件到 HDFS,带进度条
     *
     * @throws IOException
     */
    @Test
    public void copyFromLocalBigFile() throws IOException {

        InputStream in = new BufferedInputStream(new FileInputStream(new File("/Users/liuyanzhao/Desktop/ForestShop.war")));

        FSDataOutputStream out = fileSystem.create(new Path("/hdfsapi/test/ForestShop.war"), new Progressable() {
            @Override
            public void progress() {
                System.out.print(".");
            }
        });

        IOUtils.copyBytes(in, out, 4096);

    }

    /**
     * 从 HDFS 下载文件到本地
     */
    @Test
    public void copyToLocalFile() throws IOException {
        Path srcPath = new Path("/hdfsapi/test/b.txt");
        Path descPath = new Path("/Users/liuyanzhao/Desktop");
        fileSystem.copyToLocalFile(srcPath, descPath);
    }

    /**
     * 列出文件夹下的文件和文件夹列表
     *
     * @throws IOException
     */
    @Test
    public void listFiles() throws IOException {
        FileStatus[] statuses = fileSystem.listStatus(new Path("/hdfsapi/test"));
        for (FileStatus file : statuses) {
            String isDir = file.isDirectory() ? "文件夹" : "文件";
            String permission = file.getPermission().toString();
            short replication = file.getReplication();
            long length = file.getLen();
            String path = file.getPath().toString();
            System.out.println(isDir + "\t" + permission + "\t" + replication + "\t" + length + "\t" + path);
        }
    }

    /**
     * 递归列出文件夹下的文件列表
     *
     * @throws IOException
     */
    @Test
    public void listFilesRecursive() throws IOException {
        RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(new Path("/hdfsapi"), true);
        while (files.hasNext()) {
            LocatedFileStatus file = files.next();
            String isDir = file.isDirectory() ? "文件夹" : "文件";
            String permission = file.getPermission().toString();
            short replication = file.getReplication();
            long length = file.getLen();
            String path = file.getPath().toString();
            System.out.println(isDir + "\t" + permission + "\t" + replication + "\t" + length + "\t" + path);
        }
    }


    /**
     * 获取文件分块信息
     *
     * @throws IOException
     */
    @Test
    public void getFileBlockLocations() throws IOException {
        FileStatus fileStatus = fileSystem.getFileStatus(new Path("/hdfsapi/test/ForestShop.war"));
        BlockLocation[] blockLocations = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
        for (BlockLocation block : blockLocations) {
            String[] names = block.getNames();
            for (String name : names) {
                System.out.println(name + " : " + block.getOffset() + " : " + block.getLength());
            }
        }
    }

    /**
     * 删除HDFS文件
     */
    @Test
    public void delete() throws IOException {
        Path path = new Path("/hdfsapi/test/ForestShop.war");
        boolean result = fileSystem.delete(path, true);
        System.out.println(result);
    }

//    public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException {
//
//        // localhost:8020
//        Configuration configuration = new Configuration();
//        FileSystem fileSystem = FileSystem.get(new URI("hdfs://localhost:8020"), configuration, "liuyanzhao");
//
//        Path path = new Path("/hdfsapi/test2");
//        boolean result = fileSystem.mkdirs(path);
//        System.out.println(result);
//    }


}

 

 

  • 微信
  • 交流学习,有偿服务
  • weinxin
  • 博客/Java交流群
  • 资源分享,问题解决,技术交流。群号:590480292
  • weinxin
avatar

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: