网站内容更新机制(广告管理系统与缓存服务系统实时交互的桥梁-极客慧 )
优采云 发布时间: 2022-04-12 21:14网站内容更新机制(广告管理系统与缓存服务系统实时交互的桥梁-极客慧
)
借助缓存架构的消息中间件RabbitMQ实时更新Redis缓存实战演练一、背景介绍
前面我们花了很多时间介绍消息中间件RabbitMQ,讲了它的基本使用和可靠传输。这些对于我们的缓存架构有什么用,我们直接在上面分析一下:
要实现这部分功能,我们需要用到两个系统:
这两个独立的系统是密切相关的。一个是生产者,另一个是消费者。我们如何建立这两个系统之间的连接,如何通知您及时获取我们制作的广告?
通过RabbitMQ,我们在广告管理系统和缓存服务系统之间建立了实时交互的桥梁。
二、核心功能介绍1、广告管理系统
功能:制作广告,将制作信息实时同步到RabbitMQ
1)添加依赖
org.springframework.boot spring-boot-starter-amqp ${spring-boot.version} org.codehaus.janino janino 2.7.8 javax.mail mail 1.4.7
2)基本配置
@Configuration public class RabbitConfig { public final static String queueName = "ad_queue"; }
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
3)生产者消息确认机制
# 开启发送确认 spring.rabbitmq.publisher-confirms=true # 开启发送失败退回 spring.rabbitmq.publisher-returns=true
4)发送消息
@Component public class Sender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{ private static Map map = new ConcurrentHashMap(); private final Logger emailLogger = LoggerFactory.getLogger("emailLogger"); @Autowired private RabbitTemplate rabbitTemplate; public void send(String routingKey, String content) { this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); this.rabbitTemplate.setRoutingKey(routingKey); //这样我们就能知道,发送失败的是哪条消息了 this.rabbitTemplate.correlationConvertAndSend(content, new CorrelationData(content)); // this.rabbitTemplate.convertAndSend(routingKey, content); } /** * 确认后回调: * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { /** * 我们这里仅通过打印日志、发送邮件来预警,并没有实现自动重试机制: * 1、将发送失败重新发送到一个队列中:fail-queue,然后可以定时对这些消息进行重发 * 2、在本地定义一个缓存map对象,定时进行重发 * 3、为了更安全,可以将所有发送的消息保存到db中,并设置一个状态(是否发送成功),定时扫描检查是否存在未成功发送的信息 * 这块知识,我们后期讲"分布式事务"的时候,在深入讲解这块内容 */ emailLogger.error("send ack fail, cause = {}, correlationData = {}", cause, correlationData.getId()); } else { System.out.println("send ack success"); } } /** * 失败后return回调: * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { emailLogger.error("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey); String str = new String(message.getBody()); retrySend(str, 3); } private void retrySend(String content, int retryTime){ if(map.containsKey(content)){ int count = map.get(content); count++; map.put(content, count); } else { map.put(content, 1); } if(map.get(content) = 3) { //当有多次更新失败的时候,发送邮件通知: emailLogger.error("处理MQ[" + content + "]失败[" + retryTimes + "]次"); } try { if (retryTimes >= 5) { //当有很多次更新失败的时候,丢弃这条消息或者发送到死信队列中 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); }else { //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } } catch (Exception e){ logger.error("消息确认失败", e); } logger.info("handle msg finished = {}", content); } }
三、实战演练
代码库:
1、广告管理系统:生产者向RabbitMQ发送消息通知
场景分析:创建/更新广告时,消息是否正常发送?项目名称:spring-boot-ad
1)正常
在控制台查看RabbitMQ是否正常接收消息
2)异常2、广告缓存服务系统:consumer收到消息刷新到redis
场景分析:创建/更新广告时,消息是否正常接收?项目名称:spring-boot-rabbitmq-reliability-redis
1)正常2)异常四、注意
我们在本课中谈到的异常处理并不完美。在下一课中,我们将使用延迟队列来处理异常消息。
延迟队列应用场景也很广,下次继续关注分享
更多内容请关注:头条号:极客辉
个人网站:极客辉更多信息分享,请加群讨论:375412858