diff --git a/fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsFsManager.java b/fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsFsManager.java index d0e0707776bbf..e00acb615b250 100644 --- a/fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsFsManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsFsManager.java @@ -48,6 +48,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InterruptedIOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Enumeration; @@ -1199,6 +1200,10 @@ public List listFileMeta(String path, Map properties } catch (FileNotFoundException e) { LOG.info("file not found: " + path, e); throw new UserException("file not found: " + path, e); + } catch (InterruptedIOException e) { + Thread.interrupted(); // clear interrupted flag + LOG.error("Interrupted while get file status: " + path, e); + throw new UserException("Failed to get file status: " + path, e); // throw unified user exception } catch (Exception e) { LOG.error("errors while get file status ", e); throw new UserException("Fail to get file status: " + e.getMessage(), e); @@ -1245,6 +1250,10 @@ public List listPath(String path, boolean fileNameOnly, Map loadProperties) throws U Path filePath = new Path(pathUri.getPath()); try { fileSystem.getDFSFileSystem().delete(filePath, true); + } catch (InterruptedIOException e) { + Thread.interrupted(); // clear interrupted flag + LOG.error("Interrupted while delete path: " + path, e); + throw new UserException("Failed to delete path: " + path, e); // throw unified user exception } catch (IOException e) { LOG.error("errors while delete path " + path, e); throw new UserException("delete path " + path + "error", e); @@ -1287,6 +1300,11 @@ public void renamePath(String srcPath, String destPath, Map load if (!isRenameSuccess) { throw new UserException("failed to rename path from " + srcPath + " to " + destPath); } + } catch (InterruptedIOException e) { + Thread.interrupted(); // clear interrupted flag + LOG.error("Interrupted while rename path from " + srcPath + " to " + destPath, e); + // throw unified user exception + throw new UserException("Failed to rename path from " + srcPath + " to " + destPath, e); } catch (IOException e) { LOG.error("errors while rename path from " + srcPath + " to " + destPath, e); throw new UserException("errors while rename " + srcPath + "to " + destPath, e); @@ -1299,6 +1317,10 @@ public boolean checkPathExist(String path, Map loadProperties) t Path filePath = new Path(pathUri.getPath()); try { return fileSystem.getDFSFileSystem().exists(filePath); + } catch (InterruptedIOException e) { + Thread.interrupted(); // clear interrupted flag + LOG.error("Interrupted while check path exist: " + path, e); + throw new UserException("Failed to check path exist: " + path, e); // throw unified user exception } catch (IOException e) { LOG.error("errors while check path exist: " + path, e); throw new UserException("errors while check if path " + path + " exist", e); @@ -1316,6 +1338,10 @@ public TBrokerFD openReader(String path, long startOffset, Map l TBrokerFD fd = parseUUIDToFD(uuid); ioStreamManager.putNewInputStream(fd, fsDataInputStream, fileSystem); return fd; + } catch (InterruptedIOException e) { + Thread.interrupted(); // clear interrupted flag + LOG.error("Interrupted while open file " + path, e); + throw new UserException("Failed to open file " + path, e); // throw unified user exception } catch (IOException e) { LOG.error("errors while open path", e); throw new UserException("could not open file " + path, e); @@ -1328,6 +1354,11 @@ public byte[] pread(TBrokerFD fd, long offset, long length) throws UserException long currentStreamOffset; try { currentStreamOffset = fsDataInputStream.getPos(); + } catch (InterruptedIOException e) { + Thread.interrupted(); // clear interrupted flag + LOG.error("Interrupted while get file pos from output stream", e); + // throw unified user exception + throw new UserException("Failed to get file pos from output stream", e); } catch (IOException e) { LOG.error("errors while get file pos from output stream", e); throw new UserException("errors while get file pos from output stream"); @@ -1339,6 +1370,11 @@ public byte[] pread(TBrokerFD fd, long offset, long length) throws UserException + offset + " seek to it"); try { fsDataInputStream.seek(offset); + } catch (InterruptedIOException e) { + Thread.interrupted(); // clear interrupted flag + LOG.error("Interrupted while seek file pos from output stream", e); + // throw unified user exception + throw new UserException("Failed to seek file pos from output stream", e); } catch (IOException e) { throw new UserException("current read offset " + currentStreamOffset + " is not equal to " + offset + ", and could not seek to it"); @@ -1366,6 +1402,10 @@ public byte[] pread(TBrokerFD fd, long offset, long length) throws UserException System.arraycopy(buf, 0, smallerBuf, 0, readLength); return smallerBuf; } + } catch (InterruptedIOException e) { + Thread.interrupted(); // clear interrupted flag + LOG.error("Interrupted while read data from stream", e); + throw new UserException("Failed to read data from stream", e); // throw unified user exception } catch (IOException e) { LOG.error("errors while read data from stream", e); throw new UserException("errors while read data from stream", e); @@ -1382,6 +1422,10 @@ public void closeReader(TBrokerFD fd) throws UserException { synchronized (fsDataInputStream) { try { fsDataInputStream.close(); + } catch (InterruptedIOException e) { + Thread.interrupted(); // clear interrupted flag + LOG.error("Interrupted while close file input stream", e); + throw new UserException("Failed to close file input stream", e); // throw unified user exception } catch (IOException e) { LOG.error("errors while close file input stream", e); throw new UserException("errors while close file input stream", e); @@ -1403,6 +1447,10 @@ public TBrokerFD openWriter(String path, Map loadProperties) thr LOG.info("finish a open writer request. fd: " + fd); ioStreamManager.putNewOutputStream(fd, fsDataOutputStream, fileSystem); return fd; + } catch (InterruptedIOException e) { + Thread.interrupted(); // clear interrupted flag + LOG.error("Interrupted while open file " + path, e); + throw new UserException("Failed to open file " + path, e); // throw unified user exception } catch (IOException e) { LOG.error("errors while open path", e); throw new UserException("could not open file " + path, e); @@ -1419,6 +1467,11 @@ public void pwrite(TBrokerFD fd, long offset, byte[] data) throws UserException } try { fsDataOutputStream.write(data); + } catch (InterruptedIOException e) { + Thread.interrupted(); // clear interrupted flag + LOG.error("Interrupted while write file " + fd + " to output stream", e); + // throw unified user exception + throw new UserException("Failed to write file " + fd + " to output stream", e); } catch (IOException e) { LOG.error("errors while write file " + fd + " to output stream", e); throw new UserException("errors while write data to output stream", e); @@ -1432,6 +1485,11 @@ public void closeWriter(TBrokerFD fd) throws UserException { try { fsDataOutputStream.hsync(); fsDataOutputStream.close(); + } catch (InterruptedIOException e) { + Thread.interrupted(); // clear interrupted flag + LOG.error("Interrupted while close file " + fd + " output stream", e); + // throw unified user exception + throw new UserException("Failed to close file " + fd + " output stream", e); } catch (IOException e) { LOG.error("errors while close file " + fd + " output stream", e); throw new UserException("errors while close file output stream", e);