1. 简介
在本教程中,我们将探讨如何将 Play Framework 和 Akka Actors 与 Lagom Framework 进行集成。我们还会解释为什么需要这样的集成,以及实现这些集成的几种可能方式。
此外,我们将构建一个基于 Lagom 的示例应用,用于演示这些集成。
2. 理解 Lagom
Lagom 是一个高度固执己见的框架,用于在 Java 和 Scala 中构建灵活、弹性和响应式系统。
它是由 Lightbend 维护的开源框架,提供了库和开发环境,以最佳实践为基础来 构建基于响应式微服务的系统。
Lagom 通过利用 Lightbend 的其他响应式框架(如 Play 和 Akka)来支持从开发到部署的多个方面:
我们设计微服务时希望它们是隔离且自治的,并通过松耦合的方式进行通信。Lagom 支持通过 HTTP 或 WebSocket 实现同步或异步通信。此外,它还通过 Kafka 等消息代理提供基于消息的通信,确保至少一次投递语义。
我们还希望微服务能够独占并直接控制其数据,这遵循了“有界上下文”(Bounded Context)原则。Lagom 通过像事件溯源(Event Sourcing)和命令查询职责分离(CQRS)等知名的设计模式来支持数据持久化。Lagom 通过异步 API 将事件流持久化到数据库中,默认使用 Cassandra。
在开发松耦合微服务的其他关键部分中,服务发现和服务网关也非常重要。我们需要它们来提供服务之间及外部客户端通信的位置透明性。
Lagom 的开发环境中嵌入了一个服务定位器,允许服务相互发现和通信。同时,也嵌入了服务网关,用于允许外部客户端连接到 Lagom 服务。
3. Lagom 中的一个工作示例
为了探索将 Lagom 与 Play 或 Akka 集成的选项,我们首先需要一个可用的示例。在本节中,我们将构建一个简单的基于微服务的应用程序,利用 Lagom 框架。
此外,我们将使用这个示例来理解将 Lagom 与 Play 或 Akka 集成的方式。请注意,该示例基于 Lagom 官方文档中的示例,但足以覆盖我们所需的基础知识。
3.1. 项目设置
通常,Lagom 在我们拥有完整的微服务架构时更有用,因为可以从中受益于它提供的功能。然而,由于本教程的目标是展示如何在 Lagom 中集成 Play 和 Akka API,我们将保持简单。我们将定义一个没有持久化的单一微服务,然后通过 Akka 和 Play 进行扩展。
Lagom 提供了 Java 和 Scala 的 API;但在本教程中,我们将使用 Scala。此外,Lagom 在 Java 中可以选择使用 Maven 或 sbt,但 Scala 只能使用 sbt。
启动 Lagom 项目的最简单方法是使用 Lagom 提供的启动工具。或者,我们可以定义项目结构并让 sbt 生成引导代码:
organization in ThisBuild := "com.baeldung"
version in ThisBuild := "1.0-SNAPSHOT"
scalaVersion in ThisBuild := "2.13.0"
val macwire = "com.softwaremill.macwire" %% "macros" % "2.3.3" % "provided"
val scalaTest = "org.scalatest" %% "scalatest" % "3.1.1" % Test
lazy val `hello` = (project in file(".")).aggregate(`hello-api`, `hello-impl`)
lazy val `hello-api` = (project in file("hello-api"))
.settings(
libraryDependencies ++= Seq(
lagomScaladslApi
)
)
lazy val `hello-impl` = (project in file("hello-impl"))
.enablePlugins(LagomScala)
.settings(
libraryDependencies ++= Seq(
lagomScaladslTestKit,
macwire,
scalaTest
)
)
.settings(lagomForkedTestSettings)
.dependsOn(`hello-api`)
这是我们在 SBT 构建文件中可以定义的简单项目结构。
请注意,Lagom 建议为每个微服务的接口和实现分别定义项目。因此,如上所示,我们将 hello-world 微服务定义在 “hello-api” 和 “hello-impl” 两个项目中。此外,“hello-impl” 项目依赖于 “hello-api” 项目。
除此之外,还有一些常规依赖项,如 Cassandra 和 Kafka,但这些是可选的,不是每个微服务都需要。
3.2. 定义消息
首先,我们需要定义服务将消费和生产的消息。我们还需要确保提供隐式或自定义的消息序列化器,以便 Lagom 能够序列化和反序列化请求和响应消息。
让我们快速定义消息:
case class Job(jobId: String, task: String, payload: String)
object Job {
implicit val format: Format[Job] = Json.format
}
case class JobAccepted(jobId: String)
object JobAccepted {
implicit val format: Format[JobAccepted] = Json.format
}
现在让我们更好地理解这些消息:
- 我们定义了两个 case 类
Job
和JobAccepted
以及它们的伴生对象 - 我们添加了隐式 JSON 序列化;默认情况下,Lagom 使用 Play JSON 来完成这项工作
- 这些消息代表服务的请求和对应的响应
3.3. 定义服务
如前所述,Lagom 倾向于将微服务拆分为服务接口及其具体实现。因此,下一步是定义服务接口:
trait HelloService extends Service {
def submit(): ServiceCall[Job, JobAccepted]
override final def descriptor: Descriptor = {
import Service._
named("hello")
.withCalls(
pathCall("/api/submit", submit _)
).withAutoAcl(true)
}
}
这是一个简单的 Scala trait,在 Lagom 中被称为服务描述符。服务描述符定义了如何实现和调用服务。
让我们理解其中几个重要的点:
- 我们定义了一个调用
*/api/submit*
,映射到submit
函数 - 该函数返回一个
ServiceCall
的句柄,接受Job
和JobAccepted
参数 - 这些参数是消息类型,可以是严格或流式的
- 我们可以使用这个句柄来执行调用,从而完成实际的工作
- 我们使用基于路径的标识符,通过路径和查询字符串来路由调用
- 其他可能的标识符包括基于名称的标识符和 REST 标识符
3.4. 实现服务
接下来,我们需要为刚刚定义的服务接口提供实现。这必须包括描述符中指定的每个调用的实现:
class HelloServiceImpl()(implicit ec: ExecutionContext)
extends HelloService {
override def submit(): ServiceCall[Job, JobAccepted] = ServiceCall {
job =>
Future[JobAccepted] { JobAccepted(job.jobId) }
}
}
这是我们在服务描述符中定义的函数的基本实现。不过,有几点需要注意:
- 该方法并不执行调用,而是返回一个待执行的 lambda
- 这为函数式组合提供了便利
- 调用本身*不返回值,而是返回一个 Future,即一个承诺*
- 这为我们提供了构建异步、非阻塞、响应式应用的强大方式
3.5. 创建 Lagom 应用
现在,我们需要将服务和其实现整合到一个 Lagom 应用中。Lagom 使用编译时依赖注入来连接这个 Lagom 应用。Lagom 偏好使用 Macwire,它提供轻量级宏来定位组件的依赖项。
让我们快速看看如何创建我们的 LagomApplication
:
abstract class HelloApplication(context: LagomApplicationContext)
extends LagomApplication(context)
with AhcWSComponents {
override lazy val lagomServer: LagomServer =
serverFor[HelloService](wire[HelloServiceImpl])
}
这几行代码背后发生了一些有趣的事情:
HelloApplication
通过 Macwire 混合了AhcWSComponents
- 此外,我们可以混合来自 Scala 或第三方的许多其他组件
- 我们实现了
lagomServer
方法,Lagom 使用它来发现服务绑定 - 我们可以使用 Macwire 的
wire
宏将其他依赖项注入到HelloServiceImpl
中 - 这个类仍然是抽象的,因为它需要实现
serviceLocator
方法
最后,我们需要编写一个应用加载器,以便应用能够自举。在 Lagom 中,我们可以通过继承 LagomApplicationLoader
来方便地完成:
class HelloLoader extends LagomApplicationLoader {
override def load(context: LagomApplicationContext): LagomApplication =
new HelloApplication(context) {
override def serviceLocator: ServiceLocator = NoServiceLocator
}
override def loadDevMode(context: LagomApplicationContext): LagomApplication =
new HelloApplication(context) with LagomDevModeComponents
override def describeService = Some(readDescriptor[HelloService])
}
让我们看看这段代码中需要注意的几个重要点:
- 我们实现了两个必需的方法:
load
和loadDevMode
- 这是我们将合适的
serviceLocator
混合到HelloApplication
中的地方 describeService
方法是可选的,但可以帮助配置服务网关等组件
3.6. 配置
可以通过在配置文件 application.conf
中提供的值来配置 Lagom 服务的许多不同部分。
然而,在我们的简单示例中,唯一需要配置的是我们的应用加载器:
play.application.loader = com.baeldung.hello.impl.HelloLoader
3.7. 运行示例
现在,我们已经完成了创建一个简单可用的 Lagom 示例所需的所有工作。所以,我们终于可以运行这个应用了。
使用命令提示符和 sbt 工具,只需一条命令即可运行整个 Lagom 应用:
sbt runAll
一旦服务器成功启动,我们应该能够使用像 cURL 这样的工具向该服务提交任务:
curl --location --request POST 'http://localhost:9000/api/submit' \
--header 'Content-Type: application/json' \
--data-raw '{
"jobId":"jobId",
"task":"task",
"payload":"payload"
}'
4. 集成 Lagom 与 Play
Lagom 是构建在 Play 框架之上的。在构建 Lagom 服务时,我们不必关心这个细节,正如我们在示例中所看到的那样。
然而,Play 提供了许多强大的功能,我们可能需要直接访问它们。可以直接从 Lagom 调用一些 Play API。
4.1. Play Framework 是什么?
Play 是一个由 Lightbend 维护的高生产力 Java 和 Scala Web 应用框架。它基于轻量、无状态和对 Web 友好的架构。Play 框架还提供了简洁和函数式编程模式。它内部利用 Akka 和 Akka Stream 来提供响应式模型和自然的可扩展性。
我们可以使用 Play 提供的组件(如 HTTP 服务器、强大的路由机制等)来构建 Web 应用和 REST 服务。Play 对数据库访问没有固执己见,能够与多种持久化工具集成。
4.2. 为什么需要从 Lagom 访问 Play?
虽然完全可以从头开始构建 Lagom 服务,但我们经常会遇到需要将 Lagom 添加到现有应用的情况。这些应用可能已经使用 Play 框架处理各种用例。这为我们提供了一个强大的工具,可以在这些应用之上定义的 Lagom 服务中加以利用。
让我们理解一些需要直接从 Lagom 服务访问 Play API 的典型用例:
- 将 Lagom 与现有的 Play 路由器(如 Play gRPC Router)集成
- 访问 Play 组件提供的功能,如
AhcWSComponents
4.3. 从 Lagom 访问 Play API
让我们看看如何实现将简单的 Play 路由器与 Lagom 集成的目标。我们将从定义一个简单的 Play 路由器开始:
class SimplePlayRouter(action: DefaultActionBuilder, parser: PlayBodyParsers) {
val router = Router.from {
case GET(p"/api/play") =>
action(parser.default) { request =>
Results.Ok("Response from Play Simple Router")
}
}
}
这是一个非常简单的路由器,没有实际用途,但有助于理解如何将其与 Lagom 集成。下一步是将其连接到 Lagom 应用加载器中,并附加到 Lagom 服务器:
override lazy val lagomServer: LagomServer =
serverFor[HelloService](wire[HelloServiceImpl])
.additionalRouter(wire[SimplePlayRouter].router)
在这里,我们使用 Macwire 的 wire
宏注入我们的 Play 路由器,并使用 additionalRouter
方法将其附加到 Lagom 服务器。
虽然这足以实现我们的目标,但有一个小问题。由于这个附加路由器不是服务描述符的一部分,服务网关不会自动将其发布为接口。
不过,我们可以在服务描述符中快速添加一个针对该路由器的 ACL(访问控制列表):
named("hello")
.withCalls(
pathCall("/api/hello/:id", hello _),
)
.withAutoAcl(true)
.withAcls(ServiceAcl(pathRegex = Some("/api/play")))
这已经足够了,现在我们可以通过服务网关访问 Play 路由器中的接口:
curl http://localhost:9000/api/play
5. 集成 Lagom 与 Akka
就像 Play 一样,在构建 Lagom 服务时,我们不必意识到 Lagom 是构建在 Akka 之上的。但是,Akka 提供了丰富的功能,我们可能需要直接利用。可以将 Akka 与 Lagom 集成,并直接从 Lagom 服务调用 Akka API,反之亦然。
5.1. Akka 是什么?
Akka 是一组由 Lightbend 维护的开源库,可用于设计可扩展和弹性的系统。Akka 使用 Actor 模型为开发并发、并行、容错和分布式系统提供了一层抽象。Actor 模型帮助它避免内存可见性问题,同时提供位置透明性。
Lagom 内部使用 Akka 库来提供多种功能。例如,Lagom 的持久化和发布-订阅模块是构建在 Akka 之上的。Lagom 通过 Akka Cluster 为微服务提供集群支持。Lagom 还使用 Akka Streams 提供异步流服务。
5.2. 为什么需要集成 Akka 和 Lagom?
Akka 通过 Actor 模型提供了编写高可用、弹性和响应式应用的独特方式。在使用 Lagom 时,我们可能需要直接访问一些 Akka API 来精确控制如何在应用中实现这些属性。同样,我们可能有机会从 Akka Actor 向 Lagom 服务发送信号。
让我们看看一些可能促使我们集成 Akka 和 Lagom 的用例:
- 以自定义方式将工作负载分发到 Akka 集群中的节点
- 从 Akka Actor 向 Lagom 服务实现传递消息
5.3. 从 Lagom 访问 Akka API?
将 Akka 与 Lagom 集成非常简单。Akka 中几乎所有内容都可以通过 ActorSystem
访问。
我们可以利用 Lagom 的依赖注入将当前的 ActorSystem
注入到 Lagom 服务实现或持久化实体中:
class HelloServiceImpl(system: ActorSystem)(implicit ec: ExecutionContext)
在这里,Lagom 的默认依赖注入将负责将当前的 ActorSystem
注入到我们的服务实现中。
假设我们需要根据请求消息的某些参数将传入请求路由到集群中的节点。
首先,我们需要定义一个 Actor
,用于处理传入请求:
class Worker() extends Actor {
private val log = Logging.getLogger(context.system, this)
override def receive = {
case job @ Job(id, task, payload) =>
log.info("Working on job: {}", job)
sender ! JobAccepted(id)
// perform the work...
}
}
这个简单的 Actor
只是记录任务详情并向发送者确认。接下来,我们可以在集群中的某些或所有节点上选择性地启动一个工作 Actor:
if (Cluster.get(system).selfRoles("worker-node")) {
system.actorOf(Worker.props, "worker")
}
现在,我们需要创建一个路由器,用于将任务有选择性地发送到集群中的工作节点:
val workerRouter = {
val paths = List("/user/worker")
val groupConf = ConsistentHashingGroup(paths, hashMapping = {
case Job(_, task, _) => task
})
val routerProps = ClusterRouterGroup(
groupConf,
ClusterRouterGroupSettings(
totalInstances = 1000,
routeesPaths = paths,
allowLocalRoutees = true,
useRoles = Set("worker-node")
)
).props
system.actorOf(routerProps, "workerRouter")
}
在这里,我们使用了基于 ConsistentHashingGroup
的 ClusterRouterGroup
。它使用 Job
的 task
属性来分组和路由任务到工作节点。Akka 提供了多种分组策略,包括 ClusterRouterGroup
、BroadcastGroup
、RandomGroup
和 RoundRobinGroup
等。
最后,我们准备好以自定义方式接受并将任务路由到集群节点。我们将修改之前的服务实现以适应这一点:
override def submit(): ServiceCall[Job, JobAccepted] = ServiceCall {
job =>
implicit val timeout = Timeout(5.seconds)
(workerRouter ? job).mapTo[JobAccepted]
}
在这里,我们只是使用之前创建的路由器将传入任务路由到集群节点,并设置五秒超时。
现在,我们应该能够像以前一样提交任务,这些任务将按照我们定义的方式进行路由。
5.4. 从 Akka 访问 Lagom API
现在,我们将看看如何轻松地从 Akka 访问 Lagom API。
我们将扩展示例从我们的简单 Actor
发送工作负载执行进度的信号。我们将利用发布-订阅(Publish-Subscribe)这一知名的消息传递模式来实现这种通信。Lagom 通过 PubSubRegistry
提供对此的支持,它提供了 PubSubRef
,我们可以使用它来发布和订阅主题。
我们将首先在模块 “hello-impl” 的 sbt 构建文件中添加所需的依赖项:
libraryDependencies ++= Seq(
lagomScaladslTestKit,
lagomScaladslPubSub,
macwire,
scalaTest
像以前一样,我们将使用 Lagom 的依赖注入将 PubSubRegistry
的实例注入到我们的服务实现中:
class HelloServiceImpl(system: ActorSystem, pubSub: PubSubRegistry)(implicit ec: ExecutionContext)
我们还需要定义将要发布和订阅的消息:
case class JobStatus(jobId: String, jobStatus: String)
object JobStatus {
implicit val format: Format[JobStatus] = Json.format
}
现在,我们将修改之前定义的 Actor
,以利用 PubSubRegistry
并将 JobStatus
类型的消息发布到主题:
class Worker(pubSub: PubSubRegistry) extends Actor {
private val log = Logging.getLogger(context.system, this)
override def receive = {
case job @ Job(id, task, payload) =>
log.info("Working on job: {}", job)
sender ! JobAccepted(id)
val topic = pubSub.refFor(TopicId[JobStatus]("job-status"))
topic.publish(JobStatus(job.jobId,"started"))
// perform the work...
topic.publish(JobStatus(job.jobId,"completed"))
}
}
在这里,我们使用注入的 PubSubRegistry
获取一个名为 topic
的 PubSubRef
。然后,我们使用 PubSubRef#publish
发送更新。
现在,我们可以通过另一个服务调用来流式传输状态更新。让我们在服务接口中添加一个新的调用:
def hello(id: String): ServiceCall[NotUsed, String]
def doWork(): ServiceCall[Job, JobAccepted]
def status(): ServiceCall[NotUsed, Source[JobStatus, NotUsed]]
override final def descriptor: Descriptor = {
import Service._
named("hello")
.withCalls(
pathCall("/api/hello/:id", hello _),
pathCall("/api/dowork", doWork _),
pathCall("/api/status", status _)
)
.withAutoAcl(true)
}
注意,这个新方法的返回类型是 Source
,实际上是 Akka Stream API,用于启用异步流。我们还需要为这里引入的新方法提供实现:
override def status(): ServiceCall[NotUsed, Source[JobStatus, NotUsed]] = ServiceCall {
_ =>
val topic = pubSub.refFor(TopicId[JobStatus]("job-status"))
Future.successful(topic.subscriber)
}
现在,剩下的唯一事情是更新 Lagom 应用并引入一个名为 PubSubComponents
的新组件:
abstract class HelloApplication(context: LagomApplicationContext)
extends LagomApplication(context)
with PubSubComponents
with AhcWSComponents {
// Provide service bindings as before
}
现在,我们已经准备好访问我们的服务调用,提交工作负载并获取实时更新。我们需要 一个 WebSocket 客户端 来访问以下接口以获取实时更新:
ws://localhost:9000/api/status
6. 版本说明
Lagom 内部是构建在 Play 和 Akka 之上的,因此对它们库的版本有特定的依赖。在 Lagom 应用中,我们可能需要升级一个或多个库以满足特定需求。
Lagom 提供了一种方便的方式来覆盖和升级这些库的版本。我们可以通过在应用的 sbt 构建文件中提供 dependencyOverrides
来实现。
此外,Lagom 会直接或间接添加许多依赖项。但是,它并不包含 Akka 提供的所有库。我们可以选择添加任何我们需要但尚未包含的依赖项。
不过,我们必须小心不要混合不兼容的版本。Akka 有严格的二进制兼容性规则,我们必须确保不违反这些规则。此外,我们应该记住,sbt 在解析依赖项时会获取直接声明到传递的最新版本。工具 sbt-dependency-graph 在分析项目依赖项时非常有用。
7. 结论
总结一下,在本教程中,我们讨论了 Lagom 框架,并创建了一个非常简单的 Lagom 应用。我们还理解了 Lagom 是如何构建在 Play 和 Akka 之上的。
此外,我们探讨了直接从 Lagom 访问 Play 和 Akka 部分的原因和方式。我们扩展了简单的应用以理解这些集成。
一如既往,本文的源代码可以在 GitHub 上找到。