RabbitMQ开发实战

 

RabbitMQ实践方法论总结

  1. 深入理解AMQP协议架构
    1. AMQP协议直接定义了RabbitMQ的内部结构和外部行为
    2. 我们使用RabbitMQ本质上是在使用AMQP协议
    3. AMQP协议可以被多种消息中间件使用,可以举一反三
  1. 深入理解消息流转流程
    1. notion image
  1. 合理的交换机和队列设置
    1. 交换机数量不能过多,一般来说同一个业务,或者同一类业务使用同一个交换机
    2. 合理设置队列数量,一般来说一个微服务监听一个队列,或者一个微服务的一个业务监听一个队列
    3. 合理配置交换机类型,使用Topic模式时仔细设置绑定键
  1. 尽量使用自动化配置
    1. 将创建交换机/队列的操作固化在应用代码中,免去复杂的运维操作
    2. 一般来说,交换机由双方同时声明,队列由接收方声明并配置绑定关系
    3. 交换机/队列的参数一定要由双方开发团队确认,否则重复声明时,参数不一致,会导致声明失败
 
 

架构设计

  1. 使用微服务系统,组件之间充分解耦
  1. 使用消息中间件,解耦业务逻辑
  1. 使用数据库,持久化业务数据

什么是微服务架构

  1. 将应用程序构建为松耦合,可独立部署的一组服务
    1. 服务:一个单一的,可独立部署的软件组件,实现了一些有用的功能
    2. 松耦合:封装服务的实现细节,通过API调用

微服务拆分

  1. 根据系统操作进行微服务拆分
  1. 根据业务能力进行微服务拆分
  1. 根据子域进行微服务拆分

微服务的数据库设计原则

  1. 每个微服务使用自己的数据库
  1. 不要使用共享数据库的方式进行通信
  1. 不要使用外键,对于数据量比较少的表慎用索引
 

项目架构设计实践

  1. 微服务拆分
    1. notion image
  1. 架构设计
    1. notion image
  1. 业务流程
    1. notion image
  1. 接口拆分
    1. 罗列接口
  1. 数据库设计
    1. notion image
      SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for deliveryman -- ---------------------------- DROP TABLE IF EXISTS `deliveryman`; CREATE TABLE `deliveryman` ( `id` int(0) NOT NULL AUTO_INCREMENT COMMENT '骑手id', `name` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '名称', `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态', `date` datetime(0) NULL DEFAULT NULL COMMENT '时间', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of deliveryman -- ---------------------------- INSERT INTO `deliveryman` VALUES (1, 'wangxiaoer', 'AVALIABLE', '2020-06-10 20:30:17'); SET FOREIGN_KEY_CHECKS = 1; SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for order_detail -- ---------------------------- DROP TABLE IF EXISTS `order_detail`; CREATE TABLE `order_detail` ( `id` int(0) NOT NULL AUTO_INCREMENT COMMENT '订单id', `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态', `address` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '订单地址', `account_id` int(0) NULL DEFAULT NULL COMMENT '用户id', `product_id` int(0) NULL DEFAULT NULL COMMENT '产品id', `deliveryman_id` int(0) NULL DEFAULT NULL COMMENT '骑手id', `settlement_id` int(0) NULL DEFAULT NULL COMMENT '结算id', `reward_id` int(0) NULL DEFAULT NULL COMMENT '积分奖励id', `price` decimal(10, 2) NULL DEFAULT NULL COMMENT '价格', `date` datetime(0) NULL DEFAULT NULL COMMENT '时间', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 403 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1; SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for product -- ---------------------------- DROP TABLE IF EXISTS `product`; CREATE TABLE `product` ( `id` int(0) NOT NULL AUTO_INCREMENT COMMENT '产品id', `name` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '名称', `price` decimal(9, 2) NULL DEFAULT NULL COMMENT '单价', `restaurant_id` int(0) NULL DEFAULT NULL COMMENT '地址', `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态', `date` datetime(0) NULL DEFAULT NULL COMMENT '时间', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of product -- ---------------------------- INSERT INTO `product` VALUES (2, 'eqwe', 23.25, 1, 'AVALIABLE', '2020-05-06 19:19:04'); SET FOREIGN_KEY_CHECKS = 1; SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for restaurant -- ---------------------------- DROP TABLE IF EXISTS `restaurant`; CREATE TABLE `restaurant` ( `id` int(0) NOT NULL AUTO_INCREMENT COMMENT '餐厅id', `name` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '名称', `address` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '地址', `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态', `settlement_id` int(0) NULL DEFAULT NULL COMMENT '结算id', `date` datetime(0) NULL DEFAULT NULL COMMENT '时间', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of restaurant -- ---------------------------- INSERT INTO `restaurant` VALUES (1, 'qeqwe', '2weqe', 'OPEN', 1, '2020-05-06 19:19:39'); SET FOREIGN_KEY_CHECKS = 1; SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for reward -- ---------------------------- DROP TABLE IF EXISTS `reward`; CREATE TABLE `reward` ( `id` int(0) NOT NULL AUTO_INCREMENT COMMENT '奖励id', `order_id` int(0) NULL DEFAULT NULL COMMENT '订单id', `amount` decimal(9, 2) NULL DEFAULT NULL COMMENT '积分量', `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态', `date` datetime(0) NULL DEFAULT NULL COMMENT '时间', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 8 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1; SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for settlement -- ---------------------------- DROP TABLE IF EXISTS `settlement`; CREATE TABLE `settlement` ( `id` int(0) NOT NULL AUTO_INCREMENT COMMENT '结算id', `order_id` int(0) NULL DEFAULT NULL COMMENT '订单id', `transaction_id` int(0) NULL DEFAULT NULL COMMENT '交易id', `amount` decimal(9, 2) NULL DEFAULT NULL COMMENT '金额', `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态', `date` datetime(0) NULL DEFAULT NULL COMMENT '时间', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1168 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;
  1. 项目搭建
    1. 输入项目名称,包名
      选择Lombok,SpringWeb,MyBatis,MySQL Driver等组件
  1. 接口设计
    1. REST 风格接口:REST风格接口是一种HTTP接口,使用URL代表资源,如spring.com/v1/users/345,使用HTTP方法代表动词,如GET,POST,DELETE
 

编码实现

java原生rabbitmq client实现

订单微服务

 
商家微服务发送消息
  1. 创建ConnectionFactory
  1. 获取 连接 : connectionFactory.newConnection()
  1. 创建channel
  1. 序列化消息
  1. 调用basicPublish
@Slf4j @Service public class OrderService { @Autowired private OrderDetailDao orderDetailDao; @Autowired RabbitTemplate rabbitTemplate; @Value("${rabbitmq.exchange}") public String exchangeName; @Value("${rabbitmq.restaurant-routing-key}") public String restaurantRoutingKey; @Value("${rabbitmq.deliveryman-routing-key}") public String deliverymanRoutingKey; ObjectMapper objectMapper = new ObjectMapper(); public void createOrder(OrderCreateVO orderCreateVO) throws IOException, TimeoutException { log.info("createOrder:orderCreateVO:{}", orderCreateVO); OrderDetailPO orderPO = new OrderDetailPO(); orderPO.setAddress(orderCreateVO.getAddress()); orderPO.setAccountId(orderCreateVO.getAccountId()); orderPO.setProductId(orderCreateVO.getProductId()); orderPO.setStatus(OrderStatus.ORDER_CREATING); orderPO.setDate(new Date()); orderDetailDao.insert(orderPO); OrderMessageDTO orderMessageDTO = new OrderMessageDTO(); orderMessageDTO.setOrderId(orderPO.getId()); orderMessageDTO.setProductId(orderPO.getProductId()); orderMessageDTO.setAccountId(orderCreateVO.getAccountId()); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { String messageToSend = objectMapper.writeValueAsString(orderMessageDTO); //发送订单信息给商家 channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes()); } } }
basicPublish
/** * Publish a message. * * Publishing to a non-existent exchange will result in a channel-level * protocol exception, which closes the channel. * * Invocations of <code>Channel#basicPublish</code> will eventually block if a * <a href="https://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect. * * @see com.rabbitmq.client.AMQP.Basic.Publish * @see <a href="https://www.rabbitmq.com/alarms.html">Resource-driven alarms</a> * @param exchange the exchange to publish the message to * @param routingKey the routing key * @param props other properties for the message - routing headers etc * @param body the message body * @throws java.io.IOException if an error is encountered */ void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
 
监听消费消息
public classOrderMessageService { @Value("${rabbitmq.exchange}") publicString exchangeName; @Value("${rabbitmq.deliveryman-routing-key}") publicString deliverymanRoutingKey; @Value("${rabbitmq.settlement-routing-key}") publicString settlementRoutingKey; @Value("${rabbitmq.reward-routing-key}") publicString rewardRoutingKey; @Autowired privateOrderDetailDaoorderDetailDao; ObjectMapper objectMapper =newObjectMapper(); @Async public voidhandleMessage()throwsIOException, TimeoutException, InterruptedException { } }
notion image
@Slf4j @Configuration public class RabbitConfig { @Autowired OrderMessageService orderMessageService; @Autowired public void startListenMessage() throws IOException, TimeoutException, InterruptedException { orderMessageService.handleMessage(); } }
OrderMessageService 类异步监听
@Async public void handleMessage() throws IOException, TimeoutException, InterruptedException { log.info("start linstening message"); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setHost("localhost"); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { /*---------------------restaurant---------------------*/ channel.exchangeDeclare( "exchange.order.restaurant", BuiltinExchangeType.DIRECT, true, false, null); channel.queueDeclare( "queue.order", true, false, false, null); channel.queueBind( "queue.order", "exchange.order.restaurant", "key.order"); /*---------------------deliveryman---------------------*/ channel.exchangeDeclare( "exchange.order.deliveryman", BuiltinExchangeType.DIRECT, true, false, null); channel.queueBind( "queue.order", "exchange.order.deliveryman", "key.order"); /*---------------------settlement---------------------*/ channel.exchangeDeclare( "exchange.settlement.order", BuiltinExchangeType.FANOUT, true, false, null); channel.queueBind( "queue.order", "exchange.settlement.order", "key.order"); /*---------------------reward---------------------*/ channel.exchangeDeclare( "exchange.order.reward", BuiltinExchangeType.TOPIC, true, false, null); channel.queueBind( "queue.order", "exchange.order.reward", "key.order"); channel.basicConsume("queue.order", true, deliverCallback, consumerTag -> { }); } }
回调DeliverCallback 更新订单状态
  1. 消费商家消息,更新订单状态为RESTAURANT_CONFIRMED,发送消息到骑手微服务
  1. 消费骑手消息,更新订单状态为DELIVERYMAN_CONFIRMED,发送消息到结算微服务
    DeliverCallback deliverCallback = (consumerTag, message) -> { String messageBody = new String(message.getBody()); log.info("deliverCallback:messageBody:{}", messageBody); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); try { OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody, OrderMessageDTO.class); OrderDetailPO orderPO = orderDetailDao.selectOrder(orderMessageDTO.getOrderId()); switch (orderPO.getStatus()) { case ORDER_CREATING: if (orderMessageDTO.getConfirmed() && null != orderMessageDTO.getPrice()) { orderPO.setStatus(OrderStatus.RESTAURANT_CONFIRMED); orderPO.setPrice(orderMessageDTO.getPrice()); orderDetailDao.update(orderPO); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { String messageToSend = objectMapper.writeValueAsString(orderMessageDTO); channel.basicPublish("exchange.order.deliveryman", "key.deliveryman", null, messageToSend.getBytes()); } } else { orderPO.setStatus(OrderStatus.FAILED); orderDetailDao.update(orderPO); } break; case RESTAURANT_CONFIRMED: if (null != orderMessageDTO.getDeliverymanId()) { orderPO.setStatus(OrderStatus.DELIVERYMAN_CONFIRMED); orderPO.setDeliverymanId(orderMessageDTO.getDeliverymanId()); orderDetailDao.update(orderPO); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { String messageToSend = objectMapper.writeValueAsString(orderMessageDTO); channel.basicPublish("exchange.order.settlement", "key.settlement", null, messageToSend.getBytes()); } } else { orderPO.setStatus(OrderStatus.FAILED); orderDetailDao.update(orderPO); } break; case DELIVERYMAN_CONFIRMED: if (null != orderMessageDTO.getSettlementId()) { orderPO.setStatus(OrderStatus.SETTLEMENT_CONFIRMED); orderPO.setSettlementId(orderMessageDTO.getSettlementId()); orderDetailDao.update(orderPO); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { String messageToSend = objectMapper.writeValueAsString(orderMessageDTO); channel.basicPublish("exchange.order.reward", "key.reward", null, messageToSend.getBytes()); } } else { orderPO.setStatus(OrderStatus.FAILED); orderDetailDao.update(orderPO); } break; case SETTLEMENT_CONFIRMED: if (null != orderMessageDTO.getRewardId()) { orderPO.setStatus(OrderStatus.ORDER_CREATED); orderPO.setRewardId(orderMessageDTO.getRewardId()); orderDetailDao.update(orderPO); } else { orderPO.setStatus(OrderStatus.FAILED); orderDetailDao.update(orderPO); } break; default: } } catch (JsonProcessingException | TimeoutException e) { e.printStackTrace(); } };

    商家微服务

    @Slf4j @Service public class OrderMessageService { ObjectMapper objectMapper = new ObjectMapper(); @Autowired ProductDao productDao; @Autowired RestaurantDao restaurantDao; @Async public void handleMessage() throws IOException, TimeoutException, InterruptedException { log.info("start linstening message"); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setHost("localhost"); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare( "exchange.order.restaurant", BuiltinExchangeType.DIRECT, true, false, null); channel.queueDeclare( "queue.restaurant", true, false, false, null); channel.queueBind( "queue.restaurant", "exchange.order.restaurant", "key.restaurant"); channel.basicConsume("queue.restaurant", true, deliverCallback, consumerTag -> { }); while (true) { Thread.sleep(100000); } } } DeliverCallback deliverCallback = (consumerTag, message) -> { String messageBody = new String(message.getBody()); log.info("deliverCallback:messageBody:{}", messageBody); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); try { OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody, OrderMessageDTO.class); ProductPO productPO = productDao.selsctProduct(orderMessageDTO.getProductId()); log.info("onMessage:productPO:{}", productPO); RestaurantPO restaurantPO = restaurantDao.selsctRestaurant(productPO.getRestaurantId()); log.info("onMessage:restaurantPO:{}", restaurantPO); if (ProductStatus.AVALIABIE == productPO.getStatus() && RestaurantStatus.OPEN == restaurantPO.getStatus()) { orderMessageDTO.setConfirmed(true); orderMessageDTO.setPrice(productPO.getPrice()); } else { orderMessageDTO.setConfirmed(false); } log.info("sendMessage:restaurantOrderMessageDTO:{}", orderMessageDTO); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { String messageToSend = objectMapper.writeValueAsString(orderMessageDTO); channel.basicPublish("exchange.order.restaurant", "key.order", null, messageToSend.getBytes()); } } catch (JsonProcessingException | TimeoutException e) { e.printStackTrace(); } }; }

    骑手

    @Slf4j @Service public class OrderMessageService { @Autowired DeliverymanDao deliverymanDao; ObjectMapper objectMapper = new ObjectMapper(); DeliverCallback deliverCallback = (consumerTag, message) -> { String messageBody = new String(message.getBody()); log.info("deliverCallback:messageBody:{}", messageBody); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); try { OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody, OrderMessageDTO.class); List<DeliverymanPO> deliverymanPOS = deliverymanDao.selectAvaliableDeliveryman(DeliverymanStatus.AVALIABIE); orderMessageDTO.setDeliverymanId(deliverymanPOS.get(0).getId()); log.info("onMessage:restaurantOrderMessageDTO:{}", orderMessageDTO); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { String messageToSend = objectMapper.writeValueAsString(orderMessageDTO); channel.basicPublish("exchange.order.restaurant", "key.order", null, messageToSend.getBytes()); } } catch (JsonProcessingException | TimeoutException e) { e.printStackTrace(); } }; @Async public void handleMessage() throws IOException, TimeoutException, InterruptedException { log.info("start linstening message"); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setHost("localhost"); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare( "exchange.order.deliveryman", BuiltinExchangeType.DIRECT, true, false, null); channel.queueDeclare( "queue.deliveryman", true, false, false, null); channel.queueBind( "queue.deliveryman", "exchange.order.deliveryman", "key.deliveryman"); channel.basicConsume("queue.deliveryman", true, deliverCallback, consumerTag -> { }); while (true) { Thread.sleep(100000); } } } }

    RabbitMQ 高级特性

    发送端消息确认

    消息返回机制

    消费端确认机制

    消费端限流

    消息过期机制

    死信队列

    spring boot整合