详解Redis和队列

下面由redis教程栏目给大家详解redis和队列,希望对需要的朋友有所帮助!

详解Redis和队列

概要

Redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:

详解Redis和队列

由于Redis的列表是使用双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是非常快的。

普通队列实现

所以可以直接使用Redis的List实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。简单示例如下:

存放消息端(消息生产者):

package org.yamikaze.redis.messsage.queue;  import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis;  import Java.util.concurrent.TimeUnit;  /**  * 消息生产者  * @author yamikaze */public class Producer extends Thread {      public static final String MESSAGE_KEY = "message:queue";    private Jedis jedis;    private String producerName;    private volatile int count;      public Producer(String name) {        this.producerName = name;         init();     }      private void init() {         jedis = MyJedisFactory.getLocalJedis();     }      public void putMessage(String message) {         Long size = jedis.lpush(MESSAGE_KEY, message);         System.out.println(producerName + ": 当前未被处理消息条数为:" + size);         count++;     }      public int getCount() {        return count;     }       @Override    public void run() {        try {            while (true) {                 putMessage(StringUtils.generate32Str());                 TimeUnit.SECONDS.sleep(1);             }         } catch (InterruptedException e) {           } catch (Exception e) {             e.printStackTrace();         }     }      public static void main(String[] args) throws InterruptedException{         Producer producer = new Producer("myProducer");         producer.start();          for(; ;) {             System.out.println("main : 已存储消息条数:" + producer.getCount());             TimeUnit.SECONDS.sleep(10);         }     } }

消息处理端(消息消费者):

package org.yamikaze.redis.messsage.queue;  import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis;  /**  * 消息消费者  * @author yamikaze */public class Customer extends Thread{      private String customerName;    private volatile int count;    private Jedis jedis;      public Customer(String name) {        this.customerName = name;         init();     }      private void init() {         jedis = MyJedisFactory.getLocalJedis();     }      public void processMessage() {         String message = jedis.rpop(Producer.MESSAGE_KEY);        if(message != NULL) {             count++;             handle(message);         }     }      public void handle(String message) {         System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条");     }       @Override    public void run() {        while (true) {             processMessage();         }     }      public static void main(String[] args) {         Customer customer = new Customer("yamikaze");         customer.start();     } }

貌似还不错,但上述例子中消息消费者有一个问题存在,即需要不停的调用rpop方法查看List中是否有待处理消息。每调用一次都会发起一次连接,这会造成不必要的浪费。也许你会使用Thread.sleep()等方法让消费者线程隔一段时间再消费,但这样做有两个问题:

1)、如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。

2)、如果睡眠时间过长,这样不能处理一些时效性的消息,睡眠时间过短,也会在连接上造成比较大的开销。

所以可以使用brpop指令,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回null,于是消费端可以将processMessage可以改为这样:

public void processMessage() {    /**      * brpop支持多个列表(队列)      * brpop指令是支持队列优先级的,比如这个例子中MESSAGE_KEY的优先级大于testKey(顺序决定)。      * 如果两个列表中都有元素,会优先返回优先级高的列表中的元素,所以这儿优先返回MESSAGE_KEY      * 0表示不限制等待,会一直阻塞在这儿     */     List<string> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey");    if(messages.size() != 0) {        //由于该指令可以监听多个Key,所以返回的是一个列表        //列表由2项组成,1) 列表名,2)数据         String keyName = messages.get(0);        //如果返回的是MESSAGE_KEY的消息         if(Producer.MESSAGE_KEY.equals(keyName)) {             String message = messages.get(1);             handle(message);         }       }     System.out.println("======================="); }</string>

然后可以运行Customer,清空控制台,可以看到程序没有任何输出,阻塞在了brpop这儿。然后在打开Redis的客户端,输入指令client list,可以查看当前有两个连接。

一次生产多次消费的队列

Redis除了对消息队列提供支持外,还提供了一组命令用于支持发布/订阅模式。利用Redis的pub/sub模式可以实现一次生产多次消费的队列。

1)发布
    PUBLISH指令可用于发布一条消息,格式 PUBLISH channel message

    返回值表示订阅了该消息的数量。
    2)订阅
    SUBSCRIBE指令用于接收一条消息,格式 SUBSCRIBE channel

    可以看到使用SUBSCRIBE指令后进入了订阅模式,但没有接收到publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。回复分为三种类型:
    1、如果为subscribe,第二个值表示订阅的频道,第三个值表示是第几个订阅的频道?(理解成序号?) 
    2、如果为message(消息),第二个值为产生该消息的频道,第三个值为消息
    3、如果为unsubscribe,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。

    可以使用指令UNSUBSCRIBE退订,如果不加参数,则会退订所有由SUBSCRIBE指令订阅的频道。
   
    Redis还支持基于通配符的消息订阅,使用指令PSUBSCRIBE (pattern subscribe),例如:

   再试试推送消息会得到以下结果:

   可以看到publish指令返回的是2,而订阅端这边接收了两次消息。这是因为PSUBSCRIBE指令可以重复订阅频道。而使用PSUBSCRIBE指令订阅的频道也要使用指令PUNSUBSCRIBE指令退订,该指令无法退订SUBSCRIBE订阅的频道,同理UNSUBSCRIBE也不能退订PSUBSCRIBE指令订阅的频道。同时PUNSUBSCRIBE指令通配符不会展开。
例如:PUNSUBSCRIBE * 不会匹配到 channel.*, 所以要取消订阅channel.*就要这样写PUBSUBSCRIBE channel.*。

代码示范如下:

package org.yamikaze.redis.messsage.subscribe;  import org.yamikaze.redis.messsage.queue.StringUtils;import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis;  /**  * 消息发布方  * @author yamikaze */public class Publisher {      public static final String CHANNEL_KEY = "channel:message";    private Jedis jedis;      public Publisher() {         jedis = MyJedisFactory.getLocalJedis();     }      public void publishMessage(String message) {        if(StringUtils.isBlank(message)) {            return;         }         jedis.publish(CHANNEL_KEY, message);     }      public static void main(String[] args) {         Publisher publisher = new Publisher();         publisher.publishMessage("Hello Redis!");     } }

简单的发送一个消息。

消息订阅方:

package org.yamikaze.redis.messsage.subscribe;  import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;  import java.util.concurrent.TimeUnit;  /**  * 消息订阅方客户端  * @author yamikaze */public class SubscribeClient {      private Jedis jedis;    private static final String EXIT_COMMAND = "exit";      public SubscribeClient() {         jedis = MyJedisFactory.getLocalJedis();     }      public void subscribe(String ...channel) {        if(channel == null || channel.length <p>先运行client,再运行Publisher进行消息发送,输出结果:</p><p><img src="https://img.php.cn/upload/article/000/000/020/17a7d7dbee3e472ecb977109a62f0631-1.png" alt=""></p><p><span   style="max-width:90%">Redis的pub/sub也有其缺点,那就是如果消费者下线,生产者的消息会丢失。</span></p><h2>延时队列</h2><h3>背景</h3><p>在业务发展过程中,会出现一些需要延时处理的场景,比如:</p><p>a.订单下单之后超过30分钟用户未支付,需要取消订单<br>b.订单一些评论,如果48h用户未对商家评论,系统会自动产生一条默认评论<br>c.点我达订单下单后,超过一定时间订单未派出,需要超时取消订单等。。。<br>处理这类需求,比较直接简单的方式就是定时任务轮训扫表。这种处理方式在数据量不大的场景下是完全没问题,但是当数据量大的时候高频的轮训数据库就会比较的耗资源,导致数据库的慢查或者查询超时。所以在处理这类需求时候,采用了延时队列来完成。</p><h3>几种延时队列</h3><p>延时队列就是一种带有延迟功能的消息队列。下面会介绍几种目前已有的延时队列:</p><h4>1.Java中java.util.concurrent.DelayQueue</h4><p>优点:JDK自身实现,使用方便,量小适用<br>缺点:队列消息处于jvm内存,不支持分布式运行和消息持久化</p><h4>2.rocketmq延时队列</h4><p>优点:消息持久化,分布式<br>缺点:不支持任意时间精度,只支持特定level的延时消息</p><h4>3.rabbitmq延时队列(TTL+DLX实现)</h4><p>优点:消息持久化,分布式<br>缺点:延时相同的消息必须扔在同一个队列</p><h3>Redis实现的延时消息队列适合的项目特点:</h3>
  • spring框架管理对象
  • 有消息需求,但不想维护mq中间件
  • 有使用redis
  • 对消息持久化并没有很苛刻的要求

Redis实现的延时消息队列思路

Redis由于其自身的Zset数据结构,本质就是Set结构上加了个排序的功能,除了添加数据value之外,还提供另一属性score,这一属性在添加修改元素时候可以指定,每次指定后,Zset会自动重新按新的值调整顺序。可以理解为有两列字段的数据表,一列存value,一列存顺序编号。操作中key理解为zset的名字,那么对延时队列又有何用呢?

试想如果score代表的是想要执行时间的时间戳,在某个时间将它插入Zset集合中,它变会按照时间戳大小进行排序,也就是对执行时间前后进行排序,这样的话,起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就可以达到延时执行的目的, 注意不需要遍历整个Zset集合,以免造成性能浪费。

Zset的排列效果如下图:

详解Redis和队列

java代码实现如下:

package cn.chinotan.service.delayQueueRedis;import org.apache.commons.lang3.StringUtils;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.Tuple;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import java.util.Set;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/**  * @program: test  * @description: redis实现延时队列  * @author: xingcheng  * @create: 2018-08-19  **/public class AppTest {    private static final String ADDR = "127.0.0.1";    private static final int PORT = 6379;    private static JedisPool jedisPool = new JedisPool(ADDR, PORT);    private static CountDownLatch cdl = new CountDownLatch(10);    public static Jedis getJedis() {        return jedisPool.getResource();     }    /**      * 生产者,生成5个订单     */     public void productionDelayMessage() {        for (int i = 0; i  order = jedis.zrangeWithScores("orderId", 0, 0);            if (order == null || order.isEmpty()) {                 System.out.println("当前没有等待的任务");                try {                     TimeUnit.MICROSECONDS.sleep(500);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                continue;             }             Tuple tuple = (Tuple) order.toArray()[0];            double score = tuple.getScore();             Calendar instance = Calendar.getInstance();            long nowTime = instance.getTimeInMillis() / 1000;            if (nowTime &gt;= score) {                 String element = tuple.getElement();                 Long orderId = jedis.zrem("orderId", element);                if (orderId &gt; 0) {                     System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":redis消费了一个任务:消费的订单OrderId为" + element);                 }             }         }     }    static class DelayMessage implements Runnable{         @Override        public void run() {            try {                 cdl.await();                 consumerDelayMessage();             } catch (InterruptedException e) {                 e.printStackTrace();             }         }     }         public static void main(String[] args) {         AppTest appTest = new AppTest();         appTest.productionDelayMessage();        for (int i = 0; i <p>实现效果如下:</p><p><img src="https://img.php.cn/upload/article/000/000/020/513545df87e39a5f0af8877d9b3550f2-3.jpg" alt=""    style="max-width:90%"  style="max-width:90%"></p>

© 版权声明
THE END
喜欢就支持一下吧
点赞9 分享