-
Threads
-
Process
-
Using threads
-
Daemon threads
-
Life cycle and threads states
-
Sleeping, joining and interrupting threads
-
Race conditions
-
Synchronization
-
Monitors and structured locking
-
The volatile keyword
-
Thread local
-
Advanced concurrency APIs
-
Unstructured locks
-
Executor service
-
Thread pool types
-
Callables and Future
-
Semaphores
-
Fork Join framework
- A thread is a single sequential flow of control within a program.
- Single sequential flow of control
- Sequence of programmed instructions that can be managed independently
- Allows the program to split into simultaneously running tasks
- Binary instructions loaded into memory
- Gets access to resources like memory
- (its own stack, heap, registers)
- Resource is protected from other processes
- Does its thing
- Can be started and stopped without affecting others
- Cannot talk to each other (unless mechanisms are coded into them)
- Default execution mode of a process is concurrent
- Has multiple cores
- A process needs some instructions to run in parallel (not sequential)
- Threads
- Unit of execution within a process
- Does the work of a process
- Usually has a shared objective
- Has shared resources like memory, heap storage
- All instructions executed sequentially
- Process can "spawn" threads that could run in parallel
- A single process (JVM)
- Consists of various threads
- Application thread - responsible for running the main method
- Other threads for runtime concerns - like garbage collection
- Created using a language API
- Something that can be run
- Has a run method
- Identify the code you want run in a separate thread
- Put it into a Runnable
- Create a new Thread from that Runnable
- Start the thread
public class MyRunnable implements Runnable {
public void run() {
System.out.println("I am running!");
}
};
Runnable r = new MyRunnable();
Thread t = new Thread(r);
t.start();
- JVM calls the underlying OS threading APls
- When the run method returns
- An exception is thrown
- inline class
Runnable r = new Runnable() {
public void run() {
System.out .printin("Running");
}
}
- lambdas
Runnable r = () -> System. out. printin( "Running");
new Thread(() -> System. out .printin("Running")) .start();
class MyThread extends Thread {
public void run() {
System.out.printin("Running");
}
}
new MyThread() .start() ;
- Runnable is a functional interface
- Lambdas can be used instead of Runnable instances
- Thread can be subclassed to create a Runnable
- Recommend creating a new Runnable instead of subclassing Thread
while (true){
Scanner sc = new Scanner (System. in);
System.out.println("In I can tell you the nth prime number. Enter n: ");
int n = sc.nextInt();
if (n == 0) break;
Runnable r = new Runnable(){
@Override
public void run() {
int number = PrimeNumberUtil. calculatePrime(n);
System.out.println("\n Result:");
System. out. println("In Value of " + n + "th prime: " + number);
}
};
Thread t = new Thread(r);
t.start();
}
- Usual answer:
- When the main method thread ends
- When you spawn threads:
- When the last thread ends
- User threads
- Daemon threads
- The main method
- Your new Thread (runnable).start()
- When all the user threads have exited
Daemon threads
Thread t = new Thread(r);
t.setDaemon(true);
t.start();
- Just created
- Running
- Blocked
- Terminated
- New
- Runnable
- Blocked
- Waiting
- Timed Waiting
- Terminated
- Pausing or waiting for a thread
public static void sleep(long millis) throws InterruptedException
- below is status reporter , which should be a deamon thead.
Runnable statusReporter = () -> {
try {
while (true) {
Thread. sleep (5000);
printThreads(threads);
}
} catch (InterruptedException e) {
System. out .printin("Status report thread interrupted. Ending status updates");
}
};
- Use case: Print a message after the last thread ends
- vs sleep (fixed time)
- "Joins" a certain thread with the currently running thread
myThread. start();
// ...
myThread. join();
// this executes after mvThread has ended
private static void waitForThreads(List<Thread> threads) throws InterruptedException {
for (Thread thread : threads) {
thread. join();
}
}
- Interrupt the thread
- Have it voluntarily clean up and shut down
Thread t = new Thread(r);
t. start();
// ...
t.interrupt();
- Built in blocking operations handle interrupts
try {
Thread. sleep (5000);
}
catch (InterruptedException e) {
System. out .printin("Interrupted!");
}
while (true) {
// Run task
// ...
if (Thread. interrupted()) {
throw new InterruptedException();
}
}
reporterThread. interrupt();
- "Soft" interrupt
- Very different from hardware interrupts
- Depends on what the implementation does
- Operating system process
- Responsible for scheduling a thread to an available processor core
- Unschedules a running thread temporarily as needed
- Tries to be fair
- Honors priorities
- Non deterministic
- Parellelism
- Concurrency
- Can be done with multicore CPUs
- Can be done with single core CPUs
- By definition, needs multiple cores
-
Threads don't share data? No problem
-
Threads share constant (unchanging) data ?No problem
-
Threads read and write to the same data? Problem
- Check-then-act
- Read-modify-write
- Make sure only one thread can "pick up" a data element
- Make sure two threads don't simultaneously access a critical data element
- Data , not Code
- JVM Feature
public void increment() {
isolateThis {
this.counter++;
}
}
- Only one guest can use any hotel room at a time - Wrong
- Given a hotel room, only one guest can use that room at a time - Correct
- Programmer marks a data element as a lock
- Programmer marks a piece of code to be accessible by the lock holder
- JVM creates a virtual "lock" from the data element
- Thread tries to "acquire" a lock
- If it acquires it, it can execute the synchronized code
- Thread finishes executing block and "releases" lock
- All other threads that need to execute the block have to wait
- You declare to specify the hotel room and the key
// Some code
synchronized (objectRef) {
// Code to be executed one thread at a time
}
- Thread tries to get access to the monitor lock
- If thread gets it, it executes the block
- Releases the monitor lock when exiting the block
- Any other thread needs to wait (can't get monitor lock)
- Obiect based isolation
public void methodA() {
synchronized (obj1) {
// synchronized code here
}
}
public void methodB() {
synchronized (obj1) {
// some other synchronized code here
}
}
public synchronized increment() {
}
- Lock is associated with the object whose method is being called
- Lock is implicitly applied on "this".
- Mutual exclusion
- Visibility
- Mutex
- Value is read from memory before block execution
- Value is written to memory after execution completes
- Block structure using synchronized
- Acquiring and releasing locks are implicit
- Example: Exception causing control to exit: lock auto-released
- Block structure using synchronized
- Acquiring and releasing locks are implicit
- Example: Exception causing control to exit: lock auto-released
- No
- Performance
- Careful application needed
- Choose the right object for the lock
- synchronize the bare minimum code necessary
- Remember: It has to be the same lock!
public void methodA() {
synchronized (obj1) {
// ...
methodB() ;
}
}
public void method() {
synchronized (obj2){
// ...
}
}
- Non-concurrent (serial) code
- Liveness
- State of general activity and motion
- Requires a system to make progress
- Not "stuck"
- Something good will eventually occur
- Starts
- Executes
- Completes successfully or errors out
- Hangs
- The infinite loop
- The Infinite loop
- Deadlock
- Livelock
- Starvation
- Multiple threads are waiting for other threads
- The dependency is circular
- The "No, you hang up first" problem
- Dependency is circular*
synchronized(objRef1) {
synchronized (objRef2) {
//
}
}
synchronized (objRef2) {
synchronized (objRef1) {
//
}
}
- Circular invocation of synchronized methods
- s(a)->s(b), s(b)->s(c), s(c)->s(a)
- Two threads invoking join on each other
- Potentially threads waiting forever
- A "smarter" deadlock
- Try to get lock 1
- Try to get lock 2
- If lock 2 not acquired in x ms, release lock 1
- Try again after sometime
- Two people in each other's way in a corridor
- Potential deadlock
- Steps taken to mitigate deadlock causes perpetual "corrective" action
- Not completely "dead".
- ..but all activity is just to get the lock
- Thread is ready to run but is never given a chance
- Low priority thread not scheduled by the executor
- Deadlock
- Livelock
- Starvation
- No Java / JVM feature to avoid these
- Careful use of locks
- Example: Avoid using more than one lock
- Fixes the problem at the boundaries
- Something else affects when the synchronized block ends
- Synchronous access might not be a problem
- You want to just fix visibility
public class Foo {
private volatile int value;
}
- Thread 1 writes variable (to memory)
- Thread 2 reads variable (to memory)
public class Foo {
private volatile int value;
private boolean hasValueUpdated;
hasValueUpdated = true;
value = 20;
}
Runnable r = () -> processUserData(userId);
//...
public void processUserData(int userId) {
doSomeStuff();
dootherStuff();
moreStuff();
}
- Need to access the user ID in multiple places in the thread
- Pass around the variable everywheie
- Use a class member variable
- Use a thread local variable
- Scope is per-thread
- "Global" in the context of thread instance
- Each thread just sees its own thread local variable
ThreadLocal<Integer> threadLocalUserId = new ThreadLocal<>();
threadLocalUserId.set(1234);
Integer userId = threadLocalUserId.get();
- Generic class
- Almost like a wrapper
- Each thread sets and gets a different value
- Structured locks
- Acquire and release handled for you
- Nesting possible
synchronized (obj1) {
// Access obj1
synchronized (obj2) {
// Access obj1 and obj2
}
}
for (int i = 0; i < arrsize - 2; i++) {
process (arr[il, arrli + 1]);
}
for (int i = 0; i < arrsize - 2; i++) {
synchronized(arr[i]) {
synchronized(arr[i + 1]) {
process (arr[i],arr[i + 1]);
}
}
}
private Lock l = new ReentrantLock()
public void run() {
l.lock();
this.increment();
System.out.printIn(Thread.currentThread().getName() + " increments: " + this .getValue());
this.decrement();
System.out.println(Thread. currentThread().getName() + " decrements: " + this.getValue());
l.unlock();
}
lock()
lockInterruptibly()
newCondition()
tryLock()
tryLock(long time,TimeUnit unit)
unlock()
- High level API for executing tasks
- Thread creation is resource intensive
- Manages runnables (or "tasks") for you
- Provides extra abilities (like thread pool)
- Enables results
- "One way" task
ExecutorService executorService = Executors.newFixedThreadPool (3);
while(true){
Scanner sc = new Scanner(System.in);
System.out.println("In I can tell you the nth prime number. Enter n: ");
int n = sc.nextInt();
if (n == 0) break;
Runnable r = new Runnable() {
@Override
public void run() {
int number = PrimeNumberUtil.calculatePrime(n);
System.out.println("\n Result:");
System.out.println("\n Value of " + n + "th prime: " + number);
};
}
executorService.execute(r);
}
- thread_pool_executor.png
- Fixed thread pool executor
- Single threaded executor
- Cached thread pool executor
- Scheduled thread pool executor
- Work stealing thread pool executor
- runnable has a "one-way" task model.
- for runnable method signature is always
public void run()
Runnable r = new Runnable {
@Override
public void run() {
System.out.println("Printed from Runnable");
}
};
Callable<String> c = new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("Printed from Callable");
return "Value from Callable";
}
}:
- callable can return some value.
- cannot pass a callable to a thread.
- can pass a callable to executor service
executorService.submit(c);
// executorService.submit(r); can also pass runnable to a submit method
// we cant do String s=executorService.submit(c);
// because then what is the use of it,think over it , dont skip it
// executorService returns a future
Future<String> ret = executorService.submit(c);
//you can do things in the meantime
// the below will be a blocking
String retStr = ret.get();
List<Future<Integer>> futures = new ArrayList<>();
ExecutorService executorService = Executors.newCachedThreadPool();
while (true) {
Scanner sc = new Scanner(System.in);
System .out.println("\n I can tell you the nth prime number. Enter n: ");
int n = sc.nextInt();
if (n == 0) break;
Callable<Integer> c = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return PrimeNumberUtil.calculatePrime(n);
}
};
Future<Integer> primeNumberFuture = executorService.submit (c);
futures.add(primeNumberFuture);
}
Iterator‹Future<Integer>> iterator = futures.iterator();
while (iterator.hasNext()) {
Future<Integer> f = iterator.next();
if (f.isDone()) { // note the done method
System.out.println(f.get);
iterator.remove();
}
}
cancel(boolean mayInterruptIfRunning)
get()
get(long timeout, TimeUnit unit)
isCancelled()
isDone()
invokeAll (Collection<? extends,Callable<T>> tasks)
invokeAny (Collection<? extends,Callable<T>> tasks)
CompletableFuture.supplyAsync(() -> "hello")
.thenAccept((String s) -> System. out.println(s));
ExecutorService executorService = Executors.newFixedThreadPool (2);
while (true) {
Scanner sc = new Scanner(System.in);
System.out.println("\n I can tell you the nth prime number. Enter n: ");
int n = sc.nextInt();
if (n == 0) break;
CompletableFuture.supplyAsync(() -> PrimeNumberUtil.calculatePrime(n), executorService)
.thenAccept (System. out: :println);
}
- if we dont specify the executor service it is going to use a common thread pool( fork join thread pool)
- like passed and permits
- Only one thread at a time
- Limited number of threads at a time
- Used for managing limited resources in a concurrent environment
Semaphore semaphore = new Semaphore (3);
while (true) {
Scanner sc = new Scanner(System.in);
System.out.println("\n I can tell you the nth prime number. Enter n: ");
int n = sc.nextInt();
if (n == 0) break;
Runnable r = new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Now calculating for " + n) ;
int number = PrimeNumberUtil. calculatePrime(n);
System.out.println("\n Result:");
System.out.println("\n Value of " + n + "th prime: " + number):
} catch (InterruptedException e) {
System. out.println("Interrupted while |");
}
finally {
semaphore.release();
}
};
Thread t = new Thread(r);
t.start();
}
}
Semaphore semaphore = new Semaphore(3, {fairness:boolean});
- semaphore does not gurentee that there wont be any concurrency issues.