From e3d64064d9ba10ccde76f0a8747f9a8b409e47c1 Mon Sep 17 00:00:00 2001 From: sjgllgh Date: Wed, 17 Jan 2024 08:22:04 +0000 Subject: [PATCH] Fix a series of problems in the s3 file system --- .../linkis/storage/fs/impl/S3FileSystem.java | 132 +++++++++++++++--- 1 file changed, 112 insertions(+), 20 deletions(-) diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java index b8f6401b11..026a6f9e0e 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java @@ -38,10 +38,7 @@ import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.ListObjectsV2Result; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,7 +111,7 @@ public FsPath get(String dest) throws IOException { @Override public InputStream read(FsPath dest) throws IOException { try { - return s3Client.getObject(bucket, dest.getPath()).getObjectContent(); + return s3Client.getObject(bucket, buildPrefix(dest.getPath(), false)).getObjectContent(); } catch (AmazonS3Exception e) { throw new IOException("You have not permission to access path " + dest.getPath()); } @@ -123,8 +120,9 @@ public InputStream read(FsPath dest) throws IOException { @Override public OutputStream write(FsPath dest, boolean overwrite) throws IOException { try (InputStream inputStream = read(dest); - OutputStream outputStream = new S3OutputStream(s3Client, bucket, dest.getPath())) { - if (overwrite) { + OutputStream outputStream = + new S3OutputStream(s3Client, bucket, buildPrefix(dest.getPath(), false))) { + if (!overwrite) { IOUtils.copy(inputStream, outputStream); } return outputStream; @@ -164,20 +162,37 @@ public List list(FsPath path) throws IOException { @Override public FsPathListWithError listPathWithError(FsPath path) throws IOException { + return listPathWithError(path, true); + } + + public FsPathListWithError listPathWithError(FsPath path, boolean ignoreInitFile) + throws IOException { + List rtn = new ArrayList<>(); try { if (!StringUtils.isEmpty(path.getPath())) { - ListObjectsV2Result listObjectsV2Result = s3Client.listObjectsV2(bucket, path.getPath()); - List s3ObjectSummaries = listObjectsV2Result.getObjectSummaries(); + ListObjectsV2Request listObjectsV2Request = + new ListObjectsV2Request() + .withBucketName(bucket) + .withPrefix(buildPrefix(path.getPath())) + .withDelimiter("/"); + ListObjectsV2Result dirResult = s3Client.listObjectsV2(listObjectsV2Request); + List s3ObjectSummaries = dirResult.getObjectSummaries(); + List commonPrefixes = dirResult.getCommonPrefixes(); if (s3ObjectSummaries != null) { - List rtn = new ArrayList(); - String message = ""; for (S3ObjectSummary summary : s3ObjectSummaries) { - if (isDir(summary, path.getPath()) || isInitFile(summary)) continue; + if (isInitFile(summary) && ignoreInitFile) continue; FsPath newPath = new FsPath(buildPath(summary.getKey())); rtn.add(fillStorageFile(newPath, summary)); } - return new FsPathListWithError(rtn, message); } + if (commonPrefixes != null) { + for (String dir : commonPrefixes) { + FsPath newPath = new FsPath(buildPath(dir)); + newPath.setIsdir(true); + rtn.add(newPath); + } + } + return new FsPathListWithError(rtn, ""); } } catch (AmazonS3Exception e) { throw new IOException("You have not permission to access path " + path.getPath()); @@ -189,8 +204,25 @@ public FsPathListWithError listPathWithError(FsPath path) throws IOException { @Override public boolean exists(FsPath dest) throws IOException { try { - int size = s3Client.listObjectsV2(bucket, dest.getPath()).getObjectSummaries().size(); - return size > 0; + if (new File(dest.getPath()).getName().contains(".")) { + return existsFile(dest); + } + ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(); + listObjectsV2Request + .withBucketName(bucket) + .withPrefix(buildPrefix(dest.getPath())) + .withDelimiter("/"); + return s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().size() + + s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().size() + > 0; + } catch (AmazonS3Exception e) { + return false; + } + } + + public boolean existsFile(FsPath dest) { + try { + return s3Client.doesObjectExist(bucket, buildPrefix(dest.getPath(), false)); } catch (AmazonS3Exception e) { return false; } @@ -199,7 +231,14 @@ public boolean exists(FsPath dest) throws IOException { @Override public boolean delete(FsPath dest) throws IOException { try { - s3Client.deleteObject(bucket, dest.getPath()); + ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(); + listObjectsV2Request.withBucketName(bucket).withPrefix(buildPrefix(dest.getPath(), false)); + ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request); + String[] keyList = + result.getObjectSummaries().stream().map(S3ObjectSummary::getKey).toArray(String[]::new); + DeleteObjectsRequest deleteObjectsRequest = + new DeleteObjectsRequest("test").withKeys(keyList); + s3Client.deleteObjects(deleteObjectsRequest); return true; } catch (AmazonS3Exception e) { throw new IOException("You have not permission to access path " + dest.getPath()); @@ -209,8 +248,25 @@ public boolean delete(FsPath dest) throws IOException { @Override public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { try { - s3Client.copyObject(bucket, oldDest.getPath(), bucket, newDest.getPath()); - s3Client.deleteObject(bucket, oldDest.getPath()); + String newOriginPath = buildPrefix(oldDest.getPath(), false); + String newDestPath = buildPrefix(newDest.getPath(), false); + ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(); + listObjectsV2Request.withBucketName(bucket).withPrefix(newOriginPath); + ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request); + List keyList = + result.getObjectSummaries().stream() + .map(S3ObjectSummary::getKey) + .collect(Collectors.toList()); + List newKeyList = + keyList.stream() + .map(key -> key.replaceFirst(newOriginPath, newDestPath)) + .collect(Collectors.toList()); + for (int i = 0; i < keyList.size(); i++) { + String key = keyList.get(i); + String newKey = newKeyList.get(i); + s3Client.copyObject(bucket, key, bucket, newKey); + s3Client.deleteObject(bucket, key); + } return true; } catch (AmazonS3Exception e) { s3Client.deleteObject(bucket, newDest.getPath()); @@ -225,7 +281,24 @@ public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { @Override public boolean copy(String origin, String dest) throws IOException { try { - s3Client.copyObject(bucket, origin, bucket, dest); + String newOrigin = buildPrefix(origin, false); + String newDest = buildPrefix(dest, false); + ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(); + listObjectsV2Request.withBucketName(bucket).withPrefix(newOrigin); + ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request); + List keyList = + result.getObjectSummaries().stream() + .map(S3ObjectSummary::getKey) + .collect(Collectors.toList()); + List newKeyList = + keyList.stream() + .map(key -> key.replaceFirst(newOrigin, newDest)) + .collect(Collectors.toList()); + for (int i = 0; i < keyList.size(); i++) { + String key = keyList.get(i); + String newKey = newKeyList.get(i); + s3Client.copyObject(bucket, key, bucket, newKey); + } return true; } catch (AmazonS3Exception e) { throw new IOException("You have not permission to access path " + origin + " or " + dest); @@ -261,7 +334,10 @@ public boolean mkdirs(FsPath dest) throws IOException { private FsPath fillStorageFile(FsPath fsPath, S3ObjectSummary s3ObjectSummary) { fsPath.setModification_time(s3ObjectSummary.getLastModified().getTime()); - fsPath.setOwner(s3ObjectSummary.getOwner().getDisplayName()); + Owner owner = s3ObjectSummary.getOwner(); + if (owner != null) { + fsPath.setOwner(owner.getDisplayName()); + } try { fsPath.setIsdir(isDir(s3ObjectSummary, fsPath.getParent().getPath())); } catch (Throwable e) { @@ -344,6 +420,22 @@ public String buildPath(String path) { } return StorageUtils.S3_SCHEMA + "/" + path; } + + public String buildPrefix(String path, boolean addTail) { + String res = path; + if (path == null || "".equals(path)) return ""; + if (path.startsWith("/")) { + res = path.replaceFirst("/", ""); + } + if (!path.endsWith("/") && addTail) { + res = res + "/"; + } + return res; + } + + public String buildPrefix(String path) { + return buildPrefix(path, true); + } } class S3OutputStream extends ByteArrayOutputStream {