`
taog
  • 浏览: 1407 次
最近访客 更多访客>>
文章分类
社区版块
存档分类
最新评论

多线程中生产者消费者模式

    博客分类:
  • Java
阅读更多
0 引言
    多线程之间的并发主要涉及到两大问题:互斥与同步。通常来说,互斥的基本是通过锁机制实现的,而同步或者说线程间的协同工作可以通过若干种方式实现。然后究其竟,其本质都是在某个线程执行完之后通知其他线程继续执行。本文将首先简要介绍锁机制,然后介绍Java中实现并发线程间同步的几种方式。
1 互斥
1.1 锁和监视器
在JDK1.5之前,互斥是通过synchronized关键来标识的,线程获取该关键字修饰的对象的互斥排它锁,其本质含义是在线程在访问共享数据前先获取对象监视器 的所有权,然后执行监视区域的代码。而一个对象监视器在一个时间点最多只有被一个线程占有,如果另一个线程试图获取某个正在被占用的监视器,则必须等待直到该监视器被释放后,然后再和其他同样在等待的线程竞争,竞争的结果是只能有一个线程成功获取所有权。需要注意的是在外面等待的线程并不是在先进先出的队列中,而是随机挑选,因此下文称为等待集合 。
代码示例:以下代码是演示通过synchronized关键字实现的线程间的互斥,说明仅当多个线程竞争同一个对象的监视器的才会产生互斥效果。
MutexTest.java
package tt.lab.thread.lock;

import static tt.lab.thread.Util.log;
import static tt.lab.thread.Util.sleep;
import java.util.Date;

public class MutexTest {
    private Object lock;

    public MutexTest(Object lock) {
        this.lock = lock;
    }

    public void test1() {
        synchronized (this) {
            log(new Date()+" test1");
            sleep(5000);
        }
    }

    public synchronized void test2() {
        log(new Date()+" test2");
    }

    public void test3() {
        synchronized (lock) {
            log(new Date()+" test3");
        }
    }
}

上面的Mutex类的构造函数接受一个对象作为锁的宿主对象,并提供三个方法,其中test1和test2指定的是this对象,test3指定的对象是lock,因此test1和test2会产生互斥效果,test3则不与他们竞争锁。
Main.java
package tt.lab.thread.lock;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class Main {
	
	public static void main(String[] args) {
	
		//final SyncTest s = new SyncTest(new Object());
		final LockTest s = new LockTest();
		ExecutorService svc = Executors.newCachedThreadPool();
		svc.execute(new Runnable(){
			@Override
			public void run() {
				s.test1();
			}
		});
		svc.execute(new Runnable(){
			@Override
			public void run() {
				s.test2();
			}
		});
		svc.execute(new Runnable(){
			@Override
			public void run() {
				s.test3();
			}
		});
		svc.shutdown();
	}
	
}

上面采用JDK1.5提供的Executor框架执行多个线程,和使用new Thread方式类似。更多关于Executor的内容详见xxx。

运行日志


小结
由此可见,对象锁


1.2 Lock

JDK1.5之引入的Lock实现了类似的功能,一个锁在同一个时间点也只能被一个线程获取,所有试图访问由该锁保护的共享资源都必须要获取锁。但是和synchronized不同的是,新的锁机制还提供了共享锁和排他锁,共享锁可以同时被多个线程获取。新的锁机制提供了更加灵活的方式以满足不同场景的需求。
1.2.1 ReentrantLock

代码示例:本例仅仅演示了1.1节的功能
Aaa.java


Aaa.java


小结


在单例模式中,为了避免同一个类被实例化多次,需要将该类的构造方法私有化,通过提供一个工厂方法给客户端代码调用以返回该对象实例。
代码示例:简单的singleton模式。


小结
代码示例:优化的singleton模式。


小结

代码示例:简单的singleton模式。


小结

1.2.2 ReentrantReadWriteLock
2 同步
同步是指是多个线程为了达到共同的目标而进行的互相协作。典型的例子就是生产者和消费者模式,该模式是指,存在一个数据队列,多个线程中有一部分是向该队列添加数据,而另一部分线程是从队列中获取数据并处理,处理完毕后将数据从队列中移除。这两部分线程就需要协同工作。比如,为了不让队列无限制增长,通常为队列设置一个容量,当队列达到最大容量时,生产者线程就必须阻塞在那里,停止往队列中添加数据,等待消费者线程将数据从队列中移除。类似地,当队列为空时,消费者线程就必须阻塞,停止移除数据,等待生产者线程往队列里添加数据。为了达到阻塞-通知的效果,Java中可以使用多种方式实现。
2.1 wait和notify方法
在JDK1.5之前,可以通过调用作为锁的对象wait和notify以及notifyAll方法来实现线程间的协作。一个占有监视器 的线程(称为线程A)可以通过调用wait方法暂时将对对象监视器的所有权让出,进入等待队列,这样另外一个线程(称为线程B)就可以就获取该对象监视器并执行监视区域的代码,当线程B执行了notify或notifyAll方法后,并释放了对象监视器后,处在等待集合中的线程A将会被唤醒重新获取监视器并可以继续执行wait下面的语句。
一个线程之所以调用wait方法将自己挂起进入等待集合,通常是因为所需要的数据状态不符合自己的预期,比如在生产者-消费者模式中,数据缓冲队列为空时消费者需要等待,是希望等待生产者往里添加数据后再继续执行。但是由于执行notify方法的线程在调用完notify后仍旧持有对象的监视器,并仍可能修改缓冲队列里数据,抑或者,有第三个线程在等待线程被唤醒但还没执行下面语句的空档期,将数据改变,因此,当一个等待线程被唤醒后,首先要做的是重新检查一下数据的状态,看是否真的满足自己的需求。如果不满足则需要继续等待。
另外还有一点需要注意的是,在线程中调用对象的wait方法目的是该线程释放对该对象监视器的占有权,这意味着该必须已经拥有对象监视器的所有权,否则何来释放?因此,在没有所有权的情况下调用wait,notify以及notifyAll都会抛出IllegalMonitorStateException异常。
代码示例:   
    为了避免过多的线程导致日志庞大难以分析,本例子使用一个生产者和两个消费者,并且缓冲队列的大小设置为1。由于一个生产者的速度较慢,两个消费者会等待。当生产者重新往队列里加入东西后会通知所有等待的线程——这里是指两个等待的生产者。
代码清单1: Main.java


Main.java


Main.java


Main.java


小结:

2.2 lock/condition
在JDK1.5之后,通过平台提供的lock和condition同样可以实现生产者消费者模式。在传统的wait方法被调用后,调用线程进入等待集合,要想重新被唤醒,只能由别的线程调用同一个监视器对象的notify/notifyAll方法,换句话说,一个对象监视器只有一个等待集合。使用Contidion的await和signal/signalAll方法可以达到同样的效果。
代码示例:   
    为了避免过多的线程导致日志庞大难以分析,本例子使用一个生产者和两个消费者,并且缓冲队列的大小设置为1。由于一个生产者的速度较慢,两个消费者会等待。当生产者重新往队列里加入东西后会通知所有等待的线程——这里是指两个等待的生产者。
代码清单1: Main.java


Main.java


Main.java


Main.java


小结:


2.3 lock/conditions
更进一步,通过多个condition可以更灵活地控制哪些线程才需要真正被唤醒。
代码示例:为了避免过多的线程导致日志庞大难以分析,本例子使用一个生产者和两个消费者,并且缓冲队列的大小设置为1。由于一个生产者的速度较慢,两个消费者会等待。当生产者重新往队列里加入东西后会通知所有等待的线程——这里是指两个等待的生产者。
代码清单1: Main.java


Main.java


Main.java


Main.java


小结:
2.4 BlockingQueue
生产者-消费者模式的本质就是生产者们和消费者们根据中间缓存队列的状态来决定自己是否需要阻塞等待,待自己被唤醒后完成了自己的工作后需要唤醒对方。那么如果缓存任务的中间队列在空的时候当消费者试图去取的时候将其阻塞直到非空,同样当队列满的时候生产者试图添加的时候也将其阻塞直到非满,那生产者和消费者就无需直接通知对方。幸运的是,JDK1.5提供的阻塞队列目的就是为了满足此类需求。因此很容易通过阻塞队列实现生产者-消费者模式。
代码示例:为了避免过多的线程导致日志庞大难以分析,本例子使用一个生产者和两个消费者,并且缓冲队列的大小设置为1。由于一个生产者的速度较慢,两个消费者会等待。当生产者重新往队列里加入东西后会通知所有等待的线程——这里是指两个等待的生产者。
代码清单1: Main.java


Main.java


Main.java


Main.java


小结:


3 总结
    本文从多线程的互斥和协作两个角度阐述了线程同步问题,并使用Java平台提供的各种工具实现了生产者-消费者模式。从本质上来说,生产者和消费者之间交互的媒介就是阻塞功能的缓冲队列,因此,一般来说使用BlockingQueue就可以满足需求。但是,从对象内置锁和对监视器的拥有和释放开始,可以更清楚地理解线程间交互时会遇到的常见问题及解决方案。
4 参考文献
[1]
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics