ome/img/nav3_on.png">
APP
系统平台
  • 建站知识
  • 联系我们
  • 咨询热线 :
    028-86922220

    疆括仕网站建设,新征程启航

    为企业提供网站建设、域名注册、服务器等服务

    SpringAOP+RabbitMQ+WebSocket的示例分析

    这篇文章将为大家详细讲解有关SpringAOP+RabbitMQ+WebSocket的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

    10年积累的成都做网站、网站制作、成都外贸网站建设经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先制作网站后付款的网站建设流程,更有定陶免费网站建设让你可以放心的选择与我们合作。

    背景

    最近公司的客户要求,分配给员工的任务除了有微信通知外,还希望PC端的网页也能实时收到通知。管理员分配任务是在我们的系统A,而员工接受任务是在系统B。两个系统都是现在已投入使用的系统。

    技术选型

    根据需求我们最终选用SpringAOP+RabbitMQ+WebSocket。

    SpringAOP可以让我们不修改原有代码,直接将原有service作为切点,加入切面。RabbitMQ可以让A系统和B系统解耦。WebSocket则可以达到实时通知的要求。

    SpringAOP+RabbitMQ+WebSocket的示例分析

    SpringAOP

    AOP称为面向切面编程,在程序开发中主要用来解决一些系统层面上的问题,比如日志,事务,权限等待。是Spring的核心模块,底层是通过动态代理来实现(动态代理将在之后的文章重点介绍)。

    基本概念

    Aspect(切面):通常是一个类,里面可以定义切入点和通知。

    JointPoint(连接点):程序执行过程中明确的点,一般是方法的调用。

    Advice(通知):AOP在特定的切入点上执行的增强处理,有before,after,afterReturning,afterThrowing,around。

    Pointcut(切入点):就是带有通知的连接点,在程序中主要体现为书写切入点表达式。

    通知类型

    Before:在目标方法被调用之前做增强处理。

    @Before只需要指定切入点表达式即可

    AfterReturning:在目标方法正常完成后做增强。

    @AfterReturning除了指定切入点表达式后,还可以指定一个返回值形参名returning,代表目标方法的返回值

    AfterThrowing:主要用来处理程序中未处理的异常。

    @AfterThrowing除了指定切入点表达式后,还可以指定一个throwing的返回值形参名,可以通过该形参名

    来访问目标方法中所抛出的异常对象

    After:在目标方法完成之后做增强,无论目标方法时候成功完成。

    @After可以指定一个切入点表达式

    Around:环绕通知,在目标方法完成前后做增强处理,环绕通知是最重要的通知类型,像事务,日志等都是环绕通知,注意编程中核心是一个ProceedingJoinPoint。

    RabbitMQ

    SpringAOP+RabbitMQ+WebSocket的示例分析

    从图中我们可以看到RabbitMQ主要的结构有:Routing、Binding、Exchange、Queue。

    Queue

    Queue(队列)RabbitMQ的作用是存储消息,队列的特性是先进先出。

    Exchange

    生产者产生的消息并不是直接发送给消息队列Queue的,而是要经过Exchange(交换器),由Exchange再将消息路由到一个或多个Queue,还会将不符合路由规则的消息丢弃。

    Routing

    用于标记或生产者寻找Exchange。

    Binding

    用于Exchange和Queue做关联。

    Exchange Type fanout

    fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。

    direct

    direct会把消息路由到那些binding key与routing key完全匹配的Queue中。

    topic

    direct规则是严格意义上的匹配,换言之Routing Key必须与Binding Key相匹配的时候才将消息传送给Queue,那么topic这个规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送。

    headers

    headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

    WebSocket

    了解websocket必须先知道几个常用的web通信技术及其区别。

    短轮询

    短轮询的基本思路就是浏览器每隔一段时间向浏览器发送http请求,服务器端在收到请求后,不论是否有数据更新,都直接进行响应。这种方式实现的即时通信,本质上还是浏览器发送请求,服务器接受请求的一个过程,通过让客户端不断的进行请求,使得客户端能够模拟实时地收到服务器端的数据的变化。

    这种方式的优点是比较简单,易于理解,实现起来也没有什么技术难点。缺点是显而易见的,这种方式由于需要不断的建立http连接,严重浪费了服务器端和客户端的资源。尤其是在客户端,距离来说,如果有数量级想对比较大的人同时位于基于短轮询的应用中,那么每一个用户的客户端都会疯狂的向服务器端发送http请求,而且不会间断。人数越多,服务器端压力越大,这是很不合理的。

    因此短轮询不适用于那些同时在线用户数量比较大,并且很注重性能的Web应用。

    长轮询/ comet

    comet指的是,当服务器收到客户端发来的请求后,不会直接进行响应,而是先将这个请求挂起,然后判断服务器端数据是否有更新。如果有更新,则进行响应,如果一直没有数据,则到达一定的时间限制(服务器端设置)后关闭连接。

    长轮询和短轮询比起来,明显减少了很多不必要的http请求次数,相比之下节约了资源。长轮询的缺点在于,连接挂起也会导致资源的浪费。

    SSE

    SSE是HTML5新增的功能,全称为Server-Sent Events。它可以允许服务推送数据到客户端。SSE在本质上就与之前的长轮询、短轮询不同,虽然都是基于http协议的,但是轮询需要客户端先发送请求。而SSE最大的特点就是不需要客户端发送请求,可以实现只要服务器端数据有更新,就可以马上发送到客户端。

    SSE的优势很明显,它不需要建立或保持大量的客户端发往服务器端的请求,节约了很多资源,提升应用性能。并且SSE的实现非常简单,不需要依赖其他插件。

    WebSocket

    WebSocket是Html5定义的一个新协议,与传统的http协议不同,该协议可以实现服务器与客户端之间全双工通信。简单来说,首先需要在客户端和服务器端建立起一个连接,这部分需要http。连接一旦建立,客户端和服务器端就处于平等的地位,可以相互发送数据,不存在请求和响应的区别。

    WebSocket的优点是实现了双向通信,缺点是服务器端的逻辑非常复杂。现在针对不同的后台语言有不同的插件可以使用。

    四种Web即时通信技术比较

    从兼容性角度考虑,短轮询>长轮询>长连接SSE>WebSocket;

    从性能方面考虑,WebSocket>长连接SSE>长轮询>短轮询。

    实战

    项目使用SpringBoot搭建。RabbitMQ的安装这里不讲述。

    RabbitMQ配置

    两个系统A、B都需要操作RabbitMQ,其中A生产消息,B消费消息。故都需要配置。

    1、首先引入RabbitMQ的dependency:

    
      org.springframework.boot
      spring-boot-starter-amqp
    

    这个dependency中包含了RabbitMQ相关dependency。

    2、在项目的配置文件里配置为使用rabbitmq及其参数。

    application-pro.yml

    #消息队列
    message.queue.type: rabbitmq
    ## rabbit mq properties
    rabbitmq:
     host: localhost
     port: 5672
     username: guest
     password: guest

    application.properties

    #将要使用的队列名
    rabbitmq.websocket.msg.queue=websocket_msg_queue

    3、创建配置文件。队列的创建交给spring。

    RabbitMQConfig.java

    @Configuration
    @EnableRabbit
    public class RabbitMQConfig {
    
      @Value("${rabbitmq.host}")
      private String host;
      @Value("${rabbitmq.port}")
      private String port;
      @Value("${rabbitmq.username}")
      private String username;
      @Value("${rabbitmq.password}")
      private String password;
      @Value("${rabbitmq.websocket.msg.queue}")
      private String webSocketMsgQueue;
    
      @Bean
      public ConnectionFactory connectionFactory() throws IOException {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUsername(username);
        factory.setPassword(password);
    //    factory.setVirtualHost("test");
        factory.setHost(host);
        factory.setPort(Integer.valueOf(port));
        factory.setPublisherConfirms(true);
    
        //设置队列参数,是否持久化、队列TTL、队列消息TTL等
        factory.createConnection().createChannel(false).queueDeclare(webSocketMsgQueue, true, false, false, null);
        return factory;
      }
    
      @Bean
      public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
      }
    
      @Bean
      @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
      // 必须是prototype类型
      public RabbitTemplate rabbitTemplate() throws IOException {
        return new RabbitTemplate(connectionFactory());
      }
    
      @Bean
      public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() throws IOException {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
      }
    }

    4、系统B中创建队列监听,当队列有消息时,发送websocket通知。

    RabbitMQListener.java

    @Component
    public class RabbitMQListener {
    
      @Autowired
      private RabbitMQService mqService;
    
      /**
       * WebSocket推送监听器
       * @param socketEntity
       * @param deliveryTag
       * @param channel
       */
      @RabbitListener(queues = "websocket_msg_queue")
      public void webSocketMsgListener(@Payload WebSocketMsgEntity socketMsgEntity, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
        mqService.handleWebSocketMsg(socketMsgEntity, deliveryTag, channel);
      }
    
    }

    RabbitMQService.java

    public class RabbitMQService {
      @Autowired
      private MessageWebSocketHandler messageWebSocketHandler;
    
      /**
       * @param socketMsgEntity
       * @param deliveryTag
       * @param channel
       * @throws IOException
       */
      void handleWebSocketMsg(WebSocketMsgEntity socketMsgEntity, long deliveryTag, Channel channel) throws IOException {
        try {
          messageWebSocketHandler.sendMessageToUsers(socketMsgEntity.toJsonString(), socketMsgEntity.getToUserIds());
          channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
          channel.basicNack(deliveryTag, false, false);
        }
      }
    }

    WebSocketMsgEntity为MQ中传送的实体。

    public class WebSocketMsgEntity implements Serializable {
      public enum OrderType{
        repair("维修"),
        maintain("保养"),
        measure("计量");
    
        OrderType(String value){
          this.value = value;
        }
        String value;
    
        public String getValue() {
          return value;
        }
      }
      //设备名称
      private String EquName;
      //设备编号
      private String EquId;
      //工单类型
      private OrderType orderType;
      //工单单号
      private String orderId;
      //工单状态
      private String orderStatus;
      //创建时间
      private Date createTime;
      //消息接收人ID
      private List toUserIds;
    
      public String getEquName() {
        return EquName;
      }
    
      public void setEquName(String equName) {
        EquName = equName;
      }
    
      public String getOrderId() {
        return orderId;
      }
    
      public void setOrderId(String orderId) {
        this.orderId = orderId;
      }
    
      public String getEquId() {
        return EquId;
      }
    
      public void setEquId(String equId) {
        EquId = equId;
      }
    
      public String getOrderStatus() {
        return orderStatus;
      }
    
      public void setOrderStatus(String orderStatus) {
        this.orderStatus = orderStatus;
      }
    
    
      public OrderType getOrderType() {
        return orderType;
      }
    
      public void setOrderType(OrderType orderType) {
        this.orderType = orderType;
      }
    
      public Date getCreateTime() {
        return createTime;
      }
    
      public void setCreateTime(Date createTime) {
        this.createTime = createTime;
      }
    
      public List getToUserIds() {
        return toUserIds;
      }
    
      public void setToUserIds(List toUserIds) {
        this.toUserIds = toUserIds;
      }
    
      public String toJsonString(){
        return JSON.toJSONString(this);
      }
    }

    SpringAOP

    1、系统A中创建一个切面类DataInterceptor.java

    @Aspect
    @Component
    public class DataInterceptor {
      @Autowired
      private MessageQueueService queueService;
    
    
      //维修工单切点
      @Pointcut("execution(* com.zhishang.hes.common.service.impl.RepairServiceImpl.executeFlow(..))")
      private void repairMsg() {
      }
    
      /**
       * 返回通知,方法执行正常返回时触发
       *
       * @param joinPoint
       * @param result
       */
      @AfterReturning(value = "repairMsg()", returning = "result")
      public void afterReturning(JoinPoint joinPoint, Object result) {
        //此处可以获得切点方法名
        //String methodName = joinPoint.getSignature().getName();
        EquipmentRepair equipmentRepair = (EquipmentRepair) result;
        WebSocketMsgEntity webSocketMsgEntity = this.generateRepairMsgEntity(equipmentRepair);
        if (webSocketMsgEntity == null) {
          return;
        }
        queueService.send(webSocketMsgEntity);
      }
    
      /**
       * 生成发送到MQ的维修消息
       *
       * @param equipmentRepair
       * @return
       */
      private WebSocketMsgEntity generateRepairMsgEntity(EquipmentRepair equipmentRepair) {
        WebSocketMsgEntity webSocketMsgEntity = generateRepairMsgFromTasks(equipmentRepair);
        return webSocketMsgEntity;
      }
    
      /**
       * 从任务中生成消息
       *
       * @param equipmentRepair
       * @return
       */
      private WebSocketMsgEntity generateRepairMsgFromTasks(EquipmentRepair equipmentRepair) {
        //业务代码略
      }
    
    }

    2、发送消息到MQ。这里只贴了发送的核心代码

    public class RabbitMessageQueue extends AbstractMessageQueue {
    
      @Value("${rabbitmq.websocket.msg.queue}")
      private String webSocketMsgQueue;
    
      @Autowired
      private RabbitTemplate rabbitTemplate;
    
      @Override
      public void send(WebSocketMsgEntity entity) {
        //没有指定exchange,则使用默认名为“”的exchange,binding名与queue名相同
        rabbitTemplate.convertAndSend(webSocketMsgQueue, entity);
      }
    }

    WebSocket

    1、 系统B中引入websocket服务端dependency

    
      org.springframework
      spring-websocket
      4.3.10.RELEASE
    

    2、 配置websocket,添加处理类

    WebSocketConfigurer.java

    @Configuration
    @EnableWebSocket
    public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {
    
      private static Logger logger = LoggerFactory.getLogger(WebSocketConfig.class);
    
      @Override
      public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        //配置webSocket路径
        registry.addHandler(messageWebSocketHandler(),"/msg-websocket").addInterceptors(new MyHandshakeInterceptor()).setAllowedOrigins("*");
        //配置webSocket路径 支持前端使用socketJs
        registry.addHandler(messageWebSocketHandler(), "/sockjs/msg-websocket").setAllowedOrigins("*").addInterceptors(new MyHandshakeInterceptor()).withSockJS();
      }
    
      @Bean
      public MessageWebSocketHandler messageWebSocketHandler() {
        logger.info("......创建MessageWebSocketHandler......");
        return new MessageWebSocketHandler();
      }
    
    }

    MessageWebSocketHandler.java 主要用于websocket连接及消息发送处理。配置中还使用了连接握手时的处理,主要是取用户登陆信息,这里不多讲述。

    public class MessageWebSocketHandler extends TextWebSocketHandler {
      private static Logger logger = LoggerFactory.getLogger(SystemWebSocketHandler.class);
      private static ConcurrentHashMap> users = new ConcurrentHashMap<>();
    
      @Override
      public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String userId = session.getAttributes().get("WEBSOCKET_USERID").toString();
        logger.info("......AfterConnectionEstablished......");
        logger.info("session.getId:" + session.getId());
        logger.info("session.getLocalAddress:" + session.getLocalAddress().toString());
        logger.info("userId:" + userId);
        //websocket连接后记录连接信息
        if (users.keySet().contains(userId)) {
          CopyOnWriteArraySet webSocketSessions = users.get(userId);
          webSocketSessions.add(session);
        } else {
          CopyOnWriteArraySet webSocketSessions = new CopyOnWriteArraySet<>();
          webSocketSessions.add(session);
          users.put(userId, webSocketSessions);
        }
      }
    
      @Override
      public void handleTransportError(WebSocketSession session, Throwable throwable) throws Exception {
        removeUserSession(session);
        if (session.isOpen()) {
          session.close();
        }
        logger.info("异常出现handleTransportError" + throwable.getMessage());
      }
    
      @Override
      public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        removeUserSession(session);
        logger.info("关闭afterConnectionClosed" + closeStatus.getReason());
      }
    
      @Override
      public boolean supportsPartialMessages() {
        return false;
      }
    
      /**
       * 给符合要求的在线用户发送消息
       *
       * @param message
       */
      public void sendMessageToUsers(String message, List userIds) throws IOException{
        if (StringUtils.isEmpty(message) || CollectionUtils.isEmpty(userIds)) {
          return;
        }
        if (users.isEmpty()) {
          return;
        }
        for (String userId : userIds) {
          if (!users.keySet().contains(userId)) {
            continue;
          }
          CopyOnWriteArraySet webSocketSessions = users.get(userId);
          if (webSocketSessions == null) {
            continue;
          }
          for (WebSocketSession webSocketSession : webSocketSessions) {
            if (webSocketSession.isOpen()) {
              try {
                webSocketSession.sendMessage(new TextMessage(message));
              } catch (IOException e) {
                logger.error(" WebSocket server send message ERROR " + e.getMessage());
                try {
                  throw e;
                } catch (IOException e1) {
                  e1.printStackTrace();
                }
              }
            }
          }
        }
      }
    
      /**
       * websocket清除连接信息
       *
       * @param session
       */
      private void removeUserSession(WebSocketSession session) {
        String userId = session.getAttributes().get("WEBSOCKET_USERID").toString();
        if (users.keySet().contains(userId)) {
          CopyOnWriteArraySet webSocketSessions = users.get(userId);
          webSocketSessions.remove(session);
          if (webSocketSessions.isEmpty()) {
            users.remove(userId);
          }
        }
      }
    }

    整个功能完成后,A系统分配任务时,系统B登陆用户收到的消息如图:

    SpringAOP+RabbitMQ+WebSocket的示例分析

    关于“SpringAOP+RabbitMQ+WebSocket的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。


    网站栏目:SpringAOP+RabbitMQ+WebSocket的示例分析
    文章转载:http://tyhkzb.com/article/ppdchi.html
    在线咨询
    服务热线
    服务热线:028-86922220
    TOP