Archive for March, 2012

Spreading the Load

My recent work on High Speed Replication is not the only thing I’ve done to improve GlusterFS performance recently. In addition to that 2x improvement in synchronous/replicated write performance, here are some of the other changes in the pipeline.

Patch 3005 is a more reliable way to make sure we use a local copy of a file when there is one. The current “first to respond” method doesn’t always work, because the first to respond is usually the first to be asked, and we always query the servers in the same order regardless of where we are. Thus, if the bricks are on nodes A and B, everyone will query A first and most of the time everyone – including B – will decide to read from A. The new method should work a lot better in “peer to peer” deployments where every machine is both client and server.

Patch 2926 handles the opposite case, where servers and clients are separate. The same convergence behavior mentioned above was causing strange inconsistencies in performance runs. Sometimes (if clients happened to divide themselves evenly across replicas) performance was really good; other times (if clients happened to converge on the same – usually first – replica) it was really bad. The new method uses hashing to ensure that both one client accessing multiple files and multiple clients accessing the same file will distribute that access across replicas. If hash-based random distribution is good enough for DHT, it’s good enough for AFR as well.

Patch 3004 deals with distribution rather than replication, and it might be the most interesting of the three, but by itself doesn’t tell the whole story of what I’m up to. ALl the patch does is support a “layout” type (see my article about distribution) that the built-in rebalance infrastructure will leave alone. Thus, if you want to create a carefully hand-crafted layout for a directory so that all files go to bricks in a certain rack, you’ll be able to do that safely. More importantly, it provides the essential “hook” for experimenting with different rebalancing algorithms. I already have a Python script that can calculate a new layout based on an old one, with two twists.

  • It checks the brick sizes, and generates a new layout with the hash ranges weighted according to those sizes. This improves behavior when bricks are different sizes – as is often the case when the new batch of disks is 50% larger than the batch purchased a year ago.
  • It also tries to build a new layout that minimizes the data motion coming from the old one. The built-in algorithm is a bit stupid this way, tending to move (N-1)/N of the data when you add brick N. For large N, this means a pretty complete reshuffling of your data. The new algorithm seems to converge on moving only about 40% of the data if you want a “perfect” final result, or all the way below 10% if you’re willing to sacrifice a bit of “accuracy” to minimize the disruption associated with data migration.

The one thing that was missing was a way to apply these changes and make them stick. Once we have that, what I envision is a three-step approach to adding bricks. Sometimes you might not migrate any data at all; you just adjust the layouts to prefer placement of new data on the new bricks (a very small tweak to the total-size-based weighting I’ve already implemented). If you want to be a bit more aggressive about rebalancing, you can do the low-data-movement kind of rebalancing which will get you pretty close to an optimal layout. Then, perhaps during planned downtime or predictable low-usage periods, you do the full rebalance to ensure fully optimal placement of data across your cluster.

Those aren’t the only patches I have in the works, but I think they constitute a coherent group with the common goal of distributing load across the cluster more evenly than before. That’s what “scale out” is supposed to mean, so I’m glad that we’ll finally be capturing more of the advantages associated with that approach.


GlusterFS Algorithms: Replication (future)

In my last post, I promised to talk a bit about some emergent properties of the current replication approach (AFR), and some directions for the future. The biggest issue is latency. If you will recall, there are five basic steps to doing a write (or other modifying operation): lock, increment changelog, write, decrement changelog, unlock. For many workloads, certain optimizations apply. “Changelog piggybacking” can skip the decrement/increment for batched writes. “Eager locking” can do likewise for the unlock/lock. Thus, for streaming writes you might see: lock, increment, very many writes, decrement, unlock. Pretty cool, huh?

Unfortunately, it doesn’t work so well for random synchronous writes, such as we’d see for virtual-machine-image workloads or applications using lightweight embedded databases. In a simple implementation, both changelog piggybacking and eager locking apply only when we can actually see that there’s a second request in progress (or at least in our queue) when the first completes. Sure, we could implement a Nagle-ish approach of waiting for some amount of time before we send the decrement/unlock, just in case another write comes along, but that can get complicated pretty quickly. How long should we wait? If we don’t wait long enough, we might be incurring extra context switches to handle the timeout and send the deferred cleanup messages. If we wait too long, then we might be blocking out other clients for too long (for the deferred unlock) or expanding the window during which a client disconnection would cause an actually-unnecessary repair on the file (for the deferred decrement). Wouldn’t it be better if we didn’t have to rely so much on optimizations to a fundamentally inefficient full-serialization approach?

That brings us to High Speed Replication or HSR. (Hey, if the old stuff is “advanced” then the new stuff can be “high speed” IMO.) The basic idea here is to reduce latency, in several steps. First, we eliminate the “pessimistic” locking and serialization in favor of a more “optimistic” model of detecting and resolving conflicts. I played around with some pretty complicated vector-clock schemes, but eventually settled on a method using simple versions of a file at each server. The version acts as a predicate: if the version the client presents (i.e. the last one it knows about) doesn’t match the one at the server, the write is rejected. If a write is rejected by any server, the client simply retries it at all. This doesn’t provide any fairness or forward-progress guarantees, obviously, but that kind of conflict isn’t the point. This isn’t supposed to be something that you could use to simulate shared disks, or for highly concurrent I/O as is seen in the MPI world (go use PVFS). AFR won’t perform well in those cases either. Its serialization is there mostly to ensure that all servers execute writes in the same order even if they receive them out of order, to prevent permanent inconsistency between the copies. The conflict resolution and retry in HSR serves the same need; if two requests hit two servers in opposite orders, at least one will get a version conflict and be retried, so the copies will converge.

What if a server or client dies in the middle? For that, we still have the changelogs. Yes, we still use them, and they still mean the exact same thing. This was a very deliberate decision, so that the existing self-heal mechanisms could still work (most of my fancy vector-clock schemes required completely separate repair procedures). However, while the bits on disk mean the same things, the way they’re updated is a bit different. We do the increment on the server side as part of the write, and we do the decrement asynchronously. Thus, one network round trip is eliminated and one is moved out of the latency path, but if we’re interrupted in the middle of a write then the existing self-heal will still take care of it. That means we have to do some things to make sure that the existing self-heal does in fact still work even if it’s run concurrently with our writes, but I don’t need to go into that level of detail. The important thing is that a non-contending user write now consists of only two parts: a write over the network before we return, and a changelog decrement that’s done after. The results are pretty satisfying.

Those are 4KB random synchronous writes using two of Storm On Demand‘s 12GB SSD servers and a single client. I chose those because they represent a realistic configuration where performance is bound by network latency (plain old GigE AFAICT) rather than storage; I can actually get even more dramatic numbers in more artificial configurations, but that wouldn’t be useful. The “afr default” line is AFR as it exists today. Clearly it is in fact network-bound, since these machines are capable of doing about 30K IOPS each. The “afr write-behind” line was supposed to measure the effect of eager locking, but after I’d run the tests and been surprised by the results I realized that the real difference came from enabling write-behind (which is required for eager locking). Personally I don’t consider that a valid configuration, because if the user asked for synchronous writes they shouldn’t be queued in memory for write-behind, but it’s informative just to see the effect of that change. The “afr fast-path” line represents AFR with all locking and changelog updates disabled. This is a totally unsafe and invalid configuration, but it establishes a “speed of light” for replicated writes. No matter how well we optimize, replicated writes won’t be faster than that number on this hardware (especially on this network). The real point of the graph is the “hsrepl” line – twice as fast as plain AFR (the only other valid configuration IMO) or 2/3 of the speed of light, depending on how you look at it.

That’s exactly the kind of improvement I was hoping for. Our performance on this most demanding kind of workload has been a stumbling block for many, especially those who want to use GlusterFS for hosting virtual-machine images, and this just might bring those workloads “into the fold” for us. 3000 IOPS wouldn’t be that impressive for a local SSD, but this isn’t local storage. This is distributed storage, so that data survives even when nodes fail, so local-storage numbers don’t apply. It’s also synchronous I/O, so “throw everything in a huge unordered batch and tell me about my million IOPS later” doesn’t apply either. Scale this up to twenty servers on 10GbE, with more clients to feed them, and you’re probably talking about at least 200K globally available crash-safe IOPS for much less than the only real alternatives – SAN or other scale-out NAS – would cost.

Before I wrap up, I’d like to reiterate how this does not displace AFR. HSR still passes every request other than simple writes through to AFR, which must be present, to be handled the same way as before. HSR also relies on the AFR self-heal infrastructure, including all past and future improvements there. Users whose performance is not limited by network latency, either at the low end because they’re bound by slow disks or at the high end because they have a fast network, probably won’t see much benefit from HSR. So might users whose workloads are already well served by the recent improvements to AFR. In those cases, they might as well just stick with AFR to reduce the number of moving parts. HSR is simply about extending our replication strategy to cover new use cases, not to displace AFR in old ones where it has already served quite well.


GlusterFS Algorithms: Replication (present)

Replication is the most necessarily complex part of GlusterFS – even more than distribution, which would probably be the most common guess. It’s also one of the things that sets GlusterFS apart from most of its obvious competitors. Many of them simply require that you implement RAID (to protect against disk failures) and heartbeat/failover (to protect against node failures) externally. Thanks, guys. The worst thing about that approach is that it depends on shared storage at least between failover sets, and the whole idea of a scale-out filesystem is supposed to be that it can run on cheap commodity hardware. A few alternatives do implement their own replication, and kudos to them, but I’m not here to describe how other projects do things. In GlusterFS, the approach is based on using extended attributes to mark files as potentially “dirty” while they’re being modified, so that they can be recovered if the modification fails on one replica in the middle. I posted a description of these extended attributes a while back, so here I’ll focus more on the algorithms that use them. The conceptual process of writing to a file consists of the following steps.

  1. Take out a lock on the file. This prevents simultaneous updates from occurring at the replicas in different orders, which is hard to reconcile. Perhaps even more importantly, it helps to avoid conflicts between a write and “self-heal” (repair) on the same file. It’s easy to focus on conflicts between clients, but this kind of client/internal conflict is important too.
  2. Mark the “changelog” (extended attributes) on each copy of the file, to represent the pending write. This is not really a log in any meaningful sense of the word, but that’s one among many unique uses of standard terms in the code so I’ll stick with it. It’s really a set of “dirty” counters (see the above link for more detail) which are incremented at this point. The important thing to note here is that each replica contains a changelog for every other replica, so that failure of one replica does not wipe out both the write and the record of its existence.
  3. Do the actual write on all replicas.
  4. As each replica write completes, update (decrement) the changelog on the others. Note that this process might be repeated if the replica count is greater than two.
  5. When all writes and changelog decrements are complete, release the lock.

Note that this is just the conceptual view. In reality, many optimizations can be applied. For example:

  • Don’t clear the changelog on X for Y immediately if we’re still waiting to hear from Z, but wait until we can clear both together. Note that this means X’s changelog for Y might be out of date temporarily, leading to a different self-heal result than would be obtained if it were current.
  • Skip the changelog increment if another write is already in progress, and likewise skip the decrement when that prior write completes. This is called “changelog piggybacking” because we’re basically letting the second write “ride along” on the first write’s changelog increment.
  • Skip the unlock and subsequent lock operations when writes overlap. This is called “eager locking” and is essentially the same as changelog piggybacking except that it’s for lock operations instead of changelog operations. This is also not complete yet.

With these optimizations, the number of network round trips per write can go from an execrable five (or more) down to one – just the write. Even better, with write-behind enabled, these kinds of access patterns become much more likely. Unfortunately, many workloads either can’t allow write-behind or don’t provide an access pattern that it can optimize, so these optimizations won’t be optimized either and IMO treating them as the default for measuring performance is tantamount to cheating on benchmarks.

All of this might seem complex, but the real complexity is in how we use these changelog values to do self-heal. Here, we encounter some more unique terminology based on which replicas have non-zero changelog entries for which others. If X has a non-zero entry for Y, we say that X “accuses” Y (of having incomplete operations), and this leads to the following “characters” for each replica.

  • IGNORANT means that the replica doesn’t even have a changelog – e.g. for a file that’s missing on one replica.
  • INNOCENT means that the replica doesn’t accuse anyone.
  • FOOL means that the replica accuses itself. In other words, it got as far as the changelog increment but not as far as the decrement, so we don’t actually know whether the write in between made it to disk.
  • WISE means that the replica doesn’t accuse itself, but does accuse someone else.

The algorithm for determining which way to self-heal is really complicated, so I’ll just hit some of the highlights. You might recall that a replica does not have a changelog entry for itself[1], so how does it accuse itself (i.e. become a fool)? The secret is that an implicit self-accusation is made in some conditions – I confess that even I don’t fully understand how the code is making this distinction. The key is that this separates the aggregation of state from decisions about state, allowing the decision logic to work the same way in a whole bunch of different conditions. Some of the most common or important cases are:

  • All nodes are innocent. I’m not even sure how we’d be self-healing in this case, but what we do is just pick one.
  • Only one wise node exists. This is what will happen if that node finished the write and others didn’t (or might not have). The single wise node is clearly the source, and its data is propagated to any others that are less wise.
  • Multiple wise nodes exist, but they don’t accuse each other. Just pick one.
  • Multiple wise nodes exist, and there are accusations between them. This is the infamous “split brain” which we cannot resolve automatically. I recommend using the quorum enforcement feature to avoid this.
  • None of the above apply. Pick the fool who accuses the others most strongly. The theory (I think) is that the node with the highest aggregate pending-operation count for the others is the one who has been up the most while others are down, so it’s most likely to have correct data. I could actually see an argument that this should be treated as split-brain instead, so I’d be interested in hearing others’ thoughts.

There’s a lot more – feel free to explore the vast forest of calls into and out of afr_build_sources if you really want to appreciate how complicated this is – but this is getting long already and we still need to discuss how self-heal is actually triggered. Historically, this has evolved quite a bit over time. The first idea was that it would be done “behind the scenes” when a file is looked up, and users wouldn’t have to worry about. Not too surprisingly, people who actually deployed GlusterFS were uncomfortable with having a potentially large and – more importantly – unknown number of “vulnerable” files after a failure. Thus, it became common for people to run a full scan across the entire volume (using “find | stat” or “ls -alR”) to force a quicker self-heal after a failure. Recently GlusterFS has started to do this automatically through its own self-heal daemon, and even more recently code was added to log which files need self-heal instead of requiring a full scan which can take days to weeks. (This is the same basic idea I had demonstrated back in November of 2010, but is implemented quite differently.) In GlusterFS 3.3 or 3.4, the result will be an automatic and (reasonably) efficient self-heal process, which might be one of the most significant improvements since the new cluster- and volume-management framework was added in 3.1.

I was going to write about some emergent properties of this approach, and some directions for the future, but this post has gotten quite long enough. I’ll save that for next time.

[1] Update March 13: Avati points out that bricks do have a changelog entry for themselves now, and I’ve verified this to be the case. Mystery solved.


GlusterFS Algorithms: Distribution

A lot of people seem to be curious about how GlusterFS works, not just in the sense of effects but in terms of internal algorithms etc. as well. Here’s an example from this morning. The documentation at this level really is kind of sparse, so I might as well start filling some of the gaps. Today I’ll talk about DHT, which is the real core of how GlusterFS aggregates capacity and performance across multiple servers. Its responsibility is to place each file on exactly one of its subvolumes – unlike either replication (which places copies on all of its subvolumes) or striping (which places pieces onto all of its subvolumes). It’s a routing function, not splitting or copying.

The basic method used in DHT is consistent hashing. Each subvolume (brick) is assigned a range within a 32-bit hash space, covering the entire range with no holes or overlaps. Then each file is also assigned a value in that same space, by hashing its name. Exactly one brick will have an assigned range including the file’s hash value, and so the file “should” be on that brick. However, there are many cases where that won’t be the case, such as when the set of bricks (and therefore the range assignment of ranges) has changed since the file was created, or when a brick is nearly full. Much of the complexity in DHT involves these special cases, which we’ll discuss in a moment. First, though, it’s worth making a couple more observations about the basic algorithm.

  • The assignment of hash ranges to bricks is determined by extended attributes stored on directories (here’s a description of those data structures). This means the distribution is directory-specific. You could well distribute files differently – e.g. across different sets of bricks – in different directories if you know what you’re doing, but it’s quite unsafe. Firstly it’s unsafe because you’d really better know what you’re doing. Secondly it’s unsafe because there’s no management support for this, so the next time you do a rebalance (more about that later) it will happily stomp on your carefully hand-crafted xattrs. In the fairly near future, I hope to add a feature to recognize hand-set xattrs as such and leave them alone. In the more distant future, there might be management support for assigning bricks to various pools or classes of storage, and defining per-directory placement policies in terms of those.
  • Consistent hashing is usually thought of as hashing around a circle, but in GlusterFS it’s more linear. There’s no need to “wrap around” at zero, because there’s always a break (between one brick’s range and another’s) at zero.
  • If a brick is missing, there will be a hole in the hash space. Even worse, if hash ranges are reassigned while a brick is offline, some of the new ranges might overlap with the (now out of date) range stored on that brick, creating a bit of confusion about where files should be. GlusterFS tries really hard to avoid these problems, but it also checks aggressively to make sure nothing slips through. If you ever see messages in your logs about holes or overlaps, that means the checking code is doing its job.

So, those are the basics. How about those special cases? It’s probably easiest to look at the “read” path first, where we’re trying to find a file that we expect to be there. Here’s the sequence of operations.

  1. Make sure we have the hash-range assignments (the “layout”) for each brick’s copy of the parent directory. This information is cached, so we’ll usually have it already.
  2. Hash the file name and look up the corresponding brick in the layout.
  3. Send a LOOKUP request to that brick, specifying the file path.
  4. If the LOOKUP comes back positive, we found the file and we’re done.
  5. Otherwise, re-send the LOOKUP to all bricks to see who really has the file.
  6. If nobody gives a positive reply, the file really isn’t there and we’re done again.
  7. Go back to the brick where the file “should” have been, and create a link file (described below) pointing to the real location.
  8. Return the LOOKUP result to the caller.

What’s a link file, then? Have you ever looked on one of your bricks and seen zero-length files with weird permissions (sticky bit set)? Those are link files. If you look closer, you’ll also see that they have trusted.dht.linkfile xattrs with brick names in them. That’s how we avoid the “broadcast” mentioned above. On subsequent lookups, if we find a link file we just follow it to the real brick. Considering that we only go through this lookup procedure once per file per client anyway (location information is cached), the cost of “guessing wrong” is therefore pretty minimal. I once implemented a scheme where we do an exponentially expanding search instead of an immediate broadcast, hoping to achieve a better balance of lookup latency vs. network traffic, but in the end it just didn’t seem to make a difference so the extra complexity wasn’t worth it. Now, let’s look at the file-creation path.

  1. Assume we’ve already done a lookup, so we already have the layout information cached and we know the file doesn’t already exist anywhere.
  2. Hash the file name and look up the corresponding brick in the layout.
  3. If that brick is full, choose another brick (doesn’t really matter how) that isn’t instead.
  4. Send a CREATE request to the chosen brick for that file.
  5. If we “diverted” because of a full brick, go back and add a link file to the brick chosen by pure hashing. The next client will almost certainly need it.

This brings us to rebalancing, which is one of the key challenges – and therefore one of the most interesting research areas IMO – in this kind of system. The first thing to know about GlusterFS rebalancing is that it’s not automatic. If you add a new brick, even new files won’t be put on it until you do the “fix-layout” part of rebalance, and old files won’t be put on it until you do the “migrate-data” part. What do these do?

  • Fix-layout just walks the directory tree recalculating and modifying the trusted.glusterfs.dht xattrs to reflect the new list of bricks. It does this in a pretty simple way, assigning exactly one range of length MAXINT/nbricks to each brick in turn starting at zero.
  • Migrate-data is much more costly. For each file, it calculates where the file “should” be according to the new layout. Then, if the file is not already there, it moves the file by copying and renaming over the original. There’s some tricky code to make sure this is transparent and atomic and so forth, but that’s the algorithm.

In my personal opinion, there are problemsenhancement opportunities in both of these areas. Let’s take these in reverse order. Migrate-data is slow. What it should do is run in parallel on all of the bricks, with each brick either “pushing” data that is currently local but needs to be elsewhere or “pulling” data that’s the other way around. What it does instead is run on one node, potentially moving files for which it is neither source nor destination. This is a big deal, because it causes rebalance to take days when it should take hours – or weeks when it should take days, on larger installations. The amount of I/O involved is also why you don’t necessarily want this to be an automatic process.

While the migrate-data issue is at the level of mechanics and implementation, the fix-layout issue is at more of a conceptual level. To put it simply, when we add a new brick we should reallocate approximately 1/new_brick_count hash values. Because our layout calculations are naive, we will usually reallocate much more than that – exacerbating the migrate-data problem because reallocated hash values correspond to moved files. Time for a picture.

The outer ring represents the state with just three bricks – hash value zero at the top, split into three equal ranges. The inner ring represents the state after adding a fourth brick. Any place where the inner and outer rings are different colors represents a range that has been reassigned from one brick to another – implying a migration of data likewise. If you look carefully, you’ll see that we’re moving half of the data when it should be only a quarter – 8% blue to orange, 17% orange to yellow, and 25% yellow to green. What could we do that’s better? Not much, if we stay within the limitation of a single brick claiming a single range, but there really doesn’t need to be such a limitation. Instead, we could borrow from Dynamo and assign multiple “virtual node IDs” for brick four, giving it a total of 25% drawn equally from bricks one through three. (If you look at this as a clock, that’s one hour each at three, seven, and eleven o’clock). Taking this approach too far can cause layout tables to get unreasonably large, so sometimes it makes more sense to forego this optimization or even reverse it by combining/swapping ranges even though that will cause “unnecessary” data migration. Done sensibly, this can keep the table size under control without resorting to the scale-limiting “hash bucket” approach of some Dynamo derivatives. There are even more sophisticated ways to address this problem, but that’s a whole different post and this should be enough to convey the general problem space.

That’s how DHT works today, and some thoughts about how it might evolve in the future. The next article will focus on replication.