Skip to content

Commit

Permalink
Enhance the functionality of s3filesystem (#5208)
Browse files Browse the repository at this point in the history
* 1. Enhance the functionality of s3filesystem to support multipart uploads.
2. Support the use of s3 storage for BML materials and workspaces.

* format code
  • Loading branch information
sjgllgh authored Dec 4, 2024
1 parent 2d0b2ad commit 6bc388f
Show file tree
Hide file tree
Showing 9 changed files with 558 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
import org.apache.linkis.storage.domain.FsPathListWithError;
import org.apache.linkis.storage.exception.StorageWarnException;
import org.apache.linkis.storage.fs.FileSystem;
import org.apache.linkis.storage.fs.stream.S3OutputStream;
import org.apache.linkis.storage.utils.StorageConfiguration;
import org.apache.linkis.storage.utils.StorageUtils;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.*;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -99,6 +103,7 @@ public String rootUserName() {
public FsPath get(String dest) throws IOException {
FsPath ret = new FsPath(dest);
if (exists(ret)) {
ret.setIsdir(isDir(buildKey(ret.getPath())));
return ret;
} else {
logger.warn("File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)");
Expand All @@ -111,21 +116,31 @@ public FsPath get(String dest) throws IOException {
@Override
public InputStream read(FsPath dest) throws IOException {
try {
return s3Client.getObject(bucket, buildPrefix(dest.getPath(), false)).getObjectContent();
return s3Client.getObject(bucket, buildKey(dest.getPath())).getObjectContent();
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " + dest.getPath());
}
}

@Override
public OutputStream write(FsPath dest, boolean overwrite) throws IOException {
try (InputStream inputStream = read(dest);
OutputStream outputStream =
new S3OutputStream(s3Client, bucket, buildPrefix(dest.getPath(), false))) {
InputStream inputStream = null;
try {
if (!exists(dest)) {
create(dest.getPath());
}

OutputStream outputStream = new S3OutputStream(s3Client, bucket, buildKey(dest.getPath()));

if (!overwrite) {
inputStream = read(dest);
IOUtils.copy(inputStream, outputStream);
}
return outputStream;
} catch (IOException e) {
throw new IOException("You have not permission to access path " + dest.getPath());
} finally {
IOUtils.closeQuietly(inputStream);
}
}

Expand All @@ -134,24 +149,39 @@ public boolean create(String dest) throws IOException {
if (exists(new FsPath(dest))) {
return false;
}
s3Client.putObject(bucket, dest, "");
s3Client.putObject(bucket, buildKey(dest), "");
return true;
}

@Override
public List<FsPath> list(FsPath path) throws IOException {
try {
if (!StringUtils.isEmpty(path.getPath())) {
ListObjectsV2Result listObjectsV2Result = s3Client.listObjectsV2(bucket, path.getPath());
List<S3ObjectSummary> s3ObjectSummaries = listObjectsV2Result.getObjectSummaries();
return s3ObjectSummaries.stream()
.filter(summary -> !isInitFile(summary))
.map(
summary -> {
FsPath newPath = new FsPath(buildPath(summary.getKey()));
return fillStorageFile(newPath, summary);
})
.collect(Collectors.toList());
ListObjectsV2Request listObjectsV2Request =
new ListObjectsV2Request()
.withBucketName(bucket)
.withPrefix(buildKey(path.getPath()) + "/")
.withDelimiter("/");
ListObjectsV2Result dirResult = s3Client.listObjectsV2(listObjectsV2Request);
List<S3ObjectSummary> s3ObjectSummaries = dirResult.getObjectSummaries();
List<String> commonPrefixes = dirResult.getCommonPrefixes();
List<FsPath> fsPaths =
s3ObjectSummaries.stream()
.filter(summary -> !isInitFile(summary))
.map(
summary -> {
FsPath newPath = new FsPath(buildPath(summary.getKey()));
return fillStorageFile(newPath, summary);
})
.collect(Collectors.toList());
if (commonPrefixes != null) {
for (String dir : commonPrefixes) {
FsPath newPath = new FsPath(buildPath(dir));
newPath.setIsdir(true);
fsPaths.add(newPath);
}
}
return fsPaths;
}
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " + path.getPath());
Expand All @@ -173,7 +203,7 @@ public FsPathListWithError listPathWithError(FsPath path, boolean ignoreInitFile
ListObjectsV2Request listObjectsV2Request =
new ListObjectsV2Request()
.withBucketName(bucket)
.withPrefix(buildPrefix(path.getPath()))
.withPrefix(buildKey(path.getPath()) + "/")
.withDelimiter("/");
ListObjectsV2Result dirResult = s3Client.listObjectsV2(listObjectsV2Request);
List<S3ObjectSummary> s3ObjectSummaries = dirResult.getObjectSummaries();
Expand Down Expand Up @@ -204,25 +234,15 @@ public FsPathListWithError listPathWithError(FsPath path, boolean ignoreInitFile
@Override
public boolean exists(FsPath dest) throws IOException {
try {
if (new File(dest.getPath()).getName().contains(".")) {
return existsFile(dest);
if (dest == null) {
return false;
}
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request
.withBucketName(bucket)
.withPrefix(buildPrefix(dest.getPath()))
.withDelimiter("/");
return s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().size()
+ s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().size()
> 0;
} catch (AmazonS3Exception e) {
return false;
}
}

public boolean existsFile(FsPath dest) {
try {
return s3Client.doesObjectExist(bucket, buildPrefix(dest.getPath(), false));
.withPrefix(buildKey(dest.getPath()))
.withMaxKeys(1);
return !s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().isEmpty();
} catch (AmazonS3Exception e) {
return false;
}
Expand All @@ -231,25 +251,41 @@ public boolean existsFile(FsPath dest) {
@Override
public boolean delete(FsPath dest) throws IOException {
try {
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request.withBucketName(bucket).withPrefix(buildPrefix(dest.getPath(), false));
ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request);
String[] keyList =
result.getObjectSummaries().stream().map(S3ObjectSummary::getKey).toArray(String[]::new);
DeleteObjectsRequest deleteObjectsRequest =
new DeleteObjectsRequest("test").withKeys(keyList);
s3Client.deleteObjects(deleteObjectsRequest);
List<String> deleteKeys = new ArrayList<>();
delete(dest, deleteKeys);
if (!deleteKeys.isEmpty()) {
DeleteObjectsRequest deleteObjectsRequest =
new DeleteObjectsRequest(bucket).withKeys(deleteKeys.toArray(new String[0]));
s3Client.deleteObjects(deleteObjectsRequest);
}
return true;
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " + dest.getPath());
}
}

public void delete(FsPath dest, List<String> keys) throws IOException {
if (isDir(buildKey(dest.getPath()))) {
FsPathListWithError fsPathListWithError = listPathWithError(dest, false);
List<FsPath> fsPaths = fsPathListWithError.getFsPaths();
fsPaths.forEach(
fsPath -> {
try {
delete(fsPath, keys);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
} else {
keys.add(buildKey(dest.getPath()));
}
}

@Override
public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException {
try {
String newOriginPath = buildPrefix(oldDest.getPath(), false);
String newDestPath = buildPrefix(newDest.getPath(), false);
String newOriginPath = buildKey(oldDest.getPath());
String newDestPath = buildKey(newDest.getPath());
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request.withBucketName(bucket).withPrefix(newOriginPath);
ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request);
Expand Down Expand Up @@ -281,8 +317,8 @@ public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException {
@Override
public boolean copy(String origin, String dest) throws IOException {
try {
String newOrigin = buildPrefix(origin, false);
String newDest = buildPrefix(dest, false);
String newOrigin = buildKey(origin);
String newDest = buildKey(dest);
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request.withBucketName(bucket).withPrefix(newOrigin);
ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request);
Expand All @@ -305,8 +341,16 @@ public boolean copy(String origin, String dest) throws IOException {
}
}

private boolean isDir(S3ObjectSummary s3ObjectSummary, String prefix) {
return s3ObjectSummary.getKey().substring(prefix.length()).contains("/");
private boolean isDir(String key) {
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request
.withBucketName(bucket)
.withPrefix(key + "/")
.withDelimiter("/")
.withMaxKeys(1);

return !(s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().isEmpty()
&& s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().isEmpty());
}

private boolean isInitFile(S3ObjectSummary s3ObjectSummary) {
Expand All @@ -318,6 +362,13 @@ public String listRoot() {
return "/";
}

/**
* s3没有目录概念,无法直接创建目录 S3 lacks the concept of directories and cannot create directories directly.
*
* @param dest
* @return
* @throws IOException
*/
@Override
public boolean mkdir(FsPath dest) throws IOException {
String path = new File(dest.getPath(), INIT_FILE_NAME).getPath();
Expand All @@ -339,7 +390,7 @@ private FsPath fillStorageFile(FsPath fsPath, S3ObjectSummary s3ObjectSummary) {
fsPath.setOwner(owner.getDisplayName());
}
try {
fsPath.setIsdir(isDir(s3ObjectSummary, fsPath.getParent().getPath()));
fsPath.setIsdir(isDir(s3ObjectSummary.getKey()));
} catch (Throwable e) {
logger.warn("Failed to fill storage file:" + fsPath.getPath(), e);
}
Expand All @@ -359,7 +410,7 @@ public boolean canRead(FsPath dest) {

@Override
public boolean canRead(FsPath dest, String user) throws IOException {
return false;
return true;
}

@Override
Expand All @@ -384,7 +435,10 @@ public long getUsableSpace(FsPath dest) {

@Override
public long getLength(FsPath dest) throws IOException {
return 0;
return s3Client
.getObject(bucket, buildKey(dest.getPath()))
.getObjectMetadata()
.getContentLength();
}

@Override
Expand Down Expand Up @@ -418,7 +472,9 @@ public boolean setPermission(FsPath dest, String permission) {
}

@Override
public void close() throws IOException {}
public void close() throws IOException {
s3Client.shutdown();
}

public String getLabel() {
return label;
Expand All @@ -429,46 +485,22 @@ public void setLabel(String label) {
}

public String buildPath(String path) {
if (path == null || "".equals(path)) return "";
if (path == null || path.isEmpty()) return "";
if (path.startsWith("/")) {
return StorageUtils.S3_SCHEMA() + path;
}
return StorageUtils.S3_SCHEMA() + "/" + path;
}

public String buildPrefix(String path, boolean addTail) {
public String buildKey(String path) {
String res = path;
if (path == null || "".equals(path)) return "";
if (path == null || path.isEmpty()) return "";
if (path.startsWith("/")) {
res = path.replaceFirst("/", "");
}
if (!path.endsWith("/") && addTail) {
res = res + "/";
if (path.endsWith("/") && !res.isEmpty()) {
res = res.substring(0, res.length() - 1);
}
return res;
}

public String buildPrefix(String path) {
return buildPrefix(path, true);
}
}

class S3OutputStream extends ByteArrayOutputStream {
private AmazonS3 s3Client;
private String bucket;
private String path;

public S3OutputStream(AmazonS3 s3Client, String bucket, String path) {
this.s3Client = s3Client;
this.bucket = bucket;
this.path = path;
}

@Override
public void close() throws IOException {
byte[] buffer = this.toByteArray();
try (InputStream in = new ByteArrayInputStream(buffer)) {
s3Client.putObject(bucket, path, in, new ObjectMetadata());
}
}
}
Loading

0 comments on commit 6bc388f

Please sign in to comment.