打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
javaWeb 使用线程池+队列解决“订单并发”问题

遇到问题:

最近做微信支付,项目上线一阵,发现一个问题。有一条订单流水居然在数据库的出现两次。这个问题非常严重。

查看微信回调系统的接口代码发现代码是没错的(正常情况下),而这次遇到非正常情况了

原因:微信支付成功后回调我们系统接口在极短时间回调了2次,微信官方文档说明了,是最短15s回调一次。

前几天微信支付抽风了,可能业务出现了波动。

简单来说就是在并发情况下没有做数据唯一性处理,不管怎么样这类并发情况都是有必要的处理。

 

 

解决方式:使用线程池+队列

项目基于Spring,如果不用spring需要自己把

ThreadPoolManager.java

改成单例模式

 

1.写一个Controller(Spring mvc)

/** * @author HeyS1 * @date 2016/12/1 * @description */@Controllerpublic class ThreadPoolController {    @Autowired    ThreadPoolManager tpm;    @RequestMapping("/pool")    public    @ResponseBody    Object test() {        for (int i = 0; i < 500; i++) {            //模拟并发500条记录            tpm.processOrders(Integer.toString(i));        }        return "ok";    }}

 

2.线程池管理

/** * @author HeyS1 * @date 2016/12/1 * @description threadPool订单线程池, 处理订单 * scheduler 调度线程池 用于处理订单线程池由于超出线程范围和队列容量而不能处理的订单 */@Componentpublic class ThreadPoolManager implements BeanFactoryAware {    private static Logger log = LoggerFactory.getLogger(ThreadPoolManager.class);    private BeanFactory factory;//用于从IOC里取对象    // 线程池维护线程的最少数量    private final static int CORE_POOL_SIZE = 2;    // 线程池维护线程的最大数量    private final static int MAX_POOL_SIZE = 10;    // 线程池维护线程所允许的空闲时间    private final static int KEEP_ALIVE_TIME = 0;    // 线程池所使用的缓冲队列大小    private final static int WORK_QUEUE_SIZE = 50;    // 消息缓冲队列    Queue<Object> msgQueue = new LinkedList<Object>();    //用于储存在队列中的订单,防止重复提交    Map<String, Object> cacheMap = new ConcurrentHashMap<>();    //由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序    final RejectedExecutionHandler handler = new RejectedExecutionHandler() {        @Override        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {            //System.out.println("太忙了,把该订单交给调度线程池逐一处理" + ((DBThread) r).getMsg());            msgQueue.offer(((DBThread) r).getMsg());        }    };    // 订单线程池    final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(            CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,            TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);    // 调度线程池。此线程池支持定时以及周期性执行任务的需求。    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);    // 访问消息缓存的调度线程,每秒执行一次    // 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中    final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            if (!msgQueue.isEmpty()) {                if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {                    System.out.print("调度:");                    String orderId = (String) msgQueue.poll();                    DBThread accessDBThread = (DBThread) factory.getBean("dBThread");                    accessDBThread.setMsg(orderId);                    threadPool.execute(accessDBThread);                }                // while (msgQueue.peek() != null) {                // }            }        }    }, 0, 1, TimeUnit.SECONDS);    //终止订单线程池+调度线程池    public void shutdown() {        //true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止        System.out.println(taskHandler.cancel(false));        scheduler.shutdown();        threadPool.shutdown();    }    public Queue<Object> getMsgQueue() {        return msgQueue;    }    //将任务加入订单线程池    public void processOrders(String orderId) {        if (cacheMap.get(orderId) == null) {            cacheMap.put(orderId,new Object());            DBThread accessDBThread = (DBThread) factory.getBean("dBThread");            accessDBThread.setMsg(orderId);            threadPool.execute(accessDBThread);        }    }    //BeanFactoryAware    @Override    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {        factory = beanFactory;    }}

3.线程池中工作的线程

//线程池中工作的线程@Component@Scope("prototype")//spring 多例public class DBThread implements Runnable {    private String msg;    private Logger log = LoggerFactory.getLogger(DBThread.class);    @Autowired    SystemLogService systemLogService;    @Override    public void run() {        //模拟在数据库插入数据        Systemlog systemlog = new Systemlog();        systemlog.setTime(new Date());        systemlog.setLogdescribe(msg);        //systemLogService.insert(systemlog);        log.info("insert->" + msg);    }    public String getMsg() {        return msg;    }    public void setMsg(String msg) {        this.msg = msg;    }}

 

浏览器输入地址127.0.0.1/pool

几秒后关闭tomcat。

模拟500条数据,订单线程池处理了117条。调度线程池处理5条

关闭tomcat,后还有378条未处理(这里的实现需要用到spring监听器)。加起来一共500

OK。完毕

spring监听器,监听tomcat关闭事件:

public class MyApplicationListener implements ApplicationListener<ApplicationEvent> {    @Autowired    ThreadPoolManager threadPoolManager;    @Override    public void onApplicationEvent(ApplicationEvent event) {        if (event instanceof ContextClosedEvent) {            XmlWebApplicationContext x = (XmlWebApplicationContext) event.getSource();            //防止执行两次。root application context 没有parent,他就是老大            if (x.getDisplayName().equals("Root WebApplicationContext")) {                threadPoolManager.shutdown();                Queue q = threadPoolManager.getMsgQueue();                System.out.println("关闭了服务器,还有未处理的信息条数:" + q.size());            }        } else if (event instanceof ContextRefreshedEvent) {//            System.out.println(event.getClass().getSimpleName()+" 事件已发生!");        } else if (event instanceof ContextStartedEvent) {//            System.out.println(event.getClass().getSimpleName()+" 事件已发生!");        } else if (event instanceof ContextStoppedEvent) {//            System.out.println(event.getClass().getSimpleName()+" 事件已发生!");        } else {//            System.out.println("有其它事件发生:"+event.getClass().getName());        }    }}

spring配置一下

<bean id="springStartListener" class="com.temp.MyApplicationListener"></bean>
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
使用java.util.concurrent实现的线程池、消息队列功能-Java技术文档 ...
Android 之 handler 学习 1
handler.post 为什么要将thread对象post到handler中执行呢?
Android Looper和Handler分析
线程池 - Be happy with Java - BlogJava
springboot2.0+线程池+Jmeter以模拟高并发
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服