目的
假设我们有一个共享内存区域,上面详细介绍了基本约束。可以保护互斥锁后面的共享数据,在这种情况下,没有两个线程可以同时访问数据。但是,此解决方案不是最理想的,因为读取器R1可能具有锁定,然后另一个读取器R2请求访问。R2在开始自己的读操作之前等到R1完成是愚蠢的。相反,R2应立即开始。这是Reader Writer Lock模式的动机。
说明
维基百科说
在计算机科学中,读写器(rw)或共享独占锁(也称为多读写器/单写器锁或多读写器锁或推送锁)是解决读写器问题的同步原语。rw锁允许对只读操作进行并发访问,而写操作则需要独占访问。这意味着多个线程可以并行读取数据,但写入或修改数据需要一个排它锁。当写入程序正在写入数据时,所有其他写入程序或读取程序都将被阻止,直到写入程序完成写入。常见的用法可能是控制对内存中数据结构的访问,这些数据结构不能自动更新,并且在更新完成之前无效(不应被其他线程读取)。
源代码
此示例使用两个互斥锁来演示多个读取器和写入器的并发访问。
类图
第1步: 创建Reader类,在获取读锁定时读取。
<b>public</b> <b>class</b> Reader implements Runnable { <b>private</b> <b>static</b> <b>final</b> Logger LOGGER = LoggerFactory.getLogger(Reader.<b>class</b>); <b>private</b> Lock readLock; <b>private</b> String name; <b>private</b> <b>long</b> readingTime; <font><i>/** * Create new Reader * * @param name - Name of the thread owning the reader * @param readLock - Lock for this reader * @param readingTime - amount of time (in milliseconds) for this reader to engage reading */</i></font><font> <b>public</b> Reader(String name, Lock readLock, <b>long</b> readingTime) { <b>this</b>.name = name; <b>this</b>.readLock = readLock; <b>this</b>.readingTime = readingTime; } </font><font><i>/** * Create new Reader who reads for 250ms * * @param name - Name of the thread owning the reader * @param readLock - Lock for this reader */</i></font><font> <b>public</b> Reader(String name, Lock readLock) { <b>this</b>(name, readLock, 250L); } @Override <b>public</b> <b>void</b> run() { readLock.lock(); <b>try</b> { read(); } <b>catch</b> (InterruptedException e) { LOGGER.info(</font><font>"InterruptedException when reading"</font><font>, e); Thread.currentThread().interrupt(); } <b>finally</b> { readLock.unlock(); } } </font><font><i>/** * Simulate the read operation * */</i></font><font> <b>public</b> <b>void</b> read() throws InterruptedException { LOGGER.info(</font><font>"{} begin"</font><font>, name); Thread.sleep(readingTime); LOGGER.info(</font><font>"{} finish after reading {}ms"</font><font>, name, readingTime); } } </font>
第2步: Writer类,在获取写锁定时写入。
<b>public</b> <b>class</b> Writer implements Runnable { <b>private</b> <b>static</b> <b>final</b> Logger LOGGER = LoggerFactory.getLogger(Writer.<b>class</b>); <b>private</b> Lock writeLock; <b>private</b> String name; <b>private</b> <b>long</b> writingTime; <font><i>/** * Create new Writer who writes for 250ms * * @param name - Name of the thread owning the writer * @param writeLock - Lock for this writer */</i></font><font> <b>public</b> Writer(String name, Lock writeLock) { <b>this</b>(name, writeLock, 250L); } </font><font><i>/** * Create new Writer * * @param name - Name of the thread owning the writer * @param writeLock - Lock for this writer * @param writingTime - amount of time (in milliseconds) for this reader to engage writing */</i></font><font> <b>public</b> Writer(String name, Lock writeLock, <b>long</b> writingTime) { <b>this</b>.name = name; <b>this</b>.writeLock = writeLock; <b>this</b>.writingTime = writingTime; } @Override <b>public</b> <b>void</b> run() { writeLock.lock(); <b>try</b> { write(); } <b>catch</b> (InterruptedException e) { LOGGER.info(</font><font>"InterruptedException when writing"</font><font>, e); Thread.currentThread().interrupt(); } <b>finally</b> { writeLock.unlock(); } } </font><font><i>/** * Simulate the write operation */</i></font><font> <b>public</b> <b>void</b> write() throws InterruptedException { LOGGER.info(</font><font>"{} begin"</font><font>, name); Thread.sleep(writingTime); LOGGER.info(</font><font>"{} finished after writing {}ms"</font><font>, name, writingTime); } } </font>
第3步: 现在是时候创建ReaderWriterLock类来控制读写器的访问了。
允许多个读取器同时保持锁定,但如果任何写入程序持有锁,则读取器等待。如果读取器持有锁,则编写器等待。这种锁是不公平的。
<b>public</b> <b>class</b> ReaderWriterLock implements ReadWriteLock { <b>private</b> <b>static</b> <b>final</b> Logger LOGGER = LoggerFactory.getLogger(ReaderWriterLock.<b>class</b>); <b>private</b> Object readerMutex = <b>new</b> Object(); <b>private</b> <b>int</b> currentReaderCount; <font><i>/** * Global mutex is used to indicate that whether reader or writer * gets the lock in the moment. * <p> * 1. When it contains the reference of {@link #readerLock}, it means that the lock * is acquired by the reader, another * reader can also do the read operation concurrently. <br> * 2. When it contains the reference of reference of {@link #writerLock}, it means that * the lock is acquired by the * writer exclusively, no more reader or writer can get the lock. * <p> * This is the most important field in this class to control the access for reader/writer. */</i></font><font> <b>private</b> Set<Object> globalMutex = <b>new</b> HashSet<>(); <b>private</b> ReadLock readerLock = <b>new</b> ReadLock(); <b>private</b> WriteLock writerLock = <b>new</b> WriteLock(); @Override <b>public</b> Lock readLock() { <b>return</b> readerLock; } @Override <b>public</b> Lock writeLock() { <b>return</b> writerLock; } </font><font><i>/** * return true when globalMutex hold the reference of writerLock */</i></font><font> <b>private</b> <b>boolean</b> doesWriterOwnThisLock() { <b>return</b> globalMutex.contains(writerLock); } </font><font><i>/** * Nobody get the lock when globalMutex contains nothing * */</i></font><font> <b>private</b> <b>boolean</b> isLockFree() { <b>return</b> globalMutex.isEmpty(); } </font><font><i>/** * Reader Lock, can be access for more than one reader concurrently if no writer get the lock */</i></font><font> <b>private</b> <b>class</b> ReadLock implements Lock { @Override <b>public</b> <b>void</b> lock() { <b>synchronized</b> (readerMutex) { currentReaderCount++; <b>if</b> (currentReaderCount == 1) { acquireForReaders(); } } } </font><font><i>/** * Acquire the globalMutex lock on behalf of current and future concurrent readers. Make sure no writers currently * owns the lock. */</i></font><font> <b>private</b> <b>void</b> acquireForReaders() { </font><font><i>// Try to get the globalMutex lock for the first reader</i></font><font> <b>synchronized</b> (globalMutex) { </font><font><i>// If the no one get the lock or the lock is locked by reader, just set the reference</i></font><font> </font><font><i>// to the globalMutex to indicate that the lock is locked by Reader.</i></font><font> <b>while</b> (doesWriterOwnThisLock()) { <b>try</b> { globalMutex.wait(); } <b>catch</b> (InterruptedException e) { LOGGER.info(</font><font>"InterruptedException while waiting for globalMutex in acquireForReaders"</font><font>, e); Thread.currentThread().interrupt(); } } globalMutex.add(<b>this</b>); } } @Override <b>public</b> <b>void</b> unlock() { <b>synchronized</b> (readerMutex) { currentReaderCount--; </font><font><i>// Release the lock only when it is the last reader, it is ensure that the lock is released</i></font><font> </font><font><i>// when all reader is completely.</i></font><font> <b>if</b> (currentReaderCount == 0) { <b>synchronized</b> (globalMutex) { </font><font><i>// Notify the waiter, mostly the writer</i></font><font> globalMutex.remove(<b>this</b>); globalMutex.notifyAll(); } } } } @Override <b>public</b> <b>void</b> lockInterruptibly() throws InterruptedException { <b>throw</b> <b>new</b> UnsupportedOperationException(); } @Override <b>public</b> <b>boolean</b> tryLock() { <b>throw</b> <b>new</b> UnsupportedOperationException(); } @Override <b>public</b> <b>boolean</b> tryLock(<b>long</b> time, TimeUnit unit) throws InterruptedException { <b>throw</b> <b>new</b> UnsupportedOperationException(); } @Override <b>public</b> Condition newCondition() { <b>throw</b> <b>new</b> UnsupportedOperationException(); } } </font><font><i>/** * Writer Lock, can only be accessed by one writer concurrently */</i></font><font> <b>private</b> <b>class</b> WriteLock implements Lock { @Override <b>public</b> <b>void</b> lock() { <b>synchronized</b> (globalMutex) { </font><font><i>// Wait until the lock is free.</i></font><font> <b>while</b> (!isLockFree()) { <b>try</b> { globalMutex.wait(); } <b>catch</b> (InterruptedException e) { Thread.currentThread().interrupt(); } } </font><font><i>// When the lock is free, acquire it by placing an entry in globalMutex</i></font><font> globalMutex.add(<b>this</b>); } } @Override <b>public</b> <b>void</b> unlock() { <b>synchronized</b> (globalMutex) { globalMutex.remove(<b>this</b>); </font><font><i>// Notify the waiter, other writer or reader</i></font><font> globalMutex.notifyAll(); } } @Override <b>public</b> <b>void</b> lockInterruptibly() throws InterruptedException { <b>throw</b> <b>new</b> UnsupportedOperationException(); } @Override <b>public</b> <b>boolean</b> tryLock() { <b>throw</b> <b>new</b> UnsupportedOperationException(); } @Override <b>public</b> <b>boolean</b> tryLock(<b>long</b> time, TimeUnit unit) throws InterruptedException { <b>throw</b> <b>new</b> UnsupportedOperationException(); } @Override <b>public</b> Condition newCondition() { <b>throw</b> <b>new</b> UnsupportedOperationException(); } } } </font>
第4步: 让我们测试一下这个设计模式。
<b>public</b> <b>class</b> ReaderWriterLockDemo { <b>private</b> <b>static</b> <b>final</b> Logger LOGGER = LoggerFactory.getLogger(App.<b>class</b>); <font><i>/** * Program entry point * * @param args command line args */</i></font><font> <b>public</b> <b>static</b> <b>void</b> main(String[] args) { ExecutorService executeService = Executors.newFixedThreadPool(10); ReaderWriterLock lock = <b>new</b> ReaderWriterLock(); </font><font><i>// Start writers</i></font><font> IntStream.range(0, 5) .forEach(i -> executeService.submit(<b>new</b> Writer(</font><font>"Writer "</font><font> + i, lock.writeLock(), ThreadLocalRandom.current().nextLong(5000)))); LOGGER.info(</font><font>"Writers added..."</font><font>); </font><font><i>// Start readers</i></font><font> IntStream.range(0, 5) .forEach(i -> executeService.submit(<b>new</b> Reader(</font><font>"Reader "</font><font> + i, lock.readLock(), ThreadLocalRandom.current().nextLong(10)))); LOGGER.info(</font><font>"Readers added..."</font><font>); <b>try</b> { Thread.sleep(5000L); } <b>catch</b> (InterruptedException e) { LOGGER.error(</font><font>"Error sleeping before adding more readers"</font><font>, e); Thread.currentThread().interrupt(); } </font><font><i>// Start readers</i></font><font> IntStream.range(6, 10) .forEach(i -> executeService.submit(<b>new</b> Reader(</font><font>"Reader "</font><font> + i, lock.readLock(), ThreadLocalRandom.current().nextLong(10)))); LOGGER.info(</font><font>"More readers added..."</font><font>); </font><font><i>// write operations are exclusive.</i></font><font> executeService.shutdown(); <b>try</b> { executeService.awaitTermination(5, TimeUnit.SECONDS); } <b>catch</b> (InterruptedException e) { LOGGER.error(</font><font>"Error waiting for ExecutorService shutdown"</font><font>, e); Thread.currentThread().interrupt(); } } } </font>
适用性
应用程序需要为多个线程增加资源同步性能,特别是有混合读/写操作。