1. 目标
在上一篇关于 S3 上传的文章中,我们探讨了如何使用 jclouds 的通用 Blob API 上传内容到 S3。本文将使用 jclouds 提供的 S3 专用异步 API,结合 S3 原生支持的分块上传功能实现更高效的文件传输。
2. 准备工作
2.1. 配置自定义 API
上传流程的第一步是创建 jclouds API——这是一个专为 Amazon S3 定制的接口:
public AWSS3AsyncClient s3AsyncClient() {
String identity = "your-access-key";
String credentials = "your-secret-key";
BlobStoreContext context = ContextBuilder.newBuilder("aws-s3")
.credentials(identity, credentials).buildView(BlobStoreContext.class);
RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();
return providerContext.getAsyncApi();
}
2.2. 计算分块数量
Amazon S3 要求每个分块至少 5 MB。因此我们需要先计算合理的分块数量,避免出现小于 5 MB 的分块:
public static int getMaximumNumberOfParts(byte[] byteArray) {
int numberOfParts = byteArray.length / fiveMB; // 5*1024*1024
if (numberOfParts == 0) {
return 1;
}
return numberOfParts;
}
2.3. 分割内容为分块
将字节数组分割成指定数量的分块:
public static List<byte[]> breakByteArrayIntoParts(byte[] byteArray, int maxNumberOfParts) {
List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);
int fullSize = byteArray.length;
long dimensionOfPart = fullSize / maxNumberOfParts;
for (int i = 0; i < maxNumberOfParts; i++) {
int previousSplitPoint = (int) (dimensionOfPart * i);
int splitPoint = (int) (dimensionOfPart * (i + 1));
if (i == (maxNumberOfParts - 1)) {
splitPoint = fullSize;
}
byte[] partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);
parts.add(partBytes);
}
return parts;
}
验证分割逻辑:生成测试数据,分割后用 Guava 重组,确保与原始数据一致:
@Test
public void given16MByteArray_whenFileBytesAreSplitInto3_thenTheSplitIsCorrect() {
byte[] byteArray = randomByteData(16);
int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);
List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);
assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length,
equalTo(byteArray.length));
byte[] unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));
assertThat(byteArray, equalTo(unmultiplexed));
}
测试数据生成方法:
byte[] randomByteData(int mb) {
byte[] randomBytes = new byte[mb * 1024 * 1024];
new Random().nextBytes(randomBytes);
return randomBytes;
}
2.4. 创建 Payload 对象
确定分块数量并完成分割后,需要为 jclouds API 生成 Payload 对象:
public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {
List<Payload> payloads = Lists.newArrayList();
for (byte[] filePart : fileParts) {
byte[] partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();
Payload partPayload = Payloads.newByteArrayPayload(filePart);
partPayload.getContentMetadata().setContentLength((long) filePart.length);
partPayload.getContentMetadata().setContentMD5(partMd5Bytes);
payloads.add(partPayload);
}
return payloads;
}
3. 上传流程
分块上传是一个灵活的多阶段流程,特点如下:
- ✅ 无需等待全部数据:可在数据生成过程中启动上传
- ✅ 分块传输:单个分块失败可独立重试
- ✅ 并行上传:显著提升大文件传输速度
3.1. 初始化上传
第一步是初始化上传流程。向 S3 发送的请求需包含标准 HTTP 头,特别是 Content-MD5
头(使用 Guava 计算完整文件的 MD5):
Hashing.md5().hashBytes(byteArray).asBytes();
使用之前创建的异步 API 初始化上传:
ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build();
String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();
⚠️ 注意:
key
是对象的唯一标识符,需由客户端指定- 虽使用异步 API,但此处阻塞等待结果,因为后续步骤依赖返回的
uploadId
S3 返回的 uploadId
将贯穿整个上传生命周期,后续所有操作都需要此标识符。
3.2. 上传分块
下一步是并行上传分块。这是上传流程的核心耗时操作:
List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList();
for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {
ListenableFuture<String> future = s3AsyncApi.uploadPart(
container, key, partNumber + 1, uploadId, payloads.get(partNumber));
ongoingOperations.add(future);
}
分块编号需连续,但发送顺序无关紧要。
提交所有分块上传请求后,等待响应并收集 ETag:
Function<ListenableFuture<String>, String> getEtagFromOp =
new Function<ListenableFuture<String>, String>() {
public String apply(ListenableFuture<String> ongoingOperation) {
try {
return ongoingOperation.get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
};
List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);
若分块上传失败,可独立重试直到成功(示例代码未包含重试逻辑,但实现起来很简单)。
3.3. 完成上传
最后一步是完成分块上传。S3 要求提供所有分块的 ETag 映射:
Map<Integer, String> parts = Maps.newHashMap();
for (int i = 0; i < etagsOfParts.size(); i++) {
parts.put(i + 1, etagsOfParts.get(i));
}
发送完成请求:
s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();
此操作返回最终对象的 ETag,标志着整个上传流程结束。
4. 总结
本文使用 jclouds 的 S3 专用 API 实现了全并行分块上传功能。该方案可直接使用,但仍有优化空间:
- ✅ 添加重试机制:增强上传操作的容错能力
- ✅ 动态分块大小:根据网络状况调整分块大小
- ✅ 断点续传:记录已上传分块,支持中断后恢复
对于大文件上传场景,这种分块并行方案能显著提升效率,是生产环境中的推荐做法。