nginx+lua+redis实现高并发接口(三)

/ 浏览:218 次

使用消费队列防止死锁冲突

在最后一步数据入库时,如果是多线程模式有可能会导致DB死锁,造成数据缺失。 所以依靠jdk8原生队列封装了一个阻塞队列,理论上可以用在缓冲,限流等类似消费场景。 代码不多,大致如下:

public class BlockingQueueHelper<T> {

    private BlockingQueue queue;
    private ThreadFactory threadFactory;
    private int threadCount = 1;
    private int putReTryCount = 5;
    private T executeParam = null;

    private BlockingQueueHelper(BlockingQueue queue,T executeParam) {
        ThreadFactoryBuilder threadFactoryBuilder = ThreadUtil.createThreadFactoryBuilder();
        threadFactoryBuilder.setNamePrefix("blockingQueueExecuteThread");
        threadFactoryBuilder.setDaemon(true);
        threadFactoryBuilder.setPriority(1);
        threadFactory = threadFactoryBuilder.build();
        this.queue = queue ==null ? new LinkedBlockingQueue():queue;
        this.executeParam = executeParam;
    }

    public static <T>BlockingQueueHelper create(T executeParam){
        return new BlockingQueueHelper(null, executeParam);
    }
    public static BlockingQueueHelper create(BlockingQueue queue){
        return new BlockingQueueHelper(queue,null);
    }

    /**
     * create BlockingQueueHelper
     * @param queue 阻塞队列,如果为null,则默认使用 LinkedBlockingQueue
     * @param executeParam 队列消费时传入的参数,必须为线程安全变量
     * @return
     */
    public static <T>BlockingQueueHelper create(BlockingQueue queue,T executeParam){
        return new BlockingQueueHelper(queue,executeParam);
    }

    public void start(){
        Assert.notNull(queue);
        Assert.notNull(threadFactory);
        Assert.checkBetween(threadCount,1,Integer.MAX_VALUE);
        for (int i = 0; i < threadCount; i++) {
            threadFactory.newThread(
                    ()->{
                        for(;;){
                            post();
                        }
                    }
            ).start();
        }
    }

    private void post(){
        try {
            Object postItem = queue.take();
            if (postItem instanceof Execute && executeParam != null) {
                ((Execute) postItem).run(executeParam);
            }else {
                ((Runnable) postItem).run();
            }
        } catch (Exception e) {
            log.error("BlockingQueueHelper execute error ",e);
        }
    }

    public void put(Runnable e,long putTimeOut){
        putItem(e, putTimeOut);
    }
    public void put(Runnable e){
        putItem(e,10000);
    }
    public void put(Execute e,long putTimeOut){
        putItem(e,putTimeOut);
    }
    public void put(Execute e){
        putItem(e,10000);
    }
    private void putItem(Object e, long putTimeOut) {
        int putCount = 0;
        while (putCount++ < putReTryCount){
            try {
                queue.offer(e,putTimeOut,TimeUnit.MILLISECONDS);
                putCount = putReTryCount + 1;
            } catch (InterruptedException e1) {}
        }
    }

    public interface Execute<T>{
        /**
         * 带参数的执行方法(参数在初始化时指定)
         * @param param 初始化时指定的参数
         * @throws Exception 任务执行异常
         */
        public abstract void run(T param) throws Exception;
    }

}

将组件交付给spring管理:

    @Bean
    public BlockingQueueHelper blockingQueueHelper(@Qualifier("dcJdbcTemplate")JdbcTemplate dcJdbcTemplate){
        BlockingQueueHelper blockingQueueHelper =
                BlockingQueueHelper.create(new ArrayBlockingQueue(100), dcJdbcTemplate);
        blockingQueueHelper.start();
        return blockingQueueHelper;
    }

这里create的第二参数可以为空,会根据是否为空调用不同的回调方法 第一参数是一个原生队列,一般常用的是LinkedBlockingQueue(链表)和ArrayBlockingQueue(数组) 默认LinkedBlockingQueue,需要注意的是 LinkedBlockingQueue 因为数据结构的原因每次都会创建和销毁节点,gc压力相对较高,内存消耗较小; ArrayBlockingQueue 是固定的内存大小,gc压力相对没前者大。 调用示例如下:

blockingQueueHelper.put((dcJdbcTemplate) ->
                    ((JdbcTemplate) dcJdbcTemplate).batchUpdate(
                            "INSERT IGNORE dc_log_condition_course " +
                                    "(" +
                                    "id,           largeClass,       middleClass, " +
                                    "smallClass,   courseId,         courseName, " +
                                    "hourId,       hourName,         extraField," +
                                    "createTime" +
                                    ") " +
                                    "VALUES " +
                                    "(" +
                                    "?,?,?," +
                                    "?,?,?," +
                                    "?,?,?," +
                                    "?" +
                                    ")"
                            , courseList
                    ));

如果你想转载,请注明来源或者出处