Concurrency - 快速指南
Java Concurrency - Overview
Java是一种multi-threaded programming language ,这意味着我们可以使用Java开发多线程程序。 多线程程序包含两个或多个可以并发运行的部分,每个部分可以同时处理不同的任务,特别是在计算机有多个CPU时,可以充分利用可用资源。
根据定义,多任务处理是指多个进程共享公共处理资源(如CPU)。 多线程将多任务处理的概念扩展到可以将单个应用程序中的特定操作细分为单个线程的应用程序中。 每个线程可以并行运行。 OS不仅在不同的应用程序之间划分处理时间,而且在应用程序中的每个线程之间划分处理时间。
多线程使您能够以多种活动可以在同一程序中同时进行的方式进行编写。
线程的生命周期
线程在其生命周期中经历各个阶段。 例如,一个线程诞生,启动,运行,然后死亡。 下图显示了线程的完整生命周期。
以下是生命周期的各个阶段 -
New - 一个新线程在新状态下开始其生命周期。 它一直处于这种状态,直到程序启动线程。 它也被称为born thread 。
Runnable - 启动新生成的线程后,线程变为可运行。 处于此状态的线程被视为正在执行其任务。
Waiting - 有时,当线程等待另一个线程执行任务时,线程转换到等待状态。 只有当另一个线程通知等待线程继续执行时,线程才会转换回可运行状态。
Timed Waiting - 可运行的线程可以在指定的时间间隔内进入定时等待状态。 当该时间间隔到期或者等待的事件发生时,处于此状态的线程转换回可运行状态。
Terminated (Dead) - 可运行线程在完成任务或以其他方式终止时进入终止状态。
线程优先级
每个Java线程都有一个优先级,可帮助操作系统确定线程的调度顺序。
Java线程优先级在MIN_PRIORITY(常量为1)和MAX_PRIORITY(常量为10)之间。 默认情况下,每个线程都被赋予优先级NORM_PRIORITY(常量为5)。
具有较高优先级的线程对程序更重要,应在较低优先级的线程之前分配处理器时间。 但是,线程优先级不能保证线程执行的顺序,并且非常依赖于平台。
通过实现Runnable接口创建一个线程
如果您的类要作为线程执行,那么您可以通过实现Runnable接口来实现此目的。 您需要遵循三个基本步骤 -
Step 1
作为第一步,您需要实现Runnable接口提供的run()方法。 此方法为线程提供了一个入口点,您将把完整的业务逻辑放在此方法中。 以下是run()方法的简单语法 -
public void run( )
Step 2
作为第二步,您将使用以下构造函数实例化Thread对象 -
Thread(Runnable threadObj, String threadName);
其中, threadObj是实现Runnable接口的类的实例, threadName是为新线程指定的名称。
Step 3
一旦创建了Thread对象,就可以通过调用start()方法来启动它,该方法执行对run()方法的调用。 以下是start()方法的简单语法 -
void start();
Example
这是一个创建新线程并开始运行它的示例 -
class RunnableDemo implements Runnable {
private Thread t;
private String threadName;
RunnableDemo(String name) {
threadName = name;
System.out.println("Creating " + threadName );
}
public void run() {
System.out.println("Running " + threadName );
try {
for(int i = 4; i > 0; i--) {
System.out.println("Thread: " + threadName + ", " + i);
// Let the thread sleep for a while.
Thread.sleep(50);
}
} catch (InterruptedException e) {
System.out.println("Thread " + threadName + " interrupted.");
}
System.out.println("Thread " + threadName + " exiting.");
}
public void start () {
System.out.println("Starting " + threadName );
if (t == null) {
t = new Thread (this, threadName);
t.start ();
}
}
}
public class TestThread {
public static void main(String args[]) {
RunnableDemo R1 = new RunnableDemo("Thread-1");
R1.start();
RunnableDemo R2 = new RunnableDemo("Thread-2");
R2.start();
}
}
这将产生以下结果 -
Output
Creating Thread-1
Starting Thread-1
Creating Thread-2
Starting Thread-2
Running Thread-1
Thread: Thread-1, 4
Running Thread-2
Thread: Thread-2, 4
Thread: Thread-1, 3
Thread: Thread-2, 3
Thread: Thread-1, 2
Thread: Thread-2, 2
Thread: Thread-1, 1
Thread: Thread-2, 1
Thread Thread-1 exiting.
Thread Thread-2 exiting.
通过扩展线程类来创建线程
创建线程的第二种方法是使用以下两个简单步骤创建一个扩展Thread类的新类。 此方法在处理使用Thread类中的可用方法创建的多个线程时提供了更大的灵活性。
Step 1
您将需要覆盖Thread类中可用的run( )方法。 此方法为线程提供了一个入口点,您将把完整的业务逻辑放在此方法中。 以下是run()方法的简单语法 -
public void run( )
Step 2
一旦创建了Thread对象,就可以通过调用start()方法来启动它,该方法执行对run()方法的调用。 以下是start()方法的简单语法 -
void start( );
Example
这是前面的程序重写以扩展线程 -
class ThreadDemo extends Thread {
private Thread t;
private String threadName;
ThreadDemo(String name) {
threadName = name;
System.out.println("Creating " + threadName );
}
public void run() {
System.out.println("Running " + threadName );
try {
for(int i = 4; i > 0; i--) {
System.out.println("Thread: " + threadName + ", " + i);
// Let the thread sleep for a while.
Thread.sleep(50);
}
} catch (InterruptedException e) {
System.out.println("Thread " + threadName + " interrupted.");
}
System.out.println("Thread " + threadName + " exiting.");
}
public void start () {
System.out.println("Starting " + threadName );
if (t == null) {
t = new Thread (this, threadName);
t.start ();
}
}
}
public class TestThread {
public static void main(String args[]) {
ThreadDemo T1 = new ThreadDemo("Thread-1");
T1.start();
ThreadDemo T2 = new ThreadDemo("Thread-2");
T2.start();
}
}
这将产生以下结果 -
Output
Creating Thread-1
Starting Thread-1
Creating Thread-2
Starting Thread-2
Running Thread-1
Thread: Thread-1, 4
Running Thread-2
Thread: Thread-2, 4
Thread: Thread-1, 3
Thread: Thread-2, 3
Thread: Thread-1, 2
Thread: Thread-2, 2
Thread: Thread-1, 1
Thread: Thread-2, 1
Thread Thread-1 exiting.
Thread Thread-2 exiting.
Java Concurrency - Environment Setup
在本章中,我们将讨论为Java建立适宜环境的不同方面。
本地环境设置 (Local Environment Setup)
如果您仍然愿意为Java编程语言设置环境,那么本节将指导您如何在计算机上下载和设置Java。 以下是设置环境的步骤。
Java SE可从链接Download Java免费获得。 您可以根据您的操作系统下载版本。
按照说明下载Java并运行.exe以在您的计算机上安装Java。 在计算机上安装Java后,需要设置环境变量以指向正确的安装目录 -
设置Windows的路径
假设您已在c:\Program Files\java\jdk目录中安装了Java -
右键单击“我的电脑”,然后选择“属性”。
单击“高级”选项卡下的“环境变量”按钮。
现在,更改'Path'变量,使其也包含Java可执行文件的路径。 例如,如果路径当前设置为“C:\WINDOWS\SYSTEM32”,则将路径更改为“C:\WINDOWS\SYSTEM32; c:\Program Files\java\jdk\bin”。
设置Linux,UNIX,Solaris,FreeBSD的路径
应将环境变量PATH设置为指向已安装Java二进制文件的位置。 如果您在执行此操作时遇到问题,请参阅您的shell文档。
例如,如果你使用bash作为shell,那么你可以将以下行添加到'.bashrc的末尾:export PATH =/path/to/java:$ PATH'
流行的Java编辑器 (Popular Java Editors)
要编写Java程序,您需要一个文本编辑器。 市场上还有更复杂的IDE。 但就目前而言,您可以考虑以下其中一项 -
Notepad - 在Windows机器上,您可以使用任何简单的文本编辑器,如记事本(本教程推荐),TextPad。
Netbeans - 一个开源且免费的Java IDE,可从https://netbeans.org/index.html下载。
Eclipse - 由eclipse开源社区开发的Java IDE,可以从https://www.eclipse.org/下载。
Java Concurrency - Major Operations
Core Java提供对多线程程序的完全控制。 您可以开发一个多线程程序,可以根据您的要求完全暂停,恢复或停止。 您可以在线程对象上使用各种静态方法来控制它们的行为。 下表列出了这些方法 -
Sr.No. | 方法和描述 |
---|---|
1 | public void suspend() 此方法将线程置于挂起状态,并可以使用resume()方法恢复。 |
2 | public void stop() 此方法完全停止线程。 |
3 | public void resume() 此方法恢复使用suspend()方法挂起的线程。 |
4 | public void wait() 导致当前线程等待,直到另一个线程调用notify()。 |
5 | public void notify() 唤醒正在此对象监视器上等待的单个线程。 |
请注意,最新版本的Java已弃用suspend(),resume()和stop()方法,因此您需要使用可用的替代方法。
例子 (Example)
class RunnableDemo implements Runnable {
public Thread t;
private String threadName;
boolean suspended = false;
RunnableDemo(String name) {
threadName = name;
System.out.println("Creating " + threadName );
}
public void run() {
System.out.println("Running " + threadName );
try {
for(int i = 10; i > 0; i--) {
System.out.println("Thread: " + threadName + ", " + i);
// Let the thread sleep for a while.
Thread.sleep(300);
synchronized(this) {
while(suspended) {
wait();
}
}
}
} catch (InterruptedException e) {
System.out.println("Thread " + threadName + " interrupted.");
}
System.out.println("Thread " + threadName + " exiting.");
}
public void start () {
System.out.println("Starting " + threadName );
if (t == null) {
t = new Thread (this, threadName);
t.start ();
}
}
void suspend() {
suspended = true;
}
synchronized void resume() {
suspended = false;
notify();
}
}
public class TestThread {
public static void main(String args[]) {
RunnableDemo R1 = new RunnableDemo("Thread-1");
R1.start();
RunnableDemo R2 = new RunnableDemo("Thread-2");
R2.start();
try {
Thread.sleep(1000);
R1.suspend();
System.out.println("Suspending First Thread");
Thread.sleep(1000);
R1.resume();
System.out.println("Resuming First Thread");
R2.suspend();
System.out.println("Suspending thread Two");
Thread.sleep(1000);
R2.resume();
System.out.println("Resuming thread Two");
} catch (InterruptedException e) {
System.out.println("Main thread Interrupted");
} try {
System.out.println("Waiting for threads to finish.");
R1.t.join();
R2.t.join();
} catch (InterruptedException e) {
System.out.println("Main thread Interrupted");
}
System.out.println("Main thread exiting.");
}
}
上述程序产生以下输出 -
输出 (Output)
Creating Thread-1
Starting Thread-1
Creating Thread-2
Starting Thread-2
Running Thread-1
Thread: Thread-1, 10
Running Thread-2
Thread: Thread-2, 10
Thread: Thread-1, 9
Thread: Thread-2, 9
Thread: Thread-1, 8
Thread: Thread-2, 8
Thread: Thread-1, 7
Thread: Thread-2, 7
Suspending First Thread
Thread: Thread-2, 6
Thread: Thread-2, 5
Thread: Thread-2, 4
Resuming First Thread
Suspending thread Two
Thread: Thread-1, 6
Thread: Thread-1, 5
Thread: Thread-1, 4
Thread: Thread-1, 3
Resuming thread Two
Thread: Thread-2, 3
Waiting for threads to finish.
Thread: Thread-1, 2
Thread: Thread-2, 2
Thread: Thread-1, 1
Thread: Thread-2, 1
Thread Thread-1 exiting.
Thread Thread-2 exiting.
Main thread exiting.
Interthread Communication
如果您了解进程间通信,那么您将很容易理解跨线程通信。 当您开发一个两个或多个线程交换某些信息的应用程序时,Interthread通信很重要。
有三个简单的方法和一个小技巧,使线程通信成为可能。 所有这三种方法都列在下面 -
Sr.No. | 方法和描述 |
---|---|
1 | public void wait() 导致当前线程等待,直到另一个线程调用notify()。 |
2 | public void notify() 唤醒正在此对象监视器上等待的单个线程。 |
3 | public void notifyAll() 唤醒在同一对象上调用wait()的所有线程。 |
这些方法已作为Object中的final方法实现,因此它们可用于所有类。 只能在synchronized上下文中调用所有三种方法。
例子 (Example)
此示例显示了两个线程如何使用wait()和notify()方法进行通信。 您可以使用相同的概念创建复杂的系统。
class Chat {
boolean flag = false;
public synchronized void Question(String msg) {
if (flag) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(msg);
flag = true;
notify();
}
public synchronized void Answer(String msg) {
if (!flag) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(msg);
flag = false;
notify();
}
}
class T1 implements Runnable {
Chat m;
String[] s1 = { "Hi", "How are you ?", "I am also doing fine!" };
public T1(Chat m1) {
this.m = m1;
new Thread(this, "Question").start();
}
public void run() {
for (int i = 0; i < s1.length; i++) {
m.Question(s1[i]);
}
}
}
class T2 implements Runnable {
Chat m;
String[] s2 = { "Hi", "I am good, what about you?", "Great!" };
public T2(Chat m2) {
this.m = m2;
new Thread(this, "Answer").start();
}
public void run() {
for (int i = 0; i < s2.length; i++) {
m.Answer(s2[i]);
}
}
}
public class TestThread {
public static void main(String[] args) {
Chat m = new Chat();
new T1(m);
new T2(m);
}
}
当上述程序被编译并执行时,它会产生以下结果 -
输出 (Output)
Hi
Hi
How are you ?
I am good, what about you?
I am also doing fine!
Great!
以上示例已经采取,然后从[https://stackoverflow.com/questions/2170520/inter-thread-communication-in-java]进行修改
Java Concurrency - Synchronization
具有同步的多线程示例
下面是按顺序打印计数器值的相同示例,每次运行它时,都会产生相同的结果。
例子 (Example)
class PrintDemo {
public void printCount() {
try {
for(int i = 5; i > 0; i--) {
System.out.println("Counter --- " + i );
}
} catch (Exception e) {
System.out.println("Thread interrupted.");
}
}
}
class ThreadDemo extends Thread {
private Thread t;
private String threadName;
PrintDemo PD;
ThreadDemo(String name, PrintDemo pd) {
threadName = name;
PD = pd;
}
public void run() {
synchronized(PD) {
PD.printCount();
}
System.out.println("Thread " + threadName + " exiting.");
}
public void start () {
System.out.println("Starting " + threadName );
if (t == null) {
t = new Thread (this, threadName);
t.start ();
}
}
}
public class TestThread {
public static void main(String args[]) {
PrintDemo PD = new PrintDemo();
ThreadDemo T1 = new ThreadDemo("Thread - 1 ", PD);
ThreadDemo T2 = new ThreadDemo("Thread - 2 ", PD);
T1.start();
T2.start();
// wait for threads to end
try {
T1.join();
T2.join();
} catch (Exception e) {
System.out.println("Interrupted");
}
}
}
每次运行此程序时都会产生相同的结果 -
输出 (Output)
Starting Thread - 1
Starting Thread - 2
Counter --- 5
Counter --- 4
Counter --- 3
Counter --- 2
Counter --- 1
Thread Thread - 1 exiting.
Counter --- 5
Counter --- 4
Counter --- 3
Counter --- 2
Counter --- 1
Thread Thread - 2 exiting.
Java Concurrency - Deadlock
死锁描述了两个或多个线程永远被阻塞,等待彼此的情况。 当多个线程需要相同的锁但以不同的顺序获取它们时,会发生死锁。 Java多线程程序可能会遇到死锁条件,因为synchronized关键字会导致执行线程在等待与指定对象关联的锁定或监视器时阻塞。 这是一个例子。
例子 (Example)
public class TestThread {
public static Object Lock1 = new Object();
public static Object Lock2 = new Object();
public static void main(String args[]) {
ThreadDemo1 T1 = new ThreadDemo1();
ThreadDemo2 T2 = new ThreadDemo2();
T1.start();
T2.start();
}
private static class ThreadDemo1 extends Thread {
public void run() {
synchronized (Lock1) {
System.out.println("Thread 1: Holding lock 1...");
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
System.out.println("Thread 1: Waiting for lock 2...");
synchronized (Lock2) {
System.out.println("Thread 1: Holding lock 1 & 2...");
}
}
}
}
private static class ThreadDemo2 extends Thread {
public void run() {
synchronized (Lock2) {
System.out.println("Thread 2: Holding lock 2...");
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
System.out.println("Thread 2: Waiting for lock 1...");
synchronized (Lock1) {
System.out.println("Thread 2: Holding lock 1 & 2...");
}
}
}
}
}
编译并执行上述程序时,会发现死锁情况,以下是程序产生的输出 -
输出 (Output)
Thread 1: Holding lock 1...
Thread 2: Holding lock 2...
Thread 1: Waiting for lock 2...
Thread 2: Waiting for lock 1...
上面的程序将永远挂起,因为没有任何线程可以继续并等待彼此释放锁定,因此您可以通过按CTRL + C退出程序。
死锁解决方案示例
让我们改变锁定和运行相同程序的顺序,看看两个线程是否仍然相互等待 -
例子 (Example)
public class TestThread {
public static Object Lock1 = new Object();
public static Object Lock2 = new Object();
public static void main(String args[]) {
ThreadDemo1 T1 = new ThreadDemo1();
ThreadDemo2 T2 = new ThreadDemo2();
T1.start();
T2.start();
}
private static class ThreadDemo1 extends Thread {
public void run() {
synchronized (Lock1) {
System.out.println("Thread 1: Holding lock 1...");
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
System.out.println("Thread 1: Waiting for lock 2...");
synchronized (Lock2) {
System.out.println("Thread 1: Holding lock 1 & 2...");
}
}
}
}
private static class ThreadDemo2 extends Thread {
public void run() {
synchronized (Lock1) {
System.out.println("Thread 2: Holding lock 1...");
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
System.out.println("Thread 2: Waiting for lock 2...");
synchronized (Lock2) {
System.out.println("Thread 2: Holding lock 1 & 2...");
}
}
}
}
}
因此,只需更改锁的顺序就可以防止程序进入死锁情况并完成以下结果 -
输出 (Output)
Thread 1: Holding lock 1...
Thread 1: Waiting for lock 2...
Thread 1: Holding lock 1 & 2...
Thread 2: Holding lock 1...
Thread 2: Waiting for lock 2...
Thread 2: Holding lock 1 & 2...
上面的例子只是简单地概念,然而,这是一个复杂的概念,你应该在开发应用程序来处理死锁情况之前深入研究它。
Java Concurrency - ThreadLocal Class
ThreadLocal类用于创建线程局部变量,这些变量只能由同一线程读取和写入。 例如,如果两个线程正在访问引用相同threadLocal变量的代码,则每个线程都不会看到由其他线程完成的对threadLocal变量的任何修改。
ThreadLocal方法
以下是ThreadLocal类中可用的重要方法列表。
Sr.No. | 方法和描述 |
---|---|
1 | public T get() 返回当前线程的此线程局部变量副本中的值。 |
2 | protected T initialValue() 返回此线程局部变量的当前线程的“初始值”。 |
3 | public void remove() 删除此线程局部变量的当前线程值。 |
4 | public void set(T value) 将此线程局部变量的当前线程副本设置为指定值。 |
例子 (Example)
以下TestThread程序演示了ThreadLocal类的一些方法。 这里我们使用了两个计数器变量,一个是普通变量,另一个是ThreadLocal。
class RunnableDemo implements Runnable {
int counter;
ThreadLocal<Integer> threadLocalCounter = new ThreadLocal<Integer>();
public void run() {
counter++;
if(threadLocalCounter.get() != null) {
threadLocalCounter.set(threadLocalCounter.get().intValue() + 1);
} else {
threadLocalCounter.set(0);
}
System.out.println("Counter: " + counter);
System.out.println("threadLocalCounter: " + threadLocalCounter.get());
}
}
public class TestThread {
public static void main(String args[]) {
RunnableDemo commonInstance = new RunnableDemo();
Thread t1 = new Thread(commonInstance);
Thread t2 = new Thread(commonInstance);
Thread t3 = new Thread(commonInstance);
Thread t4 = new Thread(commonInstance);
t1.start();
t2.start();
t3.start();
t4.start();
// wait for threads to end
try {
t1.join();
t2.join();
t3.join();
t4.join();
} catch (Exception e) {
System.out.println("Interrupted");
}
}
}
这将产生以下结果。
输出 (Output)
Counter: 1
threadLocalCounter: 0
Counter: 2
threadLocalCounter: 0
Counter: 3
threadLocalCounter: 0
Counter: 4
threadLocalCounter: 0
您可以看到每个线程增加了counter的值,但threadLocalCounter对于每个线程保持为0。
ThreadLocalRandom Class
java.util.concurrent.ThreadLocalRandom是从jdk 1.7开始引入的实用程序类,当需要多个线程或ForkJoinTasks生成随机数时非常有用。 与Math.random()方法相比,它可以提高性能并减少争用。
ThreadLocalRandom方法
以下是ThreadLocalRandom类中可用的重要方法列表。
Sr.No. | 方法和描述 |
---|---|
1 | public static ThreadLocalRandom current() 返回当前线程的ThreadLocalRandom。 |
2 | protected int next(int bits) 生成下一个伪随机数。 |
3 | public double nextDouble(double n) 返回0(包括)和指定值(不包括)之间的伪随机,均匀分布的double值。 |
4 | public double nextDouble(double least, double bound) 返回给定的最小值(包括)和bound(不包括)之间的伪随机,均匀分布的值。 |
5 | public int nextInt(int least, int bound) 返回给定的最小值(包括)和bound(不包括)之间的伪随机,均匀分布的值。 |
6 | public long nextLong(long n) 返回0(包括)和指定值(不包括)之间的伪随机均匀分布值。 |
7 | public long nextLong(long least, long bound) 返回给定的最小值(包括)和bound(不包括)之间的伪随机,均匀分布的值。 |
8 | public void setSeed(long seed) 抛出UnsupportedOperationException。 |
例子 (Example)
以下TestThread程序演示了Lock接口的一些这些方法。 在这里,我们使用lock()来获取锁定,并使用unlock()来释放锁定。
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ThreadLocalRandom;
public class TestThread {
public static void main(final String[] arguments) {
System.out.println("Random Integer: " + new Random().nextInt());
System.out.println("Seeded Random Integer: " + new Random(15).nextInt());
System.out.println(
"Thread Local Random Integer: " + ThreadLocalRandom.current().nextInt());
final ThreadLocalRandom random = ThreadLocalRandom.current();
random.setSeed(15); //exception will come as seeding is not allowed in ThreadLocalRandom.
System.out.println("Seeded Thread Local Random Integer: " + random.nextInt());
}
}
这将产生以下结果。
输出 (Output)
Random Integer: 1566889198
Seeded Random Integer: -1159716814
Thread Local Random Integer: 358693993
Exception in thread "main" java.lang.UnsupportedOperationException
at java.util.concurrent.ThreadLocalRandom.setSeed(Unknown Source)
at TestThread.main(TestThread.java:21)
这里我们使用ThreadLocalRandom和Random类来获取随机数。
Java Concurrency - Lock Interface
java.util.concurrent.locks.Lock接口用作类似于synchronized块的线程同步机制。 新的锁定机制比同步块更灵活,提供更多选项。 Lock和synchronized块之间的主要区别如下 -
Guarantee of sequence - 同步块不提供任何顺序保证,等待线程将被授予访问权限。 锁接口处理它。
No timeout - 如果未授予锁定,则同步块No timeout选项。 Lock接口提供了这样的选项。
Single method - 同步块必须完全包含在单个方法中,而锁定接口的方法lock()和unlock()可以在不同的方法中调用。
锁定方法
以下是Lock类中可用的重要方法列表。
Sr.No. | 方法和描述 |
---|---|
1 | public void lock() 获得锁。 |
2 | public void lockInterruptibly() 除非当前线程被中断,否则获取锁定。 |
3 | public Condition newCondition() 返回绑定到此Lock实例的新Condition实例。 |
4 | public boolean tryLock() 只有在调用时它是空闲的才能获取锁。 |
5 | public boolean tryLock() 只有在调用时它是空闲的才能获取锁。 |
6 | public boolean tryLock(long time, TimeUnit unit) 如果在给定的等待时间内空闲并且当前线程未被中断,则获取锁。 |
7 | public void unlock() 释放锁定。 |
例子 (Example)
以下TestThread程序演示了Lock接口的一些这些方法。 在这里,我们使用lock()来获取锁定,并使用unlock()来释放锁定。
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class PrintDemo {
private final Lock queueLock = new ReentrantLock();
public void print() {
queueLock.lock();
try {
Long duration = (long) (Math.random() * 10000);
System.out.println(Thread.currentThread().getName()
+ " Time Taken " + (duration/1000) + " seconds.");
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.printf(
"%s printed the document successfully.\n", Thread.currentThread().getName());
queueLock.unlock();
}
}
}
class ThreadDemo extends Thread {
PrintDemo printDemo;
ThreadDemo(String name, PrintDemo printDemo) {
super(name);
this.printDemo = printDemo;
}
@Override
public void run() {
System.out.printf(
"%s starts printing a document\n", Thread.currentThread().getName());
printDemo.print();
}
}
public class TestThread {
public static void main(String args[]) {
PrintDemo PD = new PrintDemo();
ThreadDemo t1 = new ThreadDemo("Thread - 1 ", PD);
ThreadDemo t2 = new ThreadDemo("Thread - 2 ", PD);
ThreadDemo t3 = new ThreadDemo("Thread - 3 ", PD);
ThreadDemo t4 = new ThreadDemo("Thread - 4 ", PD);
t1.start();
t2.start();
t3.start();
t4.start();
}
}
这将产生以下结果。
输出 (Output)
Thread - 1 starts printing a document
Thread - 4 starts printing a document
Thread - 3 starts printing a document
Thread - 2 starts printing a document
Thread - 1 Time Taken 4 seconds.
Thread - 1 printed the document successfully.
Thread - 4 Time Taken 3 seconds.
Thread - 4 printed the document successfully.
Thread - 3 Time Taken 5 seconds.
Thread - 3 printed the document successfully.
Thread - 2 Time Taken 4 seconds.
Thread - 2 printed the document successfully.
我们在这里使用ReentrantLock类作为Lock接口的实现。 ReentrantLock类允许线程锁定方法,即使它已经锁定了其他方法。
Java Concurrency - ReadWriteLock Interface
java.util.concurrent.locks.ReadWriteLock接口允许多个线程一次读取,但一次只能写入一个线程。
Read Lock - 如果没有线程锁定ReadWriteLock进行写入,则多个线程可以访问读锁定。
Write Lock - 如果没有线程在读或写,则一个线程可以访问写锁定。
锁定方法
以下是Lock类中可用的重要方法列表。
Sr.No. | 方法和描述 |
---|---|
1 | public Lock readLock() 返回用于读取的锁。 |
2 | public Lock writeLock() 返回用于写入的锁。 |
例子 (Example)
以下TestThread程序演示了ReadWriteLock接口的这些方法。 这里我们使用readlock()获取read-lock和writeLock()来获取写锁。import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TestThread {
private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private static String message = "a";
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new WriterA());
t1.setName("Writer A");
Thread t2 = new Thread(new WriterB());
t2.setName("Writer B");
Thread t3 = new Thread(new Reader());
t3.setName("Reader");
t1.start();
t2.start();
t3.start();
t1.join();
t2.join();
t3.join();
}
static class Reader implements Runnable {
public void run() {
if(lock.isWriteLocked()) {
System.out.println("Write Lock Present.");
}
lock.readLock().lock();
try {
Long duration = (long) (Math.random() * 10000);
System.out.println(Thread.currentThread().getName()
+ " Time Taken " + (duration/1000) + " seconds.");
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() +": "+ message );
lock.readLock().unlock();
}
}
}
static class WriterA implements Runnable {
public void run() {
lock.writeLock().lock();
try {
Long duration = (long) (Math.random() * 10000);
System.out.println(Thread.currentThread().getName()
+ " Time Taken " + (duration/1000) + " seconds.");
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
message = message.concat("a");
lock.writeLock().unlock();
}
}
}
static class WriterB implements Runnable {
public void run() {
lock.writeLock().lock();
try {
Long duration = (long) (Math.random() * 10000);
System.out.println(Thread.currentThread().getName()
+ " Time Taken " + (duration/1000) + " seconds.");
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
message = message.concat("b");
lock.writeLock().unlock();
}
}
}
}
这将产生以下结果。
输出 (Output)
Writer A Time Taken 6 seconds.
Write Lock Present.
Writer B Time Taken 2 seconds.
Reader Time Taken 0 seconds.
Reader: aab
Java Concurrency - Condition Interface
java.util.concurrent.locks.Condition接口提供了一个线程暂停执行的能力,直到给定的条件为真。 Condition对象必然绑定到Lock并使用newCondition()方法获取。
条件方法
以下是Condition类中可用的重要方法列表。
Sr.No. | 方法和描述 |
---|---|
1 | public void await() 导致当前线程等待,直到发出信号或中断为止。 |
2 | public boolean await(long time, TimeUnit unit) 导致当前线程等待,直到发出信号或中断,或者指定的等待时间过去。 |
3 | public long awaitNanos(long nanosTimeout) 导致当前线程等待,直到发出信号或中断,或者指定的等待时间过去。 |
4 | public long awaitUninterruptibly() 导致当前线程等待直到发出信号。 |
5 | public long awaitUntil() 导致当前线程等待,直到发出信号或中断,或者指定的截止时间过去。 |
6 | public void signal() 唤醒一个等待线程。 |
7 | public void signalAll() 唤醒所有等待的线程。 |
例子 (Example)
以下TestThread程序演示了Condition接口的这些方法。 这里我们使用signal()来通知和await()来挂起线程。import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestThread {
public static void main(String[] args) throws InterruptedException {
ItemQueue itemQueue = new ItemQueue(10);
//Create a producer and a consumer.
Thread producer = new Producer(itemQueue);
Thread consumer = new Consumer(itemQueue);
//Start both threads.
producer.start();
consumer.start();
//Wait for both threads to terminate.
producer.join();
consumer.join();
}
static class ItemQueue {
private Object[] items = null;
private int current = 0;
private int placeIndex = 0;
private int removeIndex = 0;
private final Lock lock;
private final Condition isEmpty;
private final Condition isFull;
public ItemQueue(int capacity) {
this.items = new Object[capacity];
lock = new ReentrantLock();
isEmpty = lock.newCondition();
isFull = lock.newCondition();
}
public void add(Object item) throws InterruptedException {
lock.lock();
while(current >= items.length)
isFull.await();
items[placeIndex] = item;
placeIndex = (placeIndex + 1) % items.length;
++current;
//Notify the consumer that there is data available.
isEmpty.signal();
lock.unlock();
}
public Object remove() throws InterruptedException {
Object item = null;
lock.lock();
while(current <= 0) {
isEmpty.await();
}
item = items[removeIndex];
removeIndex = (removeIndex + 1) % items.length;
--current;
//Notify the producer that there is space available.
isFull.signal();
lock.unlock();
return item;
}
public boolean isEmpty() {
return (items.length == 0);
}
}
static class Producer extends Thread {
private final ItemQueue queue;
public Producer(ItemQueue queue) {
this.queue = queue;
}
@Override
public void run() {
String[] numbers =
{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12"};
try {
for(String number: numbers) {
System.out.println("[Producer]: " + number);
}
queue.add(null);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
static class Consumer extends Thread {
private final ItemQueue queue;
public Consumer(ItemQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
do {
Object number = queue.remove();
System.out.println("[Consumer]: " + number);
if(number == null) {
return;
}
} while(!queue.isEmpty());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
这将产生以下结果。
输出 (Output)
[Producer]: 1
[Producer]: 2
[Producer]: 3
[Producer]: 4
[Producer]: 5
[Producer]: 6
[Producer]: 7
[Producer]: 8
[Producer]: 9
[Producer]: 10
[Producer]: 11
[Producer]: 12
[Consumer]: null
Java Concurrency - AtomicInteger Class
java.util.concurrent.atomic.AtomicInteger类提供对底层int值的操作,可以原子方式读取和写入,还包含高级原子操作。 AtomicInteger支持底层int变量的原子操作。 它具有get和set方法,类似于对volatile变量的读写操作。 也就是说,集合与同一变量上的任何后续获取具有先发生关系。 原子compareAndSet方法也具有这些内存一致性功能。
AtomicInteger方法
以下是AtomicInteger类中可用的重要方法列表。
Sr.No. | 方法和描述 |
---|---|
1 | public int addAndGet(int delta) 以原子方式将给定值添加到当前值。 |
2 | public boolean compareAndSet(int expect, int update) 如果当前值与预期值相同,则以原子方式将值设置为给定的更新值。 |
3 | public int decrementAndGet() 原子地减去当前值的一个。 |
4 | public double doubleValue() 以double形式返回指定数字的值。 |
5 | public float floatValue() 以float形式返回指定数字的值。 |
6 | public int get() 获取当前值。 |
7 | public int getAndAdd(int delta) 原子地将给定值添加到当前值。 |
8 | public int getAndDecrement() 原子地减去当前值的一个。 |
9 | public int getAndIncrement() 原子地将当前值增加1。 |
10 | public int getAndSet(int newValue) 原子设置为给定值并返回旧值。 |
11 | public int incrementAndGet() 原子地将当前值增加1。 |
12 | public int intValue() 以int形式返回指定数字的值。 |
13 | public void lazySet(int newValue) 最终设置为给定值。 |
14 | public long longValue() 以long形式返回指定数字的值。 |
15 | public void set(int newValue) 设置为给定值。 |
16 | public String toString() 返回当前值的String表示形式。 |
17 | public boolean weakCompareAndSet(int expect, int update) 如果当前值与预期值相同,则以原子方式将值设置为给定的更新值。 |
例子 (Example)
以下TestThread程序在基于线程的环境中显示计数器的不安全实现。
public class TestThread {
static class Counter {
private int c = 0;
public void increment() {
c++;
}
public int value() {
return c;
}
}
public static void main(final String[] arguments) throws InterruptedException {
final Counter counter = new Counter();
//1000 threads
for(int i = 0; i < 1000 ; i++) {
new Thread(new Runnable() {
public void run() {
counter.increment();
}
}).start();
}
Thread.sleep(6000);
System.out.println("Final number (should be 1000): " + counter.value());
}
}
根据计算机的速度和线程交错,这可能会产生以下结果。
输出 (Output)
Final number (should be 1000): 1000
例子 (Example)
以下TestThread程序在基于线程的环境中使用AtomicInteger显示计数器的安全实现。import java.util.concurrent.atomic.AtomicInteger;
public class TestThread {
static class Counter {
private AtomicInteger c = new AtomicInteger(0);
public void increment() {
c.getAndIncrement();
}
public int value() {
return c.get();
}
}
public static void main(final String[] arguments) throws InterruptedException {
final Counter counter = new Counter();
//1000 threads
for(int i = 0; i < 1000 ; i++) {
new Thread(new Runnable() {
public void run() {
counter.increment();
}
}).start();
}
Thread.sleep(6000);
System.out.println("Final number (should be 1000): " + counter.value());
}
}
这将产生以下结果。
输出 (Output)
Final number (should be 1000): 1000
Java Concurrency - AtomicLong Class
java.util.concurrent.atomic.AtomicLong类提供对底层long值的操作,可以原子方式读取和写入,还包含高级原子操作。 AtomicLong支持底层长变量的原子操作。 它具有get和set方法,类似于对volatile变量的读写操作。 也就是说,集合与同一变量上的任何后续获取具有先发生关系。 原子compareAndSet方法也具有这些内存一致性功能。
AtomicLong方法
以下是AtomicLong类中可用的重要方法列表。
Sr.No. | 方法和描述 |
---|---|
1 | public long addAndGet(long delta) 以原子方式将给定值添加到当前值。 |
2 | public boolean compareAndSet(long expect, long update) 如果当前值与预期值相同,则以原子方式将值设置为给定的更新值。 |
3 | public long decrementAndGet() 原子地减去当前值的一个。 |
4 | public double doubleValue() 以double形式返回指定数字的值。 |
5 | public float floatValue() 以float形式返回指定数字的值。 |
6 | public long get() 获取当前值。 |
7 | public long getAndAdd(long delta) 原子地将给定值添加到当前值。 |
8 | public long getAndDecrement() 原子地减去当前值的一个。 |
9 | public long getAndIncrement() 原子地将当前值增加1。 |
10 | public long getAndSet(long newValue) 原子设置为给定值并返回旧值。 |
11 | public long incrementAndGet() 原子地将当前值增加1。 |
12 | public int intValue() 以int形式返回指定数字的值。 |
13 | public void lazySet(long newValue) 最终设置为给定值。 |
14 | public long longValue() 以long形式返回指定数字的值。 |
15 | public void set(long newValue) 设置为给定值。 |
16 | public String toString() 返回当前值的String表示形式。 |
17 | public boolean weakCompareAndSet(long expect, long update) 如果当前值与预期值相同,则以原子方式将值设置为给定的更新值。 |
例子 (Example)
以下TestThread程序在基于线程的环境中使用AtomicLong显示计数器的安全实现。
import java.util.concurrent.atomic.AtomicLong;
public class TestThread {
static class Counter {
private AtomicLong c = new AtomicLong(0);
public void increment() {
c.getAndIncrement();
}
public long value() {
return c.get();
}
}
public static void main(final String[] arguments) throws InterruptedException {
final Counter counter = new Counter();
//1000 threads
for(int i = 0; i < 1000 ; i++) {
new Thread(new Runnable() {
public void run() {
counter.increment();
}
}).start();
}
Thread.sleep(6000);
System.out.println("Final number (should be 1000): " + counter.value());
}
}
这将产生以下结果。
输出 (Output)
Final number (should be 1000): 1000
Java Concurrency - AtomicBoolean Class
java.util.concurrent.atomic.AtomicBoolean类提供对底层布尔值的操作,可以原子方式读取和写入,还包含高级原子操作。 AtomicBoolean支持对底层布尔变量的原子操作。 它具有get和set方法,类似于对volatile变量的读写操作。 也就是说,集合与同一变量上的任何后续获取具有先发生关系。 原子compareAndSet方法也具有这些内存一致性功能。
AtomicBoolean方法
以下是AtomicBoolean类中可用的重要方法列表。
Sr.No. | 方法和描述 |
---|---|
1 | public boolean compareAndSet(boolean expect, boolean update) 如果当前值==期望值,则以原子方式将值设置为给定的更新值。 |
2 | public boolean get() 返回当前值。 |
3 | public boolean getAndSet(boolean newValue) 原子设置为给定值并返回先前的值。 |
4 | public void lazySet(boolean newValue) 最终设置为给定值。 |
5 | public void set(boolean newValue) 无条件地设置为给定值。 |
6 | public String toString() 返回当前值的String表示形式。 |
7 | public boolean weakCompareAndSet(boolean expect, boolean update) 如果当前值==期望值,则以原子方式将值设置为给定的更新值。 |
例子 (Example)
以下TestThread程序显示了基于线程的环境中AtomicBoolean变量的用法。
import java.util.concurrent.atomic.AtomicBoolean;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
new Thread("Thread 1") {
public void run() {
while(true) {
System.out.println(Thread.currentThread().getName()
+" Waiting for Thread 2 to set Atomic variable to true. Current value is "
+ atomicBoolean.get());
if(atomicBoolean.compareAndSet(true, false)) {
System.out.println("Done!");
break;
}
}
};
}.start();
new Thread("Thread 2") {
public void run() {
System.out.println(Thread.currentThread().getName() +
", Atomic Variable: " +atomicBoolean.get());
System.out.println(Thread.currentThread().getName() +
" is setting the variable to true ");
atomicBoolean.set(true);
System.out.println(Thread.currentThread().getName() +
", Atomic Variable: " +atomicBoolean.get());
};
}.start();
}
}
这将产生以下结果。
输出 (Output)
Thread 1 Waiting for Thread 2 to set Atomic variable to true. Current value is false
Thread 1 Waiting for Thread 2 to set Atomic variable to true. Current value is false
Thread 1 Waiting for Thread 2 to set Atomic variable to true. Current value is false
Thread 2, Atomic Variable: false
Thread 1 Waiting for Thread 2 to set Atomic variable to true. Current value is false
Thread 2 is setting the variable to true
Thread 2, Atomic Variable: true
Thread 1 Waiting for Thread 2 to set Atomic variable to true. Current value is false
Done!
Java Concurrency - AtomicReference Class
java.util.concurrent.atomic.AtomicReference类提供对底层对象引用的操作,可以原子方式读取和写入,还包含高级原子操作。 AtomicReference支持对底层对象引用变量的原子操作。 它具有get和set方法,类似于对volatile变量的读写操作。 也就是说,集合与同一变量上的任何后续获取具有先发生关系。 原子compareAndSet方法也具有这些内存一致性功能。
原子参考方法
以下是AtomicReference类中可用的重要方法列表。
Sr.No. | 方法和描述 |
---|---|
1 | public boolean compareAndSet(V expect, V update) 如果当前值==期望值,则以原子方式将值设置为给定的更新值。 |
2 | public boolean get() 返回当前值。 |
3 | public boolean getAndSet(V newValue) 原子设置为给定值并返回先前的值。 |
4 | public void lazySet(V newValue) 最终设置为给定值。 |
5 | public void set(V newValue) 无条件地设置为给定值。 |
6 | public String toString() 返回当前值的String表示形式。 |
7 | public boolean weakCompareAndSet(V expect, V update) 如果当前值==期望值,则以原子方式将值设置为给定的更新值。 |
例子 (Example)
以下TestThread程序显示了基于线程的环境中AtomicReference变量的用法。
import java.util.concurrent.atomic.AtomicReference;
public class TestThread {
private static String message = "hello";
private static AtomicReference<String> atomicReference;
public static void main(final String[] arguments) throws InterruptedException {
atomicReference = new AtomicReference<String>(message);
new Thread("Thread 1") {
public void run() {
atomicReference.compareAndSet(message, "Thread 1");
message = message.concat("-Thread 1!");
};
}.start();
System.out.println("Message is: " + message);
System.out.println("Atomic Reference of Message is: " + atomicReference.get());
}
}
这将产生以下结果。
输出 (Output)
Message is: hello
Atomic Reference of Message is: Thread 1
Java Concurrency - AtomicIntegerArray Class
java.util.concurrent.atomic.AtomicIntegerArray类提供对底层int数组的操作,可以原子方式读取和写入,还包含高级原子操作。 AtomicIntegerArray支持对底层int数组变量的原子操作。 它具有get和set方法,类似于对volatile变量的读写操作。 也就是说,集合与同一变量上的任何后续获取具有先发生关系。 原子compareAndSet方法也具有这些内存一致性功能。
AtomicIntegerArray方法
以下是AtomicIntegerArray类中可用的重要方法列表。
Sr.No. | 方法和描述 |
---|---|
1 | public int addAndGet(int i, int delta) 以原子方式将给定值添加到索引i处的元素。 |
2 | public boolean compareAndSet(int i, int expect, int update) 如果当前值==期望值,则以原子方式将位置i处的元素设置为给定的更新值。 |
3 | public int decrementAndGet(int i) 原子地将索引i处的元素减1。 |
4 | public int get(int i) 获取位置i的当前值。 |
5 | public int getAndAdd(int i, int delta) 以原子方式将给定值添加到索引i处的元素。 |
6 | public int getAndDecrement(int i) 原子地将索引i处的元素减1。 |
7 | public int getAndIncrement(int i) 原子地将索引i处的元素增加1。 |
8 | public int getAndSet(int i, int newValue) 以原子方式将位置i处的元素设置为给定值并返回旧值。 |
9 | public int incrementAndGet(int i) 原子地将索引i处的元素增加1。 |
10 | public void lazySet(int i, int newValue) 最终将位置i的元素设置为给定值。 |
11 | public int length() 返回数组的长度。 |
12 | public void set(int i, int newValue) 将位置i处的元素设置为给定值。 |
13 | public String toString() 返回数组当前值的String表示形式。 |
14 | public boolean weakCompareAndSet(int i, int expect, int update) 如果当前值==期望值,则以原子方式将位置i处的元素设置为给定的更新值。 |
例子 (Example)
以下TestThread程序显示了基于线程的环境中AtomicIntegerArray变量的用法。
import java.util.concurrent.atomic.AtomicIntegerArray;
public class TestThread {
private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);
public static void main(final String[] arguments) throws InterruptedException {
for (int i = 0; i<atomicIntegerArray.length(); i++) {
atomicIntegerArray.set(i, 1);
}
Thread t1 = new Thread(new Increment());
Thread t2 = new Thread(new Compare());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Values: ");
for (int i = 0; i<atomicIntegerArray.length(); i++) {
System.out.print(atomicIntegerArray.get(i) + " ");
}
}
static class Increment implements Runnable {
public void run() {
for(int i = 0; i<atomicIntegerArray.length(); i++) {
int add = atomicIntegerArray.incrementAndGet(i);
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", value: "+ add);
}
}
}
static class Compare implements Runnable {
public void run() {
for(int i = 0; i<atomicIntegerArray.length(); i++) {
boolean swapped = atomicIntegerArray.compareAndSet(i, 2, 3);
if(swapped) {
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", value: 3");
}
}
}
}
}
这将产生以下结果。
输出 (Output)
Thread 10, index 0, value: 2
Thread 10, index 1, value: 2
Thread 10, index 2, value: 2
Thread 11, index 0, value: 3
Thread 10, index 3, value: 2
Thread 11, index 1, value: 3
Thread 11, index 2, value: 3
Thread 10, index 4, value: 2
Thread 11, index 3, value: 3
Thread 10, index 5, value: 2
Thread 10, index 6, value: 2
Thread 11, index 4, value: 3
Thread 10, index 7, value: 2
Thread 11, index 5, value: 3
Thread 10, index 8, value: 2
Thread 11, index 6, value: 3
Thread 10, index 9, value: 2
Thread 11, index 7, value: 3
Thread 11, index 8, value: 3
Thread 11, index 9, value: 3
Values:
3 3 3 3 3 3 3 3 3 3
Java Concurrency - AtomicLongArray Class
java.util.concurrent.atomic.AtomicLongArray类提供可以原子方式读取和写入的底层long数组的操作,还包含高级原子操作。 AtomicLongArray支持底层长数组变量的原子操作。 它具有get和set方法,类似于对volatile变量的读写操作。 也就是说,集合与同一变量上的任何后续获取具有先发生关系。 原子compareAndSet方法也具有这些内存一致性功能。
AtomicLongArray方法
以下是AtomicLongArray类中可用的重要方法列表。
Sr.No. | 方法和描述 |
---|---|
1 | public long addAndGet(int i, long delta) 以原子方式将给定值添加到索引i处的元素。 |
2 | public boolean compareAndSet(int i, long expect, long update) 如果当前值==期望值,则以原子方式将位置i处的元素设置为给定的更新值。 |
3 | public long decrementAndGet(int i) 原子地将索引i处的元素减1。 |
4 | public long get(int i) 获取位置i的当前值。 |
5 | public long getAndAdd(int i, long delta) 以原子方式将给定值添加到索引i处的元素。 |
6 | public long getAndDecrement(int i) 原子地将索引i处的元素减1。 |
7 | public long getAndIncrement(int i) 原子地将索引i处的元素增加1。 |
8 | public long getAndSet(int i, long newValue) 以原子方式将位置i处的元素设置为给定值并返回旧值。 |
9 | public long incrementAndGet(int i) 原子地将索引i处的元素增加1。 |
10 | public void lazySet(int i, long newValue) 最终将位置i的元素设置为给定值。 |
11 | public int length() 返回数组的长度。 |
12 | public void set(int i, long newValue) 将位置i处的元素设置为给定值。 |
13 | public String toString() 返回数组当前值的String表示形式。 |
14 | public boolean weakCompareAndSet(int i, long expect, long update) 如果当前值==期望值,则以原子方式将位置i处的元素设置为给定的更新值。 |
例子 (Example)
以下TestThread程序显示了基于线程的环境中AtomicIntegerArray变量的用法。
import java.util.concurrent.atomic.AtomicLongArray;
public class TestThread {
private static AtomicLongArray atomicLongArray = new AtomicLongArray(10);
public static void main(final String[] arguments) throws InterruptedException {
for (int i = 0; i<atomicLongArray.length(); i++) {
atomicLongArray.set(i, 1);
}
Thread t1 = new Thread(new Increment());
Thread t2 = new Thread(new Compare());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Values: ");
for (int i = 0; i<atomicLongArray.length(); i++) {
System.out.print(atomicLongArray.get(i) + " ");
}
}
static class Increment implements Runnable {
public void run() {
for(int i = 0; i<atomicLongArray.length(); i++) {
long add = atomicLongArray.incrementAndGet(i);
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", value: "+ add);
}
}
}
static class Compare implements Runnable {
public void run() {
for(int i = 0; i<atomicLongArray.length(); i++) {
boolean swapped = atomicLongArray.compareAndSet(i, 2, 3);
if(swapped) {
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", value: 3");
}
}
}
}
}
这将产生以下结果。
输出 (Output)
Thread 9, index 0, value: 2
Thread 10, index 0, value: 3
Thread 9, index 1, value: 2
Thread 9, index 2, value: 2
Thread 9, index 3, value: 2
Thread 9, index 4, value: 2
Thread 10, index 1, value: 3
Thread 9, index 5, value: 2
Thread 10, index 2, value: 3
Thread 9, index 6, value: 2
Thread 10, index 3, value: 3
Thread 9, index 7, value: 2
Thread 10, index 4, value: 3
Thread 9, index 8, value: 2
Thread 9, index 9, value: 2
Thread 10, index 5, value: 3
Thread 10, index 6, value: 3
Thread 10, index 7, value: 3
Thread 10, index 8, value: 3
Thread 10, index 9, value: 3
Values:
3 3 3 3 3 3 3 3 3 3
AtomicReferenceArray Class
java.util.concurrent.atomic.AtomicReferenceArray类提供可以原子方式读取和写入的底层引用数组的操作,还包含高级原子操作。 AtomicReferenceArray支持对底层引用数组变量的原子操作。 它具有get和set方法,类似于对volatile变量的读写操作。 也就是说,集合与同一变量上的任何后续获取具有先发生关系。 原子compareAndSet方法也具有这些内存一致性功能。
AtomicReferenceArray方法
以下是AtomicReferenceArray类中可用的重要方法列表。
Sr.No. | 方法和描述 |
---|---|
1 | public boolean compareAndSet(int i, E expect, E update) 如果当前值==期望值,则以原子方式将位置i处的元素设置为给定的更新值。 |
2 | public E get(int i) 获取位置i的当前值。 |
3 | public E getAndSet(int i, E newValue) 以原子方式将位置i处的元素设置为给定值并返回旧值。 |
4 | public void lazySet(int i, E newValue) 最终将位置i的元素设置为给定值。 |
5 | public int length() 返回数组的长度。 |
6 | public void set(int i, E newValue) 将位置i处的元素设置为给定值。 |
7 | public String toString() 返回数组当前值的String表示形式。 |
8 | public boolean weakCompareAndSet(int i, E expect, E update) 如果当前值==期望值,则以原子方式将位置i处的元素设置为给定的更新值。 |
例子 (Example)
以下TestThread程序显示了基于线程的环境中AtomicReferenceArray变量的用法。
import java.util.concurrent.atomic.AtomicReferenceArray;
public class TestThread {
private static String[] source = new String[10];
private static AtomicReferenceArray<String> atomicReferenceArray
= new AtomicReferenceArray<String>(source);
public static void main(final String[] arguments) throws InterruptedException {
for (int i = 0; i<atomicReferenceArray.length(); i++) {
atomicReferenceArray.set(i, "item-2");
}
Thread t1 = new Thread(new Increment());
Thread t2 = new Thread(new Compare());
t1.start();
t2.start();
t1.join();
t2.join();
}
static class Increment implements Runnable {
public void run() {
for(int i = 0; i<atomicReferenceArray.length(); i++) {
String add = atomicReferenceArray.getAndSet(i,"item-"+ (i+1));
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", value: "+ add);
}
}
}
static class Compare implements Runnable {
public void run() {
for(int i = 0; i<atomicReferenceArray.length(); i++) {
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", value: "+ atomicReferenceArray.get(i));
boolean swapped = atomicReferenceArray.compareAndSet(i, "item-2", "updated-item-2");
System.out.println("Item swapped: " + swapped);
if(swapped) {
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", updated-item-2");
}
}
}
}
}
这将产生以下结果。
输出 (Output)
Thread 9, index 0, value: item-2
Thread 10, index 0, value: item-1
Item swapped: false
Thread 10, index 1, value: item-2
Item swapped: true
Thread 9, index 1, value: updated-item-2
Thread 10, index 1, updated-item-2
Thread 10, index 2, value: item-3
Item swapped: false
Thread 10, index 3, value: item-2
Item swapped: true
Thread 10, index 3, updated-item-2
Thread 10, index 4, value: item-2
Item swapped: true
Thread 10, index 4, updated-item-2
Thread 10, index 5, value: item-2
Item swapped: true
Thread 10, index 5, updated-item-2
Thread 10, index 6, value: item-2
Thread 9, index 2, value: item-2
Item swapped: true
Thread 9, index 3, value: updated-item-2
Thread 10, index 6, updated-item-2
Thread 10, index 7, value: item-2
Thread 9, index 4, value: updated-item-2
Item swapped: true
Thread 9, index 5, value: updated-item-2
Thread 10, index 7, updated-item-2
Thread 9, index 6, value: updated-item-2
Thread 10, index 8, value: item-2
Thread 9, index 7, value: updated-item-2
Item swapped: true
Thread 9, index 8, value: updated-item-2
Thread 10, index 8, updated-item-2
Thread 9, index 9, value: item-2
Thread 10, index 9, value: item-10
Item swapped: false
Java Concurrency - Executor Interface
java.util.concurrent.Executor接口是一个支持启动新任务的简单接口。
ExecutorService方法
Sr.No. | 方法和描述 |
---|---|
1 | void execute(Runnable command) 在将来的某个时间执行给定的命令。 |
例子 (Example)
以下TestThread程序显示了基于线程的环境中Executor接口的使用。
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
Executor executor = Executors.newCachedThreadPool();
executor.execute(new Task());
ThreadPoolExecutor pool = (ThreadPoolExecutor)executor;
pool.shutdown();
}
static class Task implements Runnable {
public void run() {
try {
Long duration = (long) (Math.random() * 5);
System.out.println("Running Task!");
TimeUnit.SECONDS.sleep(duration);
System.out.println("Task Completed");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这将产生以下结果。
输出 (Output)
Running Task!
Task Completed
ExecutorService Interface
java.util.concurrent.ExecutorService接口是Executor接口的子接口,并添加了管理生命周期的功能,包括单个任务和执行程序本身。
ExecutorService方法
Sr.No. | 方法和描述 |
---|---|
1 | boolean awaitTermination(long timeout, TimeUnit unit) 阻止所有任务在关闭请求之后完成执行,或发生超时,或者当前线程被中断,以先发生者为准。 |
2 | 《T》 List《Future《T》》 invokeAll(Collection《? extends Callable《T》》 tasks) 执行给定的任务,返回完成所有状态和结果的Futures列表。 |
3 | 《T》 List《Future《T》》 invokeAll(Collection《? extends Callable《T》》 tasks, long timeout, TimeUnit unit) 执行给定的任务,返回一个Futures列表,在完成或超时到期时保持其状态和结果,以先发生者为准。 |
4 | 《T》 T invokeAny(Collection《? extends Callable《T》》 tasks) 执行给定的任务,返回已成功完成的任务的结果(即,不抛出异常),如果有的话。 |
5 | 《T》 T invokeAny(Collection《? extends Callable《T》》 tasks, long timeout, TimeUnit unit) |
6 | boolean isShutdown() 如果此执行程序已关闭,则返回true。 |
7 | boolean isTerminated() 如果关闭后所有任务都已完成,则返回true。 |
8 | void shutdown() 启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。 |
9 | List《Runnable》 shutdownNow() 尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。 |
10 | 《T》 Future《T》 submit(Callable《T》 task) 提交值返回任务以执行并返回表示任务的挂起结果的Future。 |
11 | Future《?》 submit(Runnable task) 提交Runnable任务以执行并返回表示该任务的Future。 |
12 | 《T》 Future《T》 submit(Runnable task, T result) 提交Runnable任务以执行并返回表示该任务的Future。 |
例子 (Example)
以下TestThread程序显示了基于线程的环境中ExecutorService接口的用法。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
executor.submit(new Task());
System.out.println("Shutdown executor");
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.err.println("tasks interrupted");
} finally {
if (!executor.isTerminated()) {
System.err.println("cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}
}
static class Task implements Runnable {
public void run() {
try {
Long duration = (long) (Math.random() * 20);
System.out.println("Running Task!");
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这将产生以下结果。
输出 (Output)
Shutdown executor
Running Task!
shutdown finished
cancel non-finished tasks
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:302)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:328)
at TestThread$Task.run(TestThread.java:39)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
ScheduledExecutorService Interface
java.util.concurrent.ScheduledExecutorService接口是ExecutorService接口的子接口,支持将来和/或定期执行任务。
ScheduledExecutorService方法
Sr.No. | 方法和描述 |
---|---|
1 | 《V》 ScheduledFuture《V》 schedule(Callable《V》 callable, long delay, TimeUnit unit) 创建并执行在给定延迟后变为启用的ScheduledFuture。 |
2 | ScheduledFuture《?》 schedule(Runnable command, long delay, TimeUnit unit) 创建并执行在给定延迟后启用的一次性操作。 |
3 | ScheduledFuture《?》 scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 创建并执行一个周期性操作,该操作在给定的初始延迟后首先启用,随后在给定的时间段内启用; 即执行将在initialDelay之后开始,然后是initialDelay + period,然后是initialDelay + 2 * period,依此类推。 |
4 | ScheduledFuture《?》 scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 创建并执行一个周期性动作,该动作在给定的初始延迟之后首先被启用,并且随后在一次执行的终止和下一次执行的开始之间给定延迟。 |
例子 (Example)
以下TestThread程序显示了基于线程的环境中ScheduledExecutorService接口的用法。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
final ScheduledFuture<?> beepHandler =
scheduler.scheduleAtFixedRate(new BeepTask(), 2, 2, TimeUnit.SECONDS);
scheduler.schedule(new Runnable() {
@Override
public void run() {
beepHandler.cancel(true);
scheduler.shutdown();
}
}, 10, TimeUnit.SECONDS);
}
static class BeepTask implements Runnable {
public void run() {
System.out.println("beep");
}
}
}
这将产生以下结果。
输出 (Output)
beep
beep
beep
beep
newFixedThreadPool Method
可以通过调用Executors类的静态newFixedThreadPool()方法获得固定的线程池。
语法 (Syntax)
ExecutorService fixedPool = Executors.newFixedThreadPool(2);
哪里
最多2个线程将处于活动状态以处理任务。
如果提交的线程超过2个,则它们将保留在队列中,直到线程可用。
如果线程在执行器执行关闭期间因故障而终止,则创建一个新线程以取代它。
任何线程都存在,直到池关闭。
例子 (Example)
以下TestThread程序显示了基于线程的环境中newFixedThreadPool方法的用法。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
// Cast the object to its class type
ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
//Stats before tasks execution
System.out.println("Largest executions: "
+ pool.getLargestPoolSize());
System.out.println("Maximum allowed threads: "
+ pool.getMaximumPoolSize());
System.out.println("Current threads in pool: "
+ pool.getPoolSize());
System.out.println("Currently executing threads: "
+ pool.getActiveCount());
System.out.println("Total number of threads(ever scheduled): "
+ pool.getTaskCount());
executor.submit(new Task());
executor.submit(new Task());
//Stats after tasks execution
System.out.println("Core threads: " + pool.getCorePoolSize());
System.out.println("Largest executions: "
+ pool.getLargestPoolSize());
System.out.println("Maximum allowed threads: "
+ pool.getMaximumPoolSize());
System.out.println("Current threads in pool: "
+ pool.getPoolSize());
System.out.println("Currently executing threads: "
+ pool.getActiveCount());
System.out.println("Total number of threads(ever scheduled): "
+ pool.getTaskCount());
executor.shutdown();
}
static class Task implements Runnable {
public void run() {
try {
Long duration = (long) (Math.random() * 5);
System.out.println("Running Task! Thread Name: " +
Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(duration);
System.out.println("Task Completed! Thread Name: " +
Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这将产生以下结果。
输出 (Output)
Largest executions: 0
Maximum allowed threads: 2
Current threads in pool: 0
Currently executing threads: 0
Total number of threads(ever scheduled): 0
Core threads: 2
Largest executions: 2
Maximum allowed threads: 2
Current threads in pool: 2
Currently executing threads: 1
Total number of threads(ever scheduled): 2
Running Task! Thread Name: pool-1-thread-1
Running Task! Thread Name: pool-1-thread-2
Task Completed! Thread Name: pool-1-thread-2
Task Completed! Thread Name: pool-1-thread-1
newCachedThreadPool Method
可以通过调用Executors类的静态newCachedThreadPool()方法来获取缓存的线程池。
语法 (Syntax)
ExecutorService executor = Executors.newCachedThreadPool();
哪里
newCachedThreadPool方法创建具有可扩展线程池的执行程序。
这样的执行程序适用于启动许多短期任务的应用程序。
例子 (Example)
以下TestThread程序显示了基于线程的环境中newCachedThreadPool方法的用法。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
// Cast the object to its class type
ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
//Stats before tasks execution
System.out.println("Largest executions: "
+ pool.getLargestPoolSize());
System.out.println("Maximum allowed threads: "
+ pool.getMaximumPoolSize());
System.out.println("Current threads in pool: "
+ pool.getPoolSize());
System.out.println("Currently executing threads: "
+ pool.getActiveCount());
System.out.println("Total number of threads(ever scheduled): "
+ pool.getTaskCount());
executor.submit(new Task());
executor.submit(new Task());
//Stats after tasks execution
System.out.println("Core threads: " + pool.getCorePoolSize());
System.out.println("Largest executions: "
+ pool.getLargestPoolSize());
System.out.println("Maximum allowed threads: "
+ pool.getMaximumPoolSize());
System.out.println("Current threads in pool: "
+ pool.getPoolSize());
System.out.println("Currently executing threads: "
+ pool.getActiveCount());
System.out.println("Total number of threads(ever scheduled): "
+ pool.getTaskCount());
executor.shutdown();
}
static class Task implements Runnable {
public void run() {
try {
Long duration = (long) (Math.random() * 5);
System.out.println("Running Task! Thread Name: " +
Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(duration);
System.out.println("Task Completed! Thread Name: " +
Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这将产生以下结果。
输出 (Output)
Largest executions: 0
Maximum allowed threads: 2147483647
Current threads in pool: 0
Currently executing threads: 0
Total number of threads(ever scheduled): 0
Core threads: 0
Largest executions: 2
Maximum allowed threads: 2147483647
Current threads in pool: 2
Currently executing threads: 2
Total number of threads(ever scheduled): 2
Running Task! Thread Name: pool-1-thread-1
Running Task! Thread Name: pool-1-thread-2
Task Completed! Thread Name: pool-1-thread-2
Task Completed! Thread Name: pool-1-thread-1
newScheduledThreadPool Method
可以通过调用Executors类的静态newScheduledThreadPool()方法来获取调度的线程池。
语法 (Syntax)
ExecutorService executor = Executors.newScheduledThreadPool(1);
例子 (Example)
以下TestThread程序显示了在基于线程的环境中使用newScheduledThreadPool方法。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
final ScheduledFuture<?> beepHandler =
scheduler.scheduleAtFixedRate(new BeepTask(), 2, 2, TimeUnit.SECONDS);
scheduler.schedule(new Runnable() {
@Override
public void run() {
beepHandler.cancel(true);
scheduler.shutdown();
}
}, 10, TimeUnit.SECONDS);
}
static class BeepTask implements Runnable {
public void run() {
System.out.println("beep");
}
}
}
这将产生以下结果。
输出 (Output)
beep
beep
beep
beep
newSingleThreadExecutor Method
可以通过调用Executors类的静态newSingleThreadExecutor()方法来获取单个线程池。
语法 (Syntax)
ExecutorService executor = Executors.newSingleThreadExecutor();
其中newSingleThreadExecutor方法创建一次执行单个任务的执行程序。
例子 (Example)
以下TestThread程序显示了基于线程的环境中newSingleThreadExecutor方法的用法。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
executor.submit(new Task());
System.out.println("Shutdown executor");
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.err.println("tasks interrupted");
} finally {
if (!executor.isTerminated()) {
System.err.println("cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}
}
static class Task implements Runnable {
public void run() {
try {
Long duration = (long) (Math.random() * 20);
System.out.println("Running Task!");
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这将产生以下结果。
输出 (Output)
Shutdown executor
Running Task!
shutdown finished
cancel non-finished tasks
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:302)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:328)
at TestThread$Task.run(TestThread.java:39)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
ThreadPoolExecutor Class
java.util.concurrent.ThreadPoolExecutor是一个ExecutorService,它使用可能的几个池化线程之一执行每个提交的任务,通常使用Executors工厂方法配置。 它还提供各种实用程序方法来检查当前线程统计信息并控制它们。
ThreadPoolExecutor方法
Sr.No. | 方法和描述 |
---|---|
1 | protected void afterExecute(Runnable r, Throwable t) 完成给定Runnable的执行后调用的方法。 |
2 | void allowCoreThreadTimeOut(boolean value) 设置管理核心线程是否可以超时并在保持活动时间内没有任务到达时终止的策略,在新任务到达时根据需要进行替换。 |
3 | boolean allowsCoreThreadTimeOut() 如果此池允许核心线程超时并在keepAlive时间内没有任务到达时终止,则返回true,在新任务到达时根据需要进行替换。 |
4 | boolean awaitTermination(long timeout, TimeUnit unit) 阻止所有任务在关闭请求之后完成执行,或发生超时,或者当前线程被中断,以先发生者为准。 |
5 | protected void beforeExecute(Thread t, Runnable r) 在给定线程中执行给定Runnable之前调用的方法。 |
6 | void execute(Runnable command) 将来某个时候执行给定的任务。 |
7 | protected void finalize() 当不再引用此执行程序且没有线程时,调用shutdown。 |
8 | int getActiveCount() 返回正在执行任务的大致线程数。 |
9 | long getCompletedTaskCount() 返回已完成执行的大致任务总数。 |
10 | int getCorePoolSize() 返回核心线程数。 |
11 | long getKeepAliveTime(TimeUnit unit) 返回线程保持活动时间,该时间是超过核心池大小的线程在终止之前保持空闲的时间量。 |
12 | int getLargestPoolSize() 返回同时存在于池中的最大线程数。 |
13 | int getMaximumPoolSize() 返回允许的最大线程数。 |
14 | int getPoolSize() 返回池中当前的线程数。 |
15 | BlockingQueue getQueue() 返回此执行程序使用的任务队列。 |
15 | RejectedExecutionHandler getRejectedExecutionHandler() 返回不可执行任务的当前处理程序。 |
16 | long getTaskCount() 返回已安排执行的大致任务总数。 |
17 | ThreadFactory getThreadFactory() 返回用于创建新线程的线程工厂。 |
18 | boolean isShutdown() 如果此执行程序已关闭,则返回true。 |
19 | boolean isTerminated() 如果关闭后所有任务都已完成,则返回true。 |
20 | boolean isTerminating() 如果此执行程序在shutdown()或shutdownNow()之后终止但尚未完全终止,则返回true。 |
21 | int prestartAllCoreThreads() 启动所有核心线程,导致它们无所事事地等待工作。 |
22 | boolean prestartCoreThread() 启动一个核心线程,导致它无所事事地等待工作。 |
23 | void purge() 尝试从工作队列中删除已取消的所有Future任务。 |
24 | boolean remove(Runnable task) 如果执行程序的内部队列存在,则从执行程序的内部队列中删除此任务,从而导致它在尚未启动时不会运行。 |
25 | void setCorePoolSize(int corePoolSize) 设置核心线程数。 |
26 | void setKeepAliveTime(long time, TimeUnit unit) 设置线程在终止之前可以保持空闲的时间限制。 |
27 | void setMaximumPoolSize(int maximumPoolSize) 设置允许的最大线程数。 |
28 | void setRejectedExecutionHandler(RejectedExecutionHandler handler) 为不可执行的任务设置新的处理程序。 |
29 | void setThreadFactory(ThreadFactory threadFactory) 设置用于创建新线程的线程工厂。 |
30 | void shutdown() 启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。 |
31 | List《Runnable》 shutdownNow() 尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。 |
32 | protected void terminated() Executor终止时调用的方法。 |
33 | String toString() 返回标识此池及其状态的字符串,包括运行状态和估计的工作和任务计数的指示。 |
例子 (Example)
以下TestThread程序显示了基于线程的环境中ThreadPoolExecutor接口的用法。
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
//Stats before tasks execution
System.out.println("Largest executions: "
+ executor.getLargestPoolSize());
System.out.println("Maximum allowed threads: "
+ executor.getMaximumPoolSize());
System.out.println("Current threads in pool: "
+ executor.getPoolSize());
System.out.println("Currently executing threads: "
+ executor.getActiveCount());
System.out.println("Total number of threads(ever scheduled): "
+ executor.getTaskCount());
executor.submit(new Task());
executor.submit(new Task());
//Stats after tasks execution
System.out.println("Core threads: " + executor.getCorePoolSize());
System.out.println("Largest executions: "
+ executor.getLargestPoolSize());
System.out.println("Maximum allowed threads: "
+ executor.getMaximumPoolSize());
System.out.println("Current threads in pool: "
+ executor.getPoolSize());
System.out.println("Currently executing threads: "
+ executor.getActiveCount());
System.out.println("Total number of threads(ever scheduled): "
+ executor.getTaskCount());
executor.shutdown();
}
static class Task implements Runnable {
public void run() {
try {
Long duration = (long) (Math.random() * 5);
System.out.println("Running Task! Thread Name: " +
Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(duration);
System.out.println("Task Completed! Thread Name: " +
Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这将产生以下结果。
输出 (Output)
Largest executions: 0
Maximum allowed threads: 2147483647
Current threads in pool: 0
Currently executing threads: 0
Total number of threads(ever scheduled): 0
Core threads: 0
Largest executions: 2
Maximum allowed threads: 2147483647
Current threads in pool: 2
Currently executing threads: 2
Total number of threads(ever scheduled): 2
Running Task! Thread Name: pool-1-thread-2
Running Task! Thread Name: pool-1-thread-1
Task Completed! Thread Name: pool-1-thread-1
Task Completed! Thread Name: pool-1-thread-2
ScheduledThreadPoolExecutor Class
java.util.concurrent.ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,并且可以另外调度命令以在给定延迟之后运行,或者定期执行。
ScheduledThreadPoolExecutor方法
Sr.No. | 方法和描述 |
---|---|
1 | protected 《V》 RunnableScheduledFuture《V》 decorateTask(Callable《V》 callable, RunnableScheduledFuture《V》 task) 修改或替换用于执行可调用的任务。 |
2 | protected 《V》 RunnableScheduledFuture《V》 decorateTask(Runnable runnable, RunnableScheduledFuture《V》 task) 修改或替换用于执行runnable的任务。 |
3 | void execute(Runnable command) 执行所需延迟为零的命令。 |
4 | boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() 获取有关是否继续执行现有定期任务的策略,即使此执行程序已关闭也是如此。 |
5 | boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() 获取有关是否执行现有延迟任务的策略,即使此执行程序已关闭也是如此。 |
6 | BlockingQueue《Runnable》 getQueue() 返回此执行程序使用的任务队列。 |
7 | boolean getRemoveOnCancelPolicy() 获取有关是否应在取消时立即从工作队列中删除已取消任务的策略。 |
8 | 《V》 ScheduledFuture《V》 schedule(Callable《V》 callable, long delay, TimeUnit unit) 创建并执行在给定延迟后变为启用的ScheduledFuture。 |
9 | ScheduledFuture《?》 schedule(Runnable command, long delay, TimeUnit unit) 创建并执行在给定延迟后启用的一次性操作。 |
10 | ScheduledFuture《?》 scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 创建并执行一个周期性操作,该操作在给定的初始延迟后首先启用,随后在给定的时间段内启用; 即执行将在initialDelay之后开始,然后是initialDelay + period,然后是initialDelay + 2 * period,依此类推。 |
11 | ScheduledFuture《?》 scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 创建并执行一个周期性动作,该动作在给定的初始延迟之后首先被启用,并且随后在一次执行的终止和下一次执行的开始之间给定延迟。 |
12 | void setContinueExistingPeriodicTasksAfterShutdownPolicy (boolean value) 设置是否继续执行现有周期性任务的策略,即使此执行程序已关闭。 |
13 | void setExecuteExistingDelayedTasksAfterShutdownPolicy (boolean value) 设置是否执行现有延迟任务的策略,即使此执行程序已关闭也是如此。 |
14 | void setRemoveOnCancelPolicy(boolean value) 设置关于是否应在取消时立即从工作队列中删除已取消任务的策略。 |
15 | void shutdown() 启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。 |
16 | List《Runnable》 shutdownNow() 尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。 |
17 | 《T》 Future《T》 submit(Callable《T》 task) 提交值返回任务以执行并返回表示任务的挂起结果的Future。 |
18 | Future《?》 submit(Runnable task) 提交Runnable任务以执行并返回表示该任务的Future。 |
19 | 《T》 Future《T》 submit(Runnable task, T result) 提交Runnable任务以执行并返回表示该任务的Future。 |
例子 (Example)
以下TestThread程序显示了基于线程的环境中ScheduledThreadPoolExecutor接口的用法。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
final ScheduledThreadPoolExecutor scheduler =
(ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1);
final ScheduledFuture<?> beepHandler =
scheduler.scheduleAtFixedRate(new BeepTask(), 2, 2, TimeUnit.SECONDS);
scheduler.schedule(new Runnable() {
@Override
public void run() {
beepHandler.cancel(true);
scheduler.shutdown();
}
}, 10, TimeUnit.SECONDS);
}
static class BeepTask implements Runnable {
public void run() {
System.out.println("beep");
}
}
}
这将产生以下结果。
输出 (Output)
beep
beep
beep
beep
Java Concurrency - Futures and Callables
java.util.concurrent.Callable对象可以返回由线程完成的计算结果,而runnable接口只能运行该线程。 Callable对象返回Future对象,该对象提供监视线程正在执行的任务进度的方法。 Future对象可用于检查Callable的状态,然后在线程完成后从Callable中检索结果。 它还提供超时功能。
语法 (Syntax)
//submit the callable using ThreadExecutor
//and get the result as a Future object
Future<Long> result10 = executor.submit(new FactorialService(10));
//get the result using get method of the Future object
//get method waits till the thread execution and then return the result of the execution.
Long factorial10 = result10.get();
例子 (Example)
以下TestThread程序显示了基于线程的环境中Futures和Callables的用法。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException,
ExecutionException {
ExecutorService executor = Executors.newSingleThreadExecutor();
System.out.println("Factorial Service called for 10!");
Future<Long> result10 = executor.submit(new FactorialService(10));
System.out.println("Factorial Service called for 20!");
Future<Long> result20 = executor.submit(new FactorialService(20));
Long factorial10 = result10.get();
System.out.println("10! = " + factorial10);
Long factorial20 = result20.get();
System.out.println("20! = " + factorial20);
executor.shutdown();
}
static class FactorialService implements Callable<Long> {
private int number;
public FactorialService(int number) {
this.number = number;
}
@Override
public Long call() throws Exception {
return factorial();
}
private Long factorial() throws InterruptedException {
long result = 1;
while (number != 0) {
result = number * result;
number--;
Thread.sleep(100);
}
return result;
}
}
}
这将产生以下结果。
输出 (Output)
Factorial Service called for 10!
Factorial Service called for 20!
10! = 3628800
20! = 2432902008176640000
Java Concurrency - Fork-Join framework
fork-join框架允许在几个worker上中断某个任务,然后等待结果将它们组合起来。 它在很大程度上利用了多处理器机器的容量。 以下是fork-join框架中使用的核心概念和对象。
Fork
Fork是一个过程,在这个过程中,任务将自身分成较小且独立的子任务,这些子任务可以同时执行。
语法 (Syntax)
Sum left = new Sum(array, low, mid);
left.fork();
这里Sum是RecursiveTask的子类,left.fork()将任务转换为子任务。
Join
Join是一个任务在子任务完成执行后加入子任务的所有结果的过程,否则它会一直等待。
语法 (Syntax)
left.join();
这里左边是Sum类的一个对象。
ForkJoinPool
它是一个特殊的线程池,设计用于fork-and-join任务拆分。
语法 (Syntax)
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
这是一个新的ForkJoinPool,具有4个并行级别的CPU。
RecursiveAction
RecursiveAction表示不返回任何值的任务。
语法 (Syntax)
class Writer extends RecursiveAction {
@Override
protected void compute() { }
}
RecursiveTask
RecursiveTask表示返回值的任务。
语法 (Syntax)
class Sum extends RecursiveTask<Long> {
@Override
protected Long compute() { return null; }
}
例子 (Example)
以下TestThread程序显示了基于线程的环境中Fork-Join框架的用法。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException,
ExecutionException {
int nThreads = Runtime.getRuntime().availableProcessors();
System.out.println(nThreads);
int[] numbers = new int[1000];
for(int i = 0; i < numbers.length; i++) {
numbers[i] = i;
}
ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length));
System.out.println(result);
}
static class Sum extends RecursiveTask<Long> {
int low;
int high;
int[] array;
Sum(int[] array, int low, int high) {
this.array = array;
this.low = low;
this.high = high;
}
protected Long compute() {
if(high - low <= 10) {
long sum = 0;
for(int i = low; i < high; ++i)
sum += array[i];
return sum;
} else {
int mid = low + (high - low)/2;
Sum left = new Sum(array, low, mid);
Sum right = new Sum(array, mid, high);
left.fork();
long rightResult = right.compute();
long leftResult = left.join();
return leftResult + rightResult;
}
}
}
}
这将产生以下结果。
输出 (Output)
32
499500
Java Concurrency - BlockingQueue Interface
java.util.concurrent.BlockingQueue接口是Queue接口的子接口,并且还支持诸如在检索元素之前等待队列变为非空的操作,并在存储元素之前等待队列中的空间可用。
BlockingQueue方法
Sr.No. | 方法和描述 |
---|---|
1 | boolean add(E e) 如果可以在不违反容量限制的情况下立即执行此操作,则将指定的元素插入此队列,成功时返回true,如果当前没有可用空间则抛出IllegalStateException。 |
2 | boolean contains(Object o) 如果此队列包含指定的元素,则返回true。 |
3 | int drainTo(Collection《? super E》 c) 从此队列中删除所有可用元素,并将它们添加到给定集合中。 |
4 | int drainTo(Collection《? super E》 c, int maxElements) 从该队列中删除最多给定数量的可用元素,并将它们添加到给定集合中。 |
5 | boolean offer(E e) 如果可以在不违反容量限制的情况下立即执行此操作,则将指定的元素插入此队列,成功时返回true,如果当前没有可用空间则返回false。 |
6 | boolean offer(E e, long timeout, TimeUnit unit) 将指定的元素插入此队列,如果需要空间可用,则等待指定的等待时间。 |
7 | E poll(long timeout, TimeUnit unit) 检索并删除此队列的头部,如果元素可用,则等待指定的等待时间。 |
8 | void put(E e) 将指定的元素插入此队列,等待空间变为可用。 |
9 | int remainingCapacity() 返回理想情况下(在没有内存或资源约束的情况下)此队列可以无阻塞地接受的其他元素的数量,如果没有内部限制,则返回Integer.MAX_VALUE。 |
10 | boolean remove(Object o) 从此队列中删除指定元素的单个实例(如果存在)。 |
11 | E take() 检索并删除此队列的头部,必要时等待,直到元素可用。 |
例子 (Example)
以下TestThread程序显示了基于线程的环境中BlockingQueue接口的用法。
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
static class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
Random random = new Random();
try {
int result = random.nextInt(100);
Thread.sleep(1000);
queue.put(result);
System.out.println("Added: " + result);
result = random.nextInt(100);
Thread.sleep(1000);
queue.put(result);
System.out.println("Added: " + result);
result = random.nextInt(100);
Thread.sleep(1000);
queue.put(result);
System.out.println("Added: " + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println("Removed: " + queue.take());
System.out.println("Removed: " + queue.take());
System.out.println("Removed: " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这将产生以下结果。
输出 (Output)
Added: 52
Removed: 52
Added: 70
Removed: 70
Added: 27
Removed: 27
Java Concurrency - ConcurrentMap Interface
java.util.concurrent.ConcurrentMap接口是Map接口的子接口,支持底层映射变量的原子操作。 它具有get和set方法,类似于对volatile变量的读写操作。 也就是说,集合与同一变量上的任何后续获取具有先发生关系。 该接口可确保线程安全性和原子性保证。
ConcurrentMap方法
Sr.No. | 方法和描述 |
---|---|
1 | default V compute(K key, BiFunction《? super K,? super V,? extends V》 remappingFunction) 尝试计算指定键及其当前映射值的映射(如果没有当前映射,则为null)。 |
2 | default V computeIfAbsent(K key, Function《? super K,? extends V》 mappingFunction) 如果指定的键尚未与值关联(或映射为null),则尝试使用给定的映射函数计算其值,并将其输入此映射,除非为null。 |
3 | default V computeIfPresent(K key, BiFunction《? super K,? super V,? extends V》 remappingFunction) 如果指定键的值存在且为非null,则尝试在给定键及其当前映射值的情况下计算新映射。 |
4 | default void forEach(BiConsumer《? super K,? super V》 action) 对此映射中的每个条目执行给定操作,直到处理完所有条目或操作引发异常。 |
5 | default V getOrDefault(Object key, V defaultValue) 返回指定键映射到的值,如果此映射不包含键的映射,则返回defaultValue。 |
6 | default V merge(K key, V value, BiFunction《? super V,? super V,? extends V》 remappingFunction) 如果指定的键尚未与值关联或与null关联,则将其与给定的非空值关联。 |
7 | V putIfAbsent(K key, V value) 如果指定的键尚未与值关联,请将其与给定值相关联。 |
8 | boolean remove(Object key, Object value) 仅当前映射到给定值时才删除键的条目。 |
9 | V replace(K key, V value) 仅当前映射到某个值时才替换键的条目。 |
10 | boolean replace(K key, V oldValue, V newValue) 仅当前映射到给定值时才替换键的条目。 |
11 | default void replaceAll(BiFunction《? super K,? super V,? extends V》 function) 将每个条目的值替换为在该条目上调用给定函数的结果,直到所有条目都已处理或函数抛出异常。 |
例子 (Example)
以下TestThread程序显示了ConcurrentMap与HashMap的使用。
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class TestThread {
public static void main(final String[] arguments) {
Map<String,String> map = new ConcurrentHashMap<String, String>();
map.put("1", "One");
map.put("2", "Two");
map.put("3", "Three");
map.put("5", "Five");
map.put("6", "Six");
System.out.println("Initial ConcurrentHashMap: " + map);
Iterator<String> iterator = map.keySet().iterator();
try {
while(iterator.hasNext()) {
String key = iterator.next();
if(key.equals("3")) {
map.put("4", "Four");
}
}
} catch(ConcurrentModificationException cme) {
cme.printStackTrace();
}
System.out.println("ConcurrentHashMap after modification: " + map);
map = new HashMap<String, String>();
map.put("1", "One");
map.put("2", "Two");
map.put("3", "Three");
map.put("5", "Five");
map.put("6", "Six");
System.out.println("Initial HashMap: " + map);
iterator = map.keySet().iterator();
try {
while(iterator.hasNext()) {
String key = iterator.next();
if(key.equals("3")) {
map.put("4", "Four");
}
}
System.out.println("HashMap after modification: " + map);
} catch(ConcurrentModificationException cme) {
cme.printStackTrace();
}
}
}
这将产生以下结果。
输出 (Output)
Initial ConcurrentHashMap: {1 = One, 2 = Two, 3 = Three, 5 = Five, 6 = Six}
ConcurrentHashMap after modification: {1 = One, 2 = Two, 3 = Three, 4 = Four, 5 = Five, 6 = Six}
Initial HashMap: {1 = One, 2 = Two, 3 = Three, 5 = Five, 6 = Six}
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(Unknown Source)
at java.util.HashMap$KeyIterator.next(Unknown Source)
at TestThread.main(TestThread.java:48)
ConcurrentNavigableMap Interface
java.util.concurrent.ConcurrentNavigableMap接口是ConcurrentMap接口的子接口,并且支持NavigableMap操作,并且递归地支持其可导航的子映射和近似匹配。
ConcurrentMap方法
Sr.No. | 方法和描述 |
---|---|
1 | NavigableSet《K》 descendingKeySet() 返回此映射中包含的键的反向顺序NavigableSet视图。 |
2 | ConcurrentNavigableMap《K,V》 descendingMap() 返回此映射中包含的映射的逆序视图。 |
3 | ConcurrentNavigableMap《K,V》 headMap(K toKey) 返回此映射部分的视图,其键严格小于toKey。 |
4 | ConcurrentNavigableMap《K,V》 headMap(K toKey, boolean inclusive) 返回此映射的部分视图,其键小于(或等于,如果包含为true)toKey。 |
5 | NavigableSet《K》 keySet() 返回此映射中包含的键的NavigableSet视图。 |
6 | NavigableSet《K》 navigableKeySet() 返回此映射中包含的键的NavigableSet视图。 |
7 | ConcurrentNavigableMap《K,V》 subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) 返回此映射部分的视图,其键的范围从fromKey到toKey。 |
8 | ConcurrentNavigableMap《K,V》 subMap(K fromKey, K toKey) 返回此映射部分的视图,其键的范围从fromKey(包含)到toKey(不包括)。 |
9 | ConcurrentNavigableMap《K,V》 tailMap(K fromKey) 返回此映射的部分视图,其键大于或等于fromKey。 |
10 | ConcurrentNavigableMap《K,V》 tailMap(K fromKey, boolean inclusive) 返回此映射部分的视图,其键大于(或等于,如果inclusive为true)fromKey。 |
例子 (Example)
以下TestThread程序显示了ConcurrentNavigableMap的用法。
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class TestThread {
public static void main(final String[] arguments) {
ConcurrentNavigableMap<String,String> map =
new ConcurrentSkipListMap<String, String>();
map.put("1", "One");
map.put("2", "Two");
map.put("3", "Three");
map.put("5", "Five");
map.put("6", "Six");
System.out.println("Initial ConcurrentHashMap: "+map);
System.out.println("HeadMap(\"2\") of ConcurrentHashMap: "+map.headMap("2"));
System.out.println("TailMap(\"2\") of ConcurrentHashMap: "+map.tailMap("2"));
System.out.println(
"SubMap(\"2\", \"4\") of ConcurrentHashMap: "+map.subMap("2","4"));
}
}
这将产生以下结果。
输出 (Output)
Initial ConcurrentHashMap: {1 = One, 2 = Two, 3 = Three, 5 = Five, 6 = Six}
HeadMap("2") of ConcurrentHashMap: {1 = One}
TailMap("2") of ConcurrentHashMap: {2 = Two, 3 = Three, 5 = Five, 6 = Six}
SubMap("2", "4") of ConcurrentHashMap: {2 = Two, 3 = Three}