cache组件

前言适用场景本地缓存选型常见的多级缓存方案角色流程优点缺点Cache组件方案1、整体设计优点缺点2、组件详细设计3、介绍命令生命周期4、具体代码

前言在《Cache组件-热key收集》中,我们已经介绍过热key是什么,热key的危害,如何收集统计;最后引出了一些热key防护的方案,本文为多级缓存解决方案。

适用场景

长期不修改,如配置类数据查询流量大,如秒杀商品信息等非强一致性,允许分布式情况下的修改与查询结果不匹配(毫秒级)

本地缓存选型| 开源组件 | 优点 | 缺点 |

| —- | —- | —- |

| EHCache | Encache是一个纯Java的进程内缓存框架,是Hibernate中默认de CacheProvider。同Caffeine和Guava Cache相比,Encache的功能更加丰富,扩展性更强,支持多种缓存淘汰算法,包括LRU、LFU和FIFO,缓存支持堆内存储、堆外存储、磁盘存储(支持持久化)三种,支持多种集群方案,解决数据共享问题。 | Encache在性能上不及Caffeine和Guava Cache |

| Guava Cache | Guava Cache继承了ConcurrentHashMap的思路,使用多个segments方式的细粒度锁,在保证线程安全的同时,支持高并发场景需求。 | 无明显缺点,性能不及Caffeine |

| Caffeine | Caffeine内部采用了一种结合LRU、LFU优点的算法:W-TinyLFU,在性能上比Guava cache更加优秀,Caffeine的API基本和Guava cache一样。 | 无明显缺点 |

综合考虑,使用 Caffeine

常见的多级缓存方案

角色

podworker注册中心

流程POD集群服务与worker集群启动时,将自身信息注册到注册中心;

POD集群服务与worker集群直连;RedisClient客户端收集redis热key统计信息,上报给worker;worker对热key进行分析,如果触发阈值,通知所有POD,将热key进行本地缓存;如果热key有修改,同样的方式通知所有服务修改;

优点能解决热key问题,并能第一时间通知所有POD进行本地缓存;

缺点需要额外引入worker集群与注册中心,复杂度飙升;

Cache组件方案

1、整体设计

优点

组件基于Redis的发布订阅实现分布式缓存的通知,降低整体复杂度;组件内部集成本地缓存,使用更简单;

缺点将高可用完全寄托于Redis,如果redis异常,则依赖本地缓存的淘汰策略,因此超时时间不能设置太长;

使用者需要区分哪些数据结构的命令支持本地缓存,哪些不支持;

2、组件详细设计

3、介绍

命令生命周期

通过CacheExecutor执行器调用命令,执行器通过handle调度器调用,handle调度器将命令通过本地缓存工厂 RedisLocalCacheFactory包装重写;

重写后的命令,优先判断key是否需要本地缓存,然后根据数据类型(目前只支持String、Hash、Set)选择本地缓存实现类,再根据本地缓存执行策略调用:

NONE:直接调用命令的apply进行redis操作并返回GET:通过本地缓存执行器执行get操作,判断是否需要通知其他POD,需要则通过发布订阅通知;SET:通过本地缓存执行器执行set操作,并通知其他POD;

4、具体代码工厂 RedisLocalCacheFactory

```java

/**

@author JerryLong@version V1.0@Title: RedisLocalCacheFactory@Description: 本地缓存工厂@date 2021/8/24 5:20 PM

/

public class RedisLocalCacheFactory {

/*支持本地缓存设置最大数量

/

private static final int KEY_MAX_SIZE = 200;

/*短暂的本地缓存key,只支持5.1秒

/

private static final int LOCAL_TRANSIENCE_KEY_MILLISECONDS = 5100;

/*需要长期本地缓存的key列表

/

private static Map> longTimesKeys = new ConcurrentHashMap<>();

/*短暂本地缓存的key前缀

/

private static final String TRANSIENCE_KEY_PREFIX = “hot-key-“;

/*数值1,需要推送

/

public static final int KEY_STATUS_NEED_PUBLISH = 1;

/*数值2,已经推送

/

public static final int KEY_STATUS_ALREADY_PUBLISH = 2;

/*热key本地缓存

*/

private static Cache transienceLocalKeys = Caffeine.newBuilder()

//设置cache的初始大小为10,要合理设置该值

.initialCapacity(10)

//最大值

.maximumSize(2000)

//在写入后开始计时,在指定的时间后过期。

.expireAfterWrite(LOCAL_TRANSIENCE_KEY_MILLISECONDS, TimeUnit.MILLISECONDS)

//构建cache实例

.build();

private static Map localCacheHandleMap = new ConcurrentHashMap<>();

private static AbstractLocalCacheHandle defaultHandle = new DefaultLocalCacheHandle();

static {

localCacheHandleMap.put(CommandsDataTypeEnum.STRING, new StringLocalCacheHandle());

localCacheHandleMap.put(CommandsDataTypeEnum.SET, new SetLocalCacheHandle());

localCacheHandleMap.put(CommandsDataTypeEnum.HASH, new HashLocalCacheHandle());

localCacheHandleMap.put(CommandsDataTypeEnum.LIST, new ListLocalCacheHandle());

localCacheHandleMap.put(CommandsDataTypeEnum.ZSET, new ZsetLocalCacheHandle());

}

public static void registe(CommandsDataTypeEnum dataType, AbstractLocalCacheHandle handle) {

localCacheHandleMap.put(dataType, handle);

}

/**

本地缓存key发布订阅channel

*/

public static final String LOCAL_CACHE_KEY_PUBSUB_CHANNEL = “Lcache:local:channel:”;

/**

获取本地缓存执行器

*@param dataType@param executor@return

*/

public static AbstractLocalCacheHandle getLocalCacheHandle(CommandsDataTypeEnum dataType, BaseCacheExecutor executor) {

return localCacheHandleMap.getOrDefault(dataType, defaultHandle);

}

/**

添加本地缓存的key

*@param key

*/

public static void addLocalCacheKey(BaseCacheExecutor executor, String key) {

Set sets = longTimesKeys.computeIfAbsent(CacheConfigUtils.modelToHashKey(executor.getCacheConfigModel()), e -> new HashSet<>());

CacheExceptionFactory.throwException(sets.size() + 1 < KEY_MAX_SIZE, “本地缓存的key数量超出” + KEY_MAX_SIZE);

sets.add(key);

}

/** * 添加本地缓存的key * * @param keys */public static void addLocalCacheKey(BaseCacheExecutor executor, Set keys) { Set sets = longTimesKeys.computeIfAbsent(CacheConfigUtils.modelToHashKey(executor.getCacheConfigModel()), e -> new HashSet<>()); CacheExceptionFactory.throwException(sets.size() + keys.size() < KEY_MAX_SIZE, "本地缓存的key数量超出" + KEY_MAX_SIZE); sets.addAll(keys);}/** * 是否是本地缓存的key * * @param key * @return 1 需要推送,2 不需要推送 */public static int isLocalCacheKey(BaseCacheExecutor executor, String key) { if (StringUtils.isBlank(key)) { return -1; } Set sets = longTimesKeys.computeIfAbsent(CacheConfigUtils.modelToHashKey(executor.getCacheConfigModel()), e -> new HashSet<>()); if (sets.contains(key)) { return KEY_STATUS_ALREADY_PUBLISH; } return (int) transienceLocalKeys.get(TRANSIENCE_KEY_PREFIX + key, e -> -1);}/** * 提交一个短暂本地缓存的key * * @param key * @param isNeedPublish 是否需要推送 */public static void addTransienceLocalCacheKey(String key, boolean isNeedPublish) { transienceLocalKeys.put(TRANSIENCE_KEY_PREFIX + key, isNeedPublish ? KEY_STATUS_NEED_PUBLISH : KEY_STATUS_ALREADY_PUBLISH);}/** * 获取命令与key本地缓存执行策略 * * @param commands * @param key * @return */public static RedisLocalCachePubHandle getRedisLocalCachePubHandle(BaseCacheExecutor executor, String commands, String key) { int isNeedPublish = isLocalCacheKey(executor, key); //不需要本地缓存的key,直接返回none if (isNeedPublish < 0) { return null; } return new RedisLocalCachePubHandle(isNeedPublish, CommandsDataTypeUtil.getHotKeyHandleType(commands));}/** * 本地缓存通知处理方式 */public static class RedisLocalCachePubHandle { public RedisLocalCachePubHandle() { } public RedisLocalCachePubHandle(int isNeedPublish, LocalCacheHandleTypeEnum handleType) { this.isNeedPublish = isNeedPublish; this.handleType = handleType; } private int isNeedPublish; private LocalCacheHandleTypeEnum handleType; public int getIsNeedPublish() { return isNeedPublish; } public LocalCacheHandleTypeEnum getHandleType() { return handleType; }}}

**本地缓存执行器抽象 AbstractLocalCacheHandle**

```java

/**

* @author JerryLong

* @version V1.0

* @Title: AbstractLocalCacheHandle

* @Description: 本地缓存执行器抽象类

* @date 2021/11/9 2:34 PM

*/

public abstract class AbstractLocalCacheHandle {

private static Cache LOCAL_CACHE = Caffeine.newBuilder()

//设置cache的初始大小为10,要合理设置该值

.initialCapacity(100)

//最大值

.maximumSize(10000)

//在写入后开始计时,在指定的时间后过期。保留500毫秒

.expireAfterWrite(500, TimeUnit.MILLISECONDS)

//构建cache实例

.build();

/**

* 获取,如果存在

*

* @param key

* @return

*/

public LocalCacheLifeCycle getIfPresent(BaseCacheExecutor executor, String key) {

return LOCAL_CACHE.getIfPresent(getLocalKey(executor, key));

}

/**

* 删除

*

* @param key

*/

public static void del(String key) {

LOCAL_CACHE.invalidate(key);

}

public static void del(BaseCacheExecutor executor, String key) {

LOCAL_CACHE.invalidate(getLocalKey(executor.getCacheConfigModel(), key));

}

/**

* 实现类需要指定自身数据类型

*

* @return

*/

protected abstract CommandsDataTypeEnum getDataType();

@PostConstruct

public void regist() {

RedisLocalCacheFactory.registe(getDataType(), this);

}

/**

* 获取

*

* @param function

* @param key

* @param fields

* @return

*/

protected abstract Object get(BaseCacheExecutor executor, CacheFunction function, String key, Object[] fields);

protected Object set(BaseCacheExecutor executor, CacheFunction function, String key) {

try {

return function.apply();

} finally {

//删除本地缓存

LOCAL_CACHE.invalidate(key);

}

}

/**

* 缓存方法执行

*

* @param function

* @param key

* @param fields

* @return

*/

public Object doCacheFunc(BaseCacheExecutor executor, CacheFunction function, String key, Object[] fields) {

if (executor.getCacheConfigModel().isLocalCache()) {

//这一次命令是否不走本地缓存

if (Boolean.TRUE.equals(executor.getNoLocalCacheOnce())) {

try {

return function.apply();

} finally {

executor.removeNoLocalCacheOnce();

}

}

RedisLocalCacheFactory.RedisLocalCachePubHandle localCacheHandle = RedisLocalCacheFactory.getRedisLocalCachePubHandle(executor, function.fnToFnName(), key);

if (null != localCacheHandle) {

Object res = null;

switch (localCacheHandle.getHandleType()) {

case NONE:

res = function.apply();

break;

case GET:

res = this.get(executor, function, key, fields);

//判断是否需要通知其他pod

if (localCacheHandle.getIsNeedPublish() == RedisLocalCacheFactory.KEY_STATUS_NEED_PUBLISH) {

//修改状态

RedisLocalCacheFactory.addTransienceLocalCacheKey(key, false);

LocalCachePublisher.publish(executor, key, true);

}

break;

case SET:

res = this.set(executor, function, getLocalKey(executor, key));

//通知其他pod,key是拼接后的

LocalCachePublisher.publish(executor, getLocalKey(executor, key), false);

break;

default:

res = function.apply();

break;

}

return res;

}

}

return function.apply();

}

/**

* 获取本地缓存的key

*

* @param key

* @return

*/

private String getLocalKey(BaseCacheExecutor executor, String key) {

return getLocalKey(executor.getCacheConfigModel(), key);

}

protected static String getLocalKey(CacheConfigModel cacheConfigModel, String key) {

return "l" + CacheConfigUtils.modelToHashKey(cacheConfigModel) + key;

}

/**

* 获取cache

*

* @param key

* @param check

* @param loadData

* @param

* @return

*/

protected Optional> getLocalCache(BaseCacheExecutor executor, String key, Function check, Function loadData) {

key = getLocalKey(executor, key);

@Nullable LocalCacheLifeCycle data = LOCAL_CACHE.getIfPresent(key);

if (null == data) {

loadAsync(key, check, loadData);

}

return Optional.ofNullable(data);

}

/**

* 异步加载,如果在加载期间有修改,则放弃

*

* @param key

*/

protected static void loadAsync(String key, Function checkLength, Function loadData) {

CompletableFuture.runAsync(() -> {

//判断是否能被加载

LocalCacheLifeCycle cycle = LOCAL_CACHE.getIfPresent(key);

if (!isCanLoad(cycle)) {

return;

}

//判断长度

if (null != checkLength) {

if (!(boolean) checkLength.apply(null)) {

LOCAL_CACHE.put(key, new LocalCacheLifeCycle(LocalCacheStatus.CAN_NOT, null));

return;

}

}

Object data = loadData.apply(null);

//最后再判断一遍是否能被加载,并且判断版本号是否是最新的

LocalCacheLifeCycle cycle1 = LOCAL_CACHE.getIfPresent(key);

if (isCanLoad(cycle1)) {

//判断版本号

if (null != cycle && null != cycle1 && cycle1.getTimestamp() > cycle.getTimestamp()) {

return;

}

//缓存

LOCAL_CACHE.put(key, new LocalCacheLifeCycle(LocalCacheStatus.CACHE, data));

}

});

}

/**

* 是否可以被重新加载,

* true : 初始化Null、已删除、已缓存

*

* @param cycle

* @return

*/

private static boolean isCanLoad(LocalCacheLifeCycle cycle) {

return null == cycle || cycle.getLocalCacheStatus().equals(LocalCacheStatus.CACHE) || cycle.getLocalCacheStatus().equals(LocalCacheStatus.DEL);

}

/**

* 本地缓存生命周期

*/

public static class LocalCacheLifeCycle {

public LocalCacheLifeCycle() {

}

public LocalCacheLifeCycle(LocalCacheStatus localCacheStatus, T data) {

this.localCacheStatus = localCacheStatus;

this.timestamp = System.currentTimeMillis();

this.data = data;

}

/**

* 缓存状态

*/

private LocalCacheStatus localCacheStatus;

/**

* 版本号,毫秒时间戳

*/

private Long timestamp;

/**

* 缓存数据

*/

private T data;

public LocalCacheStatus getLocalCacheStatus() {

return localCacheStatus;

}

public Long getTimestamp() {

return timestamp;

}

public T getData() {

return data;

}

}

/**

* key缓存状态

*/

protected enum LocalCacheStatus {

/**

* 不能本地缓存

*/

CAN_NOT,

/**

* 删除

*/

DEL,

/**

* 加载中

*/

LOADING,

/**

* 缓存中

*/

CACHE;

}

}

本地缓存热key发布 LocalCachePublisher

/**

* @author JerryLong

* @version V1.0

* @Title: LocalCachePublisher

* @Description: 热key消息发送方

* @date 2021/8/24 7:06 PM

*/

public class LocalCachePublisher {

/**

* 通过pubsub通知其他节点

*

* @param executor

* @param key

* @param isNewKey true是新的key,false是修改的key

*/

public static void publish(BaseCacheExecutor executor, String key, boolean isNewKey) {

if (!MonitorFactory.isOpenHotKeyLocalCache()) {

return;

}

if (null == executor) {

return;

}

//通知其他pod

executor.publishAsync(RedisLocalCacheFactory.LOCAL_CACHE_KEY_PUBSUB_CHANNEL + executor.getCacheConfigModel().getCacheType(), JSON.toJSONString(new HotKeySubscriptData(key, isNewKey ? -1 : 1)));

}

}

本地缓存热key消费 LocalCacheSubscriber

/**

* @author JerryLong

* @version V1.0

* @Title: LocalCacheSubscriber

* @Description: 热key消息监听方

* @date 2021/8/24 7:05 PM

*/

public class LocalCacheSubscriber {

private static Map subscribeInfo = new ConcurrentHashMap<>();

/**

* 增加监听

*

* @param executor

*/

public static void addSubscriber(BaseCacheExecutor executor) {

subscribeInfo.computeIfAbsent(CacheConfigUtils.modelToHashKeyNoUseType(executor.getCacheConfigModel()), e -> {

executor.subscribe((message) -> manageMessage(JSON.parseObject(message, HotKeySubscriptData.class)), RedisLocalCacheFactory.LOCAL_CACHE_KEY_PUBSUB_CHANNEL + executor.getCacheConfigModel().getCacheType());

return 1;

});

}

/**

* 管理消息

*

* @param hotKeySubscriptData

*/

public static void manageMessage(HotKeySubscriptData hotKeySubscriptData) {

if (!MonitorFactory.isOpenHotKeyLocalCache()) {

return;

}

//如果是当前pod发的消息,不处理

if (hotKeySubscriptData.isLocalHost()) {

return;

}

if (hotKeySubscriptData.isNewKey()) {

//新的key,本地缓存中注册这个key

RedisLocalCacheFactory.addTransienceLocalCacheKey(hotKeySubscriptData.getKey(), false);

} else {

//修改,需要删除本地缓存

AbstractLocalCacheHandle.del(hotKeySubscriptData.getKey());

}

}

}

本地缓存处理器实现(String、Hash、Set)

public class StringLocalCacheHandle extends AbstractLocalCacheHandle {

@Override

protected CommandsDataTypeEnum getDataType() {

return CommandsDataTypeEnum.STRING;

}

@Override

public Object get(BaseCacheExecutor executor, CacheFunction function, String key, Object[] fields) {

return this.getLocalCache(executor, key, null, e -> executor.noLocalCacheOnce().get(key)).map(

e -> {

//判断缓存状态

if (!e.getLocalCacheStatus().equals(LocalCacheStatus.CACHE)) {

return function.apply();

}

switch (function.fnToFnName()) {

case "get":

return e.getData();

case "substr":

if (null == e.getData()) {

return "";

}

return substr(e.getData().toString(), Integer.parseInt(fields[0].toString()), Integer.parseInt(fields[1].toString()));

case "strlen":

if (null == e.getData()) {

return 0L;

}

return Long.parseLong(String.valueOf(e.getData().toString().length()));

default:

return function.apply();

}

}

).orElse(function.apply());

}

public static String substr(String v, int s, int e) {

if (StringUtils.isBlank(v)) {

return "";

}

return v.substring(s, javaEnd(v.length(), e));

}

public static int javaEnd(int length, int end) {

if (0 < end) {

int l = end + 1;

return l > length ? length : l;

} else if (0 == end) {

return 1;

} else {

int l = length + 1 + end;

return l < 1 ? 1 : l;

}

}

}

public class HashLocalCacheHandle extends AbstractLocalCacheHandle {

@Override

protected CommandsDataTypeEnum getDataType() {

return CommandsDataTypeEnum.HASH;

}

private static final Integer HASH_MAX_LEN = 5000;

/**

* 如果key不存在,需要先load

*

* @param function

* @param fields

* @return

*/

@Override

public Object get(BaseCacheExecutor executor, CacheFunction function, String key, Object[] fields) {

return this.getCacheMap(executor, key).map(e -> {

//判断缓存状态

if (!e.getLocalCacheStatus().equals(LocalCacheStatus.CACHE)) {

return function.apply();

}

switch (function.fnToFnName()) {

case "hget":

return e.getData().get(fields[0]);

case "hmget":

List list = new ArrayList<>();

for (int i = 0; i < fields.length; i++) {

list.add(e.getData().get(fields[i]));

}

return list;

case "hmgetToMap":

Map map = new HashMap<>(8);

Object o = null;

for (int i = 0; i < fields.length; i++) {

o = e.getData().get(fields[i]);

if (null != o) {

map.put(fields[i].toString(), o);

}

}

return map;

case "hmgetToMapCanNull":

Map map1 = new HashMap<>(8);

for (int i = 0; i < fields.length; i++) {

map1.put(fields[i].toString(), e.getData().get(fields[i]));

}

return map1;

case "hgetAll":

return e.getData();

case "hkeys":

return e.getData().keySet();

case "hlen":

return Long.valueOf(e.getData().size());

case "hvals":

return new ArrayList<>(e.getData().values());

case "hexists":

return e.getData().containsKey(fields[0]);

case "hstrlen":

return e.getData().get(fields[0]).toString().length();

default:

return function.apply();

}

}).orElse(function.apply());

}

/**

* 获取cache

*

* @param key

* @return

*/

private Optional>> getCacheMap(BaseCacheExecutor executor, String key) {

return getLocalCache(executor, key, e -> executor.noLocalCacheOnce().hlen(key) <= HASH_MAX_LEN, e -> executor.noLocalCacheOnce().hgetAll(key));

}

}

public class SetLocalCacheHandle extends AbstractLocalCacheHandle {

@Override

protected CommandsDataTypeEnum getDataType() {

return CommandsDataTypeEnum.SET;

}

private static final Integer SET_MAX_LEN = 5000;

/**

* 如果key不存在,需要先load

*

* @param function

* @param fields

* @return

*/

@Override

public Object get(BaseCacheExecutor executor, CacheFunction function, String key, Object[] fields) {

return this.getCacheMap(executor, key).map(e -> {

//判断缓存状态

if (!e.getLocalCacheStatus().equals(LocalCacheStatus.CACHE)) {

return function.apply();

}

switch (function.fnToFnName()) {

case "sismember":

return e.getData().contains(fields[0]);

case "smembers":

return e.getData();

case "scard":

return Long.valueOf(e.getData().size());

default:

return function.apply();

}

}).orElse(function.apply());

}

/**

* 获取cache

*

* @param key

* @return

*/

private Optional>> getCacheMap(BaseCacheExecutor executor, String key) {

return getLocalCache(executor, key, e -> executor.noLocalCacheOnce().scard(key) <= SET_MAX_LEN, e -> executor.noLocalCacheOnce().smembers(key));

}

}

Copyright © 2022 世界杯进球_国足进世界杯了吗 - fulitb.com All Rights Reserved.