InfoQ 推荐 ( ) • 2021-06-06 10:15

消息驱动的微服务

我们已经讨论了Spring Cloud 提供的是基于微服务架构的许多功能。但是,我们一直在考虑的其实都是基于RESTful的同步通信服务。在“微服务简介”中曾经提到过,还有其他一些流行的通信方式,如发布/订阅或异步、事件驱动的点对点消息传递等,后者就是本章将要介绍的一种微服务实现方法,它和前面章节所介绍的基于RESTful的同步通信服务有所不同。

本章还将详细讨论如何使用SpringCloudStream来构建消息驱动的微服务。

本章将要讨论的主题包括:

口与Spring Cloud Stream相关的主要术语和概念。

口使用RabbitMQ和Apache Kafka消息代理作为绑定器。

口Spring Cloud Stream编程模型。

口绑定、生成器和使用者的高级配置。

口扩展、分组和分区机制的实现。

口多个绑定器支持。

了解Spring Cloud Stream

Spring Cloud Stream构建于Spring Boot之上。它允许开发人员创建独立的、生产级的Spring应用程序,并使用Spring Integration来帮助实现与消息代理的通信。使用SpringCloud Stream创建的每个应用程序都可以通过输入和输出通道与其他微服务集成。这些通道通过与中间件相关的绑定器( Binder)实现连接到外部消息代理。有两种内置的绑定器实现一Kafka 和Rabbit MQ.

Spring. Integration 扩展了Spring 编程模型,以支持众所周知的企业集成模式(Enterprise Integration Pttens, EIP)。EIP定义了许多通常用于分布式系统中的协作的组件。读者可能已经听说过消息通道、路由器、聚合器或端点等模式。Spring Integration框架的主要目标是提供一个基于EIP构建Spring应用程序的简单模型。如果读者对有关EIP的更多详细信息感兴趣,请访问网站ht:/www enterpriseintegrationpatterms com/patterns/messaging/toc .html.

构建消息传递系统

我们认为引入主要Spring Cloud Stream功能的最合适方式是通过基于微服务的示例系统。我们将轻松修改前面章节中讨论过的系统架构。这里不妨简要回顾一下这种架构。我们的系统负责处理订单。它由4个独立的微服务组成。order-service 微服务首先与product-service服务进行通信,以便收集所选产品的详细信息,然后通过customer-service服务来检索有关客户及其账户的信息。现在,发送到order-service服务的订单将被异步处理。此时仍有一个公开的RESTful HTTP API端点用于客户端提交新订单,但应用程序不会处理它们。它只保存新订单,将其发送到消息代理,然后给客户端发送响应,表示订单已被批准处理。当前讨论的示例的主要目标是显示点对点通信,因而消息将仅由一个应用程序(account-service 服务)接收。图11.1说明了这个示例系统的架构。

收到新消息之后,account-service服务会调用product-service公开的方法以查找其价格。它从账户中提取资金,然后将响应发送回order-service服务(包含当前订单的状态)。该消息也通过消息代理发送。order-service 微服务将接收消息并更新订单状态。如果外部客户端想要检查当前订单状态,它可以调用公开find方法的端点,查找订单的详细信息。该示例应用程序的源代码可在GitHub ( htp:itb co/inin/sampl-spring- cloud-messaging.git)上获得。

 启用Spring Cloud Stream

在项目中包含Spring Cloud Stream的推荐方法是使用依赖项管理系统。Spring CloudStream具有与整个Spring Cloud 框架相关的独立版本列车管理。但是,如果在dependencyManagement部分的Edgware.RELEASE版本中声明了spring-cloud-dependencies,那么就不必在porm.xml中声明任何其他内容。如果开发人员只想使用Spring Cloud Stream项目,则应定义以下部分。

org.springframework.cloudspring-cloud-st ream-dependenciesDitmars。SR2pom import

下一步是将spring-cloud-steam添加到项目依赖项中。此外,建议开发人员至少包含spring- cloud-sleuth库,以提供发送消息功能和traceld,这个traceld与通过Zuul网关传入order- service服务的源请求的traceld相同。

org . springframework. cloudspring-cloud-streamorg. springframework. cloudcartifactId>spring-cloud-sleuth

要为应用程序启用与消息代理的连接,请使用@EnableBinding注解主类。

@EnableBinding注解可以将一个或多 个接口作为参数。可以在Spring Cloud Stream提供

的3个接口之间选择。

口Sink: 用于标记从入站通道接收消息的服务。

口Source:用于向出站频道发送消息。

口Processor:可用于需要入站通道和出站通道的情况,因为它扩展了Source 和Sink接口。由于order-service 服务发送消息以及接收消息,因而其主类已使用@EnableBinding ( Prossosrcass)进行注解。

以下是支持Spring Cloud Stream绑定的主要order-service服务类。

@SpringBootApplication@EnableDiscoveryClient@EnableBinding (Processor.class)public class OrderApplication {public static void main(String[] args) {newSpringApplicationBuilder (OrderApplication.class) .web(true) .run (args);}}

声明和绑定频道

由于使用了Spring Integration,因而该应用程序独立于项目中包含的消息代理实现。Spring Cloud Stream将自动检测并使用类路径中找到的绑定器,这意味着开发人员可以选择不同类型的中间件,并配合相同的代码使用。所有与中间件相关的设置都可以通过外部配置属性覆盖,并且采用Spring Boot 支持的形式,如应用程序参数、环境变量或application.yml文件。

如前文所述,Spring Cloud Stream为Kafka和Rabbit MQ提供了绑定器实现。要包含对Kafka的支持,请将以下依赖项添加到项目中。

org。springframework.cloudspring-cloud-starter-stream- kafka

就个人而言,笔者更喜欢RabbitMQ,本章将为RabbitMQ和Kafka各创建一个示例。 由于前面的章节已经讨论过RabbitMQ的功能,所以现在就先从基于RabbitMQ的示例开始。

org. spr ingf ramework.cloud spring-cloud-starter -stream rabbit

启用Spring Cloud Stream并包含绑定器实现后,即可创建发送消息者(Sender) 和侦听消息者(Listener) 。现在可以从负责向代理发送新订单消息的生产者(Producer) 开始。这是通过order-service中的OrderSender 实现的,它使用Output bean发送消息。

@Servicepublic class OrderSender {@Autowiredprivate Source source;public boolean send (Order order)returnthis.source.output() .send (MessageBuilder .withPayload (order) .build( ) );}}

该bean由控制器调用,它公开允许提交新订单的HTTP方法。

@RestControllerpublic class OrderController {private static final Logger LOGGER =LoggerFactory.getLogger (OrderController.class);private ObjectMapper mapper new ObjectMapper();@AutowiredOrderRepository repository;@AutowiredOrderSender sender ;@PostMappingpublic order process (0RequestBody order order) throwsJsonProcess ingException (Order O = repository.add(order);LOGGER.info("Order saved: }", mapper .writeValueAsString (order));boolean isSent 一sender 。send(o) ;LOGGER. info("order sent:{ } ",mapper.writeValueAsstring (Collections.singletonMap ("issent", isSent) ) );return o;}}

包含订单信息的消息已发送到消息代理。现在,它应该通过account-service服务接收。要完成这一操作, 必须声明接收器,接收器将侦听传入队列的消息,这个消息是在消息代理上创建的。要接收带有订单数据的消息,只需要使用@Sreaml istener注解让该方法采用Order对象作为参数。

@SpringBootApplication@EnableDiscoveryClient@EnableBinding (Processor.class)public class AccountApplication {@AutowiredAccountService service;public static void main(String[] args) {newSpringAppl icationBuilder (AccountApplication.class) .web (true) . run(args);}@BeanastreamListener (Processor。INPUT)public void receiveOrder (Order order) throws JsonProcessingException {service.process (order);}}

现在可以启动示例应用程序。但是,这里还有一个尚未提及的重要细节。这两个应用程序都尝试连接在localhost上运行的RabbitMQ,并且它们都将相同的交换( Exchange )信息视为输入或输出。这是一个问题,因为order-service服务将消息发送到输出交换信息,而account-service服务又将侦听传入其输入交换消息。这些是不同的交换信息,但先者恒先。接下来不妨就从运行消息代理开始。

本文给大家讲解的内容是消息驱动的微服务

下篇文章给大家讲解的是自定义与RabbitMQ代理的连接;觉得文章不错的朋友可以转发此文关注小编,有需要的可以私信小编获取;感谢大家的支持!

原文链接:https://www.toutiao.com/a6969440002333884931/?log_from=b4d1a2e3c2e6c_1622784284296