一、概述
Semaphore(信号量)是一个控制访问多个共享资源的计数器。
当一个线程想要访问某个共享资源,首先,它必须获得 semaphore。如果 semaphore的内部计数器的值大于0,那么 semaphore减少计数器的值并允许访问共享的资源。计数器的值大于0表示,有可以自由使用的资源,所以线程可以访问并使用它们。
另一种情况,如果 semaphore的计数器的值等于0,那么 semaphore让线程进入休眠状态一直到计数器大于0。计数器的值等于0表示全部的共享资源都正被线程们使用,所以此线程想要访问就必须等到某个资源成为自由的。
当线程使用完共享资源时,他必须放出 semaphore为了让其他线程可以访问共享资源。这个操作会增加 semaphore 的内部计数器的值。
示例说明:
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。很多年以来,单纯从字面上很难理解 Semaphore所表达的含义,只能把它比作是控制流量的红绿灯,比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。
二、主要方法
void acquire():从信号量获取一个许可,获取一个许可(如果提供了一个)并立即返回,将可用的许可数减 1。在提供一个许可前一直将线程阻塞,否则线程被中断。
void release():释放一个许可,将其返回给信号量,将可用的许可数增加 1。如果任意线程试图获取许可,则选中一个线程并将刚刚释放的许可给予它。然后针对线程安排目的启用(或再启用)该线程。
int availablePermits():返回此信号量中当前可用的许可数。
boolean hasQueuedThreads():查询是否有线程正在等待获取。
三、实现
eg1: semaphores是用来保护访问一个共享资源的,或者说一个代码片段每次只能被一个线程执行。
import java.util.concurrent.Semaphore;public class PrintQueue { private Semaphore semaphore; public PrintQueue(){ semaphore = new Semaphore(1); } //模拟打印文档 public void printJob (Object document){ try { semaphore.acquire(); //实现能随机等待一段时间的模拟打印文档的行。 long duration = 2000; System.out.println("PrintQueue: Printing a Job:" + Thread.currentThread().getName()); Thread.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } finally{ semaphore.release(); } }}
public class Task implements Runnable{ private PrintQueue printQueue; public Task(PrintQueue printQueue){ this.printQueue = printQueue; } public void run() { System.out.println("Going to print a job:" + Thread. currentThread().getName()); printQueue.printJob(new Object()); System.out.println("The document has been printed:" + Thread.currentThread().getName()); } public static void main(String[] args) { PrintQueue pq = new PrintQueue(); Thread[] threads = new Thread[10]; for(int i=0;i
说明:示例的重点是PrintQueue类的构造方法和初始化Semaphore对象。你传递值1作为此构造方法的参数,那么你就创建了一个binary semaphore。初始值为1,就保护了访问一个共享资源,在例子中是print queue。
当你开始10个threads,当你开始10个threads时,那么第一个获得semaphore的得到critical section的访问权。剩下的线程都会被semaphore阻塞直到那个获得semaphore的线程释放它。当这情况发生,semaphore在等待的线程中选择一个并给予它访问critical section的访问权。全部的任务都会打印文档,只是一个接一个的执行。
eg2:semaphores也可以用来保护多个资源的副本,也就是说当你有一个代码片段每次可以被多个线程执行。
模拟6辆车去泊车,而车位有2个的场景. 当车位满时,出来一辆车,才能有一辆车进入停车.
import java.util.concurrent.Semaphore;/** * 示例模拟6辆车去泊车,而车位有2个的场景. 当车位满时,出来一辆车,才能有一辆车进入停车. */public class ParkTask implements Runnable{ private int carNo; private Semaphore semaphore; public ParkTask(int carNo, Semaphore semaphore){ this.carNo = carNo; this.semaphore = semaphore; } public void run() { try { semaphore.acquire(); //停车操作 parking(); Thread.sleep(2000); semaphore.release(); //离开 leaving(); } catch (InterruptedException e) { e.printStackTrace(); } } // 停车 public void parking(){ System.out.println(String.format("%d号车泊车", carNo)); } // 离开 public void leaving(){ System.out.println(String.format("%d号车离开", carNo)); }}
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class ParkingCars { private static final int CarCount = 6; private static final int SemaphoreCount = 2; public static void main(String[] args) { Semaphore semaphore = new Semaphore(SemaphoreCount, true); ExecutorService service = Executors.newCachedThreadPool(); for(int carNo=1; carNo<=CarCount; carNo++){ service.execute(new ParkTask(carNo, semaphore)); } try { Thread.sleep(3*1000); } catch (InterruptedException e) { e.printStackTrace(); } service.shutdown(); System.out.println(semaphore.availablePermits() + " 个停车位可以用!"); } }
//console结果:2号车泊车1号车泊车2号车离开4号车泊车3号车泊车1号车离开0 个停车位可以用!3号车离开6号车泊车5号车泊车4号车离开6号车离开5号车离开
说明:ParkTask 就是线程,停车就是 在执行,离开表示 线程完成,停车中的休眠就表示 堵塞。当信号量都被占用时,其他的线程 只能处于 等待状态,等待被占用的 线程释放 被占用的 信号量。
四、应用场景
Semaphore可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控。