背景
实时数据需要批量入历史表,频率高,数据量大。无法保障数据重复问题。之前我们一直用MongoDB,解决重复问题可用唯一索引解决,MongoDB在批插操作时,可以跳过批插过程中失败的数据继续插入。本次系统建设用的是达梦库,当前遇到的问题是批插时,一条失败,无法执行整个批插。网上找了一些资料,Mysql有insert ignor into …做为相应的解决方案。达梦库未到类似操作,网上建议用merge into,即在执行插入时通过判断是否存在违反唯一性原则 ,从而决定用upsert。由 于系统特点历史表数据量大,批插操作频繁,考虑到性能问题,该方法不打算采用。
待解决问题
考虑到数据量问题,历史表禁有重复数据
难点
1.历史表数据量大,按月做了分区,每月量估测在千万级别
2.批插操作频繁,不高于20s一次
思路
引用本地缓存 ,缓存历史数据,每次在入库前通过缓存数据判断去掉重复数据。我们唯一主键是编码类型和实时时间。实时时间会重复。
(1)缓存设 计如 下:
key:code_type;value:realTime 。每次存历史之前可以先去map中找到realTime,如果时间小于等于realTime,则这条移除不保存。
(2)考虑到数据量比较大,可以做个二级缓存。不说了,被催工作了,先上代码供参考吧。
SpringBoot集成 JetCahce
POM引依赖
这里有点小坑坑,注意版本。Jetcahce2.7之后的版本不能用@CreateCach注解了。上官网地址,自己去看看。
<dependency>
<groupId>com.alicp.jetcache</groupId>
<artifactId>jetcache-starter-redis</artifactId>
<version>2.6.7</version>
</dependency>
配置文件配置
jetcache:
# 统计间隔,默认0:表示不统计
statIntervalMinutes: 0
# areaName是否作为缓存key前缀,默认True
areaInCacheName: false
local:
default:
# 已支持可选:linkedhashmap、caffeine
type: caffeine
# key转换器的全局配置,当前只有:fastjson, @see com.alicp.jetcache.support.FastjsonKeyConvertor
keyConvertor: fastjson
# 每个缓存实例的最大元素的全局配置,仅local类型的缓存需要指定
limit: 100
# jetcache2.2以上,以毫秒为单位,指定多长时间没有访问,就让缓存失效,当前只有本地缓存支持。0表示不使用这个功能
# expireAfterAccessInMillis: 0
remote:
default:
# 已支持可选:redis、tair
type: redis
# 连接格式@see:https://github.com/lettuce-io/lettuce-core/wiki/Redis-URI-and-connection-details
host: 127.0.0.1
port: 6379
password: Abd123
keyConvertor: fastjson
# 下面都是redis的配置
poolConfig:
minIdle: 5
maxIdle: 20
maxTotal: 50
# 序列化器的全局配置。仅remote类型的缓存需要指定,可选java和kryo
valueEncoder: java
valueDecoder: java
# 以毫秒为单位指定超时时间的全局配置
# expireAfterWriteInMillis: 5000
开启注解
在SpringBoot启动类上加,开启jetCahce注解
@EnableMethodCache(basePackages = "com.test.gas.zhanglu.cache.*")
@EnableCreateCacheAnnotation
public class ZLCollectApplication {
public static void main(String[] args) {
SpringApplication.run(ZLCollectApplication.class, args);
}
}
缓存 实体类要进行序列化
public class ZLRealtimeValueHistory implements Serializable {
private static final long serialVersionUID = 806617303765589022L;
//值
private String value;
//状态
private String status;
//实时
private String realTime;
}
缓存类
import cn.hutool.core.date.DateUtil;
import com.alicp.jetcache.Cache;
import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.CreateCache;
import com.ceic.gas.management.generator.cache.GasRealtimeMonitorCacheService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class ZLMonitorCacheServiceImpl {
/**
*
* 更新缓存方法,把最新时间更新到缓存中。
* expire:指定远程缓存过期时间,
* localLimit:本地缓存数量
*/
@CreateCache(name = "WSCC_FFRealTimeCache_", expire = 1, timeUnit = TimeUnit.DAYS, localLimit = 100, cacheType = CacheType.BOTH)
public Cache<String, Date> ffRealTimeCache;
public Date getDate(String key) {
if (null != ffRealTimeCache) {
return ffRealTimeCache.get(key);
}
return null;
}
/**
* 数据的缓存
*
* @param list
*/
public void putData(List<ZLRealtimeValueHis> list) {
if (null == list || list.isEmpty()) {
return;
}
for (ZLRealtimeValueHis bean : list) {
String key = this.getKey(bean);
String value = bean.getRealTime();
if (StringUtils.isEmpty(value)) {
return;
}
Date v1 = DateUtil.parse(value);
if (null != v1) {
ffRealTimeCache.put(key, v1);
}
}
}
@Override
public String getKey(ZLRealtimeValueHis bean) {
return bean.getMineCode() + "@" + bean.getDeviceType() +
"@" + bean.getDeviceNo();
}
总结
1.jetCache如果用2.7.0之后的版本,可以用CacheManager 创建Cache
2.JetCache 可以实现二级缓存,可以本地缓存,可 以远程结合Redis等进行缓存
3.JetCache 可以设定缓存过期时间和淘汰策略,以后有时间再补充相关知识。如果希望他一直有效不过期,可以不做全局配置和任何TTL 时间相关配置
4.缓存的实体对象别忘记要进行序列化