1. 概述
在并发编程中,Future 和 Promise 是两个高级异步抽象。本教程将带你深入了解它们在 Scala 编程语言中的作用和用法。
2. Future
Future 是一个只读的占位符,用于表示正在进行的计算结果。它充当了一个尚未存在的实际值的代理。比如 IO 或 CPU 密集型操作,这些通常需要较长时间才能完成。
2.1. 创建 Future
要创建异步计算任务,我们可以将我们的计算逻辑放入 Future 的 apply 方法中:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val result = Future {
println("Long running computation started.")
val result = {
Thread.sleep(5000)
5
}
println("Our computation, finally finished.")
result
}
要运行 Future,我们需要一个 ExecutionContext,它可以让我们将业务逻辑(代码)与执行环境解耦。由于 ExecutionContext 是隐式参数,我们可以在作用域中导入或创建一个并标记为 implicit
。这里为了简单起见,我们使用全局 ExecutionContext:
implicit val ec = scala.concurrent.ExecutionContext.Implicits.global
val r1 = Future{ /* computation */ }(ec) // 显式传入 ExecutionContext
val r2 = Future{ /* computation */ } // 隐式传递
2.2. 组合 Future
使用 Future 的好处在于它支持 map
和 flatMap
操作,因此我们可以使用惯用的 for-comprehension 来链式组合多个 Future,实现良好的并发组合能力。
以下是一个 User 类的例子,其中 createUser()
函数依赖于其他异步函数的结果,如 notExist()
和 avatar()
。我们来看看如何组合这些操作:
import java.math.BigInteger
import java.net.URL
import java.security.MessageDigest
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
type Name = String
type Email = String
type Password = String
type Avatar = URL
case class User(name: Name, email: Email, password: Password, avatar: Avatar)
def notExist(email: Email): Future[Boolean] = Future {
Thread.sleep(100)
true
}
def md5hash(str: String): String =
new BigInteger(1,
MessageDigest
.getInstance("MD5")
.digest(str.getBytes)
).toString(16)
def avatar(email: Email): Future[Avatar] = Future {
Thread.sleep(200)
new Avatar("http://avatar.example.com/user/23k520f23f4.png")
}
def createUser(name: Name, email: Email, password: Password): Future[User] =
for {
_ <- notExist(email)
avatar <- avatar(email)
hashedPassword = md5hash(password)
} yield User(name, email, hashedPassword, avatar)
在这个例子中,createUser()
接收 name、email 和 password 参数,并返回一个包含对应 User 实例的 Future(即 Future[User]
)。
创建用户时需要执行一些涉及 IO 的步骤。首先检查数据库中是否已存在该用户;其次从公共头像服务获取其头像。
我们使用 Thread.sleep()
来模拟数据库操作的延迟。
因为 avatar()
和 notExist()
都是 Future,而每个 Future 都支持 flatMap 操作,所以我们可以通过 for-comprehension 将它们组合起来。
2.3. 获取值
到目前为止,我们已经学会了如何创建 Future 并将其串联起来。那么,该如何获取 Future 的最终结果呢?
在深入访问 Future 值的方式之前,先了解一下 Future 的状态机制:
✅ Future 有两种阶段状态:
- 未完成(Not Completed):计算仍在进行中。
- 已完成(Completed):计算结束,结果可能是成功(Success)或失败(Failure)。
也就是说,在计算结果就绪前,Future 处于未完成状态;完成后则进入成功或失败状态之一。
下面介绍两种获取 Future 结果的方法:
方法一:使用 onComplete 回调
✅ Future 提供了 onComplete
方法,接收一个 Try[T] => U
类型的回调函数,允许我们在完成的不同状态下分别处理:
val userFuture: Future[User] =
createUser("John", "john.doe@example.com", "secret")
userFuture.onComplete {
case Success(user) =>
println(s"User created: $user")
case Failure(exception) =>
println(s"Creating user failed due to the exception: $exception")
}
通过向 onComplete()
提供回调函数,一旦 Future 完成,该回调就会被触发。
方法二:阻塞等待结果(不推荐)
⚠️ 另一种方式是通过阻塞线程来等待 Future 完成,但这通常不是推荐做法。Await.result()
和 Await.ready()
会阻塞当前线程直到计算完成。它们的签名如下:
def result[T](awaitable: Awaitable[T], atMost: Duration): T
def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type
- 第一个参数是要等待的对象(所有 Future 都是 Awaitable 的子类)
- 第二个参数是指定等待的最大时间
可以这样获取 Future 的值:
val user: User = Await.result(userFuture, Duration.Inf)
⚠️ 注意:使用 result()
时需小心,因为它可能在计算完成前抛出异常。
✅ 如果只是想等待 Future 完成而不提取值,可以使用 ready()
方法:
val completedFuture: Future[User] = Await.ready(userFuture, Duration.Inf)
此时再调用 value.get
一定不会是 None
,而是 Some(Success(t))
或 Some(Failure(error))
:
completedFuture.value.get match {
case Success(user) =>
println(s"User created: $user")
case Failure(exception) =>
println(s"Creating user failed due to the exception: $exception")
}
3. Promise
✅ Promise 是一个可写的、单次赋值的容器,用于完成一个 Future。虽然 Promise 和 Future 很相似,但它们的角色不同:
- ✅ Future 负责“读”异步操作的结果
- ✅ Promise 负责“写”异步操作的结果
换句话说,Promise 是对将来某个时刻可用值的写入句柄。它允许我们将已完成的异步操作的结果写入 Future,从而改变 Future 的状态(从未完成变为完成)。调用 success()
方法即可完成这一过程。
⚠️ 一旦 Promise 被完成,就不能再次调用 success()
。
现在来看一下如何从 Promise 创建 Future:
✅ 步骤如下:
- 为想要返回的类型创建一个 Promise。
- 在 ExecutionContext 中运行计算代码块。如果执行成功,则调用 Promise 的
success()
方法设置结果;如果失败,则调用failure()
方法。 - 返回与 Promise 关联的 Future。
来看一个示例函数,它接收一个计算块 block: => T
,并返回一个包含 Promise 值的 Future:
def runByPromise[T](block: => T)(implicit ec: ExecutionContext): Future[T] = {
val p = Promise[T]()
ec.execute { () =>
try {
p.success(block)
} catch {
case NonFatal(e) => p.failure(e)
}
}
p.future
}
✅ Promise 是连接传统 Java 回调风格 API 和 Future 的桥梁。
4. 总结
本文介绍了 Scala 中用于编写非阻塞、异步代码的两种核心构造:
- ✅ 使用 Future 处理异步操作的读取端
- ✅ 使用 Promise 处理异步操作的写入端
完整源码见 GitHub。