Where have I been for the last month anyhow?
Hello readers. I am back from a very long writing vacation. I am back to my roots and I am here to talk about a topic I have been working on for a bit in Racket, which I want to call the Distributed Task system.
I have been working on a project, something that may come to be used here over the course of the next week, but for me to work on it, I wanted to try something different.
When we describe programs, we typically think of them as linear assemblies of code, such that one piece of code gets executed after another in a completely linear fashion. It took decades for us to move away from this, and move into distributed computing, the concept of not having to do that.
There are many different concepts for distributed programming:
Every day we use something that involves distributed programming - if you load a web page, you most likely loaded it from a cluster of machines, and were assigned one specific machine to get the file from. If you use a web browser, your web browser uses multiple CPU cores on your computer to render certain web components. Distributed programming is a little invisible.
More importantly with distributed programming, there are two types of programs:
You could say the first category is like drawing two parallel lines and watching them go forward autonomously. That would be considered parallel computing, where the two or more programs do not care about the others necessarily. That's an ideal style for programs that need to run autonomously, like systems running other programs.
The second category is called asynchronous programming, where results can be loaded in at a given time and must notify the receiver. There are many terms here, like Promises, Futures, Events, Mutexes, and so on and so forth. An example of this type of programming would be a 3D rendering program that executes code on multiple cores of a CPU, and when one thread is done rendering a pixel, it waits for a new job to come in so it can keep doing more work.
The second method is more interesting because it changes how programs can be made - instead of thinking in terms of linear executed fashion, we can now think of code in asynchronous terms. What lines of code can be lifted and spread across threads to make more sense and improve your program speed?
I will admit, I am a novice at asynchronous programming. I write code and I still can't think of places where an asynchronous style would help me. I decided that I wanted to change that, so I am taking my first real stab.
The first part of understanding asynchronous code is understanding how it actually works. Racket is a virtual machine language where code gets compiled into bytecode and ran through the Racket VM. Racket provides a way of defining asynchronous threads of execution quite simply with the
thread is a way of creating code that will run asynchronously to the main thread of the Racket VM. The code in the given
thread will not block the main thread from running, and provides a means of doing distributed computing. The user API for threads is quite simple too.
What I mean by blocking is that, when Racket encounters a code that signifies it has to wait, it will do what's called a "block", and the thread will not continue until the block is lifted. There are ways of writing custom blocks that unblock when certain parameters are met, but Racket does a great job of encapsulating it into a few easy blocks. For the time being, we're only going to use the few functions listed here for threading purposes.
thread- initializes a thread
thread-wait- waits for a thread
thread-send- sends a message to a thread
thread-receive- waits for a message from anywhere
The below snippet should give you a basic idea of how threading works.
#lang racket/base (define t1 (thread (lambda () (displayln "I'm a thread print!")))) (displayln "Waiting for t1 to finish (blocking)") (thread-wait t1) (define t2 (thread (lambda () (displayln "Waiting for a msg") (define msg (thread-receive)) (printf "Got ~a\n" msg)))) (displayln "Sending a message to t2") (thread-send t2 500) ; should see "Got 500" ; now block execution until t2 is completed ; necessary so the program doesn't exit early (thread-wait t2)
thread-send are the key pieces to creating and interacting with threads, while
thread-receive are blockers. The point of the blockers is to make sure they line up with other threads in a program and don't jump ahead of other threads too quickly.
Using threads, it is trivial to create a list of threads, or even fixed arrays like
vector, and have threads stored that way. In fact it's probably the simplest way of storing threads reasonably. Since
vector is not a pure functional data type, it involves a strong mutation call to
vector-set! to mutate.
(define my-threads (make-vector 5 #f)) (define indices (range 5)) (for ([x indices]) (vector-set! my-threads x (thread (lambda () (displayln "Hello!"))))) ; now wait for them all to print (for ([x indices]) (thread-wait (vector-ref my-threads x)))
This is kind of where some cracks will start to show. Namely because it does not do what we think it should - the
for loop pauses every cycle for each thread, and blocks the main thread until each individual thread is done!
This is still better than where we were before. If we had to run any arbitrary number of functions in parallel, the
for loop acts as a synchronizing blocker to wait for other threads to catch up. The total time for these functions to execute is the time it takes for the longest function to execute, so we aren't really in trouble right now.
The question is now then, can we re-use or create new threads when one ends? That is an issue, because it's not as simple as running a
for loop over the threads and waiting for them to finish. We might get stuck on a long-running thread, preventing us from checking on the shorter-running ones.
What instead we should think about is how to make threads communicate with others, and this is usually done with Message Passing. Message passing is a style of coding where threads do their work, and send messages to one another to notify them of changes in a system. When one thread ends, it should notify another that it needs a new job to do, or be terminated by the manager of the system.
To have threads communicate between themselves, you can create thread factories to pass a reference to a parent thread, or supervisor thread, however you want to think of it. The newly-made thread will then remember who it's parent is, and can send messages to it.
(define (thread-maker n parent) (lambda () (sleep 5) (thread-send parent 'done))) (define parent-t (thread (lambda () (define (loop) (define msg (thread-receive)) (printf "Got ~a\n" msg) (loop) (loop))))) (thread-maker 1 parent-t) (thread-maker 2 parent-t)
This program makes two threads that simply sleep for 5 seconds then message the parent. In order for a thread to send a message to another, it somehow must have the reference to that thread. This can be done by passing it through a function argument to be captured by the scope, or by simply having the definition lexically in the same scope.
By having threads send some messages to a parent, the parent can monitor the progress of each thread. If you had a list of functions to execute, you could then tell a task what to do next afterwards.
(define tasks (list (lambda () ...) ...)) (define workers (make-vector 5 #f)) (define indices (range 5)) (define (make-worker n parent) (thread (lambda () (define (loop) (thread-send parent n) (define task (thread-receive)) (task) (loop)) (loop)))) (define supervisor (thread (lambda () (define (supervisor-loop listof-tasks) (unless (empty? listof-tasks) (define asker (thread-receive)) (thread-send (vector-ref workers asker) (car listof-tasks)) (supervisor-loop (cdr listof-tasks)))) (parent-loop tasks)))) (for ([x indices]) (vector-set! workers x (make-worker x supervisor))) (thread-wait supervisor)
Now that looks pretty darn good, and doesn't use
thread-wait for individual threads. Now we can dispatch work to worker threads and let them act autonomously without interfering with the main thread. Pretty cool, right?
The downside to this code is that the main thread will exit when the last task is sent off to a thread. We don't wait for all the threads to finish their work, we just look to see if our work list is empty or not. We need a better system to manage this.
Let's move on, and start trying to use threads to solve real problems.
We have seen how to run code independently of the main thread, but so far, it isn't exactly smart. Threads can do code, but right now, what if code relies on another thread to complete before it can start?
As I can best describe it, let's describe Panera Bread, a place I have been to far too often now.
Panera Bread has a goal of cooking recipes to create food product, and then selling them to the customer in exchange for monetary value. And at Panera Bread, we value our customer and pride ourselves on the quality of our work.
(Notice the strong use of me saying and then? That's because one operation relies on another in order to fulfill it's goal.)
We can say that one task is dependent on another, so how can we find a way to represent this in code? Well, it's tricky, but it can be done.
The first step is defining what it is to be a task. A task is a set of code, with a label, and possibly depends on another person or event to finish before it can be started.
(define Task (id fn depends-on))
A simple three-variable struct would do well here. But we don't want that
depends-on part to always connect to something, so we'll use a function to help construct them with an optional labeled argument.
(define (Task-make id fn #:depends-on [dp #f]) (Task id fn dp))
Now we can represent a list of tasks for Panera Bread nice and proper. These are the tasks that Panera Bread workers are expected to fulfill in some capacity.
(define tasks (list (Task-make 'prepare_food (lambda () (make-food))) (Task-make 'clean_store (lambda () (clean-store))) (Task-make 'open_store (lambda () (open-store)) #:depends-on 'clean_store) (Task-make 'sell_food (lambda () (sell-food)) #:depends-on 'prepare_food))))
Now we have four tasks ready to go. Great!
At this point you're probably wondering "if you can offload things into threads, why not simply make all the threads?". To that I will say: you really should not rely on the notion of infinite threads.
Picture a thread as if it were a Panera Bread worker. You could open up a new thread each time a task is encountered, but if the task cannot be completed because it's waiting on someone else to finish their work, then you're basically paying a worker to stand around waiting for something to do. If you have 1000 tasks, you need 1000 workers, and that would be a lot of paid slacking-off.
Instead, we must think of it in limited resources. Throwing more threads at a problem doesn't make it go faster. In fact, the more threads you create, you can possibly make your program slower, as all the threads try to get as much CPU time as they can, making the computer go crazy with all the contextual switching it will have to do.
It's better to think of it in smaller sizes and go up based on how well you do with a smaller bundle. Just like a real business, you increase your workers if you think you need more throughput. You don't start out with a ton of workers, then start firing them off to tweak your performance and budget.
Now, back to the bread making. We just finished describing our tasks, now we need a way of sorting the independent tasks from the dependent ones. You can do this with a filter of sorts by simply pre-processing the Tasks and checking if they have a dependency or not, then putting them into their own structures.
;; List<Task> -> (List<Task>, Hash<Symbol, Task>) (define (pre-process tasks) (define (inner tl free-tasks nonfree-tasks) (if (empty? tl) (values free-tasks nonfree-tasks) (let ([T (car tl)]) (let ([dp (Task-depends-on T)]) (inner (cdr tl) (if (eqv? #f dp) (cons T free-tasks) free-tasks) (if (eqv? #f dp) nonfree-tasks (if (hash-has-key? nonfree-tasks dp) (hash-update nonfree-tasks dp (lambda (tasks) (cons T task))) (hash-set nonfree-tasks dp (list T))))))))) (inner tasks '() (make-immutable-hash '())))
If that seems like a lot to take in, well, it's because it is. We map through and collect results of each task and recognize if it is a dependent task or not. We can then put the dependents into a hash which we can instantly access the minute the task expires.
We do have to change our worker system a little bit, but that's okay. We need our worker to tell us when they need a task, and then when the task is completed (and more specifically, which task was completed).
(define (make-worker n parent) (thread (λ () (define (loop) (thread-send parent (list 'awaiting n 0)) (define T (thread-receive)) (when (Task? T) (with-handlers ([exn? (λ (e) (displayln e) (exit))]) ((Task-fn T)) (thread-send parent (list 'done n (Task-id T))))) (loop)) (loop))))
Don't mind the
with-handlers part, that is just how we handle fatal errors for now. Error bubbling in Racket with threads is semi-confusing, and maybe it should be delegated to the parent instead, but we're not quite there yet. After all, it's hard work managing a Panera Bread facility.
Instead of sending over a number, we send three pieces of data to notify our parent that we are either bored and need work, or finished a specific job. You can't send over the result of
thread-send, which I learned the hard way, so we send it over as a compact list and deal with the unpacking later.
So we have our list of tasks, our list of tasks that have intermediate requirements, and a bunch of workers we can start assigning. How do we get started selling bread? Let's make a manager, finally! One who will boss the workers around to 100% efficiency.
(define-values (task-list dependents-hash) (pre-process tasks)) (define supervisor (thread (λ () (define (superloop Q H C) (when (or (not (empty? Q)) (< 0 C)) (define-values (msg thread# taskid) (apply values (thread-receive))) (printf "Message: ~a ~a ~a\n" msg thread# taskid) (case msg ((awaiting) (if (not (empty? Q)) (begin (thread-send (vector-ref workers thread#) (cdr Q)) (superloop (cdr Q) H (add1 C))) (superloop Q H C))) ((done) (if (hash-has-key? H taskid) (superloop (append Q (hash-ref H taskid)) (hash-update H taskid '())) (sub1 C)) (superloop Q H (sub1 C))) ((fail) (error "ERROR: system failure, check messages") (superloop Q H (sub1 C))) (else (error "ERROR: incorrect message type"))))) (superloop task-list dependents-hash 0)))) (for ([x indices]) (vector-set! workers x (make-worker x supervisor))) ; now wait for the supervisor to finish running all tasks (thread-wait supervisor)
Now if you made it all the way here, congrats! This took me probably a good day or so to fully flesh out, but it's actually a working system and does what I need.
The notable elements of this code is that I had to incorporate a task counter, because the minute a task left the active queue and went to the next cycle, it would stop, because there were no more tasks in the list, meaning it's job would be done. An active counter of running tasks helps the supervisor understand that there are jobs running, and it should keep track of the amount started and subtract when jobs are done.
One other thing: why is supervisor in it's own thread? Because I find it easier, that's all. You can lift all the work and put it into the main thread, but then you have to capture a reference to the main thread by using the
current-thread parameter. That
current-thread parameter can indeed get the main thread reference, but I think it's easier to simply create a new thread and use
thread-wait. That way there's no confusion about threading and it's all front and center.
I like the idea of writing programs as if they were thrown into the mix to be picked up by digital workers to then be completed. It's a nice thought, and a very different perspective on how to write programs.
The inspiration of this came from my remembering how the artificial intelligence of the game F.E.A.R. (2005) was made. F.E.A.R. (I'm going to call it "Fear" from now on okay?) is a combat first person shooter that when it came out, blew a lot of people's minds for how reactive the enemy A.I. actually is. The enemy soldiers you fight will talk to each other and try to work together to defeat you, often times by flanking, throwing grenades, and coming at you from different angles by spreading out.
The Fear creators used something called goal-oriented action programming, which is a type of AI program design using a graph data structure, representing game states as nodes and unit actions as connections between nodes. The game world is mapped out heavily with different positions and paths, and the enemies had their actions written so they could employ their moves to create new game states.
Game state requires memory space and it isn't infinite, so enemies cannot predict every action you do, but they can reasonably see a good few moves ahead and analyze what path which would help the best. Or, they can use a completely randomized path, and how would you know which one they took? That's the fun in all of it.
That being said, I cannot possibly expect to re-create the AI engine for an FPS game, but it gave me the inspiration to do this new way of coding I've never tried.
I have a program, and I can break it down into steps, and some of those steps do have intermediate requirements. My task system is a little basic, and there's no way of me describing a task with multiple requirements right now. That would require more work. But it works for me right now, and I can lay out a path of how I want things to work, and the task running system will simply do the tasks in order and try to make sure no task is ran unless it's requirement is met.
The project I am working on currently uses it, because I wanted to try something new. I have a ton of intermediary steps that need to be met before other code pieces can be executed, so it made sense to try this out. Over the course of the week, I will be employing this new project here, and I will talk about it more in-depth when it is released. For now I want this post to sit here to talk about my experience trying something completely different with Racket.
Until next time!