PLRelational: Query Optimization and Execution
This is our latest entry in a series of articles on PLRelational. For more background, check out these other articles in the series:
- Reactive Relational Programming with PLRelational
- Introduction to Relational Algebra using PLRelational
- PLRelational: Observing Change
- PLRelational: Storage Formats
- Let's Build with PLRelational, Part 1 (and Part 2)
We've been talking a lot about PLRelational lately and what you can do with it. Today I want to talk about some of its internals. In particular, I want to talk about how it optimizes and executes queries, which is one of the most interesting components of the framework.
Simple Query Execution
PLRelational's relations are divided into two kinds: source relations that contain data directly, and intermediate relations that produce data by operating on other relations. (The latter are implemented in the IntermediateRelation
type.)
To get data out of a relation, PLRelational executes a query. For source relations, this is simple: ask it for its data, and that's it. Executing a query on intermediate relations gets more complicated.
Executing a query on intermediate relations is easy to implement if speed isn't a concern. Fetch the contents of the operands, perform the operation on those contents, and provide the result. For example, here's pseudocode for the intersection operation:
for row in operand[0] {
if operand[1].contains(row) {
provide(row)
}
}
Difference is almost identical, just with the condition reversed:
for row in operand[0] {
if !operand[1].contains(row) {
provide(row)
}
}
Union is slightly more complicated due to the need to avoid providing duplicate rows, but still straightforward:
for row in operand[0] {
provide(row)
}
for row in operand[1] {
if !operand[0].contains(row) {
provide(row)
}
}
Early versions of PLRelational implemented operations in this way. It works fine, but can be really slow.
To see why, let's build up a small example. We'll start off with a list of pets:
let pets = MakeRelation(
["id", "name", "animal"],
[1, "Mickey", "cat"],
[2, "Fido", "dog"],
[3, "Nudges", "cat"],
[4, "Rover", "dog"])
Let's pull out the cats:
let cats = pets.select(Attribute("animal") *== "cat")
How is the select
method implemented? A really simple implementation might look like this:
for row in operand[0] {
if expression.valueWithRow(row).boolValue {
provide(row)
}
}
Of course, this requires iterating over all of the pets. This is fine when there's only four of them, but what if there are thousands?
We could improve this by adding some code to the underlying Relation
type produced by MakeRelation
. It could implement select
to return a new Relation
which knows how to efficiently perform the operation.
What if the situation is more complicated, though? For example, maybe pets
is loaded from disk, and then we make in-memory modifications by adding and deleting rows. We can express that by creating Relation
s for the added and removed rows, then using a union and a difference to combine it all:
let added = MakeRelation(
["id", "name", "animal"],
[5, "Pointy", "cat"])
let removed = MakeRelation(
["id", "name", "animal"],
[6, "Nudges", "cat"])
let currentPets = pets.union(added).difference(removed)
Then we pull out these cats:
let currentCats = currentPets.select(Attribute("animal") *== "cat")
Unfortunately, we're back to iterating over all of the pets.
We should be able to do better. Filtering the output of a union produces the same results as filtering the inputs, then performing the union on the filtered results. The same is true for intersections and differences. If the system could take the filter and push it down to the storage relations, each one could efficiently perform the select and we'd avoid iterating over everything.
How do you make that happen? You could override select
in unions and differences to do this, but then what if you derive multiple relations from a union? What about other operations? What about more complicated derived relations? What we really want is a separate system that can see the big picture and optimize it all.
Query Execution Structure
PLRelational implements such a system using three high-level components.
The query planner is responsible for translating a set of Relation
s into a more execution-oriented representation. Relation
objects in memory are easy to work with in code, but a bit challenging to work with for query execution. The QueryPlanner
class takes an array of Relation
s and output callbacks (which will be provided with the data from those Relation
s), and builds a Node
for each Relation
passed in and all of the Relation
s they depend on. Each Node
contains things such as an operation describing what it does, an approximate count, and pointers to parent and child nodes.
Next, the query optimizer takes the nodes that the planner produced and performs optimizations on them.
After the optimizer is done, the nodes are sent to the query runner. It performs all the work of actually fetching data, computing results, and invoking the output callbacks. The runner is also able to perform optimizations which are better suited to being performed as the query runs rather than in advance.
Ahead-of-Time Optimizations
The distinction between optimizations performed in advance and optimizations performed while running the query mirrors the world of compilers, where you have ahead-of-time and just-in-time compilers. As I implemented this system, the best optimizations came from the query runner, and the query optimizer ended up with a pretty small role.
The current implementation only optimizes a couple of degenerate cases with unions. Unions with only one operand, which are used internally as placeholders in some situations, are eliminated. Nested union operations are compressed into a single operation, within limits. Finally, a garbage collection pass removes nodes that have been orphaned by these optimizations.
Just-in-Time Optimizations
The query runner is responsible for some key optimizations which can dramatically speed up. Most importantly, it tracks filters which can be applied to each node, and pushes those filters to the children when possible. When those filters make it all the way to a child which provides data directly, the filter can be used to efficiently fetch only the needed data.
First, it finds all of the select operations in the graph and applies the select expressions as filters to the children. This is done before running the query, and so strictly speaking could be done as part of the query optimizer instead. However, the query runner tracks the nodes' filter states, which makes it a more suitable place to implement this step.
The query runner tracks two important pieces of information about each node's filter state: the current filter expression, and the number of parent nodes which have not yet provided a filter. As long as there are parents which have not provided a filter, the node must provide its data unfiltered, because those parents may need it all. When a parent node provides a filter, the filter expression is ORed with the current filter expression, and the number of parents which haven't provided a filter is decremented. Once that number reaches zero, the filter expression can be used, and it's propagated to the node's children.
Once select operations are applied, the query runner begins to run the query and process data. At this point, another big optimization kicks in for join operations.
A join works a lot like a select, where the expression is dynamically derived from the contents of the operands. For example, let's take the currentPets
data above, and track the IDs of the selected animals stored in another Relation
:
let selectedPetIDs = MakeRelation(
["id"],
[5])
If we need the selected pet's name and animal type, we could do this with a select(Attribute("id") *== 5)
, but we'd have to manually manage that. It's much easier to do a join:
let selectedPets = selectedPetIDs.join(currentPets)
The fact that selectedPetIDs
contains a single row with id: 5
makes it act like a select on currentPets
, returning only the rows where id
is 5
.
Equivalently, we could look at currentPets
as providing a massive select expression based on all of its id
values, which then applies to selectedPetIDs
. This produces the exact same output, but is far more expensive to compute, which will be crucial!
When a join node receives all data for one of its operands, it will attempt to turn it into a filter which it can push to the other operand. It creates the filter by constructing a filter from each input row and ORing them all together. To avoid making a massive, inefficient filter that wouldn't actually speed anything up, it only does this if the number of rows is below a certain threshold, currently set at 100 rows.
Ordering Operations
The order in which data is fetched is crucial. Joins where one operand is small can be wonderfully optimized if the small side is computed first, but will be woefully inefficient if the large side is computed first. Figuring out which side is the small side ahead of time can be really tough, since each operand can be an arbitrarily complex operation. Instead, the query runner attempts to make this happen as it fetches and computes data.
The query runner operates by choosing a node which can produce data (called an "initiator node") and asking it for some rows. It then propagates those rows through the graph until as much processing as possible has been done. It then goes back to the initiator and asks for more data, and this repeats until the initiator has provided all of its data. The query runner then chooses the next initiator, and repeats everything again until all initiators have been drained.
Initiator nodes are able to estimate the number of rows they contain which match a certain filter. It's important for this operation to be fast, which is why it's not an exact number. If the estimate is incorrect, the query will still produce correct results, just potentially slower. If the initiator can't come up with an estimate at all, it can return nil
, in which case the query runner considers it to be bigger than anything else.
When the query runner chooses a new initiator node to pull data from, it picks the one with the smallest estimated size at the time. This usually results in the small sides of joins being filled first, which then allows the query runner to filter the other join operand. Estimated sizes change as filters are pushed down, so this can result in a chain reaction as one small initiator is used, filtering another initiator making it small, which then filters even more.
Subtree Copying
A child node must receive filters from all parents before it can apply that filter to itself. If one parent doesn't have a filter, then it must be assumed that this parent needs all of the data. This is problematic when one parent provides a filter, and then that parent's own data provides an input to the other parent which would generate a filter. Subtree copying fixes this seeming chicken-and-egg problem.
This is a lot easier to understand with an example. Let's consider a Relation
which contains metadata for a document hierarchy. Each document gets an ID and a name. It also gets a parent ID, which expresses the nesting relationship. Documents at the top level have a null parent ID:
let documentMetadata = MakeRelation(
["id", "name", "parent_id"],
[1, "A Story", .null],
[2, "Cake Recipe", .null],
[3, "A Story: Chapter 1", 1],
[4, "A Story: Chapter 2", 1])
We'll track the notion of the currently selected document ID in another Relation
:
let selectedDocumentID = MakeRelation(
["id"],
[3])
We can derive a Relation
containing the metadata of only the currently selected document by performing a join:
let selectedDocumentMetadata = selectedDocumentID.join(documentMetadata)
Now let's imagine that we want the metadata of the currently selected document's parent as well. We can obtain this by projecting selectedDocumentMetadata
on the parent_id
attribute (removing the ID and name data), renaming parent_id
to id
, and then joining the result to documentMetadata
:
let selectedDocumentParentID = selectedDocumentMetadata
.project("parent_id")
.renameAttributes(["parent_id": "id"])
let selectedDocumentParentMetadata = selectedDocumentParentID
.join(documentMetadata)
Let's take a more graphical look at this setup. PLRelational has code which can dump a Relation
into a Graphviz dot
file, which we can then convert to SVG for display on the web. Here's what selectedDocumentParentMetadata
looks like:
Note that in this graph, the selectedDocumentParentMetadata
is at the top, and the arrows point from operations to their operands.
After going through the query planner, but not yet running the query, the query planner nodes look like this:
This graph is inverted from the last one. selectedDocumentParentMetadata
is now at the bottom, and the arrows point in the direction of data flow.
In this graph, we can see the problem visually. selectedDocumentID
has the smallest number of rows, so the query runner will fetch its data first. That will go into the join, which will convert the input to a filter and push that filter to documentMetadata
. However, because documentMetadata
has two parents and only one filter has been applied, that's not enough to activate a filter, so it produces all of its rows. That means that potentially thousands of rows flow through this graph just to produce one output row, which is terribly inefficient.
Subtree copying fixes this problem. When certain criteria are met, a node and all of its parent nodes are copied when pushing a filter to a node. In this example, documentMetadata
is duplicated, with one copy providing input to selectedDocumentMetadata
and the other copy providing input to selectedDocumentParentMetadata
. This is the state of things when the query runner completes, and it looks like this:
The documentMetadata
node has been duplicated. One of them now points into selectedDocumentMetadata
, and the other one points into selectedDocumentParentMetadata
. When selectedDocumentMetadata
receives input from selectedDocumentID
, it creates a filter and pushes it to its copy of documentMetadata
.
Since that node only has one parent, the filter is immediately activated, which lets it efficiently produce a single row. The query runner pulls data from that node next, since it's now the smallest. That then allows selectedDocumentMetadata
to compute the output of the join. That data then flows up the chain until it reaches selectedDocumentParentMetadata
. That join is then able to compute a filter and push it to the other copy of documentMetadata
. This allows both fetches from documentMetadata
to be performed efficiently, and avoids the chicken-and-egg problem posed by the original structure.
I mentioned that certain criteria must be met for the query runner to copy a subtree. Those criteria are:
- The total number of nodes in the subtree must be no more than a certain limit, currently set at 100 nodes.
- The node at the top of the subtree must have at least one other parent that hasn't yet provided a filter.
- There must be at least one initiator node in the subtree which can efficiently produce less data when given a filter.
#1 is an arbitrary limit put in place to avoid doing the optimization if it's going to require a lot of work. Really complicated subtrees may cost more to duplicate than the time saved by the operation. It's hard to figure out exactly where the tradeoff is no longer worth it, but 100 is a workable limit for now.
#2 avoids pointless work. If the node isn't waiting on any other parents, then it can propagate the filter directly and copying won't help anything.
#3 likewise avoids pointless work. If none of the initiators in the subtree can efficiently produce filtered data, then there's no point in trying to ensure that they can be filtered. We'll pay the cost one way or another, and we might as well skip the cost of copying the subtree on top of that.
Subtree copying can make a big difference. Testing with 1,000 rows in memory, the query runs in about 30 milliseconds on my computer with subtree copying disabled, and about 7 milliseconds with it enabled. On 100,000 rows, it takes 2.4 seconds to run without subtree copying, and only 18 milliseconds to run with it.
Conclusion
That's the basic tour of PLRelational's query infrastructure. One of the interesting aspects of building the framework around relational algebra is the ability to optimize complicated data flow graphs constructed by the programmer on the fly based on the actual data being used at the time. Our optimizer is still fairly basic and there's a lot of room for improvement and new optimizations, but what we currently have is already enough to provide many orders of magnitude improvement on real-world data.