0
篇帖子
使用消费队列防止死锁冲突
在最后一步数据入库时,如果是多线程模式有可能会导致DB死锁,造成数据缺失。 所以依靠jdk8原生队列封装了一个阻塞队列,理论上可以用在缓冲,限流等类似消费场景。 代码不多,大致如下:
public class BlockingQueueHelper<T> {
private BlockingQueue queue;
private ThreadFactory threadFactory;
private int threadCount = 1;
private int putReTryCount = 5;
private T executeParam = null;
private BlockingQueueHelper(BlockingQueue queue,T executeParam) {
ThreadFactoryBuilder threadFactoryBuilder = ThreadUtil.createThreadFactoryBuilder();
threadFactoryBuilder.setNamePrefix("blockingQueueExecuteThread");
threadFactoryBuilder.setDaemon(true);
threadFactoryBuilder.setPriority(1);
threadFactory = threadFactoryBuilder.build();
this.queue = queue ==null ? new LinkedBlockingQueue():queue;
this.executeParam = executeParam;
}
public static <T>BlockingQueueHelper create(T executeParam){
return new BlockingQueueHelper(null, executeParam);
}
public static BlockingQueueHelper create(BlockingQueue queue){
return new BlockingQueueHelper(queue,null);
}
/**
* create BlockingQueueHelper
* @param queue 阻塞队列,如果为null,则默认使用 LinkedBlockingQueue
* @param executeParam 队列消费时传入的参数,必须为线程安全变量
* @return
*/
public static <T>BlockingQueueHelper create(BlockingQueue queue,T executeParam){
return new BlockingQueueHelper(queue,executeParam);
}
public void start(){
Assert.notNull(queue);
Assert.notNull(threadFactory);
Assert.checkBetween(threadCount,1,Integer.MAX_VALUE);
for (int i = 0; i < threadCount; i++) {
threadFactory.newThread(
()->{
for(;;){
post();
}
}
).start();
}
}
private void post(){
try {
Object postItem = queue.take();
if (postItem instanceof Execute && executeParam != null) {
((Execute) postItem).run(executeParam);
}else {
((Runnable) postItem).run();
}
} catch (Exception e) {
log.error("BlockingQueueHelper execute error ",e);
}
}
public void put(Runnable e,long putTimeOut){
putItem(e, putTimeOut);
}
public void put(Runnable e){
putItem(e,10000);
}
public void put(Execute e,long putTimeOut){
putItem(e,putTimeOut);
}
public void put(Execute e){
putItem(e,10000);
}
private void putItem(Object e, long putTimeOut) {
int putCount = 0;
while (putCount++ < putReTryCount){
try {
queue.offer(e,putTimeOut,TimeUnit.MILLISECONDS);
putCount = putReTryCount + 1;
} catch (InterruptedException e1) {}
}
}
public interface Execute<T>{
/**
* 带参数的执行方法(参数在初始化时指定)
* @param param 初始化时指定的参数
* @throws Exception 任务执行异常
*/
public abstract void run(T param) throws Exception;
}
}
将组件交付给spring管理:
@Bean
public BlockingQueueHelper blockingQueueHelper(@Qualifier("dcJdbcTemplate")JdbcTemplate dcJdbcTemplate){
BlockingQueueHelper blockingQueueHelper =
BlockingQueueHelper.create(new ArrayBlockingQueue(100), dcJdbcTemplate);
blockingQueueHelper.start();
return blockingQueueHelper;
}
这里create的第二参数可以为空,会根据是否为空调用不同的回调方法 第一参数是一个原生队列,一般常用的是LinkedBlockingQueue(链表)和ArrayBlockingQueue(数组) 默认LinkedBlockingQueue,需要注意的是 LinkedBlockingQueue 因为数据结构的原因每次都会创建和销毁节点,gc压力相对较高,内存消耗较小; ArrayBlockingQueue 是固定的内存大小,gc压力相对没前者大。 调用示例如下:
blockingQueueHelper.put((dcJdbcTemplate) ->
((JdbcTemplate) dcJdbcTemplate).batchUpdate(
"INSERT IGNORE dc_log_condition_course " +
"(" +
"id, largeClass, middleClass, " +
"smallClass, courseId, courseName, " +
"hourId, hourName, extraField," +
"createTime" +
") " +
"VALUES " +
"(" +
"?,?,?," +
"?,?,?," +
"?,?,?," +
"?" +
")"
, courseList
));
本博客内所有原创和翻译的文章的版权归本人所有,允许第三方转载,但转载时请务必保留作者名,并注明出处链接,否则本人将保留追究其法律责任的权利。
「人生在世,留句话给我吧」