Yield-From Example 2: A Scheduler for Generator-Based Threads ============================================================= Just for fun, let's write a thread scheduler. Each thread will be a generator. Whenever a thread wants to suspend itself, it will do a 'yield'. We won't make any use of values sent or received by yields in this example. We'll start with a global variable to hold the currently running thread. current = None We'll also want a queue of threads that are waiting to run. ready_list = [] The first thing we'll want is a way of getting a thread into the scheduling system. def schedule(g): ready_list.append(g) The core loop of the scheduler will repeatedly take the thread at the head of the queue and run it until it yields. def run(): global current while ready_list: g = ready_list[0] current = g try: g.next() except StopIteration: unschedule(g) else: expire_timeslice(g) If the thread is still at the head of the ready list after it has yielded, we move it to the end, so that the ready threads will run round-robin fashion. def expire_timeslice(g): if ready_list and ready_list[0] is g: del ready_list[0] ready_list.append(g) When the thread finishes, we use the following function to remove it from the scheduling system. def unschedule(g): if g in ready_list: ready_list.remove(g) We've got enough so far to try a simple test. def person(name, count): for i in xrange(count): print name, "running" yield schedule(person("John", 2)) schedule(person("Michael", 3)) schedule(person("Terry", 4)) run() We can run this with present-day Python, and we get John running Michael running Terry running John running Michael running Terry running Michael running Terry running Terry running Waiting for Resources --------------------- Things get more interesting when our threads do something non-trivial. Let's turn our people into dining philosophers. For this we'll need a way to represent forks (the eating kind, not the unix kind) and a way for a thread to wait for one to become available. But before launching into that, let's add two more functions to our scheduler that will come in useful. def block(queue): queue.append(current) unschedule(current) This removes the currently running thread from the ready list and adds it to a list that you specify. def unblock(queue): if queue: g = queue.pop(0) schedule(g) This removes the thread at the head of the specified list, if any, and adds it to the ready list. Now we can start implementing an eating utensil. class Utensil: def __init__(self, id): self.id = id self.available = True self.queue = [] The utensil has a flag indicating whether it's available, and a queue of threads waiting to use it. To acquire a utensil, we first check to see whether it is available. If not, we block the current thread on the queue, and then yield. When we get to run again, it's our turn, so we mark the utensil as being in use. def acquire(self): if not self.available: block(self.queue) yield self.available = False To release the utensil, we mark it as available and then unblock the thread at the head of the queue, if any. def release(self): self.available = True unblock(self.queue) Next we need a life cycle for a philosopher. def philosopher(name, lifetime, think_time, eat_time, left_fork, right_fork): for i in xrange(lifetime): for j in xrange(think_time): print name, "thinking" yield print name, "waiting for fork", left_fork.id yield from left_fork.acquire() print name, "acquired fork", left_fork.id print name, "waiting for fork", right_fork.id yield from right_fork.acquire() print name, "acquired fork", right_fork.id for j in xrange(eat_time): # They're Python philosophers, so they eat spam rather than spaghetti print name, "eating spam" yield print name, "releasing forks", left_fork.id, "and", right_fork.id left_fork.release() right_fork.release() print name, "leaving the table" Now we can set up a scenario. forks = [Utensil(i) for i in xrange(3)] schedule(philosopher("Plato", 7, 2, 3, forks[0], forks[1])) schedule(philosopher("Socrates", 8, 3, 1, forks[1], forks[2])) schedule(philosopher("Euclid", 5, 1, 4, forks[2], forks[0])) run() We can't run this in current Python, because of the yield-froms. However, we can test it by substituting for-loops such as for _ in left_fork.acquire(): yield After doing this, the output is Plato thinking Socrates thinking Euclid thinking Plato thinking Socrates thinking Euclid waiting for fork 2 Euclid acquired fork 2 Euclid waiting for fork 0 Euclid acquired fork 0 Euclid eating spam ...etc... Waiting for External Events --------------------------- So far our thread system has been completely self-absorbed and unable to deal with the outside world. Let's arrange things so that, if there are no threads ready to run, the scheduler will wait for some file to become readable or writable using select(). It's easiest to do this by writing a new main loop that builds on the previous one. def run2(): while 1: run() if not wait_for_event(): return We will need a data structure to hold threads waiting for files. Each file needs two queues associated with it, for threads waiting to read and write respectively. class FdQueues: def __init__(self): self.readq = [] self.writeq = [] We will keep a mapping from file objects to their associated FdQueue instances. fd_queues = {} The following function retrieves the queues for a given fd, creating new ones if they don't already exist. def get_fd_queues(fd): q = fd_queues.get(fd) if not q: q = FdQueues() fd_queues[fd] = q return q Now we can write a new pair of scheduling primitives to block on a file. def block_for_reading(fd): block(get_fd_queues(fd).readq) def block_for_writing(fd): block(get_fd_queues(fd).writeq) It's expected that the thread calling these will immediately yield afterwards. We could incorporate the yield into these functions, but we'll be building higher level functions on top of these shortly, and it will be more convenient to do the yield there. We'll also want a way of removing a file from the fd_queues when we've finished with it, so we'll add a function to close it and clean up. def close_fd(fd): if fd in fd_queues: del fd_queues[fd] fd.close() Now we can write wait_for_event(). It's a bit longwinded, but fairly straightforward. We build lists of file objects having nonempty read or write queues, pass them to select(), and for each one that's ready, we unblock the thread at the head of the relevant queue. If there are no threads waiting on any files, we return False to tell the scheduler there's no more work to do. def wait_for_event(): from select import select read_fds = [] write_fds = [] for fd, q in fd_queues.iteritems(): if q.readq: read_fds.append(fd) if q.writeq: write_fds.append(fd) if not (read_fds or write_fds): return False read_fds, write_fds, _ = select(read_fds, write_fds, []) for fd in read_fds: unblock(fd_queues[fd].readq) for fd in write_fds: unblock(fd_queues[fd].writeq) return True At this point we can try a quick test to see if everything works so far. def loop(): while 1: print "Waiting for input" block_for_reading(stdin) yield print "Input is ready" line = stdin.readline() print "Input was:", repr(line) if not line: break schedule(loop()) run2() Sample session: Waiting for input asdf Input is ready Input was: 'asdf\n' Waiting for input qwer Input is ready Input was: 'qwer\n' Waiting for input Input is ready Input was: '' It's not a very convincing test yet, though, since there's only one thread, so let's play around with some sockets and build a multithreaded server. A Spam Server ------------- We're going to implement the following protocol. The client sends the word "SPAM" followed by a number, and the server replies with "100 SPAM FOLLOWS" and the corresponding number of repetitions of the phrase "spam glorious spam". If the requested number is not greater than zero or the request is malformed, the server replies "400 WE ONLY SERVE SPAM". We could do with some higher-level functions for blocking operations on sockets, so let's write a few. First, accepting a connection from a listening socket. def sock_accept(sock): block_for_reading(sock) yield return sock.accept() Now reading a line of text from a socket. We keep reading until the data ends with a newline or EOF is reached. (We're assuming that the client will wait for a reply before sending another line, so we don't have to worry about reading too much.) We also close the socket on EOF, since we won't be reading from it again after that. def sock_readline(sock): buf = "" while buf[-1:] != "\n": block_for_reading(sock) yield data = sock.recv(1024) if not data: break buf += data if not buf: close_fd(sock) return buf Writing data to a socket. We loop until all the data has been written. We don't use sendall(), because it might block, and we don't want to hold up other threads. def sock_write(sock, data): while data: block_for_writing(sock) yield n = sock.send(data) data = data[n:] Now we're ready to write the main loop of the server. It will set up a listening socket, then repeatedly accept connections and spawn a thread to handle each one. port = 4200 def listener(): lsock = socket(AF_INET, SOCK_STREAM) lsock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) lsock.bind(("", port)) lsock.listen(5) while 1: csock, addr = yield from sock_accept(lsock) print "Listener: Accepted connection from", addr schedule(handler(csock)) The handler function handles the interaction with one client session. def handler(sock): while 1: line = yield from sock_readline(sock) if not line: break try: n = parse_request(line) yield from sock_write(sock, "100 SPAM FOLLOWS\n") for i in xrange(n): yield from sock_write(sock, "spam glorious spam\n") except BadRequest: yield from sock_write(sock, "400 WE ONLY SERVE SPAM\n") The handler uses the following function to parse the request and check it for validity. class BadRequest(Exception): pass def parse_request(line): tokens = line.split() if len(tokens) != 2 or tokens[0] != "SPAM": raise BadRequest try: n = int(tokens[1]) except ValueError: raise BadRequest if n < 1: raise BadRequest return n All we need to do now is spawn the main loop and run the scheduler. schedule(listener()) run2() At this point, I got fed up with expanding yield-from statements by hand, and wrote a program to do it for me. It's called yfpp.py and you can find it in the attached zip file. Here's a sample client session: % telnet localhost 4200 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. SPAM 3 100 SPAM FOLLOWS spam glorious spam spam glorious spam spam glorious spam EGGS 400 WE ONLY SERVE SPAM ^] telnet> Connection closed. % Conclusions ----------- The yield-from statement makes it possible to write thread code using generators almost the same way as you would write ordinary code. Whether it's any easier or clearer than using things like yield Call(g(x)) and yield Return(x) is debatable. However, I think this example does show that the implementation of a generator-based scheduler can be very clean and simple when yield-from is available, and if it is suitably optimised, probably more efficient as well. All the code presented here is included in a runnable form in the attached file Threads.zip. -- Greg