Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Avoid hdfs fs manager interrupting the thread when exception occurs (backport #48403) #48695

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsFsManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Enumeration;
Expand Down Expand Up @@ -1200,6 +1201,10 @@ public List<FileStatus> listFileMeta(String path, Map<String, String> properties
} catch (FileNotFoundException e) {
LOG.info("file not found: " + path, e);
throw new UserException("file not found: " + path, e);
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while get file status: " + path, e);
throw new UserException("Failed to get file status: " + path, e); // throw unified user exception
} catch (Exception e) {
LOG.error("errors while get file status ", e);
throw new UserException("Fail to get file status: " + e.getMessage(), e);
Expand Down Expand Up @@ -1246,6 +1251,10 @@ public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<S
"the arguments like region, IAM, instance profile and so on.");
throw new UserException("The arguments of blob store(S3/Azure) may be wrong. " +
"You can check the arguments like region, IAM, instance profile and so on.", e);
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while list path: " + path, e);
throw new UserException("Failed to list path: " + path, e); // throw unified user exception
} catch (Exception e) {
LOG.error("errors while get file status ", e);
throw new UserException("Fail to get file status: " + e.getMessage(), e);
Expand All @@ -1259,6 +1268,10 @@ public void deletePath(String path, Map<String, String> loadProperties) throws U
Path filePath = new Path(pathUri.getPath());
try {
fileSystem.getDFSFileSystem().delete(filePath, true);
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while delete path: " + path, e);
throw new UserException("Failed to delete path: " + path, e); // throw unified user exception
} catch (IOException e) {
LOG.error("errors while delete path " + path, e);
throw new UserException("delete path " + path + "error", e);
Expand Down Expand Up @@ -1288,6 +1301,11 @@ public void renamePath(String srcPath, String destPath, Map<String, String> load
if (!isRenameSuccess) {
throw new UserException("failed to rename path from " + srcPath + " to " + destPath);
}
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while rename path from " + srcPath + " to " + destPath, e);
// throw unified user exception
throw new UserException("Failed to rename path from " + srcPath + " to " + destPath, e);
} catch (IOException e) {
LOG.error("errors while rename path from " + srcPath + " to " + destPath, e);
throw new UserException("errors while rename " + srcPath + "to " + destPath, e);
Expand All @@ -1300,6 +1318,10 @@ public boolean checkPathExist(String path, Map<String, String> loadProperties) t
Path filePath = new Path(pathUri.getPath());
try {
return fileSystem.getDFSFileSystem().exists(filePath);
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while check path exist: " + path, e);
throw new UserException("Failed to check path exist: " + path, e); // throw unified user exception
} catch (IOException e) {
LOG.error("errors while check path exist: " + path, e);
throw new UserException("errors while check if path " + path + " exist", e);
Expand All @@ -1317,6 +1339,10 @@ public TBrokerFD openReader(String path, long startOffset, Map<String, String> l
TBrokerFD fd = parseUUIDToFD(uuid);
ioStreamManager.putNewInputStream(fd, fsDataInputStream, fileSystem);
return fd;
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while open file " + path, e);
throw new UserException("Failed to open file " + path, e); // throw unified user exception
} catch (IOException e) {
LOG.error("errors while open path", e);
throw new UserException("could not open file " + path, e);
Expand All @@ -1329,6 +1355,11 @@ public byte[] pread(TBrokerFD fd, long offset, long length) throws UserException
long currentStreamOffset;
try {
currentStreamOffset = fsDataInputStream.getPos();
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while get file pos from output stream", e);
// throw unified user exception
throw new UserException("Failed to get file pos from output stream", e);
} catch (IOException e) {
LOG.error("errors while get file pos from output stream", e);
throw new UserException("errors while get file pos from output stream");
Expand All @@ -1340,6 +1371,11 @@ public byte[] pread(TBrokerFD fd, long offset, long length) throws UserException
+ offset + " seek to it");
try {
fsDataInputStream.seek(offset);
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while seek file pos from output stream", e);
// throw unified user exception
throw new UserException("Failed to seek file pos from output stream", e);
} catch (IOException e) {
throw new UserException("current read offset " + currentStreamOffset + " is not equal to "
+ offset + ", and could not seek to it");
Expand Down Expand Up @@ -1367,6 +1403,10 @@ public byte[] pread(TBrokerFD fd, long offset, long length) throws UserException
System.arraycopy(buf, 0, smallerBuf, 0, readLength);
return smallerBuf;
}
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while read data from stream", e);
throw new UserException("Failed to read data from stream", e); // throw unified user exception
} catch (IOException e) {
LOG.error("errors while read data from stream", e);
throw new UserException("errors while read data from stream", e);
Expand All @@ -1383,6 +1423,10 @@ public void closeReader(TBrokerFD fd) throws UserException {
synchronized (fsDataInputStream) {
try {
fsDataInputStream.close();
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while close file input stream", e);
throw new UserException("Failed to close file input stream", e); // throw unified user exception
} catch (IOException e) {
LOG.error("errors while close file input stream", e);
throw new UserException("errors while close file input stream", e);
Expand All @@ -1404,6 +1448,10 @@ public TBrokerFD openWriter(String path, Map<String, String> loadProperties) thr
LOG.info("finish a open writer request. fd: " + fd);
ioStreamManager.putNewOutputStream(fd, fsDataOutputStream, fileSystem);
return fd;
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while open file " + path, e);
throw new UserException("Failed to open file " + path, e); // throw unified user exception
} catch (IOException e) {
LOG.error("errors while open path", e);
throw new UserException("could not open file " + path, e);
Expand All @@ -1420,6 +1468,11 @@ public void pwrite(TBrokerFD fd, long offset, byte[] data) throws UserException
}
try {
fsDataOutputStream.write(data);
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while write file " + fd + " to output stream", e);
// throw unified user exception
throw new UserException("Failed to write file " + fd + " to output stream", e);
} catch (IOException e) {
LOG.error("errors while write file " + fd + " to output stream", e);
throw new UserException("errors while write data to output stream", e);
Expand All @@ -1433,6 +1486,11 @@ public void closeWriter(TBrokerFD fd) throws UserException {
try {
fsDataOutputStream.hsync();
fsDataOutputStream.close();
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while close file " + fd + " output stream", e);
// throw unified user exception
throw new UserException("Failed to close file " + fd + " output stream", e);
} catch (IOException e) {
LOG.error("errors while close file " + fd + " output stream", e);
throw new UserException("errors while close file output stream", e);
Expand Down
Loading