Dalke Scientific Software: More science. Less time. Products

Threads

Background and thoughts

Suppose you want some sort of coarse-grained parallel system with a set of compute nodes and a master node making requests of the compute nodes. For examples, the compute nodes do BLAST searches, or chemical similarity searches, or energy minimization. How do you implement it?

There are many ways. In the HPC (that's "High Performance Computing") world you could use MPI or PVM. Or Linda or TCGMSG (is that still around?) or any of a large number of alternates. Descendants of Linda include JavaSpaces and variants, I think. In the Python world I've heard good comments about using Pyro ("Python Remote Object"). CORBA used to be a hot topic and is still a solution. Try omniORB/omniORBpy if you want to try that route.

I'm partial to web-based services. Stick a web server on each machine. The master node makes a request to the web server and gets a response. Because the compute likely takes some time the response might be "check back later", and possibly with a URL to use for the check. Or because you control both sides of the connection, have the master node also implement a web server so the client nodes can send the results back through another web request. That is, the master node's request to the client includes a callback URL which the client uses to send the results back to the server.

In all of these solutions you have to worry about what happens when one of the machines goes down. If a client goes down do you redo the request on another machine? How do you detect that a machine goes down? With a per-node MTBF of 5 years then a cluster with 128 machines will have a fault every 15 days. RAID arrays have shown that it's possible to have good overall reliability even when less reliable components. Google and others are continuing decades of research in how to make distributed operating systems fault-tolerant. For most things I do I can ignore the problem, or make occasional "are you alive?" pings on the server.

Implementing this sort of system means the server should handle, or at least appear to handle, multiple things at the same time. For example, doing a BLAST job while also answering a status or a ping request, or queuing up new jobs.

Implementations

This is supposed to be an educational talk so I'll ask the question "how do you implement a multitasking server?" There are three general solutions.

Starting a new process

The easiest is to let the OS do everything for you. Got a BLAST job to run? Start it as a new process in the background. My usual solution looks like this: This approach uses the file system for communication. It's boring but it works.

Warning against threads

I said that to get it out of the way. You're likely reading this because you want to know about threads. Mostly you should stay away from threads. They are tricky to use. Many libraries aren't thread-safe. Even the CPython core isn't thread-safe so there's a a global interpreter lock (the "GIL") so only one thread has access to the core interpreter at a time. C extensions can release the lock by using special calls to release the GIL, and reacquire when returning to the Python core.

Threads generally don't scale well. (Meaning more than 50-100 threads.) Each thread has its own stack, which is about a MB or so. Threads are susceptible to subtle and hard-to-debug timing problems because one thread can change things out from underneath another thread. For example, suppose in the following

if key in some_dictionary:
  some_dictionary[key] += 1
else:
  some_dictionary[key] = 1
another thread got control just after the "if key in" test. If the key was there then the test returns True and the first path is taken. The other thread could remove the key from the dictionary causing the += 1 to fail.

This is solvable by locking parts of the data structure but then there's the overhead of locking and unlocking all the time, even when collisions are rare, and it's hard to test that you got it right.

If you're going to use threads then it's best to use a "shared none" approach, where the data structures between the threads are not shared. Failing that, the shared data should be read-only. Failing that, well, locks help, and use Queues to pass data ownership between threads. I'll talk more about this in a bit.

Reactors and collaborative multitasking

Another solution, more appropriate to I/O multitasking, is using a dispatcher which checks if any data came in from the network, GUI, or scheduler. If so, pass the data to a handler waiting for the event. When the event is done the handler returns control to the dispatcher, which processes the next event. This usually requires a big code rewrite to turn things inside out because the handler must explicitly say when it's giving up control, which is called "collaborative multitasking" as compared to threads which are "preemptive multitasking" because control can pass to another thread at any time.

Python comes with the asyncore and asynchat modules which implement this sort of approach, known as a reactor pattern. The "async" is short for "asynchronous" because there's a separation of control between receiving data and doing something with it. Allegra is a relatively new package still in development extending the core Python functionality. Twisted is the best known asynchronous application toolkit for Python. In general people find asynchronous libraries hard to write.

Yet another approach is Stackless Python. It uses a completely different way to handle threads which let you have even tens of thousands of simultaneous lightweight threads, but with limitations on what you can do with them. The limitations mostly affect C programs and not Python programs. Note though that Stackless does not change the fundamentals of the problem. If a function blocks (waiting for I/O or doing some sort of computation) then you'll still need to use a system thread or asynchronous I/O. Stackless only makes it easier.

I won't show any example of using these. Besides requiring a different view on how to program there isn't much code available designed for these system so you'll end up redoing a lot of existing work to get things running.

Threads

Here's a function I want to have run in its own thread.

import time

def compute(x):
    time.sleep(x)
    return x*x
In real code this might make a web request, start another program, connect to a database, etc.

The common way to start a new thread is through subclassing threading.Thread. It's a two-step process: construct the object then call its "start" method. The start method creates the new thread then then calls it's own "run" method. Each subclass must implement an appropriate "run". In the following I'll start 3 threads, each with it's own value to compute. Note that the main thread ends before the compute threads have finished. Unless the threads are marked as "daemons" Python won't exit until all threads have finished.

import time
import datetime
import threading

def compute(x):
    time.sleep(x)
    return x*x

# Include a timestamp in the log message
def log(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    print "%s %s" % (now, message)

# Each compute request gets its own thread
class ComputeThread(threading.Thread):
    def __init__(self, value):
        threading.Thread.__init__(self)
        self.value = value
    def run(self):
        result = compute(self.value)
        log("%s -> %s" % (self.value, result))

def main():
    log("starting main thread")
    ComputeThread(3).start()
    ComputeThread(2.5).start()
    ComputeThread(1).start()
    log("ending main thread")
    
if __name__ == "__main__":
    main()
Here's the output
00:36:17 starting main thread
00:36:17 ending main thread
00:36:18 1 -> 1
00:36:19 2.5 -> 6.25
00:36:20 3 -> 9
You can see that I'm a late night person.

Thread pool and Queues

There is some overhead to starting new threads, which adds up. Often there are resource limits to consider. For example, if the thread connects to a remote server then you may want to limit the program so it makes at most three simultaneous requests. The usual solution for this is a thread pool. These are threads waiting for a request. Some other thread sends the request to the thread pool. One of the threads in the pool gets it and executes it. When finished it waits for another request.

The key part to this is waiting for a request. This is done with a Queue.Queue object. Use a Queue to communicate across threads. I'll say that again: use a Queue to communicate across threads. Don't worry about locks and other low-level techniques. Queue does what you need.

Queues implement "put" and "get". These are thread-safe meaning that any thread can put and get without conflicts. By default the queue has unlimited length and a thread can put things in the queue and keep on going. If the queue is empty then doing a get in a thread will block that thread until the queue is not empty. If the queue contains an object then doing a get in a thread will retrieve that object. Only one thread will ever get that object.

The idea behind a Queue is there are producers and consumers. Producers "put" things into the queue and consumers "get" things from the queue. Consumers always wait until something is ready, and only one consumer gets an item from the queue.

There is a minor tricky point. How does the thread know when to finish? I said "[w]hen finished [the thread] waits for another request" but I also said "Python won't exit until all threads have finished." I could mark a thread as a daemon but then it would exist the moment all other non-daemon threads are finished, aborting any task in progress.

What I'll do is signal all of the compute threads to finish. I'll use a special request object. When a ComputeThread gets it then it knows to exit. I'll use None for that object but the details are up to you. I do have to send N termination objects, one for each compute thread.

Here's an example of a thread pool with 3 threads handling 5 requests

import time
import datetime
import threading
import Queue

def log(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    print "%s %s" % (now, message)

def compute(x):
    time.sleep(x)
    return x*x

# Threads form a thread-pool waiting for compute requests in the request_queue
class ComputeThread(threading.Thread):
    def __init__(self, id, request_queue):
        threading.Thread.__init__(self, name="ComputeThread-%d" % (id,))
        self.request_queue = request_queue
    def run(self):
        while 1:
            # block waiting for something in the queue
            req = self.request_queue.get()
            if req is None:
                # Nothing more to process; quit
                break
            result = compute(req)
            log("%s %s -> %s" % (self.getName(), req, result))
            
def main():
    log("starting main thread")
    request_queue = Queue.Queue()
    # Initialize the thread pool with three compute threads
    N_compute_threads = 3
    for i in range(N_compute_threads):
        ComputeThread(i, request_queue).start()
    
    # Make 5 requests
    request_queue.put(4)
    request_queue.put(5)
    request_queue.put(3)
    request_queue.put(1.5)
    request_queue.put(2.2)
    
    # Send shutdown messages to all the threads in the pool
    for i in range(N_compute_threads):
        request_queue.put(None)
    log("Main thread finished.")

if __name__ == "__main__":
    main()
and when run just now it produced
01:06:09 starting main thread
01:06:09 Main thread finished.
01:06:12 ComputeThread-2 3 -> 9
01:06:13 ComputeThread-0 4 -> 16
01:06:14 ComputeThread-2 1.5 -> 2.25
01:06:14 ComputeThread-1 5 -> 25
01:06:15 ComputeThread-0 2.2 -> 4.84

Bidirectional communications

In the previous example code passed from the main thread to the compute threads but nothing came back to the main thread. This is rather useless in real code. Usually the calling thread wants to make use of the result. I can solve this with another Queue. When I make the request I'll send the value to compute and also send the Queue to use for the response. When the computation is done the ComputeThread sends the result back through the response queue.

This is something easier shown in code than elaborating more.

import time
import datetime
import threading
import Queue

def log(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    print "%s %s" % (now, message)

def compute(x):
    time.sleep(x)
    return x*x

# Threads form a thread-pool waiting for compute requests in the request_queue
class ComputeThread(threading.Thread):
    def __init__(self, id, request_queue):
        threading.Thread.__init__(self, name="ComputeThread-%d" % (id,))
        self.request_queue = request_queue
    def run(self):
        while 1:
            # block waiting for something in the queue
            req = self.request_queue.get()
            if req is None:
                # Nothing more to process; quit
                break
            value, response_queue = req
            result = compute(value)
            log("%s %s -> %s" % (self.getName(), value, result))
            response_queue.put(result)

request_queue = Queue.Queue()
            
def threaded_sum(values):
    sum = 0.0
    response_queue = Queue.Queue()
    for value in values:
        request_queue.put((value, response_queue))
    # accumulate results; the response order will not be the same as the input!
    # The "_" is a convention meaning "I don't care about the actual variable name."
    for _ in values:
        sum += response_queue.get()
    return sum
    
def main():
    log("starting main thread")
    # Initialize the thread pool with three compute threads
    N_compute_threads = 3
    for i in range(N_compute_threads):
        ComputeThread(i, request_queue).start()
    
    # Make 5 requests
    result = threaded_sum( (4, 5, 3, 1.5, 2.2) )
    log("the sum is %f" % (result,))
        
    # Send shutdown messages to all the threads in the pool
    for i in range(N_compute_threads):
        request_queue.put(None)
    log("Main thread finished.")

if __name__ == "__main__":
    main()
and the output is
01:22:16 starting main thread
01:22:19 ComputeThread-2 3 -> 9
01:22:20 ComputeThread-0 4 -> 16
01:22:21 ComputeThread-2 1.5 -> 2.25
01:22:21 ComputeThread-1 5 -> 25
01:22:22 ComputeThread-0 2.2 -> 4.84
01:22:22 the sum is 57.090000
01:22:22 Main thread finished.
Sweet!

I jokingly say "sweet" but there were bugs when I wrote the code. An annoying thing about threads is they are hard to kill. Hitting control-C doesn't always kill a Python program with multiple threads. When that happens I then hit control-Z to suspend the task, then do a "kill %%" to send the soft kill signal, then "fg" to go back to Python, and another control-C to really kill it. I could also do "kill -9 %%" instead of the "kill %%" but that always seems so rude.

thread variables

Sometimes you need per-thread variables. For example, the C variable "errno" is a per-thread variable often implemented via #define macro tricks. The value of errno reflects the most recent error value from a C library function call in the current thread and not the error value from some other thread.

As another example, CherryPy is a thread-based web server library for Python. Each request is handled in by a differen thread. The request and response information are stored in thread variables accessed via "cherrypy.request" and "cherrypy.response". For example, to change the content-type header for the response a CherryPy handler might look like:

    @expose
    def spam(self, x):
        cherrypy.response.headers["Content-Type"] = "text/plain"
        return "You asked for %r" % (x,)

Making new thread-local variables is easy. Before describing how, I'll remind you that just like global variables (or per-modules) variables are generally the wrong solution, so are thread-local variables. You'll rarely need them, but there are times when they are useful. Which means coming up with a good example is hard.

Suppose the compute task has a bug which occurs when the request value is > 3 and you want to debug that case via log (or print) statements. There are a few ways to implement that. I decided to use a thread variable solution. I made the log function defer to a thread-local variable named "log" and added a couple of log() calls to the compute() fucntion. For the main thread the thread-local log prints the information to stdout.

def log_to_stdout(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    print "%s %s" % (now, message)
    
def log_nothing(message):
    pass

local = threading.local()
# the main thread always logs
local.log = log_to_stdout

def log(message):
    local.log(message)

def compute(x):
    log("want to compute %r" % (x,))
    time.sleep(x)
    result = x*x
    log("finished computing %r; it is %r" % (x, result))
    return result

while the ComputeThreads by default log using the log_nothing function
class ComputeThread(threading.Thread):
    ...
    def run(self):
        local.log = log_nothing
        while 1:
            ...

The caller can override the default using a new "debug" option which enables logging for the thread in the thread pool

    def run(self):
        local.log = log_nothing
        while 1:
            # block waiting for something in the queue
            req = self.request_queue.get()
            if req is None:
                # Nothing more to process; quit
                break
            value, response_queue, debug = req
            if debug:
                local.log = log_to_stdout
            else:
                local.log = log_nothing
            result = compute(value)
            log("%s %s -> %s" % (self.getName(), value, result))
            response_queue.put(result)

To enable debugging when the requst > 3 I pass a debug flag as the third field in the item put() into the request_queue. (At this point I start thinking I need a new class with attributes for value, response_queue, and debug rather than passing them in as a tuple.)

def threaded_sum(values):
    sum = 0.0
    response_queue = Queue.Queue()
    for value in values:
        debug = (value > 3)
        request_queue.put((value, response_queue, debug))
    # accumulate results; the order will not be the same as the input!
    # The "_" is a convention meaning "I don't care about the actual variable name."
    for _ in values:
        sum += response_queue.get()
    return sum

Here are all the changes in one place

import time
import datetime
import threading
import Queue

def log_to_stdout(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    print "%s %s" % (now, message)
    
def log_nothing(message):
    pass

local = threading.local()
# the main thread always logs
local.log = log_to_stdout

def log(message):
    local.log(message)

def compute(x):
    log("want to compute %r" % (x,))
    time.sleep(x)
    result = x*x
    log("finished computing %r; it is %r" % (x, result))
    return result

# Threads form a thread-pool waiting for compute requests in the request_queue
class ComputeThread(threading.Thread):
    def __init__(self, id, request_queue):
        threading.Thread.__init__(self, name="ComputeThread-%d" % (id,))
        self.request_queue = request_queue
    def run(self):
        local.log = log_nothing
        while 1:
            # block waiting for something in the queue
            req = self.request_queue.get()
            if req is None:
                # Nothing more to process; quit
                break
            value, response_queue, debug = req
            if debug:
                local.log = log_to_stdout
            else:
                local.log = log_nothing
            result = compute(value)
            log("%s %s -> %s" % (self.getName(), value, result))
            response_queue.put(result)

request_queue = Queue.Queue()
            
def threaded_sum(values):
    sum = 0.0
    response_queue = Queue.Queue()
    for value in values:
        debug = (value > 3)
        request_queue.put((value, response_queue, debug))
    # accumulate results; the order will not be the same as the input!
    # The "_" is a convention meaning "I don't care about the actual variable name."
    for _ in values:
        sum += response_queue.get()
    return sum

def main():
    log("starting main thread")
    # Initialize the thread pool with three compute threads
    N_compute_threads = 3
    for i in range(N_compute_threads):
        ComputeThread(i, request_queue).start()
    
    # Make 5 requests
    result = threaded_sum( (4, 5, 3, 1.5, 2.2) )
    log("the sum is %f" % (result,))
        
    # Send shutdown messages to all the threads in the pool
    for i in range(N_compute_threads):
        request_queue.put(None)
    log("Main thread finished.")

if __name__ == "__main__":
    main()
When I run it I get
01:58:28 starting main thread
01:58:28 want to compute 4
01:58:28 want to compute 5
01:58:32 finished computing 4; it is 16
01:58:32 ComputeThread-0 4 -> 16
01:58:33 finished computing 5; it is 25
01:58:33 ComputeThread-1 5 -> 25
01:58:34 the sum is 57.090000
01:58:34 Main thread finished.

Locks

Sometimes using a Queue has too much overhead. A Queue uses locks for syncronization and it's a way to communicate between threads. What if you don't need all that? For example, the print statement prints to stdout, which is a shared resources. It's possible that two threads could print to stdout at the same time and the two outputs be jumbled into one. If you print a single string this won't happen because of Python's GIL, but it could happen with comma-separated fields like this:

print "a=", a(), "b=", b()
because some other thread could get control of stdout between the time a() is called and the time it is printed.

My log_to_stdout function was written so the output was in one string. In that way I didn't have to worry about locking stdout.

def log_to_stdout(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    print "%s %s" % (now, message)
Suppose that I wanted to write it like this
def log_to_stdout(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    print now, message
The thread change could occur between printing the "now" and the "message".

I could uses a Queue and have a dedicated stdout writing thread, but like I said, that seems rather a lot of overhead. Instead I'll use a lock to guarantee that two "log_to_stdout" functions cannot mess each other up.

A lock is either locked or unlocked. If it's unlocked then calling its "acquire()" method puts it into a locked state. If it's locked then calling its "acquire()" method blocks the calling thread until the lock is released (and no one else grabs it before the current thread.) Calling "release()" releases the lock

stdout_lock = threading.Lock()
def log_to_stdout(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    stdout_lock.acquire()
    try:
        print now, message
    finally:
        stdout_lock.release()
Python 2.5 adds the "with" statement, which is designed for resource management tasks like this. The object's special __enter__ method is called at the start of the code block of the with statement and its __exit__ method is called when the code block ends or it raises an exception. Python 2.5's lock objects implement the with protocol. The __enter__ method acquires the lock and the __exit__ releases it. The result looks like this:
stdout_lock = threading.Lock()
def log_to_stdout(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    with stdout_lock:
        print now, message

Using single threaded libraries

Some libraries are not thread-safe. There are various solutions depending on what "not thread safe" means. If the library uses global (module) variables to store per-instance values then it's just badly written. Libraries like Tk and SQLite aren't thread-safe meaning that two different threads must not call a library function at the same time. The two solutions for this case are 1) use a lock around all library calls or 2) send the request to a special thread dedicated to that library.

Rather than use one of those complicated libraries I'll have a simple example of a non-thread safe library which counts the number of times it's been called

call_count = 0
def double(x):
  global call_count
  call_count += 1
  return x*2
I want to call it using something like
print "5 doubled is", call_in_thread(the_thread, double, 5)
Here's how to implement that using a Queue. In the following I'll start a few threads each doing something which calls the single-threaded doubler function.
import threading
import Queue
import time
import datetime
import functools

stdout_lock = threading.Lock()
def log(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    with stdout_lock:
        print now, message

class SingleThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.queue = Queue.Queue()
    def run(self):
        # When the main thread exits
        while 1:
            req = self.queue.get()
            if req is None:
                break
            f, response_queue = req
            response_queue.put(f())
            
    def quit(self):
        self.queue.put(None)

single_thread = SingleThread()
single_thread.start()

# the magic occurs here.  This assumes that all threads have a "queue" attribute
# use to give it new requests, and requests are of the form (callable, response_queue)
# The callable runs in the specified thread.
def call_in_thread(T, f):
    response_queue = Queue.Queue()
    T.queue.put( (f, response_queue) )
    return response_queue.get()

call_count = 0
def double(x):
  global call_count
  call_count += 1
  return x*2

class ComputeThread(threading.Thread):
    def __init__(self, value):
        threading.Thread.__init__(self)
        self.value = value
    def run(self):
        # functools.partial is new in Python 2.5.  It's the same as
        #      lambda : double(self.value)
        response = call_in_thread(single_thread, functools.partial(double, self.value))
        assert response == self.value * 2
        log("success with %s" % self.value)
        
def main():
    for i in (5, 2, 3.4, 1):
        ComputeThread(i).start()
    
    time.sleep(2)
    print "Doubler was called", call_count, "times"
    single_thread.quit()
    
main()

Here's how to implement it using a lock. This code is obviously simpler.

import threading
import Queue
import time
import datetime
import functools

stdout_lock = threading.Lock()
def log(message):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    with stdout_lock:
        print now, message

single_thread = threading.Lock()

def call_in_thread(T, f):
    # T must be a lock object and f a callable; lock it and call the function
    # The callable runs in the current thread.
    with T:
        return f()

call_count = 0
def double(x):
  global call_count
  call_count += 1
  return x*2

class ComputeThread(threading.Thread):
    def __init__(self, value):
        threading.Thread.__init__(self)
        self.value = value
    def run(self):
        # functools.partial is new in Python 2.5.  It's the same as
        #      lambda : double(self.value)
        response = call_in_thread(single_thread, functools.partial(double, self.value))
        assert response == self.value * 2
        log("success with %s" % self.value)
        
def main():
    for i in (5, 2, 3.4, 1):
        ComputeThread(i).start()
    
    time.sleep(2)
    print "Doubler was called", call_count, "times"
    
main()



Copyright © 2001-2013 Andrew Dalke Scientific AB