In this lecture we discussed processes and threads, and introduced multithreaded programming.
Our quick library reference lists classes in the System.Threading namespace, which we used in our multithreaded programs.
This
program uses the Thread.Start()
method
to create a thread that runs in the background. It writes messages to
the console from both the foreground and background threads.
using static System.Console;
class Parallel {
static void background() {
for (int i = 0 ; i < 10 ; ++i) {
WriteLine("background");
Thread.Sleep(1000);
}
}
static void Main() {
Thread t = new(background);
t.Start();
for (int i = 0 ; i < 5 ; ++i) {
WriteLine("foreground");
Thread.Sleep(1000);
}
}
}This
program is similar to the previous one, but the foreground thread
uses the Join() method
to wait for the background thread to finish execution.
using static System.Console;
class Parallel2 {
static void loop(int count, string s) {
for (int i = 0 ; i < count ; ++i) {
WriteLine(s);
Thread.Sleep(1000);
}
}
static void background() {
loop(10, "background");
}
static void Main() {
Thread t = new(background);
t.Start();
loop(5, "foreground");
WriteLine("waiting...");
t.Join();
WriteLine("background thread has finished");
WriteLine("exiting now");
}
}This program uses trial division to count the number of prime numbers below 20,000,000. Below, we'll attempt to make it faster using multiple threads.
using static System.Console;
class Primes {
static bool isPrime(int n) {
if (n < 2) return false;
for (int i = 2; i * i <= n ; ++i)
if (n % i == 0)
return false;
return true;
}
static void Main() {
int count = 0;
for (int i = 2 ; i < 20_000_000 ; ++i)
if (isPrime(i))
++count;
WriteLine($"primes = {count:N0}");
}
}This program counts the number of primes below 20,000,000 using multiple threads. The program receives the number of threads to create as a command-line argument. It allocates a fixed range of primes to each thread.
using static System.Console;
class Worker {
int start, end;
public int found = 0;
public Worker(int start, int end) {
this.start = start; this.end = end;
}
static bool isPrime(int n) {
if (n < 2) return false;
for (int i = 2; i * i <= n ; ++i)
if (n % i == 0)
return false;
return true;
}
public void run() {
WriteLine($"running: {start} .. {end}");
for (int i = start ; i < end ; ++i)
if (isPrime(i))
++found;
WriteLine($"finished: {start} .. {end}");
}
}
class Primes {
static void Main(string[] args) {
int n = 20_000_000;
int numThreads = int.Parse(args[0]);
int perThread = n / numThreads;
Worker[] workers = new Worker[numThreads];
Thread[] threads = new Thread[numThreads];
for (int i = 0 ; i < numThreads ; ++i) {
int lo = perThread * i;
int hi = i == numThreads ? n : perThread * (i + 1);
workers[i] = new Worker(lo, hi);
threads[i] = new Thread(workers[i].run);
threads[i].Start();
}
int primes = 0;
for (int i = 0 ; i < numThreads ; ++i) {
threads[i].Join();
primes += workers[i].found;
}
WriteLine($"primes = {primes:N0}");
}
}The program above works, but is somewhat inefficient because some threads have more work to do than others. Even though it allocates the same size range to each thread, it takes longer on average to test whether larger numbers are prime, so the threads that have the higher numbers will finish last.
Let's
attempt to improve the program by dividing work between threads
dynamically. In the program below, there is an integer next
in
an object of class Counter
that
is shared by all threads. Each time that a thread needs a new number
to test, it reads this integer and increments it. When a thread finds
that a number is prime, it increments the shared counter found.
This program is incorrect, and will probably print a result that is far from the correct answer. The problem is that the actions which read and write the shared variables are not atomic. In other words, if more than one thread attempts to execute these actions at the same time, the result may be incorrect. One reason for that (among others) is that each of these actions actually consists of several machine instructions. For example, the line
int n = counter.next++
might compile to several machine instructions which perform the following individual tasks:
read
counter.next
into
a machine register
increment the register
write the register to counter.next
It's
possible that a thread might be preempted
by
the operating system in the middle of these instructions. (A thread
is preempted when its time
slice is
up and it is time for another thread to run.) Suppose that thread A
has just read counter.next,
and then is preempted. Then several other threads might run and might
each increment counter.next.
When thread A resumes running, it will write its own value to
counter.next,
but that value may now be out of date.
using static System.Console;
// Attempt to count primes, allocating integers dynamically.
// state shared by all threads
class Counter {
public int next = 2; // next integer to check
public int found = 0; // number of primes found
}
class Worker {
int id;
Counter counter;
public Worker(int id, Counter counter) {
this.id = id;
this.counter = counter;
}
static bool isPrime(int n) {
if (n < 2) return false;
for (int i = 2; i * i <= n ; ++i)
if (n % i == 0)
return false;
return true;
}
public void run() {
WriteLine($"thread {id}: running");
while (true) {
int n = counter.next++; // NOT ATOMIC
if (n > 20_000_000)
break;
if (isPrime(n))
counter.found += 1; // NOT ATOMIC
}
WriteLine($"thread {id}: finished");
}
}
class Primes {
static void Main(string[] args) {
int numThreads = int.Parse(args[0]);
Counter counter = new Counter();
Worker[] workers = new Worker[numThreads];
Thread[] threads = new Thread[numThreads];
for (int i = 0 ; i < numThreads ; ++i) {
workers[i] = new Worker(i, counter);
threads[i] = new Thread(workers[i].run);
threads[i].Start();
}
for (int i = 0 ; i < numThreads ; ++i)
threads[i].Join();
WriteLine($"primes = {counter.found:N0}");
}
}We may repair the program above by using atomic
integer operations. These are available in the
System.Threading.Interlocked class in
the C# class library. They work by executing special machine
instructions which are guaranteed to be atomic even in a
multithreaded environment.
The updated program is below. Only a few lines are
different from the preceding program; they are indicated in bold.
I have not included the Primes
class, since it's exactly the same
as in the previous program.
using static System.Console;
// Count primes, using atomic operations
// state shared by all threads
class Counter {
public int next = 1; // next + 1 is the next integer to check
public int found = 0; // number of primes found
}
class Worker {
int id;
Counter counter;
public Worker(int id, Counter counter) {
this.id = id;
this.counter = counter;
}
static bool isPrime(int n) {
if (n < 2) return false;
for (int i = 2; i * i <= n ; ++i)
if (n % i == 0)
return false;
return true;
}
public void run() {
WriteLine($"thread {id}: running");
while (true) {
int n = Interlocked.Increment(ref counter.next);
if (n > 20_000_000)
break;
if (isPrime(n))
Interlocked.Increment(ref counter.found);
}
WriteLine($"thread {id}: finished");
}
}Atomic integer operations are adequate for some multithreaded programs. But in many cases we want to share data structures between threads, and of course these structures are larger than a single integer. For example, suppose that in the previous programs we wanted to construct a list of all the prime numbers we find, rather than merely counting them. To do that, we'd need to have a shared list object that we can write to from all threads. And of course most data structure operations are not naturally atomic.
To
allow safe access to a data structure from multiple threads, we can
protect it using a lock,
otherwise known as a mutex
(for
mutual
exclusion).
C# includes a lock
statement
that acquires a lock on a given object before entering a block of
code. When the thread leaves the block of code, the lock is released.
Only one thread may hold a lock on a given object at any moment of
time. If thread A is holding the lock on an object and thread B
attempts to acquire the lock on the same object using a lock
statement,
thread B will block,
i.e. its operation will be suspended until the lock becomes available
because thread A has released it.
A block of code that only one thread may enter at a time is called a critical section.
Here's
a version of our prime-counting program that uses locks to protect
the shared variables next
and
found.
It's quite similar to the previous program, so the only lines that
are different are indicated in bold.
Once again, I haven't included the main Primes
class
since it's exactly the same as before.
using static System.Console;
// Count primes, with locking
// state shared by all threads
class Counter {
public int next = 2; // next integer to check
public int found = 0; // number of primes found
}
class Worker {
int id;
Counter counter;
public Worker(int id, Counter counter) {
this.id = id;
this.counter = counter;
}
static bool isPrime(int n) {
if (n < 2) return false;
for (int i = 2; i * i <= n ; ++i)
if (n % i == 0)
return false;
return true;
}
public void run() {
WriteLine($"thread {id}: running");
while (true) {
int n;
lock (counter) {
n = counter.next++; // critical section
}
if (n > 20_000_000)
break;
if (isPrime(n))
lock (counter) {
counter.found += 1; // critical section
}
}
WriteLine($"thread {id}: finished");
}
}