Redis的缓存穿透,缓存击穿,缓存雪崩原因及解决方案
1、前言
在我们日常的开发中,通常都是使用数据库来进行数据的存储,由于一般的Web系统中通常不会存在高并发的情况,所以并没有什么问题。可是,一旦出现大并发量的数据请求,比如一些商品抢购的情景,或者是节假日访问量瞬间变大的时候,单一使用数据库来保存数据的系统会因为磁盘读/写速度比较慢的问题而存在严重的性能弊端,一瞬间成千上万的请求到来,需要系统在极短的时间内完成成千上万次的读/写操作,这个时候数据库往往不能够承受,极其容易造成数据库系统瘫痪,最终导致服务器宕机的严重生产事故。
为了克服上述的问题,开发人员通常会引入NoSQL技术,这是一种基于内存的数据库,并且提供一定的持久化功能。Redis技术就是NoSQL技术中的集大成者,但是引入Redis又有可能出现缓存穿透,缓存击穿,缓存雪崩等问题。本文就对这三种问题进行较深入剖析。
2、初认识缓存穿透,缓存击穿,缓存雪崩
缓存穿透:Key对应的数据在缓存中并不存在,每次针对此Key的请求从缓存获取不到,请求都会到数据库,从而可能压垮数据库。比如,用一个不存在的用户ID获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。
缓存击穿:Key对应的数据存在,并且属于热点,热点Key在某个时间点过期的时候,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端数据库加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端数据库压垮,如同在一个屏障上凿开了一个洞,所以称之为“缓存击穿”。
缓存雪崩:当缓存服务器重启或者大量缓存Key集中在某一个时间段失效,所有的查询都落在数据库上,造成了缓存雪崩,也会给后端数据库带来很大压力。
3、缓存穿透解决方案
如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。
有很多种方法可以有效地解决缓存穿透问题,最常见的则是采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的BitMap中,一定不存在的数据会被 这个BitMap拦截掉,从而避免了对底层存储系统的查询压力。
布隆过滤器(Bloom filter)简介
Bloom filter 是由 Howard Bloom 在 1970 年提出的二进制向量数据结构,它具有很好的空间和时间效率,被用来检测一个元素是不是集合中的一个成员。
如需要判断一个元素是不是在一个集合中,我们通常做法是把所有元素保存下来,然后通过比较知道它是不是在集合内,链表、树都是基于这种思路,当集合内元素个数的变大,我们需要的空间和时间都线性变大,检索速度也越来越慢。 Bloom filter 采用的是哈希函数的方法,将一个元素映射到一个 m 长度的阵列上的一个点,当这个点是 1 时,那么这个元素在集合内,反之则不在集合内。这个方法的缺点就是当检测的元素很多的时候可能有冲突,解决方法就是使用 k 个哈希 函数对应 k 个点,如果所有点都是 1 的话,那么元素在集合内,如果有 0 的话,元素则不在集合内。备注:如果检测结果为“是”,该元素不一定在集合中,但如果检测结果为“否”,该元素一定不在集合中。
另外,也有一个更为简单粗暴的方法,如果一个查询返回的数据为空(不管是数据不存在,还是系统故障),我们仍然把这个空结果进行缓存,但它的过期时间会很短,最长不超过五分钟。
//伪代码,秒杀苹果手机
public Object getIPhone()
{
int cacheTime = 300;
String cacheKey = "iPhone";
String cacheValue = CacheHelper.get(cacheKey);
if (cacheValue != null)
{
return cacheValue;
}
else
{
//查询数据库
cacheValue = DBHelper.getIPhoneFromDB();
if (cacheValue == null)
{
//如果发现为空,设置个默认值,也缓存起来
cacheValue = "";
}
CacheHelper.add(cacheKey, cacheValue, cacheTime);
return cacheValue;
}
}
4、缓存击穿解决方案
业界比较常用的做法是使用Redis的互斥锁(mutex key)。简单地来说,就是在缓存失效的时候(判断拿出来的值为空),不是立即去查询数据库加载数据, 而是先去set一个mutex key,当操作返回成功时(意味着获得了互斥锁),再进行查库操作并回设缓存;否则,就重试整个get缓存的方法。
SETNX 是「SET if Not eXists」的缩写,也就是只有不存在的时候才设置,可以利用它来实现锁的效果。
public Object getIPhone()
{
int cacheTime = 300;
String cacheKey = "iPhone";
String cacheValue = CacheHelper.get(cacheKey);
if (cacheValue != null)
{
return cacheValue;
}
else
{
//查询数据库,先获取锁,需要分情况考虑
String lockKey = "lock";
//需要设置过期时间,防止此线程挂掉之后,其他线程也无法加锁
Boolean lockResult = CacheHelper.setnx(lockKey,"mylocker",30)
if (lockResult)
{//情况1:加锁成功
cacheValue = DBHelper.getIPhoneFromDB();
if (cacheValue == null)
{
//如果发现为空,设置个默认值,也缓存起来
cacheValue = "";
}
CacheHelper.add(cacheKey, cacheValue, cacheTime);
CacheHelper.delete(lockKey);
return cacheValue;
}
else
{//情况2:加锁失败
Thread.sleep(30);
cacheValue = CacheHelper.get(cacheKey);
return cacheValue;
}
}
}
上述代码看似完美,但是存在问题:虽然加锁成功了,但如果查库的时间过长,导致锁失效了,最后delete锁的时候,删掉的是其他线程加的锁。这个时候应该是“各加各锁,各删各锁”,如下所示:
//查询数据库,先获取锁,需要分情况考虑
String lockKey = "lock";
String lockValue = UUID.randomUUID().toString();
//需要设置过期时间,防止此线程挂掉之后,其他线程也无法加锁
Boolean lockResult = CacheHelper.setnx(lockKey,lockValue,30)
if (lockResult)
{//情况1:加锁成功
cacheValue = DBHelper.getIPhoneFromDB();
if (cacheValue == null)
{
//如果发现为空,设置个默认值,也缓存起来
cacheValue = "";
}
CacheHelper.add(cacheKey, cacheValue, cacheTime);
if (lockValue.equals(CacheHelper.get(lockKey)))
{
CacheHelper.delete(lockKey);
}
return cacheValue;
}
5、缓存雪崩解决方案
缓存失效时的雪崩效应对底层系统的冲击非常可怕,大多数系统设计者考虑用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。如下所示:
public Object getIPhone()
{
int cacheTime = 300;
String cacheKey = "iPhone";
String cacheValue = CacheHelper.get(cacheKey);
if (cacheValue != null)
{
return cacheValue;
}
else
{
synchronized(Object.class)
{//加锁之后,形成一个等待队列
cacheValue = CacheHelper.get(cacheKey);
if (cacheValue != null)
{
return cacheValue;
}
else
{
cacheValue = DBHelper.getIPhoneFromDB();
if (cacheValue == null)
{
//如果发现为空,设置个默认值,也缓存起来
cacheValue = "";
}
CacheHelper.add(cacheKey, cacheValue, cacheTime);
return cacheValue;
}
}
}
}
加锁排队只是为了减轻数据库的压力,并没有提高系统吞吐量。假设在高并发下,缓存重建期间,这时过来1000个请求有999个都在阻塞的。同样会导致用户等待超时,这是个治标不治本的方法!如下所示:
还有一个简单方案就时讲缓存失效时间分散开,比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机时间,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
还有一种被称为“二级缓存”的解决方法,二级缓存过期时间比一级缓存的时间延长1倍,当一级缓存失效的时候,返回二级缓存的数据,并异步启动一个新线程去重建缓存。如下代码所示:
public Object getIPhone()
{
int cacheTime = 300;
String cacheKey1 = "iPhone1";
String cacheKey2 = "iPhone2";
String cacheValue1 = CacheHelper.get(cacheKey1);
String cacheValue2 = CacheHelper.get(cacheKey2);
if(cacheValue1 != null)
{//一级缓存有效,则返回一级缓存的值
return cacheValue1;
}
else
{//一级缓存失效,则返回二级缓存的值,并新启线程重建缓存
ThreadPool.QueueUserWorkItem((cacheKey1,cacheKey2,cacheTime) ->
{
cacheValue1 = DBHelper.getIPhoneFromDB();
CacheHelper.add(cacheKey1, cacheValue1, cacheTime);
CacheHelper.add(cacheKey2, cacheValue1, cacheTime * 2);
})
return cacheValue1;
}
}