这里举一个栗子,我们对一个资源进行加锁,可是又要进行细粒度的控制,该如何实现呢?
比如我们开了了个餐馆。餐馆有一个厨房,服务员可以通知厨房进行做菜,当前冰箱里有菜时,厨房就会开始做菜,冰箱里没菜则会等待。
/** * Created by Anur IjuoKaruKas on 6/28/2018 */@SuppressWarnings("Duplicates")public class Restaurant { private final Lock kitchen = new ReentrantLock(); private ConcurrentLinkedDequemeetFridge = new ConcurrentLinkedDeque<>();// 肉冰箱 public Runnable cockMeet() { return new Runnable() { @Override public void run() { synchronized (kitchen) { System.out.println("通知厨房做肉"); if (meetFridge.isEmpty()) { try { System.out.println("冰箱没有肉了,等待冰箱有肉"); kitchen.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } String meetNeedToCock = meetFridge.getFirst(); System.out.println("正在炒" + meetNeedToCock); } } }; } public Runnable buySomething() { return new Runnable() { @Override public void run() { synchronized (kitchen) { System.out.println("进货了"); meetFridge.addLast("牛肉"); kitchen.notify(); } } }; } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); Restaurant restaurant = new Restaurant(); executorService.execute(restaurant.cockMeet()); executorService.execute(restaurant.cockMeet()); Thread.sleep(1000); executorService.execute(restaurant.buySomething()); Thread.sleep(1000); executorService.execute(restaurant.buySomething()); Thread.sleep(1000); executorService.execute(restaurant.buySomething()); executorService.execute(restaurant.cockMeet()); }}
运行一下main方法,可以得到以下输出:
通知厨房做肉冰箱没有肉了,等待冰箱有肉通知厨房做肉冰箱没有肉了,等待冰箱有肉进货了正在炒牛肉进货了正在炒牛肉进货了通知厨房做肉正在炒牛肉
到这里是没有什么问题的。
进来了一个新需求,一个刚好可以用上Condition的新需求
现在我们既需要做肉,也需要做菜。 也就是说: 1、服务员通知了厨房,需要做一个肉和一个菜。这个时候厨房正好没库存,厨房进行了等待。 2、这时候某人去菜市场买了菜回来,厨房开始做菜。 3、过了一段时间 4、某人去菜市场买了肉回来,厨房开始做肉。
这样的一个需求,当然用其他方式实现也是可以的,但如果使用 Condition来实现,它将变得异常简单。
/** * Created by Anur IjuoKaruKas on 6/28/2018 */@SuppressWarnings("Duplicates")public class Restaurant { private final Lock kitchen = new ReentrantLock(); private final Condition waitMeet = kitchen.newCondition(); private final Condition waitVege = kitchen.newCondition(); private ConcurrentLinkedDequemeetFridge = new ConcurrentLinkedDeque<>();// 肉冰箱 private ConcurrentLinkedDeque vegeFridge = new ConcurrentLinkedDeque<>();// 菜冰箱 public Runnable cockMeet() { return new Runnable() { @Override public void run() { kitchen.lock(); try { System.out.println("通知厨房做肉"); if (meetFridge.isEmpty()) { try { System.out.println("冰箱没有肉了,等待冰箱有肉"); waitMeet.await(); // 直接调用condition的wait方法 } catch (InterruptedException e) { e.printStackTrace(); } } String meetNeedToCock = meetFridge.getFirst(); System.out.println("正在炒" + meetNeedToCock); } catch (Exception e) { e.printStackTrace(); } finally { kitchen.unlock(); } } }; } public Runnable cockVege() { return new Runnable() { @Override public void run() { kitchen.lock(); try { System.out.println("通知厨房做菜"); if (vegeFridge.isEmpty()) { try { System.out.println("冰箱没有菜了,等待冰箱有菜"); waitVege.await(); // 直接调用condition的wait方法 } catch (InterruptedException e) { e.printStackTrace(); } } String meetNeedToCock = vegeFridge.getFirst(); System.out.println("正在炒" + meetNeedToCock); } catch (Exception e) { e.printStackTrace(); } finally { kitchen.unlock(); } } }; } public Runnable buySomething() { return new Runnable() { @Override public void run() { kitchen.lock(); try { Random random = new Random(); if (random.nextBoolean()) { System.out.println("肉进货了"); meetFridge.addLast("牛肉"); waitMeet.signal(); } else { System.out.println("菜进货了"); vegeFridge.addLast("苦瓜"); waitVege.signal(); } } catch (Exception e) { e.printStackTrace(); } finally { kitchen.unlock(); } } }; } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); Restaurant restaurant = new Restaurant(); executorService.execute(restaurant.cockMeet()); executorService.execute(restaurant.cockVege()); executorService.execute(restaurant.buySomething()); }}
最后输出:
通知厨房做肉冰箱没有肉了,等待冰箱有肉通知厨房做菜冰箱没有菜了,等待冰箱有菜肉进货了正在炒牛肉
可见我们可以针对情况对不同的行为进行通知,这就是condition的力量。
提高篇
这里就不瞎扯场景了,直接上代码。
这是仿kafka BufferPool的一种思路,(当然没kafka实现的那么复杂),它的思路是使用一个队列来管理等待的线程。 每次线程进来sout(),都进行等待 满足一定的条件时,mission()会通知队头的一个线程进行操作。
/** * Created by Anur IjuoKaruKas on 6/25/2018 */public class Task { private Dequewaiters = new ArrayDeque<>(); private Lock lock = new ReentrantLock(); private Integer count = 0; private void sout(String str) { this.lock.lock(); try { System.out.println("sout " + str + " get the lock"); Condition condition = this.lock.newCondition(); waiters.addLast(condition); condition.await(); Condition conditionFromWaiters = waiters.removeFirst(); if (conditionFromWaiters != condition) { System.out.println("???????"); } System.out.println("Test Task: " + str); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private void mission() { this.lock.lock(); try { System.out.println("mission get the lock"); while (count < 10) { count++; } Condition condition = waiters.peekFirst(); if (condition != null) { condition.signal(); } count = 0; } finally { lock.unlock(); } } public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(10); final Task task = new Task(); for (int i = 0; i < 1000000; i++) { final int finalI = i; executorService.execute(new Runnable() { @Override public void run() { task.sout(finalI + ""); } }); executorService.execute(new Runnable() { @Override public void run() { task.mission(); } }); } }}