1. 目标

上一篇关于 S3 上传的文章中,我们探讨了如何使用 jclouds 的通用 Blob API 上传内容到 S3。本文将使用 jclouds 提供的 S3 专用异步 API,结合 S3 原生支持的分块上传功能实现更高效的文件传输。

2. 准备工作

2.1. 配置自定义 API

上传流程的第一步是创建 jclouds API——这是一个专为 Amazon S3 定制的接口:

public AWSS3AsyncClient s3AsyncClient() {
   String identity = "your-access-key";
   String credentials = "your-secret-key";

   BlobStoreContext context = ContextBuilder.newBuilder("aws-s3")
      .credentials(identity, credentials).buildView(BlobStoreContext.class);

   RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();
   return providerContext.getAsyncApi();
}

2.2. 计算分块数量

Amazon S3 要求每个分块至少 5 MB。因此我们需要先计算合理的分块数量,避免出现小于 5 MB 的分块:

public static int getMaximumNumberOfParts(byte[] byteArray) {
   int numberOfParts = byteArray.length / fiveMB; // 5*1024*1024
   if (numberOfParts == 0) {
      return 1;
   }
   return numberOfParts;
}

2.3. 分割内容为分块

将字节数组分割成指定数量的分块:

public static List<byte[]> breakByteArrayIntoParts(byte[] byteArray, int maxNumberOfParts) {
   List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);
   int fullSize = byteArray.length;
   long dimensionOfPart = fullSize / maxNumberOfParts;
   for (int i = 0; i < maxNumberOfParts; i++) {
      int previousSplitPoint = (int) (dimensionOfPart * i);
      int splitPoint = (int) (dimensionOfPart * (i + 1));
      if (i == (maxNumberOfParts - 1)) {
         splitPoint = fullSize;
      }
      byte[] partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);
      parts.add(partBytes);
   }

   return parts;
}

验证分割逻辑:生成测试数据,分割后用 Guava 重组,确保与原始数据一致:

@Test
public void given16MByteArray_whenFileBytesAreSplitInto3_thenTheSplitIsCorrect() {
   byte[] byteArray = randomByteData(16);

   int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);
   List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);

   assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length, 
      equalTo(byteArray.length));
   byte[] unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));
   assertThat(byteArray, equalTo(unmultiplexed));
}

测试数据生成方法:

byte[] randomByteData(int mb) {
   byte[] randomBytes = new byte[mb * 1024 * 1024];
   new Random().nextBytes(randomBytes);
   return randomBytes;
}

2.4. 创建 Payload 对象

确定分块数量并完成分割后,需要为 jclouds API 生成 Payload 对象

public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {
   List<Payload> payloads = Lists.newArrayList();
   for (byte[] filePart : fileParts) {
      byte[] partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();
      Payload partPayload = Payloads.newByteArrayPayload(filePart);
      partPayload.getContentMetadata().setContentLength((long) filePart.length);
      partPayload.getContentMetadata().setContentMD5(partMd5Bytes);
      payloads.add(partPayload);
   }
   return payloads;
}

3. 上传流程

分块上传是一个灵活的多阶段流程,特点如下:

  • 无需等待全部数据:可在数据生成过程中启动上传
  • 分块传输:单个分块失败可独立重试
  • 并行上传:显著提升大文件传输速度

3.1. 初始化上传

第一步是初始化上传流程。向 S3 发送的请求需包含标准 HTTP 头,特别是 Content-MD5 头(使用 Guava 计算完整文件的 MD5):

Hashing.md5().hashBytes(byteArray).asBytes();

使用之前创建的异步 API 初始化上传:

ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build();
String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();

⚠️ 注意:

  • key 是对象的唯一标识符,需由客户端指定
  • 虽使用异步 API,但此处阻塞等待结果,因为后续步骤依赖返回的 uploadId

S3 返回的 uploadId 将贯穿整个上传生命周期,后续所有操作都需要此标识符。

3.2. 上传分块

下一步是并行上传分块。这是上传流程的核心耗时操作:

List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList();
for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {
   ListenableFuture<String> future = s3AsyncApi.uploadPart(
      container, key, partNumber + 1, uploadId, payloads.get(partNumber));
   ongoingOperations.add(future);
}

分块编号需连续,但发送顺序无关紧要。

提交所有分块上传请求后,等待响应并收集 ETag

Function<ListenableFuture<String>, String> getEtagFromOp = 
  new Function<ListenableFuture<String>, String>() {
   public String apply(ListenableFuture<String> ongoingOperation) {
      try {
         return ongoingOperation.get();
      } catch (InterruptedException | ExecutionException e) {
         throw new IllegalStateException(e);
      }
   }
};
List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);

若分块上传失败,可独立重试直到成功(示例代码未包含重试逻辑,但实现起来很简单)。

3.3. 完成上传

最后一步是完成分块上传。S3 要求提供所有分块的 ETag 映射:

Map<Integer, String> parts = Maps.newHashMap();
for (int i = 0; i < etagsOfParts.size(); i++) {
   parts.put(i + 1, etagsOfParts.get(i));
}

发送完成请求:

s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();

此操作返回最终对象的 ETag,标志着整个上传流程结束。

4. 总结

本文使用 jclouds 的 S3 专用 API 实现了全并行分块上传功能。该方案可直接使用,但仍有优化空间:

  • 添加重试机制:增强上传操作的容错能力
  • 动态分块大小:根据网络状况调整分块大小
  • 断点续传:记录已上传分块,支持中断后恢复

对于大文件上传场景,这种分块并行方案能显著提升效率,是生产环境中的推荐做法。


原始标题:Multipart Upload on S3 with jclouds | Baeldung