Java 操作 HDFS API

avatar 2020年06月26日14:56:32 0 999 views
本文介绍 Java 操作 HDFS API。

一、Hadoop 命令行操作HDFS文件


先介绍一下通过 hadoop 命令来操作文件

进入 hadoop 安装位置的 bin 目录
hadoop fs -ls /   查看 HDFS 根目录文件

hadoop fs -put README.txt /     将本地文件上传到 HDFS

hadoop fs -copyFromLocal LICENSE.txt /  将本地文件拷贝到 HDFS

hadoop fs -cat /README.txt  读取文件

hadoop fs -text /README.txt  读取文件

hadoop fs -get /README.txt .   从HDFS下载文件到本地

hadoop fs -mkdir /test 创建文件夹

hadoop fs -mv  /README.txt /test/ 移动文件到文件夹

hadoop fs -cp  /README.txt  /README.txt-back 复制文件

hadoop fs -getmerge  /test ./t.txt 将HDFS里的/test里的文件下载合并到本地的 t.txt中

hadoop fs -rm /REAME.txt 删除文件

hadoop fs -rmdir /test 删除文件夹(只能删除空文件夹)
hadoop fs -rm -r /test 删除文件夹



二、Java操作 HDFS API


下面介绍通过 Java 代码来操作 HDFS 的文件

1、新建一个 Maven 项目



项目名称自定义

文件结构如下



2、pom.xml

为了方便大家参考,我这里直接把完整的 pom.xml 贴出来了

主要是添加了 hadoop-client 依赖,配置了 cloudera 仓库。
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>hadoop-train</artifactId>
<version>1.0-SNAPSHOT</version>

<name>hadoop-train</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<hadoop.version>2.6.0-cdh5.15.1</hadoop.version>
</properties>


<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
</repository>
</repositories>

<dependencies>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>

 

3、单元测试

完整代码如下
package org.liuyanzhao.bigdata.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;

/**
* 使用Java API操作HDFS文件系统
* <p>
* 关键点:
* 1)创建Configuration
* 2)获取FileSystem
* 3)操作
*
* @author 言曌
* @date 2020/6/25 10:08 下午
*/

public class HDFSApp {

public static final String HDFS_PATH = "hdfs://localhost:8020";

FileSystem fileSystem = null;
Configuration configuration = null;

@Before
public void before() throws URISyntaxException, IOException, InterruptedException {
System.out.println("------before-------");
configuration = new Configuration();
// 设置创建文件时副本数为1
configuration.set("dfs.replication", "1");

fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, "liuyanzhao");
}

@After
public void after() {
System.out.println("------after-------");
configuration = null;
fileSystem = null;
}


/**
* 创建 HDFS 文件夹
*
* @throws IOException
*/
@Test
public void mkdir() throws IOException {
Path path = new Path("/hdfsapi/test2");
boolean result = fileSystem.mkdirs(path);
System.out.println(result);
}

/**
* 查看 HDFS 文件内容
*
* @throws IOException
*/
@Test
public void text() throws IOException {
Path path = new Path("/LICENSE.txt");
FSDataInputStream in = fileSystem.open(path);
IOUtils.copyBytes(in, System.out, 1024);
}

/**
* 创建文件
*
* @throws IOException
*/
@Test
public void create() throws IOException {
Path path = new Path("/hdfsapi/test/a.txt");
FSDataOutputStream out = fileSystem.create(path);
out.writeUTF("hello hdfs");
out.flush();
out.close();
}

/**
* 重命名
*
* @throws IOException
*/
@Test
public void rename() throws IOException {
Path srcPath = new Path("/hdfsapi/test/a.txt");
Path destPath = new Path("/hdfsapi/test/b.txt");
boolean result = fileSystem.rename(srcPath, destPath);
System.out.println(result);
}

/**
* 上传本地文件到 HDFS
*
* @throws IOException
*/
@Test
public void copyFromLocalFile() throws IOException {
Path srcPath = new Path("/Users/liuyanzhao/Desktop/category.sql");
Path descPath = new Path("/hdfsapi/test/");
fileSystem.copyFromLocalFile(srcPath, descPath);
}


/**
* 上传大文件到 HDFS,带进度条
*
* @throws IOException
*/
@Test
public void copyFromLocalBigFile() throws IOException {

InputStream in = new BufferedInputStream(new FileInputStream(new File("/Users/liuyanzhao/Desktop/ForestShop.war")));

FSDataOutputStream out = fileSystem.create(new Path("/hdfsapi/test/ForestShop.war"), new Progressable() {
@Override
public void progress() {
System.out.print(".");
}
});

IOUtils.copyBytes(in, out, 4096);

}

/**
* 从 HDFS 下载文件到本地
*/
@Test
public void copyToLocalFile() throws IOException {
Path srcPath = new Path("/hdfsapi/test/b.txt");
Path descPath = new Path("/Users/liuyanzhao/Desktop");
fileSystem.copyToLocalFile(srcPath, descPath);
}

/**
* 列出文件夹下的文件和文件夹列表
*
* @throws IOException
*/
@Test
public void listFiles() throws IOException {
FileStatus[] statuses = fileSystem.listStatus(new Path("/hdfsapi/test"));
for (FileStatus file : statuses) {
String isDir = file.isDirectory() ? "文件夹" : "文件";
String permission = file.getPermission().toString();
short replication = file.getReplication();
long length = file.getLen();
String path = file.getPath().toString();
System.out.println(isDir + "\t" + permission + "\t" + replication + "\t" + length + "\t" + path);
}
}

/**
* 递归列出文件夹下的文件列表
*
* @throws IOException
*/
@Test
public void listFilesRecursive() throws IOException {
RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(new Path("/hdfsapi"), true);
while (files.hasNext()) {
LocatedFileStatus file = files.next();
String isDir = file.isDirectory() ? "文件夹" : "文件";
String permission = file.getPermission().toString();
short replication = file.getReplication();
long length = file.getLen();
String path = file.getPath().toString();
System.out.println(isDir + "\t" + permission + "\t" + replication + "\t" + length + "\t" + path);
}
}


/**
* 获取文件分块信息
*
* @throws IOException
*/
@Test
public void getFileBlockLocations() throws IOException {
FileStatus fileStatus = fileSystem.getFileStatus(new Path("/hdfsapi/test/ForestShop.war"));
BlockLocation[] blockLocations = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
for (BlockLocation block : blockLocations) {
String[] names = block.getNames();
for (String name : names) {
System.out.println(name + " : " + block.getOffset() + " : " + block.getLength());
}
}
}

/**
* 删除HDFS文件
*/
@Test
public void delete() throws IOException {
Path path = new Path("/hdfsapi/test/ForestShop.war");
boolean result = fileSystem.delete(path, true);
System.out.println(result);
}

// public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException {
//
// // localhost:8020
// Configuration configuration = new Configuration();
// FileSystem fileSystem = FileSystem.get(new URI("hdfs://localhost:8020"), configuration, "liuyanzhao");
//
// Path path = new Path("/hdfsapi/test2");
// boolean result = fileSystem.mkdirs(path);
// System.out.println(result);
// }


}

 

 

  • 微信
  • 交流学习,有偿服务
  • weinxin
  • 博客/Java交流群
  • 资源分享,问题解决,技术交流。群号:590480292
  • weinxin
avatar

发表评论

avatar 登录者:匿名
可以匿名评论或者登录后台评论,评论回复后会有邮件通知

  

已通过评论:0   待审核评论数:0