学习HDFS有一段时间了,现在把自己总结的HDFS的相关操作代码展示给大家。
主要有HDFS的增删改查,文件的追加,windows本地文件的上传,hdfs文件的下载,文件重命名,创建目录,文件是否存在等操作。
准备工作:我是用maven搭配的环境,下面用到了单元测试@Test,需要在pom.xml文件内添加junit的依赖
1 package hdfs;
2
3 import java.io.*;
4 import java.security.PrivilegedExceptionAction;
5
6 import org.apache.hadoop.conf.Configuration;
7 import org.apache.hadoop.fs.FSDataOutputStream;
8 import org.apache.hadoop.fs.FileStatus;
9 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.io.IOUtils;
12 import org.apache.hadoop.security.UserGroupInformation;
13 import org.junit.Test;
14
15
16 public class OperatingFiles {
17 // initialization
18 //读取配置文件
19 static Configuration conf = new Configuration();
20 static FileSystem hdfs;
21
22 static { //root是你主节点虚机的用户名
23 UserGroupInformation ugi = UserGroupInformation
24 .createRemoteUser("root");
25 try {
26 ugi.doAs(new PrivilegedExceptionAction() {
27 public Void run() throws Exception {
28 Configuration conf = new Configuration(); //"hdfs://lyz01:9000/"对应的是你自己的网址
29 conf.set("fs.default.name", "hdfs://lyz01:9000/");
30 //conf.set("hadoop.job.ugi", "root");
31 //以下两行是支持 hdfs的追加 功能的:hdfs.append()
32 conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
33 conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
34 Path path = new Path("hdfs://lyz01:9000/");
35 //如果在本地测试,需要使用此种方法获取文件系统
36 hdfs = FileSystem.get(path.toUri(), conf);
37 //hdfs = path.getFileSystem(conf); // 这个也可以
38 //如果在Hadoop集群下运行,使用此种方法可以直接获取默认文件系统
39 //hdfs = FileSystem.get(conf); //这个不行,这样得到的hdfs所有操作都是针对本地文件系统,而不是针对hdfs的,原因不太清楚
40 return null;
41 }
42 });
43 } catch (IOException e) {
44 // TODO Auto-generated catch block
45 e.printStackTrace();
46 } catch (InterruptedException e) {
47 // TODO Auto-generated catch block
48 e.printStackTrace();
49 }
50 }
51
52 // 创建hdfs目录
53 @Test
54 public void createDir() throws IOException {
55 String dir = "/test2/";
56 Path path = new Path(dir);
57 if (hdfs.exists(path)) {
58 System.out.println("dir \t" + conf.get("fs.default.name") + dir
59 + "\t already exists");
60 return;
61 }
62 hdfs.mkdirs(path);
63 System.out.println("new dir \t" + conf.get("fs.default.name") + dir);
64 }
65
66 // 文件重命名
67 @Test
68 public void renameFile() throws IOException{
69 String oldName = "/reduceJoin/2.txt";
70 String newName = "/reduceJoin/tb_b.txt";
71 Path oldPath = new Path(oldName);
72 Path newPath = new Path(newName);
73 if (hdfs.exists(oldPath)){
74 hdfs.rename(oldPath,newPath);
75 System.out.println("rename成功!");
76 }else{
77 System.out.println("文件不存在!rename失败!");
78 }
79 }
80
81 // 读取文件
82 @Test
83 public void readFile() throws IOException{
84 String uri = "/output2017_11_12_12_57_04/part-r-00000";
85 //判断文件是否存在
86 if(!hdfs.exists(new Path(uri))){
87 System.out.println("Error ; the file not exists.");
88 return;
89 }
90 InputStream in = null;
91 try {
92 in = hdfs.open(new Path(uri));
93 //BufferedReader bf =new BufferedReader(new InputStreamReader(in,"GB2312"));//防止中文乱码
94 //复制到标准输出流
95 IOUtils.copyBytes(in, System.out, 4096,false);
96 /*String line = null;
97 while((line = bf.readLine()) != null){
98 System.out.println(line);
99 }*/
100 } catch (Exception e) {
101 e.printStackTrace();
102 }finally{
103 IOUtils.closeStream(in);
104 }
105 }
106
107 // 从本地往HDFS上传文件
108 @Test
109 public void copyFile() throws IOException {
110 String localSrc = "D:/group_max.txt";
111 String hdfsDst = "/group/";
112 Path src = new Path(localSrc);
113 Path dst = new Path(hdfsDst);
114 //本地文件不存在
115 if (!(new File(localSrc)).exists()) {
116 System.out.println("Error: local dir \t" + localSrc
117 + "\t not exists.");
118 return;
119 }
120 //hdfs路径不存在
121 if (!hdfs.exists(dst)) {
122 System.out.println("Error: dest dir \t" + dst.toUri()
123 + "\t not exists.");
124 return;
125 }
126 String dstPath = dst.toUri() + "/" + src.getName();
127 //System.out.println(dstPath);// "/test1/3931.jpg"
128 //判断上传的文件 hdfs的目录下是否存在
129 if (hdfs.exists(new Path(dstPath))) {
130 System.out.println("Warn: dest file \t" + dstPath
131 + "\t already exists.");
132 }else{
133 //本地文件上传hdfs
134 hdfs.copyFromLocalFile(src, dst);
135 // list all the files in the current direction
136 //遍历文件
137 FileStatus files[] = hdfs.listStatus(dst);
138 System.out.println("Upload to \t" + conf.get("fs.default.name")
139 + hdfsDst);
140 for (FileStatus file : files) {
141 System.out.println(file.getPath());
142 }
143 }
144 }
145
146 // 从HDFS 下载文件 到本地
147 @Test
148 public void downloadFile() throws IllegalArgumentException,IOException{
149 String hdfsDst = "/test2/2_1";
150 String localSrc = "D:/hadfs";
151 Path dst = new Path(hdfsDst);
152 Path src = new Path(localSrc);
153 //本地的路径 + hdfs下载的文件名
154 String localFile = localSrc + "/" + dst.getName();
155 //如果HDFS路径不存在
156 if(!hdfs.exists(dst.getParent())){
157 System.out.println("Error : the HDFS directory:\t" + dst.getParent() + "\tdoes not exist. Please check it!");
158 return;
159 }
160 //如果本地目录不存在,则创建
161 if(!new File(localSrc).exists()){
162 new File(localSrc).mkdirs();
163 System.out.println("Warn : The local directory does not exist. It has been automatically created for you!");
164 }
165 // 如果本地文件存在
166 if(new File(localFile).exists()){
167 System.out.println("Error : the localSrc: \t" + localFile + "\t already exists.");
168 return;
169 }
170 //如果HDFS文件不存在
171 if(!hdfs.exists(new Path(hdfsDst))){
172 System.out.println("Error : the HDFS file: \t" + hdfsDst + "\t not exists.");
173 }else{
174 //HDFS下载文件到本地
175 hdfs.copyToLocalFile(false,dst,src,true);
176 System.out.println("successful :download successful! please look at: \t" + localSrc);
177 }
178 }
179
180
181 // create a new file
182 @Test
183 public void createFile()
184 throws IOException {
185 String fileName = "/test3/b.txt";
186 String fileContent = "";
187 Path dst = new Path(fileName);
188 //判断 新建的文件在hdfs上是否存在
189 if(hdfs.exists(dst)){
190 System.out.println("Error : the hdfs file exists.");
191 }else {
192 byte[] bytes = fileContent.getBytes();
193 FSDataOutputStream output = hdfs.create(dst);
194 output.write(bytes);
195 System.out.println("new file \t" + conf.get("fs.default.name")
196 + fileName);
197 }
198 }
199
200 // 追加内容到文件
201 @Test
202 public void appendFile()
203 throws IOException {
204 String fileName = "/test2/file2.txt";
205 String fileContent = "你好 世界";
206 Path dst = new Path(fileName);
207 byte[] bytes = fileContent.getBytes();
208 //如果文件不存在
209 if (!hdfs.exists(dst)) {
210 System.out.println("Error : the file not exists");
211 return;
212 }
213 FSDataOutputStream output = hdfs.append(dst);
214 output.write(bytes);
215 System.out.println("successful: append to file \t" + conf.get("fs.default.name")
216 + fileName);
217 }
218
219
220 // 列出所有文件
221 @Test
222 public void listFiles() throws IOException {
223 String dirName = "/test1";
224 Path f = new Path(dirName);
225 FileStatus[] status = hdfs.listStatus(f);
226 System.out.println(dirName + " has all files:");
227 if (status.length == 0) {
228 System.out.println("nothing !");
229 } else {
230 for (int i = 0; i < status.length; i++) {
231 System.out.println(status[i].getPath().toString());
232 }
233 }
234 }
235
236 // 判断文件是否存在,存在即删除
237 @Test
238 public void deleteFile() throws IOException {
239 String fileName = "/test2";
240 Path f = new Path(fileName);
241 boolean isExists = hdfs.exists(f);
242 if (isExists) { // if exists, delete
243 boolean isDel = hdfs.delete(f, true);
244 System.out.println(fileName + " delete? \t" + isDel);
245 } else {
246 System.out.println(fileName + " exist? \t" + notExists);
247 }
248 }
249 }
Original: https://www.cnblogs.com/createboke/p/7824785.html
Author: CREATE_17
Title: 利用JAVA API远程进行HDFS的相关操作
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/565833/
转载文章受原作者版权保护。转载请注明原作者出处!