Redis
Redis
一、Redis数据结构及常用命令
1、通用命令
keys
查看符合模板的所有key(不建议在生产环境中使用)
DEL
删除一个指定key
exists
判断key是否存在
expire
给key设置一个有效期,到期key自动删除
ttl
查看一个key的剩余有效期:为 -2 则key已经过期,为 -1则key有效期为永久
2、String类型相关命令
2.1、简介:String类型,也就是字符串类型,是Redis中最简单的存储类型。其value是字符串,不过根据字符串的格式不同,又可以分为3类:
- string:普通字符串。
- int:整数类型,可以做自增、自减操作。
- float:浮点类型,可以做自增、自减操作。
- 不管是哪种格式,底层都是字节数组形式存储,只不过是编码方式不同。字符串类型的最大空间不能超过512m。
2.2、命令
SET:
添加或者修改已经存在的一个String类型的键值对
GET:
根据key获取String类型的value
MSET:
批量添加多个String类型的键值对
MGET:
根据多个key获取多个String类型的value
INCR:
让一个整型的key自增1
INCRBY:
让一个整型的key自增并指定步长,例如:incrby num 2让num值自增2
INCRBYFLOAT:
让一个浮点类型的数字自增并指定步长
SETNX:
添加一个String类型的键值对,前提是这个key不存在,否则不执行
SETEX:
添加一个String类型的键值对,并且指定有效期
3、Hash数据结构
3.1、简介:Hash数据结构,其数据结构为 Hash,也称散列,可简单理解为 Java 中的 HashMap。在Redis中Hash结构有 '大Key'、'小Key',顾名思义就是有 两个Key,大key中是包含多个小key,而每个小key都有自己的value。在我们修改的时候可以单独对大Key中的小Key进行修改,并不会像String类型会覆 盖全部value数据。
3.2、常用命令,ps:和String类型差不多,在前边加H即可: key为(大Key) field为(小Key)
HSET value
添加或者修改hash类型key的field的值
HGET
获取一个hash类型key的field的值
HMSET
批量添加多个hash类型key的field的值
HMGET
批量获取多个hash类型key的field的值
HGETALL
获取一个hash类型的key中的所有的field和value
HKEYS
获取一个hash类型的key中的所有的field
HVALS
获取一个hash类型的key中的所有的value
HINCRBY
让一个hash类型key的字段值自增并指定步长
HSETNX
添加一个hash类型的key的field值,前提是这个field不存在,否则不执行
4、List类型
4.1、简介:Redis中的List类型,和 Java中的 LinkedList 类似,都可以看做是双向链表,既可以支持正向检索,也支持反向检索,特征也和 LinkedList 类似:
有序
元素可以重复
插入和删除元素快
查询速度一般
可以用来储存有序数据,如:点赞列表、评论列表等
4.2、常用命令
LPUSH key element...
向列表左侧插入一个或多个元素
LPOP key
移除并返回列表左侧的第一个元素,没有则返回null
RPUSH key element...
向列表右侧插入一个或多个元素
RPOP key
移除并返回列表右侧的第一个元素
LRANGE key star end
返回一段角标范围内的所有元素
BLPOP和BRPOP
与LPOP和RPOP类似,只不过在没有元素时等待指定时间,而不是直接返回null
5、Set类型
5.1、简介:Redis中的Set类型类似于 Java 中的HashSet,可以看做Value是Null的HashMap,因为是Hash表,也具备HashSet的部分特征:
无序
元素不可重复
查找快
支持交集、并集、差集等功能
5.2、常用命令、
SADD key member...
向set中添加一个或多个元素
SREM key member...
移除set中的指定元素
SCARD key
返回set中元素的个数
SISMEMBER key member
判断一个元素是否存在于set中
SMEMBERS
获取set中的所有元素
SINTER key1 key2…
求key1与key2的交集
SDIFF key1 key2....
求key1与key2的差集
6、SortedSet类型
6.1、简介:SortedSet类型Redis的SortedSet是一个可排序的set集合,与Java中的Treeset有些类似,但底层数据结构却差别很大。SortedSet中 的每一个元 素都带有一个score属性,可以基于score属性对元素排序,底层的实现是一个跳表(SkipList)加hash表。SortedSet具备下列特性:
可排序
元素不重复
查询速度快
因为SortedSet的可排序特性,经常被用来实现排行榜这样的功能。
6.2、常用命令
命令 | 参数 | 描述 |
---|---|---|
ZADD | key score member | 添加一个或多个元素到sorted set,如果已经存在则更新其score值 |
ZREM | key member | 删除sorted set中的一个指定元素 |
ZSCORE | key member | 获取sorted set中的指定元素的score值 |
ZRANK | key member | 获取sorted set中的指定元素的排名 |
ZCARD | key | 获取sorted set中的元素个数 |
ZCOUNT | key min max | 统计score值在给定范围内的所有元素的个数 |
ZINCRBY | key increment member | 让sorted set中的指定元素自增,步长为指定的increment值 |
ZRANGE | key min max | 按照score排序后,获取指定排名范围内的元素 |
ZRANGEBYSCORE | key min max | 按照score排序后,获取指定score范围内的元素 |
ZDIFF、ZINTER、ZUNION | 求差集、交集、并集 |
7、Stream数据类型
简介:Stream是Redis5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
用Stream做消息队列的优缺点
优点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
缺点:
- 有消息漏读的风险
8、GEO数据结构
GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮 助我们根据经纬度来检索数据。(底层还是SortedSet 数据类型,是将经纬度通过某种算法转换为字符串作为score存储)
常见的命令有:
-
GEOADD:
-
添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
-
-
GEODIST:
-
计算指定的两个点之间的距离并返回
-
-
GEOHASH:
-
将指定member的坐标转为hash字符串形式并返回
-
-
GEOPOS:
-
返回指定member的坐标
-
-
GEORADIUS:
-
指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.2以后已废弃
-
-
GEOSEARCH:
-
在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能
-
-
GEOSEARCHSTORE:
-
与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。6.2.新功能
-
二、Redis常用客户端
SpringDataRedis :在SpringDataRedis中整合了下面两种
jedis
以Redis命令作为方法名称,学习成本低,简单实用。但是Jedis实例是线程不安全的,多线程环境下需要基于连接池来使用
lettuce
Lettuce是基于Netty实现的,支持同步、异步和响应式编程方式,并且是线程安全的。支持Redis的哨兵模式、集群模式和管道模式。
Redisson
Redisson是一个基于Redis实现的分布式、可伸缩的Java数据结构集合。包含了诸如Map、Queue、Lock、Semaphore、AtomicLong等强大功能
1、Jedis 快速入门
-
引入依赖
<!--jedis--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.7.0</version> </dependency> <!--单元测试--> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency>
-
建立连接
新建一个单元测试类,内容如下:
private Jedis jedis; @BeforeEach void setUp() { // 1.建立连接 // jedis = new Jedis("192.168.150.101", 6379); jedis = JedisConnectionFactory.getJedis(); // 2.设置密码 jedis.auth("123321"); // 3.选择库 jedis.select(0); }
3.测试:
@Test void testString() { // 存入数据 String result = jedis.set("name", "虎哥"); System.out.println("result = " + result); // 获取数据 String name = jedis.get("name"); System.out.println("name = " + name); } @Test void testHash() { // 插入hash数据 jedis.hset("user:1", "name", "Jack"); jedis.hset("user:1", "age", "21"); // 获取 Map<String, String> map = jedis.hgetAll("user:1"); System.out.println(map); }
4、释放资源
@AfterEach void tearDown() { if (jedis != null) { jedis.close(); } }
2、 Jedis连接池
Jedis本身是线程不安全的,并且频繁的创建和销毁连接会有性能损耗,因此我们推荐大家使用Jedis连接池代替Jedis的直连方式
有关池化思想,并不仅仅是这里会使用,很多地方都有,比如说我们的数据库连接池,比如我们tomcat中的线程池,这些都是池化思想的体现。
2.1.创建Jedis的连接池
public class JedisConnectionFacotry {
private static final JedisPool jedisPool;
static {
//配置连接池
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(8);
poolConfig.setMaxIdle(8);
poolConfig.setMinIdle(0);
poolConfig.setMaxWaitMillis(1000);
//创建连接池对象
jedisPool = new JedisPool(poolConfig,
"192.168.150.101",6379,1000,"123321");
}
public static Jedis getJedis(){
return jedisPool.getResource();
}
}
代码说明:
-
JedisConnectionFacotry:工厂设计模式是实际开发中非常常用的一种设计模式,我们可以使用工厂,去降低代的耦合,比如Spring中的Bean的创建,就用到了工厂设计模式
-
静态代码块:随着类的加载而加载,确保只能执行一次,我们在加载当前工厂类的时候,就可以执行static的操作完成对 连接池的初始化
-
最后提供返回连接池中连接的方法.
3、SpringDataRedis
SpringData是Spring中数据操作的模块,包含对各种数据库的集成,其中对Redis的集成模块就叫做SpringDataRedis
官网地址:https://spring.io/projects/spring-data-redis
-
提供了对不同Redis客户端的整合(Lettuce和Jedis)
-
提供了RedisTemplate统一API来操作Redis
-
支持Redis的发布订阅模型
-
支持Redis哨兵和Redis集群
-
支持基于Lettuce的响应式编程
-
支持基于JDK、JSON、字符串、Spring对象的数据序列化及反序列化
-
支持基于Redis的JDKCollection实现
SpringDataRedis中提供了RedisTemplate工具类,其中封装了各种对Redis的操作。并且将不同数据类型的操作API封装到了不同的类型中:
![](https://blog-1312110814.cos.ap-shanghai.myqcloud.com/images/2022-10-29 01%3A10%3A391666976859647.png)
引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.7</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.heima</groupId>
<artifactId>redis-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>redis-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--redis依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--common-pool-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--Jackson依赖-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
配置文件
spring:
redis:
host: 192.168.150.101
port: 6379
password: 123321
lettuce:
pool:
max-active: 8 #最大连接
max-idle: 8 #最大空闲连接
min-idle: 0 #最小空闲连接
max-wait: 100ms #连接等待时间
4、RedisTemplate两种序列化实践方案:
-
方案一:
-
自定义RedisTemplate
-
修改RedisTemplate的序列化器为GenericJackson2JsonRedisSerializer
-
-
方案二:(推荐使用)
- 使用StringRedisTemplate
- 写入Redis时,手动把对象序列化为JSON
- 读取Redis时,手动把读取到的JSON反序列化为对象
三、缓存更新策略
业务场景
低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存
高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存
-
读操作:
- 缓存命中则直接返回
- 缓存未命中则查询数据库,并写入缓存,设定超时时间
-
写操作:
- 先写数据库,然后再删除缓存
- 要确保数据库与缓存操作的原子性
四、缓存穿透、雪崩、击穿
1、缓存穿透
- 提示:布隆过滤器是指将数据库中某种数据的标识以byte二进制位的数据进行存储
2、缓存雪崩
3、缓存击穿
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
原因:
解决方案:
-
不同的解决方案:
解决思路:
互斥锁:多个线程在并行运行中,只能有一个线程成功,其余线程失败,而成功或失败的逻辑都要我们自己去写,所不能使用 synchronized、Lock 锁来完成。
可以采用Redis中String类型 的 setnx来完成
setnx:类似于set命令,但不同的是若操作时key已经有值,不会成功,在java中API为:
-
stringRedisTemplate.opsForValue().setIfAbsent(K key, V value, long timeout, TimeUnit unit) //建议设置存活时间很短,并且,在代码逻辑方面使用 try catch finally 来释放锁(删除key) try { ..... } catch (Exception e) { throw new RuntimeException(e); } finally { stringRedisTemplate.delete(key); }
-
逻辑过期:
五、秒杀业务
1、全局Id生成,采用Redis形式
全局id需要满足以下特征:
- 唯一性
- 高可用
- 高性能
- 递增性
- 安全性
- 可以拼接一些其他的信息:
- 符号位:1bit,永远为0,永远为正数
- 时间戳:31bit,以秒为单位,可以用69年
- 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
- 可以拼接一些其他的信息:
2、秒杀优惠券业务
在本业务中优惠券分为两种:
-
普通优惠券(库存多,没有秒杀时间)
-
秒杀优惠券(有固定的优惠券,并且有开始抢购时间,和结束抢购时间)[并发高]
-
表的设计为:
- 两张表,一张为优惠卷表(包含普通优惠券,秒杀优惠券),秒杀优惠卷表(也可以任务是优惠券的分表,字段有秒杀优惠券对应的字段【开始抢购时间,和结束抢购时间....等】)
基础业务逻辑:
- 下单是需要判断是否到达开始时间
- 判断是否还有库存
4、超卖问题
- 在并发的情况下会产生线程安全问题,具体如下图所示:
- 线程安全:多个线程对某个数据进行写操作
解决方案:
悲观锁:
- 认为线程安全一定会出现,采用获取锁的方式让让线程串行执行。
- 如:synchronized、Lock,都属于悲观锁。
乐观锁:
- 顾名思义就是非常乐观,因此认为线程安全不一定会发生,因此不会加锁,只会在更新数据的时候,判断其他线程有没有对数据进行了修改。
- 实现方案:
- 版本号机制:在数据库表字段中,添加 version 字段,每当一个线程对数据更新时,就让 version 加 1,而其他线程来更新数据时就会先判断自己获取的 version 值和 数据库中的是否一致,若一致,则修改,不一致,则自旋。
- CAS机制:
- 如:上问题所示:我们可以使用库存字段当做 version 来使用
3、一人一单
每个用户只能对秒杀业务中的商品下一单
- 实现思路:
- ==有线程安全问题,待解决:==
若采用集群模式的话,那 jdk自带的锁就无法完成,需要使用分布式锁:
-
单体架构:使用 synchronized 锁来解决多个线程共享同一个数据是可以解决的。
-
- 原理:synchronized 底层在JVM中会创建一个 锁监听器 来记录 多个线程的执行,当第一个个线程过来时,会拿到线程的名字,进行记录,此时如果其他线程来执行,若锁监听器中有线程,则其他线程阻塞,指定 锁监听器中的线程得到释放。
-
分布式架构:synchronized 无法解决,因为用的是不同的JVM 锁监听器也不同、导致多个线程并行运行。
- 解决方案:必须要在分布式系统中多个线程使用同一把锁、跨JVM锁、跨进程的锁。
- **分布式锁 :**满足分布式系统或集群模式下多进程可见并且互斥的锁。
- 为什么使用Redis锁呢? Redis 性能好,高并发,安全性,高可用(不容易宕机)。
- setnx:往Redis中set数据,只有数据不存在时,才能set,如果已经存在就会set失败。
- 基于setnx
==注意,在释放锁的时候一定要进行判断,防止线程一 删掉线程二 的锁==
/**
* @ClassName: RedisSetNxLock
* @author: anqin
* @Description:
* @date: 2022/11/6 0:52
*/
public class RedisSetNxLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private static final String NAME_PREFIX = "Lock:";
private final String ID_PREFIX = java.util.UUID.randomUUID().toString().replace("-", "") + "-";
private final String name;
public RedisSetNxLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
@Override
public boolean tryLock(Long milliseconds) {
String id = ID_PREFIX + Thread.currentThread().getId();
// set NAME_PREFIX + name id NX EX
Boolean bool = stringRedisTemplate.opsForValue()
.setIfAbsent(NAME_PREFIX + name, id, milliseconds, TimeUnit.MILLISECONDS);
// 因为要拆箱,防止出现空指针,使用常量判断
return Boolean.TRUE.equals(bool);
}
@Override
public void unLock() {
// 获取线程表示
String id = ID_PREFIX + Thread.currentThread().getId();
// 查询Redis,相同才可以删除,防止线程一 删掉线程二 的锁
String uid = stringRedisTemplate.opsForValue().get(NAME_PREFIX + name);
if(id.equals(uid)){
stringRedisTemplate.delete(NAME_PREFIX + name);
}
}
}
4、Redisson 分布式锁
- 基于NIO的Netty框架,生产环境使用分布式锁
- redisson加锁:lua脚本加锁(其他客户端自旋)
- 自动延时机制:启动watch dog,后台线程,每隔10秒检查一下客户端1还持有锁key,会不断的延长锁key的生存时间
- 可重入锁机制:第二个if判断 ,myLock :{“8743c9c0-0795-4907-87fd-6c719a6b4586:1”:2 }
- 释放锁:无锁直接返回;有锁不是我加的,返回;有锁是我加的,执行hincrby -1,当重入锁减完才执行del操作
- Redis使用同一个Lua解释器来执行所有命令,Redis保证以一种原子性的方式来执行脚本:当lua脚本在执行的时候,不会有其他脚本和命令同时执行,这种语义类似于 MULTI/EXEC。从别的客户端的视角来看,一个lua脚本要么不可见,要么已经执行完
Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。 Redisson在基于NIO的Netty框架上,生产环境使用分布式锁。
加入jar包的依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.7.0</version>
</dependency>
配置Redisson
public class RedissonManager {
private static Config config = new Config();
//声明redisso对象
private static Redisson redisson = null;
//实例化redisson
static{
config.useClusterServers()
// 集群状态扫描间隔时间,单位是毫秒
.setScanInterval(2000)
//cluster方式至少6个节点(3主3从,3主做sharding,3从用来保证主宕机后可以高可用)
.addNodeAddress("redis://127.0.0.1:6379" )
.addNodeAddress("redis://127.0.0.1:6380")
.addNodeAddress("redis://127.0.0.1:6381")
.addNodeAddress("redis://127.0.0.1:6382")
.addNodeAddress("redis://127.0.0.1:6383")
.addNodeAddress("redis://127.0.0.1:6384");
//得到redisson对象
redisson = (Redisson) Redisson.create(config);
}
//获取redisson对象的方法
public static Redisson getRedisson(){
return redisson;
}
}
锁的获取和释放
public class DistributedRedisLock {
//从配置类中获取redisson对象
private static Redisson redisson = RedissonManager.getRedisson();
private static final String LOCK_TITLE = "redisLock_";
//加锁
public static boolean acquire(String lockName){
//声明key对象
String key = LOCK_TITLE + lockName;
//获取锁对象
RLock mylock = redisson.getLock(key);
//加锁,并且设置锁过期时间3秒,防止死锁的产生 uuid+threadId
mylock.lock(2,3,TimeUtil.SECOND);
//加锁成功
return true;
}
//锁的释放
public static void release(String lockName){
//必须是和加锁时的同一个key
String key = LOCK_TITLE + lockName;
//获取所对象
RLock mylock = redisson.getLock(key);
//释放锁(解锁)
mylock.unlock();
业务逻辑中使用分布式锁
public String discount() throws IOException{
String key = "lock001";
//加锁
DistributedRedisLock.acquire(key);
//执行具体业务逻辑
dosoming
//释放锁
DistributedRedisLock.release(key);
//返回结果
return soming;
}
1 、加锁机制
- 如果该客户端面对的是一个redis cluster集群,他首先会根据hash节点选择一台机器。
- 发送lua脚本到redis服务器上,脚本如下:
//exists',KEYS[1])==0 不存在,没锁
"if (redis.call('exists',KEYS[1])==0) then "+ --看有没有锁
// 命令:hset,1:第一回
"redis.call('hset',KEYS[1],ARGV[2],1) ; "+ --无锁 加锁
// 配置锁的生命周期
"redis.call('pexpire',KEYS[1],ARGV[1]) ; "+
"return nil; end ;" +
//可重入操作,判断是不是我加的锁
"if (redis.call('hexists',KEYS[1],ARGV[2]) ==1 ) then "+ --我加的锁
//hincrby 在原来的锁上加1
"redis.call('hincrby',KEYS[1],ARGV[2],1) ; "+ --重入锁
"redis.call('pexpire',KEYS[1],ARGV[1]) ; "+
"return nil; end ;" +
//否则,锁存在,返回锁的有效期,决定下次执行脚本时间
"return redis.call('pttl',KEYS[1]) ;" --不能加锁,返回锁的时间
lua的作用:保证这段复杂业务逻辑执行的原子性。
lua的解释:
- KEYS[1]) : 加锁的key
- ARGV[1] : key的生存时间,默认为30秒
- ARGV[2] : 加锁的客户端ID (UUID.randomUUID()) + “:” + threadId)
第一段if判断语句,就是用“exists myLock”命令判断一下,如果你要加锁的那个锁key不存在的话,你就进行加锁。 如何加锁呢?很简单,用下面的命令:
- hset myLock 8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
通过这个命令设置一个hash数据结构,这行命令执行后,会出现一个类似下面的数据结构:
- myLock :{“8743c9c0-0795-4907-87fd-6c719a6b4586:1”:1 } 上述就代表“8743c9c0-0795-4907-87fd-6c719a6b4586:1”这个客户端对“myLock”这个锁key完成了加锁。
接着会执行“pexpire myLock 30000”命令,设置myLock这个锁key的生存时间是30秒。
锁互斥机制
那么在这个时候,如果客户端2来尝试加锁,执行了同样的一段lua脚本,会咋样呢? 很简单,第一个if判断会执行“exists myLock”,发现myLock这个锁key已经存在了。 接着第二个if判断,判断一下,myLock锁key的hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。
所以,客户端2会获取到pttl myLock返回的一个数字,这个数字代表了myLock这个锁key的剩余生存时间。比如还剩15000毫秒的生存时间。
此时客户端2会进入一个while循环,不停的尝试加锁。
自动延时机制
只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。
可重入锁机制
第一个if判断 肯定不成立,“exists myLock”会显示锁key已经存在了。
第二个if判断 会成立,因为myLock的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”
此时就会执行可重入加锁的逻辑,他会用:incrby myLock 8743c9c0-0795-4907-87fd-6c71a6b4586:1 1
通过这个命令,对客户端1的加锁次数,累加1。数据结构会变成:myLock :{“8743c9c0-0795-4907-87fd-6c719a6b4586:1”:2 }
2、 释放锁机制
执行lua脚本如下:
# 如果key已经不存在,说明已经被解锁,直接发布(publish)redis消息(无锁,直接返回)
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
# key和field不匹配,说明当前客户端线程没有持有锁,不能主动解锁。 不是我加的锁 不能解锁 (有锁不是我加的,返回)
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
# 将value减1 (有锁是我加的,进行hincrby -1 )
"local counter = redis.call('hincrby', KEYS[1], ARGV[3],-1); " +
# 如果counter>0说明锁在重入,不能删除key
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
# 删除key并且publish 解锁消息
# 可重入锁减完了,进行del操作
"else " +
"redis.call('del', KEYS[1]); " + #删除锁
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
- – KEYS[1] :需要加锁的key,这里需要是字符串类型。
- – KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName: “redisson_lockchannel{” + getName() + “}”
- – ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合 redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。
- – ARGV[2] :锁的超时时间,防止死锁
- – ARGV[3] :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) + “:” + threadId
如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。
其实说白了,就是每次都对myLock数据结构中的那个加锁次数减1。
如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:
- “del myLock”命令,从redis里删除这个key。
- 然后呢,另外的客户端2就可以尝试完成加锁了。
分布式锁特性
- 互斥性 任意时刻,只能有一个客户端获取锁,不能同时有两个客户端获取到锁。
- 同一性 锁只能被持有该锁的客户端删除,不能由其它客户端删除。
- 可重入性 持有某个锁的客户端可继续对该锁加锁,实现锁的续租
- 容错性 锁失效后(超过生命周期)自动释放锁(key失效),其他客户端可以继续获得该锁,防止死锁
分布式锁的实际应用
- 数据并发竞争 利用分布式锁可以将处理串行化,前面已经讲过了。
- 防止库存超卖 订单1下单前会先查看库存,库存为10,所以下单5本可以成功; 订单2下单前会先查看库存,库存为10,所以下单8本可以成功; 订单1和订单2 同时操作,共下单13本,但库存只有10本,显然库存不够了,这种情况称为库存超卖。 可以采用分布式锁解决这个问题。
- ) 订单1和订单2都从Redis中获得分布式锁(setnx),谁能获得锁谁进行下单操作,这样就把订单系统下单的顺序串行化了,就不会出现超卖的情况了。伪码如下:
//加锁并设置有效期
if(redis.lock("RDL",200)){
//判断库存
if (orderNum<getCount()){
//加锁成功 ,可以下单
order(5);
//释放锁
redis,unlock("RDL");
}
}
注意此种方法会降低处理效率,这样不适合秒杀的场景,秒杀可以使用CAS和Redis队列的方式。
5、Redis秒杀优化(提高并发性能)
由上文可见:目前我们也业务也已经差不多完成,但是由于性能低下,不适于秒杀的场景。
在整个业务中我们的线程都是串行执行的,而且我们的读写操作都是直接在数据完成的,导致性能下降。
我们可以这样优化,如下图所示:将我们需要判断的逻辑在Redis中执行,而其余的读写操作使用异步的方式(在开一个线程)去执行。
将数据库中优惠券数据同步到Redis中(可以是String数据结构,如set id 库存)
在Redis中判断一人一单,可以使用 set数据结构(key不能重复,value可以有多个值)
场景:当用户抢购时,我们对Redis中库存预减一,在判断一人一单时,将用户的id存入set数据结构的value中。
如图所示:我们第一个线程执行完后可以直接判断出该用户有没有资格进行抢购,而异步操作只需判断即可。
为了保证我们的第一个线程原子性,可以使用lua脚本实现,如图:
具体需求如下:
新增秒杀优惠券的同时,将优惠券信息保存到Redis中。
基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功。
--- --- Generated by EmmyLua(https://github.com/EmmyLua) --- Created by Administrator. --- DateTime: 2022/11/8 1:43 --- -- 优惠券id local orderId = ARGV[1] -- 用户id local userId = ARGV[2] -- 库存key local stockKey = "seckill:stock" .. orderId -- 订单key local orderKey = "seckill:stock" .. userId -- 业务判断 -- 判断库存是否充足 tonumber: 将字符串转为 数字类型 if(tonumber(redis.call('get',stockKey)) <= 0)then return 2 end -- 判断用户是否已经下过单 sismember orderKey userId 判断orderKey KEY 中有么有 userId if(redis.call('sismember',orderKey,userId) == 1)then -- 存在userId,该用户以下过单 return 1 end -- 扣库存 incrby stockKey -1 ,incrby 是增的意思,增-1就是 减一 redis.call('incrby',stockKey, -1) -- 下订单,添加用户id 到 set数据结构中 redis.call('sadd',orderKey,userId) return 0
如果抢购成功,将优惠券id和用户id封装后存入阻塞队列。
private static final DefaultRedisScript<Long> REDIS_SCRIPT; /* * 加载lua脚本 */ static { REDIS_SCRIPT = new DefaultRedisScript<>(); REDIS_SCRIPT.setLocation(new ClassPathResource("seckillVoucher.lua")); // 在 resources路径下 REDIS_SCRIPT.setResultType(Long.class); } @Override public Result<?> seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); // 执行lua脚本 Long result = stringRedisTemplate.execute(REDIS_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString()); // 返回结果 为1:库存不足,2:不能重复下单,0:进入阻塞队列执行异步操作 int i = result.intValue(); if(i != 0){ return Result.fail(i == 2 ? "库存不足!" : "每个用户只能抢购一次!"); } // TODO 保存阻塞队列 Long orderId = redisIdAlgorithm.getId("order"); return Result.ok(orderId); }
开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能。
基于Redis有三种消息队列模式
消息队列模式 优点 缺点 List 1、利用Redis存储,不受限于JVM内存上限
2、基于Redis的持久化机制,数据安全性有保证
3、可以满足消息有序性1、无法避免消息丢失
2、只支持单消费者pubSub 1、采用发布订阅模型,支持多生产、多消费 1、不支持数据持久化
2、无法避免消息丢失
3、消息堆积有上限,超出时数据丢失Stream(数据类型) 1、消息可回溯
2、一个消息可以被多个消费者读取
3、可以阻塞读取1、有消息漏读的风险
==TODO== 未完成
六、点赞功能
-
需求:
- 每个用户只能对一个博客点一次赞,再次点赞则是取消赞。
- 如果当前用户已经点赞,则点赞按钮高亮显示(前端实现,判断Blog类中isLike属性)。
- 每个用户只能对一个博客点一次赞,再次点赞则是取消赞。
-
实现思路:
-
Redis数据结构选择:因为前端需要显示某个博客的点赞用户排序(根据点赞时间升序)‘
-
由上图可见,我们的需求可以使用 SortedSet 类型(该类型有score属性可做排序)
-
利用Redis的SortedSet(ZSet) 集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-1,代码如下
/** * 根据 key 和 value获取 score * * @param key key * @param value value * @return 若key不存在则返回true,反之false */ public boolean isScore(String key, String value) { Double score = stringRedisTemplate.opsForZSet().score(key, value); return null == score; }
-
给Blog类中添加一个isLike字段,标示是否被当前用户点赞。
-
修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值给isLike字段
-
修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段
-
七、好友关注
(做的比较简单)
指用户关注博客作者,在数据中有博客和用户id的关系表:
关注和取消关注
- 则添加用户和该博客的关系,取消关注则删除该关系即可(同时添加到Redis中 使用Set 类型,key:userid ,value:目标用户id)。
共同关注查询
- 因为我们之前将关注信息是存在Redis的Set数据类型中而 Set类型有 intersect 命令,也就是求两个key的交集,可以直接查出数据。
推送到粉丝收件箱
关注推送也叫做Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。
-
传统模式:
- 类似于百度搜索,用户需要自己寻找自己想要的东西。
-
Feed模式:
-
类似于抖音、快手、今日头条,用户只需要下拉搜索 系统就可以根据算法将用户感兴趣的文章或视频推给用户。
-
Feed流有常见的两种模式:
Timeline:
- 不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
- 优点:信息全面,不会有缺失。并且实现也相对简单
- 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
- 不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
智能排序:
- 利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用
- 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
- 缺点:如果算法不精准,可能起到反作用
- 利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用
-
本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种:
- 拉模式:也叫读扩散
- 推模式:也叫读扩散
- 推拉结合:也叫做读写混合,兼具推和拉两种模式的优点。
- 总结:
- 在本案例中 由于是点评项目(并不会出现某种大V)所有使用的是推模式。
- 拉模式:也叫读扩散
-
推送到粉丝收件箱业务功能比较简单:
- 只需要在发布博客后查询出当前用户的所有粉丝
- 使用SortedSet数据类型 key 为 粉丝的id,value为博客的id,score 为 时间戳即可,最后返回博客的id。
- 为什么使用SortedSet 而不使用List呢?
- List使用角标来排序,SortedSet使用score来排序
- 若使用List做分页的时候别的线程来插入数据,可能会造成数据多次展示。
- 而SortedSet做分页为滚动分页,所以不会产生上述情况。
- 为什么使用SortedSet 而不使用List呢?
-
滚动分页
- 使用Redis SortedSet类型中的 ZREVRANGEBYSCORE 来完成 (我们的score存的是时间戳,所以一下示列为时间戳)
- **key:**指定key。
- max:若是第一次查询,则是 当前的时间戳(最大时间)| 若不是第一次查询 则是 上一次查询的最小时间戳。
- **min:**0 (最小时间戳,不可能为负)。
- offset:若是第一次查询,则为0 | 若不是第一次查询 则是 在上一次的结果中,与最小值一样的元素个数。
- count: 与前端统一 (每页显示的条数)。
本项目中实现该功能代码如下:
/**
* 实现滚动分页查询
*
* @param max 0 (最小时间戳,不可能为负)
* @param offset 若是第一次查询,则为0 | 若不是第一次查询 则是 在上一次的结果中,与最小值一样的元素个数。
* @return BlogOfFollowDTO
*/
@Override
public Result<?> blogOfFollow(Long max, Integer offset) {
// 获取当前用户
UserDTO user = UserHolder.getUser();
Long uId = user.getId();
if (user == null || uId == null) {
return Result.fail("请先登入!");
}
String key = FEED_KEY + uId;
/*
* key 指定key。
* min 若是第一次查询,则是 当前的时间戳(最大时间)| 若不是第一次查询 则是 上一次查询的最小时间戳
* max 0 (最小时间戳,不可能为负)
* offset 若是第一次查询,则为0 | 若不是第一次查询 则是 在上一次的结果中,与最小值一样的元素个数。
* count 与前端统一 (每页显示的条数)
*/
// 滚动分页查询
Set<ZSetOperations.TypedTuple<String>> typedTuples = redisUtil
.zReverseRangeByScoreWithScores(key, 0, max, offset, 3);
if (typedTuples == null || typedTuples.isEmpty()) {
return Result.ok(Collections.emptyList());
}
List<String> ids = new ArrayList<>(typedTuples.size());
// 获取最后一个值的分数,也就是for 循环的最后一次
long score = 0;
// 获取 offset
int off = 1;
for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {
// 博客id
ids.add(tuple.getValue());
long sco = tuple.getScore().intValue();
if (sco == sco) {
off++;
} else {
score = sco;
off = 1;
}
}
// 根据id 查询Blog
List<Long> idLongs = ids.stream().map(Long::valueOf).collect(Collectors.toList());
String strIds = StrUtil.join(",", idLongs);
// FIELD : 自定义排序
List<Blog> blogs = this.query()
.in("id", idLongs).last("ORDER BY FIELD(id," + strIds + ")").list();
blogs.forEach(blog -> {
this.queryBlogUser(blog);
this.isBlogs(blog);
});
BlogOfFollowDTO dto = BlogOfFollowDTO.builder().list(blogs).offset(off).minTime(score).build();
return Result.ok(dto);
}
八、附近商家
Redis数据结构选择参考 第一章节的Redis数据结构和常用命令中的GEO数据结构
九、用户签到功能(bitMap)
我们按月来统计用户签到信息,签到记录为1,未签到则记录为0
Redis中是利用string类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是2^32个bit位。
BitMap的操作命令有:
- SETBIT:向指定位置(offset)存入一个0或1
- GETBIT:获取指定位置(offset)的bit值
- BITCOUNT:统计BitMap中值为1的bit位的数量
- BITFIELD:操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
- BITFIELD_RO:获取BitMap中bit数组,并以十进制形式返回
- BITOP:将多个BitMap的结果做位运算(与、或、异或)
- BITPOS:查找bit数组中指定范围内第一个0或1出现的位置
十、UV PV 统计
- UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
- PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。
UV 采用Redis HypreLoglog
Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。相关算法
原理大家可以参考:https://juejin.cn/post/684490378574405633#heading-0
Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb,内存占用低的令人发指!作为代价,其测量结果 是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略。
单个Redis会出现的问题
十一、Redis 持久化机制
RDB
RDB全称Redis Database Backup file(Redis数据备份文件),也被叫做Redis数据快照。简单来说就是把内存中的所 有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。
命令:save bgsave
- RDB方式bgsave的基本流程?
- fork主进程得到一个子进程,共享内存空间
- 子进程读取内存数据并写入新的RDB文件
- 用新RDB文件替换旧的RDB文件。
- RDB会在什么时候执行?save 60 1000代表什么含义?
- 默认是服务停止时。
- 代表60秒内至少执行1000次修改则触发RDB
- RDB的缺点?
- RDB执行间隔时间长,两次RDB之间写入数据有丢失的风险
- fork子进程、压缩、写出RDB文件都比较耗时
AOF
AOF全称为Append Only File(追加文件)。Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文 件。
-
AOF默认是关闭的,需要修改Redis.conf 文件来开启:
-
AOF的命令记录频率也可以在 redis.conf 中修改:
-
因为是记录命令,AOF文件会比RDB文件大的多。而且AOF会记录对同一个key的多次写操作,但只有最后一次写操作才 有意义。通过执行bgrewriteaof命令,可以让AOF文件执行重写功能,用最少的命令达到相同效果。
-
Redis也会在触发國值时自动去重写AOF文件。间值也可以在redis.conf中配置:
RDB 和 AOF 的区别
RDB和AOF各有自己的优缺点,如果对数据安全性要求较高,在实际开发中往往会==结合==两者来使用。
据说:在后续的Redis 的高版本中会将两者结合!
- 感谢你赐予我前进的力量