1. 概述

AWS 提供了大量服务,我们可以通过其官方 SDK 在 Java 中调用。但在早期版本中,该 SDK 并不支持响应式操作,异步能力也非常有限。

随着 AWS SDK for Java 2.0 的发布,这一切发生了改变。该版本基于 Reactive Streams 规范,✅ 现在我们可以使用完全非阻塞的 I/O 模式来操作 AWS 服务。

本文将通过一个 Spring Boot 实现的简单 blob 存储 REST 接口,后端使用 AWS S3,带你掌握这套响应式编程实践。

2. AWS S3 操作概览

我们的目标是构建一个典型的 blob 存储服务,支持前端应用上传、列出、下载和删除文件(如图片、音频、文档等),即标准的 CRUD 操作。

⚠️ 传统实现的一大痛点是:如何高效处理大文件或慢速网络连接

在 Servlet 3.0 之前,JavaEE 只提供阻塞式 API,每个客户端连接都需要一个独立线程。这种模型资源消耗大,容易导致:

  • 需要更多服务器资源(成本更高)
  • 更容易受到 DoS 类攻击

reactive upload thread per client

而使用响应式栈后,情况大为改观:

  • 使用少量线程,通过 I/O 事件驱动(如数据可读、写完成)
  • 同一线程可处理多个客户端的事件
  • 极大减少上下文切换开销
  • 资源利用率更高,同等并发下更轻量

3. 项目搭建

示例项目是一个标准的 Spring Boot WebFlux 应用,包含 Lombok、JUnit 等常用依赖。

核心是引入 AWS SDK for Java 2.x 的相关依赖:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>bom</artifactId>
            <version>2.24.9</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>s3</artifactId>
        <scope>compile</scope>
    </dependency>

    <dependency>
        <artifactId>netty-nio-client</artifactId>
        <groupId>software.amazon.awssdk</groupId>
        <scope>compile</scope>
    </dependency>
</dependencies>

关键点:

  • 使用 AWS 提供的 BOM(Bill of Materials)统一管理版本,✅ 避免手动指定每个组件版本
  • s3 依赖包含 S3 客户端核心功能
  • netty-nio-client 是异步非阻塞通信的基础,必须引入

更多传输层配置详见 AWS 官方文档

4. S3 客户端创建

S3 操作的入口是 S3AsyncClient 类,所有异步调用都从它发起。

我们通过 Spring 的 @Configuration 创建一个单例 Bean,便于注入使用:

@Configuration
@EnableConfigurationProperties(S3ClientConfigurarionProperties.class)
public class S3ClientConfiguration {
    @Bean
    public S3AsyncClient s3client(S3ClientConfigurarionProperties s3props, 
      AwsCredentialsProvider credentialsProvider) {
        SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
          .writeTimeout(Duration.ZERO)
          .maxConcurrency(64)
          .build();
        S3Configuration serviceConfiguration = S3Configuration.builder()
          .checksumValidationEnabled(false)
          .chunkedEncodingEnabled(true)
          .build();
        S3AsyncClientBuilder b = S3AsyncClient.builder().httpClient(httpClient)
          .region(s3props.getRegion())
          .credentialsProvider(credentialsProvider)
          .serviceConfiguration(serviceConfiguration);

        if (s3props.getEndpoint() != null) {
            b = b.endpointOverride(s3props.getEndpoint());
        }
        return b.build();
    }
}

配置说明

S3ClientConfigurarionProperties 包含以下关键配置项:

  • region:AWS 区域,如 us-east-1
  • accessKeyId / secretAccessKey:API 凭据
  • endpoint:可选,用于覆盖默认 S3 接口地址(对接 MinIO、LocalStack 等兼容服务)
  • bucket:文件存储的桶名称

客户端配置要点

  • ❌ 禁用写超时(writeTimeout(Duration.ZERO)):避免低带宽下上传中断
  • ✅ 提高最大并发(maxConcurrency(64)):提升吞吐
  • ❌ 禁用校验和(checksumValidationEnabled(false)):减少 CPU 开销
  • ✅ 启用分块编码(chunkedEncodingEnabled(true)):支持流式上传

凭据管理

使用自定义 AwsCredentialsProvider,支持从 Spring 环境中读取配置,✅ 便于对接 Vault、Config Server 等外部配置中心:

@Bean
public AwsCredentialsProvider awsCredentialsProvider(S3ClientConfigurarionProperties s3props) {
    if (StringUtils.isBlank(s3props.getAccessKeyId())) {
        return DefaultCredentialsProvider.create();
    } else {
        return () -> {
            return AwsBasicCredentials.create(
              s3props.getAccessKeyId(),
              s3props.getSecretAccessKey());
        };
    }
}

5. 上传服务设计

上传接口路径为 /inbox,支持两种方式:

  • 普通上传POST,请求体为文件原始数据
  • 多部分上传POSTPUTmultipart/form-data 格式

我们创建 @RestController 处理两种请求:

@RestController
@RequestMapping("/inbox")
@Slf4j
public class UploadResource {
    private final S3AsyncClient s3client;
    private final S3ClientConfigurarionProperties s3config;

    public UploadResource(S3AsyncClient s3client, S3ClientConfigurarionProperties s3config) {
        this.s3client = s3client;
        this.s3config = s3config;        
    }
    
    @PostMapping
    public Mono<ResponseEntity<UploadResult>> uploadHandler(
      @RequestHeader HttpHeaders headers, 
      @RequestBody Flux<ByteBuffer> body) {
      // ... 单文件上传逻辑
    }

    @RequestMapping(
      consumes = MediaType.MULTIPART_FORM_DATA_VALUE,
      method = {RequestMethod.POST, RequestMethod.PUT})
    public Mono<ResponseEntity<UploadResult>> multipartUploadHandler(
      @RequestHeader HttpHeaders headers,
      @RequestBody Flux<Part> parts ) {
      // ... 多文件上传逻辑
    }
}

返回 UploadResult 对象,包含操作结果和生成的文件 key,供后续下载使用。

6. 单文件上传

客户端发送原始数据,我们通过 @RequestBody Flux<ByteBuffer> 接收流式数据。

核心逻辑简单粗暴:

  1. 生成唯一文件 key
  2. 构建 PutObjectRequest
  3. Flux<ByteBuffer> 转为 AsyncRequestBody 传给 SDK
@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers,
  @RequestBody Flux<ByteBuffer> body) {
    String fileKey = UUID.randomUUID().toString();
    MediaType mediaType = headers.getContentType();
    if (mediaType == null) {
        mediaType = MediaType.APPLICATION_OCTET_STREAM;
    }
    CompletableFuture future = s3client
      .putObject(PutObjectRequest.builder()
        .bucket(s3config.getBucket())
        .contentLength(headers.getContentLength()) // 必须!
        .key(fileKey)
        .contentType(mediaType.toString())
        .metadata(Collections.singletonMap("filename", "original.jpg"))
        .build(), 
      AsyncRequestBody.fromPublisher(body));

    return Mono.fromFuture(future)
      .map((response) -> {
        return ResponseEntity
          .status(HttpStatus.CREATED)
          .body(new UploadResult(HttpStatus.CREATED, new String[] {fileKey}));
        });
}

踩坑提醒

  • ⚠️ contentLength 必须指定,否则 SDK 会抛异常
  • ✅ 使用 AsyncRequestBody.fromPublisher(body) 直接桥接 Reactor 与 AWS SDK
  • 异步结果通过 Mono.fromFuture() 转为响应式流

7. 多文件上传

处理 multipart/form-data 上传,不能简单复用单文件逻辑,否则会:

  • ❌ 需要缓存整个文件才能获取大小
  • ❌ 消耗额外磁盘 I/O 和内存

✅ 正确做法:使用 AWS 多部分上传(Multipart Upload) 功能,支持:

  • 分块上传(最小 5MB/块,最后一块除外)
  • 并行上传
  • 断点续传

7.1 外层处理流程

return parts
  .ofType(FilePart.class)           // 只处理文件部分
  .flatMap((part)-> saveFile(part)) // 并发上传每个文件
  .collect(Collectors.toList())     // 收集所有生成的 key
  .map((keys)-> new UploadResult(HttpStatus.CREATED, keys)));

7.2 单文件上传实现

由于 S3 要求每块 ≥5MB,需本地缓冲。我们用 UploadState 记录上传状态:

class UploadState {
    String bucket;
    String filekey;
    String uploadId;
    int partCounter;
    Map<Integer, CompletedPart> completedParts = new HashMap<>();
    int buffered = 0;
    UploadState(String bucket, String filekey) {
        this.bucket = bucket;
        this.filekey = filekey;
    }
}

完整上传流程:

Mono<String> saveFile(HttpHeaders headers, String bucket, FilePart part) {
    String filekey = UUID.randomUUID().toString();
    Map<String, String> metadata = Collections.singletonMap("filename", part.filename());
    MediaType mt = part.headers().getContentType();
    if (mt == null) mt = MediaType.APPLICATION_OCTET_STREAM;
    
    UploadState uploadState = new UploadState(bucket, filekey);
    
    CompletableFuture<CreateMultipartUploadResponse> uploadRequest = s3client
      .createMultipartUpload(CreateMultipartUploadRequest.builder()
        .contentType(mt.toString())
        .key(filekey)
        .metadata(metadata)
        .bucket(bucket)
        .build());

    return Mono
      .fromFuture(uploadRequest)
      .flatMapMany((response) -> {
          uploadState.uploadId = response.uploadId();
          return part.content(); // 转为 Flux<DataBuffer>
      })
      .bufferUntil((buffer) -> { // 缓冲至 5MB
          uploadState.buffered += buffer.readableByteCount();
          if (uploadState.buffered >= s3config.getMultipartMinPartSize()) {
              uploadState.buffered = 0;
              return true;
          }
          return false;
      })
      .map((buffers) -> concatBuffers(buffers)) // 合并为 ByteBuffer
      .flatMap((buffer) -> uploadPart(uploadState, buffer))
      .reduce(uploadState, (state, completedPart) -> {
          state.completedParts.put(completedPart.partNumber(), completedPart);
          return state;
      })
      .flatMap((state) -> completeUpload(state))
      .map((response) -> uploadState.filekey);
}

7.3 上传分块

private Mono<CompletedPart> uploadPart(UploadState uploadState, ByteBuffer buffer) {
    final int partNumber = ++uploadState.partCounter;
    CompletableFuture<UploadPartResponse> request = s3client.uploadPart(
        UploadPartRequest.builder()
            .bucket(uploadState.bucket)
            .key(uploadState.filekey)
            .partNumber(partNumber)
            .uploadId(uploadState.uploadId)
            .contentLength((long) buffer.capacity())
            .build(), 
        AsyncRequestBody.fromPublisher(Mono.just(buffer)));
    
    return Mono.fromFuture(request)
      .map((result) -> CompletedPart.builder()
        .eTag(result.eTag())
        .partNumber(partNumber)
        .build());
}

7.4 完成上传

private Mono<CompleteMultipartUploadResponse> completeUpload(UploadState state) {        
    CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder()
        .parts(state.completedParts.values())
        .build();
    return Mono.fromFuture(s3client.completeMultipartUpload(
        CompleteMultipartUploadRequest.builder()
            .bucket(state.bucket)
            .uploadId(state.uploadId)
            .multipartUpload(multipartUpload)
            .key(state.filekey)
            .build()));
}

8. 文件下载

相比上传,下载简单得多。核心是 getObject() 方法,接收:

  • GetObjectRequest:指定桶和 key
  • AsyncResponseTransformer:将响应流转换为目标类型

⚠️ 注意:SDK 自带的 toPublisher() 实现会在内存中缓冲整个文件,不适合大文件。

但我们无需担心,自定义转换器非常简单。

8.1 下载接口

@GetMapping(path="/{filekey}")
Mono<ResponseEntity<Flux<ByteBuffer>>> downloadFile(@PathVariable("filekey") String filekey) {    
    GetObjectRequest request = GetObjectRequest.builder()
      .bucket(s3config.getBucket())
      .key(filekey)
      .build();
    
    return Mono.fromFuture(s3client.getObject(request, AsyncResponseTransformer.toPublisher()))
      .map(response -> {
        String filename = getMetadataItem(response.response(), "filename", filekey);            
        return ResponseEntity.ok()
          .header(HttpHeaders.CONTENT_TYPE, response.response().contentType())
          .header(HttpHeaders.CONTENT_LENGTH, Long.toString(response.response().contentLength()))
          .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
          .body(Flux.from(response));
      });
}

踩坑提醒

  • AsyncResponseTransformer.toPublisher() 返回 Publisher<ByteBuffer>,可直接转为 Flux
  • ⚠️ S3 元数据头 不区分大小写(RFC 7230),MinIO 等兼容服务可能返回不同大小写,需做归一化处理

9. 总结

本文展示了如何在 Spring WebFlux 中使用 AWS SDK for Java 2.x 的响应式特性,实现高效、低资源消耗的 S3 文件服务。

核心优势:

  • ✅ 非阻塞 I/O,高并发下资源占用低
  • ✅ 流式处理,支持大文件上传下载
  • ✅ 无缝集成 Reactor 与 AWS SDK

该模式同样适用于 DynamoDB 等其他支持响应式的 AWS 服务。

完整代码见 GitHub 仓库


原始标题:AWS S3 with Java - Reactive Support