springboot整合redis


一、redis介绍

Redis是当前比较热门的NOSQL系统之一,它是一个开源的使用ANSI c语言编写的key-value存储系统(区别于MySQL的二维表格的形式存储。)。和Memcache类似,但很大程度补偿了Memcache的不足。和Memcache一样,Redis数据都是缓存在计算机内存中,不同的是,Memcache只能将数据缓存到内存中,无法自动定期写入硬盘,这就表示,一断电或重启,内存清空,数据丢失。所以Memcache的应用场景适用于缓存无需持久化的数据。而Redis不同的是它会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,实现数据的持久化。

Redis的特点:

1,Redis读取的速度是110000次/s,写的速度是81000次/s;

2,原子 。Redis的所有操作都是原子性的,同时Redis还支持对几个操作全并后的原子性执行。

3,支持多种数据结构:string(字符串);list(列表);hash(哈希),set(集合);zset(有序集合)

4,持久化,集群部署

5,支持过期时间,支持事务,消息订阅

二、springboot整合Redis

1、依赖

<!-- redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- spring2.X集成redis所需common-pool2-->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.6.0</version>
</dependency>

2、配置application.properties

spring:
  redis:
    host: 127.0.0.1   # 主 机 地 址
    port: 6379        # 端 口
    password:         # 认 证
    database: 0       # 选 择 数 据 库
    timeout: 1000     # 超时时间
    lettuce:        # 连 接 池 配 置
      pool:
        max-active: 20  # 连接池中的最大空闲连接
        max-wait: -1  # 连接池最大阻塞等待时间(使用负值表示没有限制)
        max-idle: 4   # 连接池中的最大空闲连接
        min-idle: 0   # 连接池中的最小空闲连接
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database= 0
spring.redis.timeout=1800000

spring.redis.lettuce.pool.max-active=20
spring.redis.lettuce.pool.max-wait=-1
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-idle=5
spring.redis.lettuce.pool.min-idle=0
#最小空闲
# REDIS (RedisProperties)
# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=localhost
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=8
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=0

3、redis通用配置类

作用 处理Springboot使用 RedisTemplate过程中的编码问题

2选一即可

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        GenericJackson2JsonRedisSerializer jackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
        redisTemplate.setKeySerializer(RedisSerializer.string());
        redisTemplate.setHashKeySerializer(RedisSerializer.string());
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setConnectionFactory(factory);
        return redisTemplate;
    }

    @Bean
    public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForHash();
    }

    @Bean
    public ValueOperations<String, String> valueOperations(RedisTemplate<String, String> redisTemplate) {
        return redisTemplate.opsForValue();
    }

    @Bean
    public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForList();
    }

    @Bean
    public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForSet();
    }

    @Bean
    public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForZSet();
    }
}

原理:设置其他的序列化方式使用json形式

RedisTemplate,默认序列化的时候,用的RedisTemplate里面的一个RedisSerializer对象的string方法

@EnableCaching //开启缓存
@Configuration  //配置类
public class RedisConfig extends CachingConfigurerSupport {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        //ObjectMapper 指定在转成json的时候的一些转换规则
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);

        //把自定义objectMapper设置到jackson2JsonRedisSerializer中(可以不设置,使用默认规则)
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setConnectionFactory(factory);
        //key序列化方式
        template.setKeySerializer(redisSerializer);
        //value序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //value hashmap序列化
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        return template;
    }

    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory) {
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        //解决查询缓存转换异常的问题
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置序列化(解决乱码的问题),过期时间600秒
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofSeconds(600))
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                .disableCachingNullValues();
        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                .cacheDefaults(config)
                .build();
        return cacheManager;
    }
}

三、RedisTemplate使用

1、通用:删除、失效时间、是否存在

//引入redisTemplate(需要有以上配置类处理编码问题)
@Autowired
private RedisTemplate<String,String> redisTemplate;

//删除数据
redisTemplate.delete(String key);
//批量删除
redisTemplate.delete(Collection<K> keys);

//设置失效时间
//TimeUnit取值:NANOSECONDS纳秒,MICROSECONDS微秒,MILLISECONDS毫秒
//SECONDS秒,MINUTES分,HOURS小时,DAYS天
redisTemplate.expire(K key, long timeout, TimeUnit unit)

//获取指定key的过期时间,单位为秒
redisTemplate.getExpire(K key);
//获取指定key的过期时间,并将结果转化未指定的时间单位
redisTemplate.getExpire(K key, final TimeUnit timeUnit)

//判断给定的key是否存在
redisTemplate.hasKey(K key)

2、opsFor和BoundOperations

提供了对key的“bound”(绑定)便捷化操作API,可以通过bound封装指定的key,然后进行一系列的操作而无须“显式”的再次指定Key,即BoundKeyOperations.

redisTemplate.opsForValue();//操作字符串
redisTemplate.opsForHash();//操作hash
redisTemplate.opsForList();//操作list
redisTemplate.opsForSet();//操作set
redisTemplate.opsForZSet();//操作有序set

redisTemplate.boundValueOps(K key);//操作字符串
redisTemplate.boundHashOps(K key);//操作hash
redisTemplate.boundListOps(K key);//操作list
redisTemplate.boundSetOps(K key);//操作set
redisTemplate.boundZSetOps(K key);//操作有序set

区别:boundValueOps绑定key的位置提前。

//1、通过redisTemplate设置值(单次操作)
redisTemplate.boundValueOps("StringKey").set("StringValue");
redisTemplate.boundValueOps("StringKey").set("StringValue",1, TimeUnit.MINUTES);

//2、通过BoundValueOperations设置值(多次操作,无需再次指定key)
BoundValueOperations stringKey = redisTemplate.boundValueOps("StringKey");
stringKey.set("StringVaule");
stringKey.set("StringValue",1, TimeUnit.MINUTES);

//3、通过ValueOperations设置值(多次操作,每次操作都要指定key)
ValueOperations ops = redisTemplate.opsForValue();
ops.set("StringKey", "StringVaule");
ops.set("StringValue","StringVaule",1, TimeUnit.MINUTES);

二者操作数据的方法基本相同,以下只演示opsForValue

3、操作字符串类型

//1 从redis获取key为phone的值,如果获取到直接返回
String code = redisTemplate.opsForValue().get(phone);

//2 存入数据并设置有效时间5分钟
redisTemplate.opsForValue().set(K key, V value, long timeout, TimeUnit unit);

//3 获取指定key的值
String str1 = (String) redisTemplate.boundValueOps("StringKey").get();

//4 递增,将key的value(字符串内容必须为数字)的整数值加1。
redisTemplate.opsForValue().increment(K key);
//递增指定数值
redisTemplate.opsForValue().increment(K key,long delta);
redisTemplate.opsForValue().increment(K key,double delta);

//5 递减,decrement用法同上

API

// 绑定key值,ops之后的操作都是针对该key的值
BoundValueOperations<String, Object> ops = redisTemplate.boundValueOps("TEST:REDIS:USER:NAME");
ops.set("张三",100, TimeUnit.HOURS);  // 设置值及其过期时间,不存在新增,存在覆盖
Boolean b = ops.setIfAbsent("王五");  //设置值,存在则不进行任何操作,返回false;不存在则新增键值对,返回true
Boolean b2 = ops.setIfPresent("赵六");    //修改值,如果该键不存在则不进行操作,返回是否修改
Integer i = ops.append("学java");    //字符串拼接,返回拼接后的长度
String s = ops.get(0, 3);   //获取指定key的值,可设置开始索引和结束索引
String s1 = (String)ops.getAndSet("李四");    //修改值,并返回旧值
Long size = ops.size();     //返回字符串值的长度

ops.set(10);
Long l1 = ops.decrement();  //减一,字符串必须为数字类型
Long l2 = ops.decrement(3);  //减指定数值,字符串必须为数字类型
Long l3 = ops.increment();  //加一
Long l4 = ops.increment(3);  //加指定数值
float f5 = ops.increment(2.3);    //float类型

RedisOperations<String, Object> operations = ops.getOperations();

4、操作Hash类型

值为map类型

//1 设置值,参数(大key,小key,值)
redisTemplate.opsForHash().put(H key, HK hashKey, HV value);

//2 将HashMap中所有数据存储到值中
HashMap<String, String> hashMap = new HashMap<>();
redisTemplate.opsForHash().putAll("KEY",hashMap);

//3 获取大key下所有的小key
Set<Object> keys = redisTemplate.opsForHash().keys("大key");
//4 获取大key下所有的小key的值
List<Object> values = redisTemplate.opsForHash().values("大key");

//5 获取指定大key下,指定小key的值
String value = (String)redisTemplate.opsForHash().get("大key","小key");  

//6 获取指定大key下所有键值对
Map<Object, Object> entries = redisTemplate.opsForHash().entries("大key");

//7 删除指定小key(删大key见通用)
redisTemplate.opsForHash().delete("大key", "小key1", "小key2");

//8 判断指定大key的指定小key是否存在(判断大key存在,见通用)
Boolean b = redisTemplate.opsForHash().hasKey("大key", "小key");

5、操作List类型

相当于LinkedList,可存储重复元素,特点:单键多值

//1、添加元素
redisTemplate.boundListOps("listKey").leftPush("listLeftValue1");
redisTemplate.boundListOps("listKey").rightPush("listRightValue2");

//2、将List放入缓存
ArrayList<String> list = new ArrayList<>();
redisTemplate.boundListOps("listKey").rightPushAll(list);
redisTemplate.boundListOps("listKey").leftPushAll(list);

//3、获取List缓存全部内容(开始索引,结束索引)
List listKey1 = redisTemplate.boundListOps("listKey").range(0, 10); 

//4、弹出元素
String listKey2 = (String) redisTemplate.boundListOps("listKey").leftPop();  
String listKey3 = (String) redisTemplate.boundListOps("listKey").rightPop(); 

//5、根据索引查询元素
String listKey4 = (String) redisTemplate.boundListOps("listKey").index(1);

//6、获取List缓存长度
Long size = redisTemplate.boundListOps("listKey").size();

//7、根据索引修改List中的某条数据(key,索引,值)
redisTemplate.boundListOps("listKey").set(3L,"listLeftValue3");

//8、移除N个值为value(key,移除个数,值)
redisTemplate.boundListOps("listKey").remove(3L,"value");

6、操作Set类型

不允许重复元素的集合

//1、设置值
redisTemplate.boundSetOps("setKey").add("setValue1", "setValue2", "setValue3");

//2、根据key获取Set中的所有值
Set set1 = redisTemplate.boundSetOps("setKey").members();

//3、根据value从一个set中查询,是否存在
Boolean isEmpty = redisTemplate.boundSetOps("setKey").isMember("setValue2");

//4、获取Set缓存的长度
Long size = redisTemplate.boundSetOps("setKey").size();

//5、移除指定的元素
Long result1 = redisTemplate.boundSetOps("setKey").remove("setValue1");

//6、移除指定的key
Boolean result2 = redisTemplate.delete("setKey");

7、操作ZSet类型

有序不重复集合

分数:每个元素都有,排序依据(从大到小)

//1、向集合中插入元素,并设置分数(double类型,用于排序)
redisTemplate.boundZSetOps("zSetKey").add("zSetVaule", 100D);

//2、向集合中插入多个元素,并设置分数
DefaultTypedTuple<String> p1 = new DefaultTypedTuple<>("zSetVaule1", 2.1D);
DefaultTypedTuple<String> p2 = new DefaultTypedTuple<>("zSetVaule2", 3.3D);
redisTemplate.boundZSetOps("zSetKey").add(new HashSet<>(Arrays.asList(p1,p2)));

//3、按照排名先后(从小到大)返回指定区间内的元素, -1为打印全部
Set<String> range = redisTemplate.boundZSetOps("zSetKey").range(key, 0, -1);

//4、获得指定元素的分数
Double score = redisTemplate.boundZSetOps("zSetKey").score("zSetVaule");

//5、返回集合内的成员个数
Long size = redisTemplate.boundZSetOps("zSetKey").size();

//6、返回集合内指定分数范围的成员个数(Double类型)
Long COUNT = redisTemplate.boundZSetOps("zSetKey").count(0D, 2.2D);

//7、返回集合内元素在指定分数范围内的排名(从小到大)
Set byScore = redisTemplate.boundZSetOps("zSetKey").rangeByScore(0D, 2.2D);

//8、带偏移量和个数,(key,起始分数,最大分数,偏移量,个数)
Set<String> ranking2 = redisTemplate.opsForZSet().rangeByScore("zSetKey", 0D, 2.2D 1, 3);

//9、返回集合内元素的排名,以及分数(从小到大)
Set<TypedTuple<String>> tuples = redisTemplate.boundZSetOps("zSetKey").rangeWithScores(0L, 3L);
for (TypedTuple<String> tuple : tuples) {
    System.out.println(tuple.getValue() + " : " + tuple.getScore());
}

//10、返回指定成员的排名
//从小到大
Long startRank = redisTemplate.boundZSetOps("zSetKey").rank("zSetVaule");
//从大到小
Long endRank = redisTemplate.boundZSetOps("zSetKey").reverseRank("zSetVaule");

//11、从集合中删除指定元素
redisTemplate.boundZSetOps("zSetKey").remove("zSetVaule");

//12、删除指定索引范围的元素(Long类型)
redisTemplate.boundZSetOps("zSetKey").removeRange(0L,3L);

//13、删除指定分数范围内的元素(Double类型)
redisTemplate.boundZSetOps("zSetKey").removeRangeByScorssse(0D,2.2D);

//14、为指定元素加分(Double类型)
Double score = redisTemplate.boundZSetOps("zSetKey").incrementScore("zSetVaule",1.1D);

注:如果要判断指定元素是否存在,则尝试去获取该元素的分数,如果返回空则不存在

8、GEO地理位置操作

https://www.runoob.com/redis/redis-geo.html

9、数字类型字符串加减

如果字符串不是数字会报错

//设值
redisTemplate.opsForValue().set("key", "10");

//加1
Long value = redisTemplate.opsForValue().increment("key");

//加4
Long value = redisTemplate.opsForValue().increment("key",4);

//减1
Long value = redisTemplate.opsForValue().decrement("key");

//减4
Long value = redisTemplate.opsForValue().decrement("key",4);    

//加2.3 increment是float类型
redisTemplate.opsForValue().increment("i",2.3);

四、key的命名规范

1.建议全部大写

2.key不能太长也不能太短,键名越长越占资源,太短可读性太差,长度不超过44字节

3.key 单词与单词之间以 : 分开

4.redis使用的时候注意命名空间,一个项目一个命名空间,项目内业务不同命名空间也不同。

格式

[业务名称]:[数据名称]:[id]

login:user:10

命名空间

  • 第一段放置项目名或缩写 如 project

  • 第二段把表名转换为key前缀 如, user:

  • 第三段放置用于区分区key的字段,对应mysql中的主键的列名,如userid

  • 第四段放置主键值,如18,16

结合起来 PRO:USER:UID:18 是不是很清晰

常见的设置登录token

key: PRO:USER:LOGINNAME:373166324

value:12kd-dsj5ce-d4445-h4sd472

在使用Redis进行数据缓存时,往往数据量是比较大的,若直接以普通键值对:key:value存储,就会显得比较乱,数据分类不明显,不易于查看和查找数据。

这时,我们可以采取以命名空间开头的方式存储数据,使不同类型的数据统一放到一个命名空间下,一目了然。

那么如何以命名空间分组呢?其实很简单,只用在存储数据时,键值对中的键命名以冒号分开即可:
命名空间:key。例如,vehicle:car1,vehicle:car2。

image-20210904004009164

如果使用了两个冒号,则会在命名空间下再创建一个无名称的“文件夹”,如下图:

image-20210904004034658

五、redis储存对象

1、String 结构存储 JSON

将对象转化为 JSON 字符串进行存储,较为简单直观

此处采用 hutoolJSONUtil 工具类

// redis中不存在菜单列表,从数据库中获取数据,并将List转化为json存储到redis中
List<SysMenuEntity> menuList = sysMenuService.list();
redisTemplate.opsForValue().set("key",JSONUtil.toJsonStr(menuList));
//从redis中获取集合数据,并转化为List类型
String menuJson = (String)redisTemplate.opsForValue().get("key");
JSONArray array = new JSONArray(menuJson);
List<SysMenuEntity> menuList = JSONUtil.toList(array, SysMenuEntity.class);

2、Hash 结构(推荐)

利用hash结构将对象的每个字段独立存储,可针对单个字段执行 CRUD,便于扩展,且占用内存小

KEY value(field) value(value)
REWIND:USER:1 name 张三
REWIND:USER:1 age 18
REWIND:USER:1 sex
@Autowired
private StringRedisTemplate stringRedisTemplate;

@Test
public void test3(){
    SysUserEntity sysUserEntity = new SysUserEntity();
    sysUserEntity.setUserId(1L);
    sysUserEntity.setUsername("张三");

    Map<String, Object> userMap = BeanUtil.beanToMap(sysUserEntity);

    stringRedisTemplate.boundHashOps("user:info:1").putAll(userMap);
}

六、数据类型及使用场景

1、字符串(String)

2、列表(List)

特点:单键多值,当列表中值为空时,该键就会消失,可添加一个元素到列表头或列表尾

底层:双向链表,对双端操作性能高,但是通过索引下标操作中间性能低

6、bitmap(位图)

由01组成,可用场景为:记录在一定范围内有2个状态的数据,如在一个月内是否打卡

image-20210905211418879

get、set、count

7、geo

附近的人

Redis GEO 主要用于存储地理位置信息,并对存储的信息进行操作,该功能在 Redis 3.2 版本新增。

Redis GEO 操作方法有:

  • geoadd:添加地理位置的坐标。
  • geopos:获取地理位置的坐标。
  • geodist:计算两个位置之间的距离。
  • georadius:根据用户给定的经纬度坐标来获取指定范围内的地理位置集合。
  • georadiusbymember:根据储存在位置集合里面的某个地点获取指定范围内的地理位置集合。
  • geohash:返回一个或多个位置对象的 geohash 值。

七、事务

1、基本介绍

事务定义:Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

Redis事务的主要作用就是串联多个命令防止别的命令插队。

从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。

组队的过程中可以通过discard来放弃组队。

组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消。

如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。

2、事务三大特性

  • 单独的隔离操作

    事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

  • 没有隔离级别的概念

    队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行

  • 不保证原子性

    事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

3、代码实现

(1)相关API

// 开启事务支持,在同一个 Connection 中执行命令
redisTemplate.setEnableTransactionSupport(true);

//开启事务,开始组队,但不执行
redisTemplate.multi();

// 提交事务,组队结束,进入执行阶段,开始执行redis命令,返回每次命令执行成功与否的集合
List<Object> exec = redisTemplate.exec();

// 放弃组队
redisTemplate.discard();

// 监视某个key的值是否变化,用于乐观锁
redisTemplate.watch("stock");
//取消监视
redisTemplate.unwatch();

(2)开启事务方法一

由于 enableTransactionSupport 属性的默认值是 false,导致了每一个 RedisConnection 都是重新获取的。所以,我们刚刚执行的 MULTI 和 EXEC 这两个命令不在同一个 Connection 中。

  • 设置 enableTransactionSupport 开启事务支持

解决上述问题,最简单的办法就是让 RedisTemplate 开启事务支持,即设置 enableTransactionSupport 为 true 就可以了。测试代码如下:

// 开启事务支持,在同一个 Connection 中执行命令
stringRedisTemplate.setEnableTransactionSupport(true);

stringRedisTemplate.multi();
stringRedisTemplate.opsForValue().set("name", "qinyi");
stringRedisTemplate.opsForValue().set("gender", "male");
stringRedisTemplate.opsForValue().set("age", "19");
System.out.println(stringRedisTemplate.exec());     // [true, true, true]

(3)开启事务方法二

  • 通过 SessionCallback,保证所有的操作都在同一个 Session 中完成

更常见的写法仍是采用 RedisTemplate 的默认配置,即不开启事务支持。但是,我们可以通过使用 SessionCallback,该接口保证其内部所有操作都是在同一个Session中。测试代码如下:

/**
 * 使用 SessionCallback, 在同一个 Redis Connection 中执行事务: 成功执行事务
 * */
@Test
@SuppressWarnings("all")
public void testSessionCallback() {

    SessionCallback<Object> callback = new SessionCallback<Object>() {
        @Override
        public Object execute(RedisOperations operations) throws DataAccessException {
            operations.multi();
            operations.opsForValue().set("name", "qinyi");
            operations.opsForValue().set("gender", "male");
            operations.opsForValue().set("age", "19");
            return operations.exec();
        }
    };

    // [true, true, true]
    System.out.println(stringRedisTemplate.execute(callback));
}

4、乐观锁实现

乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种check-and-set机制实现事务的。

主要有以下两个命令:

redisTemplate.watch("version");

在执行multi之前,先执行watch key1 [key2],可以监视一个(或多个) key ,如果在事务执行之前这个(**或这些) key** 被其他命令所改动,那么事务将被打断。

redisTemplate.unwatch();

取消 WATCH 命令对所有 key 的监视。

如果在执行 WATCH 命令之后,EXEC 命令或DISCARD 命令先被执行了的话,那么就不需要再执行UNWATCH 了。

案例代码

// 监视版本号
redisTemplate.watch("stock");
//开启事务,开始组队,但不执行
redisTemplate.multi();
//减库存,并未立刻执行
Long stock = redisTemplate.boundValueOps("stock").decrement();
// 提交事务,组队结束,开始执行redis命令
List<Object> exec = redisTemplate.exec();

八、发布订阅模式

Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。

Redis 客户端可以订阅任意数量的频道。

类似于 Rabbit MQ 的广播模式

1、 基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。

2、 消息发布者,即publish客户端,无需独占链接,你可以在publish消息的同时,使用同一个redis-client链接进行其他操作(例如:INCR等)

3、 消息订阅者,即subscribe客户端,需要独占链接,即进行subscribe期间,redis-client无法穿插其他操作,此时client以阻塞的方式等待“publish端”的消息;这一点很好理解,因此subscribe端需要使用单独的链接,甚至需要在额外的线程中使用。

Redis 发布/订阅应用场景

1、实时消息系统

2、即时通信,频道作为聊天室,将信息回显给订阅频道的所有人

3、订阅系统,关注系统都是 ok 的

对于复杂的场景,我们就不用考虑 redis 了,可以直接使用专业的 MQ 开源组件,例如 rabbitMQ 或者 kafka

使用 Redis 发布/订阅 需要注意的点

使用 Redis 发布/订阅是有缺陷的

1、对于消息处理可靠性要求不强

2、消费能力无需通过增加消费方进行增强

1、发布消息

redisTemplate 相关方法:

  • channel :发布订阅的通道
  • message:消息内容
public void convertAndSend(String channel, Object message)
redisTemplate.convertAndSend("testQueue", "myTestMsg");

2、订阅消息

(1)监听器

@Component
public class UserLoginMsgListener implements MessageListener {

    protected Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * 收到消息
     * @param message 消息内容
     * @param pattern 与通道匹配的模式(如果指定)-可以为空。
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] body = message.getBody();
        byte[] channel = message.getChannel();

        logger.info("topic: {} , 发布消息: {}", new String(channel), new String(body));

    }
}

(2)配置监听器

@Configuration
public class RedisPubSubConfig {

    @Autowired
    private UserLoginMsgListener userLoginMsgListener;

    @Bean
    RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        // 如果有多个监听器可以多次调用
        container.addMessageListener(userLoginMsgListener, new ChannelTopic("testQueue"));
        return container;
    }
}

九、Stream 消息队列


  目录