Skip to content
This repository was archived by the owner on Feb 26, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Complete settings lists:
* s3fs_amazon_s3_factory
* s3fs_signer_override
* s3fs_path_style_access
* s3fs_kms_key_id

##### Set endpoint to reduce data latency in your applications

Expand Down Expand Up @@ -160,6 +161,7 @@ private FileSystem s3FileSystem;
* Works with virtual s3 folders (not really exists and are element's subkeys)
* List buckets for the client
* Multi endpoint fileSystem
* Support for encryption with a user-specified AWS KMS CMK ID

#### Roadmap:

Expand Down
11 changes: 9 additions & 2 deletions src/main/java/com/upplication/s3fs/S3FileChannel.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.upplication.s3fs;

import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
import com.amazonaws.services.s3.model.SSECustomerKey;
import com.amazonaws.util.IOUtils;
import org.apache.tika.Tika;

Expand Down Expand Up @@ -160,10 +163,14 @@ protected void sync() throws IOException {
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(Files.size(tempFile));
metadata.setContentType(new Tika().detect(stream, path.getFileName().toString()));

String bucket = path.getFileStore().name();
String key = path.getKey();
path.getFileSystem().getClient().putObject(bucket, key, stream, metadata);
PutObjectRequest request = new PutObjectRequest(bucket, key, stream, metadata);
String keyId = path.getFileSystem().getKmsKeyId();
if (keyId != null) {
request.withSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams(keyId));
}
path.getFileSystem().getClient().putObject(request);
}
}
}
12 changes: 11 additions & 1 deletion src/main/java/com/upplication/s3fs/S3FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,20 @@ public class S3FileSystem extends FileSystem implements Comparable<S3FileSystem>
private final String key;
private final AmazonS3 client;
private final String endpoint;
private final String kmsKeyId;
private int cache;

public S3FileSystem(S3FileSystemProvider provider, String key, AmazonS3 client, String endpoint) {
this(provider, key, client, endpoint, null);
}

public S3FileSystem(S3FileSystemProvider provider, String key, AmazonS3 client, String endpoint, String kmsKeyId) {
this.provider = provider;
this.key = key;
this.client = client;
this.endpoint = endpoint;
this.cache = 60000; // 1 minute cache for the s3Path
this.kmsKeyId = kmsKeyId;
}

@Override
Expand Down Expand Up @@ -176,4 +182,8 @@ public int compareTo(S3FileSystem o) {
public int getCache() {
return cache;
}
}

public String getKmsKeyId() {
return kmsKeyId;
}
}
8 changes: 5 additions & 3 deletions src/main/java/com/upplication/s3fs/S3FileSystemProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ public class S3FileSystemProvider extends FileSystemProvider {

public static final String CHARSET_KEY = "s3fs_charset";
public static final String AMAZON_S3_FACTORY_CLASS = "s3fs_amazon_s3_factory";
public static final String KMS_KEY_ID = "s3fs_kms_key_id";

private static final ConcurrentMap<String, S3FileSystem> fileSystems = new ConcurrentHashMap<>();
private static final List<String> PROPS_TO_OVERLOAD = Arrays.asList(ACCESS_KEY, SECRET_KEY, REQUEST_METRIC_COLLECTOR_CLASS, CONNECTION_TIMEOUT, MAX_CONNECTIONS, MAX_ERROR_RETRY, PROTOCOL, PROXY_DOMAIN,
PROXY_HOST, PROXY_PASSWORD, PROXY_PORT, PROXY_USERNAME, PROXY_WORKSTATION, SOCKET_SEND_BUFFER_SIZE_HINT, SOCKET_RECEIVE_BUFFER_SIZE_HINT, SOCKET_TIMEOUT,
USER_AGENT, AMAZON_S3_FACTORY_CLASS, SIGNER_OVERRIDE, PATH_STYLE_ACCESS);
USER_AGENT, AMAZON_S3_FACTORY_CLASS, SIGNER_OVERRIDE, PATH_STYLE_ACCESS, KMS_KEY_ID);

private S3Utils s3Utils = new S3Utils();
private Cache cache = new Cache();
Expand Down Expand Up @@ -550,7 +551,8 @@ public void setAttribute(Path path, String attribute, Object value, LinkOption..
* @return S3FileSystem never null
*/
public S3FileSystem createFileSystem(URI uri, Properties props) {
return new S3FileSystem(this, getFileSystemKey(uri, props), getAmazonS3(uri, props), uri.getHost());
String keyId = props.getProperty(KMS_KEY_ID);
return new S3FileSystem(this, getFileSystemKey(uri, props), getAmazonS3(uri, props), uri.getHost(), keyId);
}

protected AmazonS3 getAmazonS3(URI uri, Properties props) {
Expand Down Expand Up @@ -634,4 +636,4 @@ public Cache getCache() {
public void setCache(Cache cache) {
this.cache = cache;
}
}
}
11 changes: 9 additions & 2 deletions src/main/java/com/upplication/s3fs/S3SeekableByteChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import org.apache.tika.Tika;

import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;

public class S3SeekableByteChannel implements SeekableByteChannel {

Expand Down Expand Up @@ -109,7 +111,12 @@ protected void sync() throws IOException {

String bucket = path.getFileStore().name();
String key = path.getKey();
path.getFileSystem().getClient().putObject(bucket, key, stream, metadata);
PutObjectRequest request = new PutObjectRequest(bucket, key, stream, metadata);
String keyId = path.getFileSystem().getKmsKeyId();
if (keyId != null) {
request.withSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams(keyId));
}
path.getFileSystem().getClient().putObject(request);
}
}

Expand Down Expand Up @@ -142,4 +149,4 @@ public SeekableByteChannel position(long newPosition) throws IOException {
public long position() throws IOException {
return seekable.position();
}
}
}
18 changes: 16 additions & 2 deletions src/test/java/com/upplication/s3fs/S3FileChannelTest.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.upplication.s3fs;

import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.upplication.s3fs.util.AmazonS3ClientMock;
import com.upplication.s3fs.util.AmazonS3MockFactory;
import com.upplication.s3fs.util.S3EndpointConstant;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -98,7 +100,19 @@ public void writeNeedsToCloseChannel() throws IOException {
channel.close();

verify(channel, times(1)).implCloseChannel();
verify(client, times(1)).putObject(eq("buck"), eq("file1"), any(InputStream.class), any(ObjectMetadata.class));
verify(client, times(1)).putObject(argThat(new ArgumentMatcher<PutObjectRequest>() {
@Override
public boolean matches(Object argument) {
PutObjectRequest request = (PutObjectRequest) argument;
if (!request.getKey().equals("file1")) {
return false;
}
if (!request.getBucketName().equals("buck")) {
return false;
}
return true;
}
}));
}

@Test(expected = FileAlreadyExistsException.class)
Expand Down Expand Up @@ -128,4 +142,4 @@ public void tempFileDisappeared() throws IOException, NoSuchFieldException, Secu
Files.delete(tempFile);
channel.close();
}
}
}
13 changes: 11 additions & 2 deletions src/test/java/com/upplication/s3fs/util/AmazonS3ClientMock.java
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,16 @@ public void deleteBucket(DeleteBucketRequest deleteBucketRequest) throws AmazonC

@Override
public PutObjectResult putObject(PutObjectRequest putObjectRequest) throws AmazonClientException {
throw new UnsupportedOperationException();
InputStream inputStream = putObjectRequest.getInputStream();
String bucket = putObjectRequest.getBucketName();
String keyName = putObjectRequest.getKey();
S3Element elem = parse(inputStream, bucket, keyName);

persist(bucket, elem);

PutObjectResult putObjectResult = new PutObjectResult();
putObjectResult.setETag("3a5c8b1ad448bca04584ecb55b836264");
return putObjectResult;
}

@Override
Expand Down Expand Up @@ -1346,4 +1355,4 @@ public URL getUrl(String bucketName, String key) {
public AmazonS3Waiters waiters() {
throw new UnsupportedOperationException();
}
}
}