Lightweight Tasks using Generators and Yield-From
This is an essay about using generators, together with the new yield from construct available in Python 3.3, to implement lightweight cooperatively-scheduled threads. I will use the term task to refer to such threads, to distinguish them from OS-supported threads and processes.
Here I will develop a very simple scheduler for such tasks, to
illustrate the principles involved. I'll be introducing small snippets
of code, some of which are core parts of the scheduler and some of
which are usage examples. To help distinguish between them, I'll use a
colour code: orange for library code, blue for application examples, and green for example output.
Core of the Scheduler
Each task will be a generator. Whenever a task wants to suspend itself, it will yield. We won't make any use of values sent or received by yields; they will simply be suspension points.
We'll start with a global variable to hold the currently running task.
current = None
We'll also want a queue of tasks that are waiting to run.
ready_list = []
The first thing we'll want is a way of getting a task into the scheduling system.
def schedule(g):
ready_list.append(g)
The core loop of the scheduler will repeatedly take the task at the head of the queue and run it until it yields.
def run():
global current
while ready_list:
g = ready_list[0]
current = g
try:
g.next()
except StopIteration:
unschedule(g)
else:
expire_timeslice(g)
If the task is still at the head of the ready list after it has
yielded, we move it to the end, so that the ready tasks will run
round-robin fashion.
def expire_timeslice(g):
if ready_list and ready_list[0] is g:
del ready_list[0]
ready_list.append(g)
When the task finishes, we use the following function to remove it from the scheduling system.
def unschedule(g):
if g in ready_list:
ready_list.remove(g)
We've got enough so far to try a simple test.
def person(name, count):
for i in range(count):
print name, "running"
yield
schedule(person("John", 2))
schedule(person("Michael", 3))
schedule(person("Terry", 4))
run()
Running this, we get
John running
Michael running
Terry running
John running
Michael running
Terry running
Michael running
Terry running
Terry running
Waiting for Resources
Things get more interesting when our tasks do something non-trivial.
Let's turn our people into dining philosophers. For this we'll need a
way to represent forks (the eating kind, not the unix kind) and a way
for a task to wait for one to become available.
But before launching into that, let's add two more functions to our scheduler that will come in useful.
def block(queue):
queue.append(current)
unschedule(current)
This removes the currently running task from the ready list and adds it to a list that you specify.
def unblock(queue):
if queue:
g = queue.pop(0)
schedule(g)
This removes the task at the head of the specified list, if any, and adds it to the ready list.
Now we can start implementing an eating utensil.
class Utensil:
def __init__(self, id):
self.id = id
self.available = True
self.queue = []
The utensil has a flag indicating whether it's available, and a queue
of tasks waiting to use it. To acquire a utensil, we first check to see
whether it is available. If not, we block the current task on the
queue, and then yield. When we get to run again, it's our turn, so we
mark the utensil as being in use.
def acquire(self):
if not self.available:
block(self.queue)
yield
self.available = False
To release the utensil, we mark it as available and then unblock the task at the head of the queue, if any.
def release(self):
self.available = True
unblock(self.queue)
Next we need a life cycle for a philosopher.
def philosopher(name, lifetime, think_time, eat_time, left_fork, right_fork):
for i in range(lifetime):
for j in range(think_time):
print(name, "thinking")
yield
print(name, "waiting for fork", left_fork.id)
yield from left_fork.acquire()
print(name, "acquired fork", left_fork.id)
print(name, "waiting for fork", right_fork.id)
yield from right_fork.acquire()
print(name, "acquired fork", right_fork.id)
for j in range(eat_time):
# They're Python philosophers, so they eat spam rather than spaghetti
print(name, "eating spam")
yield
print(name, "releasing forks", left_fork.id, "and", right_fork.id)
left_fork.release()
right_fork.release()
print(name, "leaving the table")
Now we can set up a scenario.
forks = [Utensil(i) for i in range(3)]
schedule(philosopher("Plato", 7, 2, 3, forks[0], forks[1]))
schedule(philosopher("Socrates", 8, 3, 1, forks[1], forks[2]))
schedule(philosopher("Euclid", 5, 1, 4, forks[2], forks[0]))
run()
Running this produces
Plato thinking
Socrates thinking
Euclid thinking
Plato thinking
Socrates thinking
Euclid waiting for fork 2
Euclid acquired fork 2
Euclid waiting for fork 0
Euclid acquired fork 0
Euclid eating spam
...etc...
Waiting for External Events
So far our task system has been completely self-absorbed and unable to
deal with the outside world. Let's arrange things so that, if there are
no tasks ready to run, the scheduler will wait for some file to become
readable or writable using select(). It's easiest to do this by writing
a new main loop that builds on the previous one.
def run2():
while 1:
run()
if not wait_for_event():
return
We will need a data structure to hold tasks waiting for files. Each
file needs two queues associated with it, for tasks waiting to read and
write respectively.
class FdQueues:
def __init__(self):
self.readq = []
self.writeq = []
We will keep a mapping from file objects to their associated FdQueue instances.
fd_queues = {}
The following function retrieves the queues for a given fd, creating new ones if they don't already exist.
def get_fd_queues(fd):
q = fd_queues.get(fd)
if not q:
q = FdQueues()
fd_queues[fd] = q
return q
Now we can write a new pair of scheduling primitives to block on a file.
def block_for_reading(fd):
block(get_fd_queues(fd).readq)
def block_for_writing(fd):
block(get_fd_queues(fd).writeq)
It's expected that the task calling these will immediately yield
afterwards. We could incorporate the yield into these functions, but
we'll be building higher level functions on top of these shortly, and
it will be more convenient to do the yield there.
We'll also want a way of removing a file from the fd_queues when we've
finished with it, so we'll add a function to close it and clean up.
def close_fd(fd):
if fd in fd_queues:
del fd_queues[fd]
fd.close()
Now we can write wait_for_event(). It's a bit longwinded, but fairly
straightforward. We build lists of file objects having nonempty read or
write queues, pass them to select(), and for each one that's ready, we
unblock the task at the head of the relevant queue. If there are no
tasks waiting on any files, we return False to tell the scheduler
there's no more work to do.
def wait_for_event():
from select import select
read_fds = []
write_fds = []
for fd, q in fd_queues.iteritems():
if q.readq:
read_fds.append(fd)
if q.writeq:
write_fds.append(fd)
if not (read_fds or write_fds):
return False
read_fds, write_fds, _ = select(read_fds, write_fds, [])
for fd in read_fds:
unblock(fd_queues[fd].readq)
for fd in write_fds:
unblock(fd_queues[fd].writeq)
return True
At this point we can try a quick test to see if everything works so far.
def loop():
while 1:
print("Waiting for input")
block_for_reading(stdin)
yield
print("Input is ready")
line = stdin.readline()
print("Input was:", repr(line))
if not line:
break
schedule(loop())
run2()
Sample session:
Waiting for input
asdf
Input is ready
Input was: 'asdf\n'
Waiting for input
qwer
Input is ready
Input was: 'qwer\n'
Waiting for input
Input is ready
Input was: ''
It's not a very convincing test yet, though, since there's only one
task, so let's play around with some sockets and build a multitasked
server.
A Spam Server
We're going to implement the following protocol. The client sends the
word "SPAM" followed by a number, and the server replies with "100 SPAM
FOLLOWS" and the corresponding number of repetitions of the phrase
"spam glorious spam". If the requested number is not greater than zero
or the request is malformed, the server replies "400 WE ONLY SERVE
SPAM".
We could do with some higher-level functions for blocking operations on
sockets, so let's write a few. First, accepting a connection from a
listening socket.
def sock_accept(sock):
block_for_reading(sock)
yield
return sock.accept()
Now reading a line of text from a socket. We keep reading until the
data ends with a newline or EOF is reached. (We're assuming that the
client will wait for a reply before sending another line, so we don't
have to worry about reading too much.) We also close the socket on EOF,
since we won't be reading from it again after that.
def sock_readline(sock):
buf = ""
while buf[-1:] != "\n":
block_for_reading(sock)
yield
data = sock.recv(1024)
if not data:
break
buf += data
if not buf:
close_fd(sock)
return buf
Writing data to a socket. We loop until all the data has been written.
We don't use sendall(), because it might block, and we don't want to
hold up other tasks.
def sock_write(sock, data):
while data:
block_for_writing(sock)
yield
n = sock.send(data)
data = data[n:]
Now we're ready to write the main loop of the server. It will set up a
listening socket, then repeatedly accept connections and spawn a task
to handle each one.
port = 4200
def listener():
lsock = socket(AF_INET, SOCK_STREAM)
lsock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
lsock.bind(("", port))
lsock.listen(5)
while 1:
csock, addr = yield from sock_accept(lsock)
print("Listener: Accepted connection from", addr)
schedule(handler(csock))
The handler function handles the interaction with one client session.
def handler(sock):
while 1:
line = yield from sock_readline(sock)
if not line:
break
try:
n = parse_request(line)
yield from sock_write(sock, "100 SPAM FOLLOWS\n")
for i in range(n):
yield from sock_write(sock, "spam glorious spam\n")
except BadRequest:
yield from sock_write(sock, "400 WE ONLY SERVE SPAM\n")
The handler uses the following function to parse the request and check it for validity.
class BadRequest(Exception):
pass
def parse_request(line):
tokens = line.split()
if len(tokens) != 2 or tokens[0] != "SPAM":
raise BadRequest
try:
n = int(tokens[1])
except ValueError:
raise BadRequest
if n < 1:
raise BadRequest
return n
All we need to do now is spawn the main loop and run the scheduler.
schedule(listener())
run2()
Here's a sample client session:
% telnet localhost 4200
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
SPAM 3
100 SPAM FOLLOWS
spam glorious spam
spam glorious spam
spam glorious spam
EGGS
400 WE ONLY SERVE SPAM
^]
telnet> Connection closed.
%