Hadoop系列007-HDFS客户端操作

本人微信公众号,欢迎扫码关注!

HDFS客户端操作

1 环境准备

1.1 Jar包准备

1)解压hadoop-2.7.2.tar.gz到非中文目录

注意1:如果使用WinRAR解压报错的话,就使用超级管理员权限打开DOS窗口,然后cd到解压包所在位置,执行start winrar x -y xxx.tar.gz命令,即可成功

注意2:使用对应平台下编译后的hadoop源码包,即win7系统使用在win7下编译后的源码包,win10同理。

2)进入share文件夹,查找所有jar包,并把jar包拷贝到_lib文件夹下

3)在全部jar包中查找.source.jar,并剪切到_source文件夹。

4)在全部jar包中查找tests.jar,并剪切到_test文件夹。

1.2 IDEA准备

1)配置HADOOP_HOME环境变量

2)建立工程并且添加依赖

注意:Eclipse全选Jar包右键Add Build Path

3)编写代码测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {
//1 加载配置
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://hadoop102:8020");

FileSystem fileSystem = null;
try {
//2 获取文件系统
fileSystem = FileSystem.get(configuration);

//3 上传文件到HDFS指定位置
fileSystem.copyFromLocalFile(new Path("D:/test/asd.txt"), new Path("/user/intflag/input/asd.txt"));
} catch (IOException e) {
e.printStackTrace();
} finally {
closeFileSystem(fileSystem);
}
}

4)执行程序

客户端去操作hdfs时,是有一个用户身份的。默认情况下,hdfs客户端api会从jvm中获取一个参数来作为自己的用户身份:-DHADOOP_USER_NAME=atguigu,atguigu为用户名称。

5)验证

2 通过API操作HDFS

2.1 HDFS获取文件系统
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void getFileSystem() {
// 1 创建配置对象
Configuration conf = new Configuration();
// 2 获取文件系统
FileSystem fs = null;
try {
fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), conf, "intflag");

// 3 打印文件系统
System.out.println(fs);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
} finally {
if (fs != null) {
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

2.2 HDFS文件上传
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void putFileToHDFS() {
// 1 获取文件系统
FileSystem fs = getFileSystem();
// 2 上传文件
try {
fs.copyFromLocalFile(true,new Path("D:/test/x3.000"), new Path("/user/intflag/input/x3.000"));
} catch (IOException e) {
e.printStackTrace();
} finally {
// 3 关闭资源
closeFileSystem(fs);
}
}
2.3 HDFS文件下载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void getFileFromHDFS() {
// 1 获取文件系统
FileSystem fs = getFileSystem();
// 2 下载文件
try {
fs.copyToLocalFile(new Path("/user/intflag/input/x3.000"),new Path("D:/test/x3.000"));
} catch (IOException e) {
e.printStackTrace();
} finally {
// 3 关闭资源
closeFileSystem(fs);
}
}
2.4 HDFS目录创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 在HDFS上创建文件夹
*/
@Test
public void mkdirAtHDFS() {
// 1 获取文件系统
FileSystem fs = getFileSystem();
// 2 创建文件夹
try {
fs.mkdirs(new Path("/user/intflag/test"));
} catch (IOException e) {
e.printStackTrace();
} finally {
// 3 关闭资源
closeFileSystem(fs);
}
}
2.5 HDFS文件夹删除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 删除HDFS上的文件夹
*/
@Test
public void deleteDirAtHDFS() {
// 1 获取文件系统
FileSystem fs = getFileSystem();
// 2 删除文件夹
try {
//fs.delete(new Path("/user/intflag/test"));
fs.delete(new Path("/user/intflag/test"),true);
} catch (IOException e) {
e.printStackTrace();
} finally {
// 3 关闭资源
closeFileSystem(fs);
}
}
2.6 HDFS文件名更改
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 重命名HDFS上的文件夹
*/
@Test
public void renameDirAtHDFS() {
// 1 获取文件系统
FileSystem fs = getFileSystem();
// 2 重命名文件夹
try {
fs.rename(new Path("/user/intflag/test"), new Path("/user/intflag/test22"));
} catch (IOException e) {
e.printStackTrace();
} finally {
// 3 关闭资源
closeFileSystem(fs);
}
}
2.7 HDFS文件详情查看
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 读取HDFS上的文件信息
*/
@Test
public void readFileAtHDFS() {
// 1 获取文件系统
FileSystem fs = getFileSystem();
// 2 读取文件信息
try {
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
LocatedFileStatus status = listFiles.next();
System.out.println("-----------------------------------");
System.out.println("文件名称:"+status.getPath().getName());
System.out.println("块的大小:"+status.getBlockSize());
System.out.println("内容长度:"+status.getLen());
System.out.println("文件权限:"+status.getPermission());
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// 3 关闭资源
closeFileSystem(fs);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
-----------------------------------
文件名称:asd.txt
块的大小:134217728
内容长度:34
文件权限:rw-r--r--
-----------------------------------
文件名称:hadoop-2.7.2.tar.gz
块的大小:134217728
内容长度:197657687
文件权限:rw-r--r--
-----------------------------------
文件名称:liugx.txt
块的大小:134217728
内容长度:64
文件权限:rw-r--r--
-----------------------------------
文件名称:x3.000
块的大小:134217728
内容长度:592
文件权限:rw-r--r--

Process finished with exit code 0
2.8 HDFS文件夹查看
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 读取文件夹信息
*/
@Test
public void readFfolderAtHDFS() {
// 1 获取文件系统
FileSystem fs = getFileSystem();
// 2 读取文件夹信息
try {
FileStatus[] listStatus = fs.listStatus(new Path("/user/intflag/"));
for (FileStatus status : listStatus) {
if (status.isFile()) {
System.out.println("f----"+status.getPath().getName());
} else {
System.out.println("d----"+status.getPath().getName());
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// 3 关闭资源
closeFileSystem(fs);
}
}

3 通过IO流操作HDFS

3.1 HDFS文件上传
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 文件上传到HDFS
*/
@Test
public void putFileToHDFS() {
// 1 获取HDFS
FileSystem fs = getFileSystem();

try {
// 2 获取输出流
FSDataOutputStream fos = fs.create(new Path("/user/intflag/input/x3.000"));
// 3 获取输入流
FileInputStream fis = new FileInputStream(new File("D:/test/x3.000"));
// 4 流对接
IOUtils.copyBytes(fis, fos, new Configuration());
// 5 关闭流
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
} catch (IOException e) {
e.printStackTrace();
} finally {
// 5 关闭资源
closeFileSystem(fs);
}
}
3.2 HDFS文件下载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 下载文件
*/
@Test
public void getFileFromHDFS() {
// 1 获取HDFS
FileSystem fs = getFileSystem();

try {
// 2 获取输入流
FSDataInputStream fis = fs.open(new Path("/user/intflag/input/liugx.txt"));

// 3 获取输出流
FileOutputStream fos = new FileOutputStream(new File("D:/test/liugx.txt"));

// 4 流对接
IOUtils.copyBytes(fis, fos, new Configuration());
// 5 关闭流
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
} catch (IOException e) {
e.printStackTrace();
} finally {
// 5 关闭资源
closeFileSystem(fs);
}
}
3.3 定位文件读取

下载第一块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 下载大文件-下载第一块
*/
@Test
public void getFileFromHDFSSeek1() {
// 1 获取HDFS
FileSystem fs = getFileSystem();

try {
// 2 获取输入流
FSDataInputStream fis = fs.open(new Path("/user/intflag/input/hadoop-2.7.2.tar.gz"));

// 3 获取输出流
FileOutputStream fos = new FileOutputStream(new File("D:/test/hadoop-2.7.2.tar.gz.part1"));

// 4 流对接(只读取128m)
byte[] buff = new byte[1024];
//1024 * 1024 * 128
int len = 1024 * 128;
for (int i = 0; i < len; i++) {
fis.read(buff);
fos.write(buff);
}
// 5 关闭流
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
} catch (IOException e) {
e.printStackTrace();
} finally {
// 5 关闭资源
closeFileSystem(fs);
}
}

下载第二块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 下载大文件-下载第二块
*/
@Test
public void getFileFromHDFSSeek2() {
// 1 获取HDFS
FileSystem fs = getFileSystem();

try {
// 2 获取输入流
FSDataInputStream fis = fs.open(new Path("/user/intflag/input/hadoop-2.7.2.tar.gz"));

// 3 获取输出流
FileOutputStream fos = new FileOutputStream(new File("D:/test/hadoop-2.7.2.tar.gz.part2"));

// 4 流对接(只读取128m)
// 定位到128m
int len = 1024 * 1024 * 128;
fis.seek(len);
IOUtils.copyBytes(fis,fos,new Configuration());

// 5 关闭流
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
} catch (IOException e) {
e.printStackTrace();
} finally {
// 5 关闭资源
closeFileSystem(fs);
}
}

合并文件

1
2
3
4
打开DOS窗口,定位到下载后的位置,输入以下命令合并文件
type hadoop-2.7.2.tar.gz.part2 >> hadoop-2.7.2.tar.gz.part1
然后重命名文件hadoop-2.7.2.tar.gz.part1,将文件.part1去掉
打开文件验证
-------------本文结束感谢您的阅读-------------