利用JAVA API远程进行HDFS的相关操作

学习HDFS有一段时间了,现在把自己总结的HDFS的相关操作代码展示给大家。

主要有HDFS的增删改查,文件的追加,windows本地文件的上传,hdfs文件的下载,文件重命名,创建目录,文件是否存在等操作。

准备工作:我是用maven搭配的环境,下面用到了单元测试@Test,需要在pom.xml文件内添加junit的依赖

1 package hdfs;
  2
  3 import java.io.*;
  4 import java.security.PrivilegedExceptionAction;
  5
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.fs.FSDataOutputStream;
  8 import org.apache.hadoop.fs.FileStatus;
  9 import org.apache.hadoop.fs.FileSystem;
 10 import org.apache.hadoop.fs.Path;
 11 import org.apache.hadoop.io.IOUtils;
 12 import org.apache.hadoop.security.UserGroupInformation;
 13 import org.junit.Test;
 14
 15
 16 public class OperatingFiles {
 17     // initialization
 18     //读取配置文件
 19     static Configuration conf = new Configuration();
 20     static FileSystem hdfs;
 21
 22     static {        //root是你主节点虚机的用户名
 23         UserGroupInformation ugi = UserGroupInformation
 24                 .createRemoteUser("root");
 25         try {
 26             ugi.doAs(new PrivilegedExceptionAction() {
 27                 public Void run() throws Exception {
 28                     Configuration conf = new Configuration();               //"hdfs://lyz01:9000/"对应的是你自己的网址
 29                     conf.set("fs.default.name", "hdfs://lyz01:9000/");
 30                     //conf.set("hadoop.job.ugi", "root");
 31                     //以下两行是支持 hdfs的追加 功能的:hdfs.append()
 32                     conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
 33                     conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
 34                     Path path = new Path("hdfs://lyz01:9000/");
 35                     //如果在本地测试,需要使用此种方法获取文件系统
 36                     hdfs = FileSystem.get(path.toUri(), conf);
 37                     //hdfs = path.getFileSystem(conf); // 这个也可以
 38                     //如果在Hadoop集群下运行,使用此种方法可以直接获取默认文件系统
 39                     //hdfs = FileSystem.get(conf); //这个不行,这样得到的hdfs所有操作都是针对本地文件系统,而不是针对hdfs的,原因不太清楚
 40                     return null;
 41                 }
 42             });
 43         } catch (IOException e) {
 44             // TODO Auto-generated catch block
 45             e.printStackTrace();
 46         } catch (InterruptedException e) {
 47             // TODO Auto-generated catch block
 48             e.printStackTrace();
 49         }
 50     }
 51
 52     // 创建hdfs目录
 53     @Test
 54     public void createDir() throws IOException {
 55         String dir = "/test2/";
 56         Path path = new Path(dir);
 57         if (hdfs.exists(path)) {
 58             System.out.println("dir \t" + conf.get("fs.default.name") + dir
 59                     + "\t already exists");
 60             return;
 61         }
 62         hdfs.mkdirs(path);
 63         System.out.println("new dir \t" + conf.get("fs.default.name") + dir);
 64     }
 65
 66     // 文件重命名
 67     @Test
 68     public void renameFile() throws IOException{
 69         String oldName = "/reduceJoin/2.txt";
 70         String newName = "/reduceJoin/tb_b.txt";
 71         Path oldPath = new Path(oldName);
 72         Path newPath = new Path(newName);
 73         if (hdfs.exists(oldPath)){
 74             hdfs.rename(oldPath,newPath);
 75             System.out.println("rename成功!");
 76         }else{
 77             System.out.println("文件不存在!rename失败!");
 78         }
 79     }
 80
 81     // 读取文件
 82     @Test
 83     public void readFile() throws IOException{
 84         String uri = "/output2017_11_12_12_57_04/part-r-00000";
 85         //判断文件是否存在
 86         if(!hdfs.exists(new Path(uri))){
 87             System.out.println("Error ; the file not exists.");
 88             return;
 89         }
 90         InputStream in = null;
 91         try {
 92             in = hdfs.open(new Path(uri));
 93             //BufferedReader bf =new BufferedReader(new InputStreamReader(in,"GB2312"));//防止中文乱码
 94             //复制到标准输出流
 95             IOUtils.copyBytes(in, System.out, 4096,false);
 96             /*String line = null;
 97             while((line = bf.readLine()) != null){
 98                 System.out.println(line);
 99             }*/
100         } catch (Exception e) {
101             e.printStackTrace();
102         }finally{
103             IOUtils.closeStream(in);
104         }
105     }
106
107     // 从本地往HDFS上传文件
108     @Test
109     public void copyFile() throws IOException {
110         String localSrc = "D:/group_max.txt";
111         String hdfsDst = "/group/";
112         Path src = new Path(localSrc);
113         Path dst = new Path(hdfsDst);
114         //本地文件不存在
115         if (!(new File(localSrc)).exists()) {
116             System.out.println("Error: local dir \t" + localSrc
117                     + "\t not exists.");
118             return;
119         }
120         //hdfs路径不存在
121         if (!hdfs.exists(dst)) {
122             System.out.println("Error: dest dir \t" + dst.toUri()
123                     + "\t not exists.");
124             return;
125         }
126         String dstPath = dst.toUri() + "/" + src.getName();
127         //System.out.println(dstPath);//   "/test1/3931.jpg"
128         //判断上传的文件 hdfs的目录下是否存在
129         if (hdfs.exists(new Path(dstPath))) {
130             System.out.println("Warn: dest file \t" + dstPath
131                     + "\t already exists.");
132         }else{
133             //本地文件上传hdfs
134             hdfs.copyFromLocalFile(src, dst);
135             // list all the files in the current direction
136             //遍历文件
137             FileStatus files[] = hdfs.listStatus(dst);
138             System.out.println("Upload to \t" + conf.get("fs.default.name")
139                     + hdfsDst);
140             for (FileStatus file : files) {
141                 System.out.println(file.getPath());
142             }
143         }
144     }
145
146     // 从HDFS 下载文件 到本地
147     @Test
148     public void downloadFile() throws IllegalArgumentException,IOException{
149         String hdfsDst = "/test2/2_1";
150         String localSrc = "D:/hadfs";
151         Path dst = new Path(hdfsDst);
152         Path src = new Path(localSrc);
153         //本地的路径 + hdfs下载的文件名
154         String localFile = localSrc + "/" + dst.getName();
155         //如果HDFS路径不存在
156         if(!hdfs.exists(dst.getParent())){
157             System.out.println("Error : the HDFS directory:\t" + dst.getParent() + "\tdoes not exist. Please check it!");
158             return;
159         }
160         //如果本地目录不存在,则创建
161         if(!new File(localSrc).exists()){
162             new File(localSrc).mkdirs();
163             System.out.println("Warn : The local directory does not exist. It has been automatically created for you!");
164         }
165         // 如果本地文件存在
166         if(new File(localFile).exists()){
167             System.out.println("Error : the localSrc: \t" + localFile + "\t already exists.");
168             return;
169         }
170         //如果HDFS文件不存在
171         if(!hdfs.exists(new Path(hdfsDst))){
172             System.out.println("Error : the HDFS file: \t" + hdfsDst + "\t not exists.");
173         }else{
174             //HDFS下载文件到本地
175             hdfs.copyToLocalFile(false,dst,src,true);
176             System.out.println("successful :download successful! please look at: \t" + localSrc);
177         }
178     }
179
180
181     // create a new file
182     @Test
183     public void createFile()
184             throws IOException {
185         String fileName = "/test3/b.txt";
186         String fileContent = "";
187         Path dst = new Path(fileName);
188         //判断 新建的文件在hdfs上是否存在
189         if(hdfs.exists(dst)){
190             System.out.println("Error : the hdfs file exists.");
191         }else {
192             byte[] bytes = fileContent.getBytes();
193             FSDataOutputStream output = hdfs.create(dst);
194             output.write(bytes);
195             System.out.println("new file \t" + conf.get("fs.default.name")
196                     + fileName);
197         }
198     }
199
200     // 追加内容到文件
201     @Test
202     public void appendFile()
203             throws IOException {
204         String fileName = "/test2/file2.txt";
205         String fileContent = "你好 世界";
206         Path dst = new Path(fileName);
207         byte[] bytes = fileContent.getBytes();
208         //如果文件不存在
209         if (!hdfs.exists(dst)) {
210             System.out.println("Error : the file not exists");
211             return;
212         }
213         FSDataOutputStream output = hdfs.append(dst);
214         output.write(bytes);
215         System.out.println("successful: append to file \t" + conf.get("fs.default.name")
216                 + fileName);
217     }
218
219
220     // 列出所有文件
221     @Test
222     public void listFiles() throws IOException {
223         String dirName = "/test1";
224         Path f = new Path(dirName);
225         FileStatus[] status = hdfs.listStatus(f);
226         System.out.println(dirName + " has all files:");
227         if (status.length == 0) {
228             System.out.println("nothing !");
229         } else {
230             for (int i = 0; i < status.length; i++) {
231                 System.out.println(status[i].getPath().toString());
232             }
233         }
234     }
235
236     // 判断文件是否存在,存在即删除
237     @Test
238     public void deleteFile() throws IOException {
239         String fileName = "/test2";
240         Path f = new Path(fileName);
241         boolean isExists = hdfs.exists(f);
242         if (isExists) { // if exists, delete
243             boolean isDel = hdfs.delete(f, true);
244             System.out.println(fileName + "  delete? \t" + isDel);
245         } else {
246             System.out.println(fileName + "  exist? \t" + notExists);
247         }
248     }
249 }

Original: https://www.cnblogs.com/createboke/p/7824785.html
Author: CREATE_17
Title: 利用JAVA API远程进行HDFS的相关操作

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/565833/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球