Lightweight Tasks using Generators and Yield-From

This is an essay about using generators, together with the new yield from construct available in Python 3.3, to implement lightweight cooperatively-scheduled threads. I will use the term task to refer to such threads, to distinguish them from OS-supported threads and processes.

Here I will develop a very simple scheduler for such tasks, to illustrate the principles involved. I'll be introducing small snippets of code, some of which are core parts of the scheduler and some of which are usage examples. To help distinguish between them, I'll use a colour code: orange for library code, blue for application examples, and green for example output.

Core of the Scheduler

Each task will be a generator. Whenever a task wants to suspend itself, it will yield. We won't make any use of values sent or received by yields; they will simply be suspension points.

We'll start with a global variable to hold the currently running task.
  current = None
We'll also want a queue of tasks that are waiting to run.
  ready_list = []
The first thing we'll want is a way of getting a task into the scheduling system.
  def schedule(g):
    ready_list.append(g)
The core loop of the scheduler will repeatedly take the task at the head of the queue and run it until it yields.
  def run():
    global current
    while ready_list:
      g = ready_list[0]
      current = g
      try:
        g.next()
      except StopIteration:
        unschedule(g)
      else:
        expire_timeslice(g)
If the task is still at the head of the ready list after it has yielded, we move it to the end, so that the ready tasks will run round-robin fashion.
  def expire_timeslice(g):
    if ready_list and ready_list[0] is g:
      del ready_list[0]
      ready_list.append(g)
When the task finishes, we use the following function to remove it from the scheduling system.
  def unschedule(g):
    if g in ready_list:
      ready_list.remove(g)
We've got enough so far to try a simple test.
  def person(name, count):
    for i in range(count):
      print name, "running"
      yield

  schedule(person("John", 2))
  schedule(person("Michael", 3))
  schedule(person("Terry", 4))

  run()
Running this, we get
  John running
  Michael running
  Terry running
  John running
  Michael running
  Terry running
  Michael running
  Terry running
  Terry running

Waiting for Resources

Things get more interesting when our tasks do something non-trivial.

Let's turn our people into dining philosophers. For this we'll need a way to represent forks (the eating kind, not the unix kind) and a way for a task to wait for one to become available.

But before launching into that, let's add two more functions to our scheduler that will come in useful.
  def block(queue):
    queue.append(current)
    unschedule(current)
This removes the currently running task from the ready list and adds it to a list that you specify.
  def unblock(queue):
    if queue:
      g = queue.pop(0)
      schedule(g)
This removes the task at the head of the specified list, if any, and adds it to the ready list.

Now we can start implementing an eating utensil.
  class Utensil:

    def __init__(self, id):
      self.id = id
      self.available = True
      self.queue = []
The utensil has a flag indicating whether it's available, and a queue of tasks waiting to use it. To acquire a utensil, we first check to see whether it is available. If not, we block the current task on the queue, and then yield. When we get to run again, it's our turn, so we mark the utensil as being in use.
    def acquire(self):
      if not self.available:
        block(self.queue)
        yield
      self.available = False
To release the utensil, we mark it as available and then unblock the task at the head of the queue, if any.
    def release(self):
      self.available = True
      unblock(self.queue)
Next we need a life cycle for a philosopher.
  def philosopher(name, lifetime, think_time, eat_time, left_fork, right_fork):
    for i in range(lifetime):
      for j in range(think_time):
        print(name, "thinking")
        yield
      print(name, "waiting for fork", left_fork.id)
      yield from left_fork.acquire()
      print(name, "acquired fork", left_fork.id)
      print(name, "waiting for fork", right_fork.id)
      yield from right_fork.acquire()
      print(name, "acquired fork", right_fork.id)
      for j in range(eat_time):
        # They're Python philosophers, so they eat spam rather than spaghetti
        print(name, "eating spam")
        yield
      print(name, "releasing forks", left_fork.id, "and", right_fork.id)
      left_fork.release()
      right_fork.release()
    print(name, "leaving the table")
Now we can set up a scenario.
  forks = [Utensil(i) for i in range(3)]
  schedule(philosopher("Plato", 7, 2, 3, forks[0], forks[1]))
  schedule(philosopher("Socrates", 8, 3, 1, forks[1], forks[2]))
  schedule(philosopher("Euclid", 5, 1, 4, forks[2], forks[0]))
  run()
Running this produces
  Plato thinking
  Socrates thinking
  Euclid thinking
  Plato thinking
  Socrates thinking
  Euclid waiting for fork 2
  Euclid acquired fork 2
  Euclid waiting for fork 0
  Euclid acquired fork 0
  Euclid eating spam
  ...etc...

Waiting for External Events

So far our task system has been completely self-absorbed and unable to deal with the outside world. Let's arrange things so that, if there are no tasks ready to run, the scheduler will wait for some file to become readable or writable using select(). It's easiest to do this by writing a new main loop that builds on the previous one.
  def run2():
    while 1:
      run()
      if not wait_for_event():
        return
We will need a data structure to hold tasks waiting for files. Each file needs two queues associated with it, for tasks waiting to read and write respectively.
  class FdQueues:

    def __init__(self):
      self.readq = []
      self.writeq = []
We will keep a mapping from file objects to their associated FdQueue instances.
  fd_queues = {}
The following function retrieves the queues for a given fd, creating new ones if they don't already exist.
  def get_fd_queues(fd):
    q = fd_queues.get(fd)
    if not q:
      q = FdQueues()
      fd_queues[fd] = q
    return q
Now we can write a new pair of scheduling primitives to block on a file.
  def block_for_reading(fd):
    block(get_fd_queues(fd).readq)

  def block_for_writing(fd):
    block(get_fd_queues(fd).writeq)
It's expected that the task calling these will immediately yield afterwards. We could incorporate the yield into these functions, but we'll be building higher level functions on top of these shortly, and it will be more convenient to do the yield there.

We'll also want a way of removing a file from the fd_queues when we've finished with it, so we'll add a function to close it and clean up.
  def close_fd(fd):
    if fd in fd_queues:
      del fd_queues[fd]
    fd.close()
Now we can write wait_for_event(). It's a bit longwinded, but fairly straightforward. We build lists of file objects having nonempty read or write queues, pass them to select(), and for each one that's ready, we unblock the task at the head of the relevant queue. If there are no tasks waiting on any files, we return False to tell the scheduler there's no more work to do.
  def wait_for_event():
    from select import select
    read_fds = []
    write_fds = []
    for fd, q in fd_queues.iteritems():
      if q.readq:
        read_fds.append(fd)
      if q.writeq:
        write_fds.append(fd)
    if not (read_fds or write_fds):
      return False
    read_fds, write_fds, _ = select(read_fds, write_fds, [])
    for fd in read_fds:
      unblock(fd_queues[fd].readq)
    for fd in write_fds:
      unblock(fd_queues[fd].writeq)
    return True
At this point we can try a quick test to see if everything works so far.
  def loop():
    while 1:
      print("Waiting for input")
      block_for_reading(stdin)
      yield
      print("Input is ready")
      line = stdin.readline()
      print("Input was:", repr(line))
      if not line:
        break

  schedule(loop())
  run2()
Sample session:
  Waiting for input
  asdf
  Input is ready
  Input was: 'asdf\n'
  Waiting for input
  qwer
  Input is ready
  Input was: 'qwer\n'
  Waiting for input
  Input is ready
  Input was: ''
It's not a very convincing test yet, though, since there's only one task, so let's play around with some sockets and build a multitasked server.

A Spam Server

We're going to implement the following protocol. The client sends the word "SPAM" followed by a number, and the server replies with "100 SPAM FOLLOWS" and the corresponding number of repetitions of the phrase "spam glorious spam". If the requested number is not greater than zero or the request is malformed, the server replies "400 WE ONLY SERVE SPAM".

We could do with some higher-level functions for blocking operations on sockets, so let's write a few. First, accepting a connection from a listening socket.
  def sock_accept(sock):
    block_for_reading(sock)
    yield
    return sock.accept()
Now reading a line of text from a socket. We keep reading until the data ends with a newline or EOF is reached. (We're assuming that the client will wait for a reply before sending another line, so we don't have to worry about reading too much.) We also close the socket on EOF, since we won't be reading from it again after that.
  def sock_readline(sock):
    buf = ""
    while buf[-1:] != "\n":
      block_for_reading(sock)
      yield
      data = sock.recv(1024)
      if not data:
        break
      buf += data
    if not buf:
      close_fd(sock)
    return buf
Writing data to a socket. We loop until all the data has been written. We don't use sendall(), because it might block, and we don't want to hold up other tasks.
  def sock_write(sock, data):
    while data:
      block_for_writing(sock)
      yield
      n = sock.send(data)
      data = data[n:]
Now we're ready to write the main loop of the server. It will set up a listening socket, then repeatedly accept connections and spawn a task to handle each one.
  port = 4200

  def listener():
    lsock = socket(AF_INET, SOCK_STREAM)
    lsock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    lsock.bind(("", port))
    lsock.listen(5)
    while 1:
      csock, addr = yield from sock_accept(lsock)
      print("Listener: Accepted connection from", addr)
      schedule(handler(csock))
The handler function handles the interaction with one client session.
  def handler(sock):
    while 1:
      line = yield from sock_readline(sock)
      if not line:
        break
      try:
        n = parse_request(line)
        yield from sock_write(sock, "100 SPAM FOLLOWS\n")
        for i in range(n):
          yield from sock_write(sock, "spam glorious spam\n")
      except BadRequest:
        yield from sock_write(sock, "400 WE ONLY SERVE SPAM\n")
The handler uses the following function to parse the request and check it for validity.
  class BadRequest(Exception):
    pass

  def parse_request(line):
    tokens = line.split()
    if len(tokens) != 2 or tokens[0] != "SPAM":
      raise BadRequest
    try:
      n = int(tokens[1])
    except ValueError:
      raise BadRequest
    if n < 1:
      raise BadRequest
    return n
All we need to do now is spawn the main loop and run the scheduler.
  schedule(listener())
  run2()
Here's a sample client session:
  % telnet localhost 4200
  Trying 127.0.0.1...
  Connected to localhost.
  Escape character is '^]'.
  SPAM 3
  100 SPAM FOLLOWS
  spam glorious spam
  spam glorious spam
  spam glorious spam
  EGGS
  400 WE ONLY SERVE SPAM
  ^]
  telnet> Connection closed.
  %