Skip to content

Commit

Permalink
Fix a series of problems in the s3 file system
Browse files Browse the repository at this point in the history
  • Loading branch information
sjgllgh committed Jan 17, 2024
1 parent e79b59d commit e3d6406
Showing 1 changed file with 112 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -114,7 +111,7 @@ public FsPath get(String dest) throws IOException {
@Override
public InputStream read(FsPath dest) throws IOException {
try {
return s3Client.getObject(bucket, dest.getPath()).getObjectContent();
return s3Client.getObject(bucket, buildPrefix(dest.getPath(), false)).getObjectContent();
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " + dest.getPath());
}
Expand All @@ -123,8 +120,9 @@ public InputStream read(FsPath dest) throws IOException {
@Override
public OutputStream write(FsPath dest, boolean overwrite) throws IOException {
try (InputStream inputStream = read(dest);
OutputStream outputStream = new S3OutputStream(s3Client, bucket, dest.getPath())) {
if (overwrite) {
OutputStream outputStream =
new S3OutputStream(s3Client, bucket, buildPrefix(dest.getPath(), false))) {
if (!overwrite) {
IOUtils.copy(inputStream, outputStream);
}
return outputStream;
Expand Down Expand Up @@ -164,20 +162,37 @@ public List<FsPath> list(FsPath path) throws IOException {

@Override
public FsPathListWithError listPathWithError(FsPath path) throws IOException {
return listPathWithError(path, true);
}

public FsPathListWithError listPathWithError(FsPath path, boolean ignoreInitFile)
throws IOException {
List<FsPath> rtn = new ArrayList<>();
try {
if (!StringUtils.isEmpty(path.getPath())) {
ListObjectsV2Result listObjectsV2Result = s3Client.listObjectsV2(bucket, path.getPath());
List<S3ObjectSummary> s3ObjectSummaries = listObjectsV2Result.getObjectSummaries();
ListObjectsV2Request listObjectsV2Request =
new ListObjectsV2Request()
.withBucketName(bucket)
.withPrefix(buildPrefix(path.getPath()))
.withDelimiter("/");
ListObjectsV2Result dirResult = s3Client.listObjectsV2(listObjectsV2Request);
List<S3ObjectSummary> s3ObjectSummaries = dirResult.getObjectSummaries();
List<String> commonPrefixes = dirResult.getCommonPrefixes();
if (s3ObjectSummaries != null) {
List<FsPath> rtn = new ArrayList();
String message = "";
for (S3ObjectSummary summary : s3ObjectSummaries) {
if (isDir(summary, path.getPath()) || isInitFile(summary)) continue;
if (isInitFile(summary) && ignoreInitFile) continue;
FsPath newPath = new FsPath(buildPath(summary.getKey()));
rtn.add(fillStorageFile(newPath, summary));
}
return new FsPathListWithError(rtn, message);
}
if (commonPrefixes != null) {
for (String dir : commonPrefixes) {
FsPath newPath = new FsPath(buildPath(dir));
newPath.setIsdir(true);
rtn.add(newPath);
}
}
return new FsPathListWithError(rtn, "");
}
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " + path.getPath());
Expand All @@ -189,8 +204,25 @@ public FsPathListWithError listPathWithError(FsPath path) throws IOException {
@Override
public boolean exists(FsPath dest) throws IOException {
try {
int size = s3Client.listObjectsV2(bucket, dest.getPath()).getObjectSummaries().size();
return size > 0;
if (new File(dest.getPath()).getName().contains(".")) {
return existsFile(dest);
}
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request
.withBucketName(bucket)
.withPrefix(buildPrefix(dest.getPath()))
.withDelimiter("/");
return s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().size()
+ s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().size()
> 0;
} catch (AmazonS3Exception e) {
return false;
}
}

public boolean existsFile(FsPath dest) {
try {
return s3Client.doesObjectExist(bucket, buildPrefix(dest.getPath(), false));
} catch (AmazonS3Exception e) {
return false;
}
Expand All @@ -199,7 +231,14 @@ public boolean exists(FsPath dest) throws IOException {
@Override
public boolean delete(FsPath dest) throws IOException {
try {
s3Client.deleteObject(bucket, dest.getPath());
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request.withBucketName(bucket).withPrefix(buildPrefix(dest.getPath(), false));
ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request);
String[] keyList =
result.getObjectSummaries().stream().map(S3ObjectSummary::getKey).toArray(String[]::new);
DeleteObjectsRequest deleteObjectsRequest =
new DeleteObjectsRequest("test").withKeys(keyList);
s3Client.deleteObjects(deleteObjectsRequest);
return true;
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " + dest.getPath());
Expand All @@ -209,8 +248,25 @@ public boolean delete(FsPath dest) throws IOException {
@Override
public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException {
try {
s3Client.copyObject(bucket, oldDest.getPath(), bucket, newDest.getPath());
s3Client.deleteObject(bucket, oldDest.getPath());
String newOriginPath = buildPrefix(oldDest.getPath(), false);
String newDestPath = buildPrefix(newDest.getPath(), false);
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request.withBucketName(bucket).withPrefix(newOriginPath);
ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request);
List<String> keyList =
result.getObjectSummaries().stream()
.map(S3ObjectSummary::getKey)
.collect(Collectors.toList());
List<String> newKeyList =
keyList.stream()
.map(key -> key.replaceFirst(newOriginPath, newDestPath))
.collect(Collectors.toList());
for (int i = 0; i < keyList.size(); i++) {
String key = keyList.get(i);
String newKey = newKeyList.get(i);
s3Client.copyObject(bucket, key, bucket, newKey);
s3Client.deleteObject(bucket, key);
}
return true;
} catch (AmazonS3Exception e) {
s3Client.deleteObject(bucket, newDest.getPath());
Expand All @@ -225,7 +281,24 @@ public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException {
@Override
public boolean copy(String origin, String dest) throws IOException {
try {
s3Client.copyObject(bucket, origin, bucket, dest);
String newOrigin = buildPrefix(origin, false);
String newDest = buildPrefix(dest, false);
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request.withBucketName(bucket).withPrefix(newOrigin);
ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request);
List<String> keyList =
result.getObjectSummaries().stream()
.map(S3ObjectSummary::getKey)
.collect(Collectors.toList());
List<String> newKeyList =
keyList.stream()
.map(key -> key.replaceFirst(newOrigin, newDest))
.collect(Collectors.toList());
for (int i = 0; i < keyList.size(); i++) {
String key = keyList.get(i);
String newKey = newKeyList.get(i);
s3Client.copyObject(bucket, key, bucket, newKey);
}
return true;
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " + origin + " or " + dest);
Expand Down Expand Up @@ -261,7 +334,10 @@ public boolean mkdirs(FsPath dest) throws IOException {

private FsPath fillStorageFile(FsPath fsPath, S3ObjectSummary s3ObjectSummary) {
fsPath.setModification_time(s3ObjectSummary.getLastModified().getTime());
fsPath.setOwner(s3ObjectSummary.getOwner().getDisplayName());
Owner owner = s3ObjectSummary.getOwner();
if (owner != null) {
fsPath.setOwner(owner.getDisplayName());
}
try {
fsPath.setIsdir(isDir(s3ObjectSummary, fsPath.getParent().getPath()));
} catch (Throwable e) {
Expand Down Expand Up @@ -344,6 +420,22 @@ public String buildPath(String path) {
}
return StorageUtils.S3_SCHEMA + "/" + path;
}

public String buildPrefix(String path, boolean addTail) {
String res = path;
if (path == null || "".equals(path)) return "";
if (path.startsWith("/")) {
res = path.replaceFirst("/", "");
}
if (!path.endsWith("/") && addTail) {
res = res + "/";
}
return res;
}

public String buildPrefix(String path) {
return buildPrefix(path, true);
}
}

class S3OutputStream extends ByteArrayOutputStream {
Expand Down

0 comments on commit e3d6406

Please sign in to comment.