Recent blog updates

Shown here are only posts related to concurrency. You can view all posts here.

How Reader Mutexes Can Deadlock

Sample Deadlock

Translucent areas depict waiting for something; incomplete lock statements have dashed border. Note that it doesn't matter in which order the top two acquisitions are made.

Can a cryptic entanglement of your mutex locks lead to a deadlock? It sure can. Deadlocking is the second thing your parents tell you about mutexes: if one thread acquires A, then acquires B before releasing A, and the other does the same in the reverse order, the threads may potentially deadlock. And deadlock they will if the first two acquisitions are picked from separate threads. Here's the code of two threads, and sidenote depicts the problematic execution:

But what about Reader/writer locks also known as "shared/exclusive" locks? Let's recap what these are first. Sometimes, to achieve greater efficiency, a mutex implementation supports two flavors of locking: Reader and Writer (otherwise known as "shared" and "exclusive"). If several threads only want to read from a shared variable, there's no need for each of them to wait for others. That's where you'd use a ReaderLock operation on a mutex guarding the variable. If thread wants to write, it invokes WriterLock which means "do not run any readers of writers while I'm holding the lock". Here's a wiki entry for reference, and here's standard Java API.

Seemingly OK Reader Lock Execution

We no longer have a "must-before" relation between B locks in two threads, so they don't deadlock. This looks OK, but it actually is not!

So imagine that both threads X and Y happen to use one of the locks as Reader lock? It seemingly should prevent deadlocking: if, say, B is a reader lock, then the execution specified above will make progress: B.ReaderLock() in thread X will not block waiting for thread Y to release it... right? Here's the code for clarity:

Turns out, reader locks can deadlock. You just need to make reader lock wait for another reader lock's release; how?

Many mutual exclusion implementations make acquiring threads "form a line" of some sort to ensure fairness: no thread should wait forever for a lock. Then a threads that tries to acquire a lock--either shared or exclusive--waits until all threads that called L.WrLock() earlier exit their critical sections. Fairness is especially important when you have reader and writer locks: if you'd allow any reader lock to proceed while there is another reader holding the lock, your writers could "starve" waiting for quiescence among readers, which may never happen on a highly contended lock.

So, to make a reader lock wait on another reader lock, we need a writer lock between them.

Deadlocking Reader Lock Execution

Here's how three threads can interleave such that you have a deadlock between reader mutex locks. The "blocked by" relationship between these reader locks transitively hops over a writer lock in some other thread Z.

Assume, that in the execution described earlier, before Thread X attempts to acquire the reader lock B, thread Z chips in, invokes B.WrLock(), and only then X calls B.RdLock(). The second X's Y.RdLock() starts to wait for the Z to acquire and then release B because of fairness concerns discussed above. Z's B.WrLock() waits for Y to release B.RdLock(). Y waits for X to release A. No thread makes progress, and here's the deadlock. Here's sample code of all three threads:

Note that you will have at least one writer lock for B somewhere (because if all acquisitions are Reader Locks, there's no point in having the lock at all.) Therefore, the only way to prevent this kind of deadlock is to not distinguish reader and writer locks when reasoning about progress guarantees.

This kind of deadlock needs at least three threads in order to bite you, but don't dismiss it outright! If the Internet taught us that a million monkeys in front of typewriters will not eventually recreate all the body of Shakespeare's work. they would at least trigger all possible race conditions in our typewriters no matter how contrived the corresponding executions seem.

Read on | Comments (0) | Make a comment >>


Multithreaded Consensus Versus Practice

Many programmers complain that most of what they've learned in their universities is completely useless in their daily work. Anecdotal evidence, a survey among some MIPT alumni, suggests that the median fraction of courses actually useful to a former student is about 5%. That's like taking one class per week instead of whole week of studying, so could you simply follow those wise brats who indeed spent the whole week smoking weed and boozing?

I'm sure that you simply should look closer. In this post, I tell how a portion of academic knowledge appeared in my work out of the blue.

The Challenge

Last week, we've been trying to improve concurrency of a multithreaded messaging system. When multiple threads wait for incoming messages with distinct labels, one and only one of them has to do the work of parsing the incoming message and directing it to the thread that actually waits for it. Choosing a designated parser thread was not possible, so the waiting threads should somehow assign the work to one and only one of them.

This is not a hard problem, but we had some minor block when we first approached it. So I suggested to step back and take a look at the theory.

That Sounds Familiar

I recall that making several threads designate one and only one of them as a "worker" is called "consensus problem" (Wikipedia provides) a slightly different but equivalent definition.) I also recalled, from my reading of TAoMP ("The Art of Multiprocessor Programming" by Herlihy and Shavit, link), that it was used to demonstrate limitations of various synchronization primitives, and most of them were incapable of solving it. That's right, in the world of formalized computing, some software problems can be proved computationally infeasible. An by "infeasible" I don't mean making a decent AI in first-person shooters, which is definitely infeasible, that's more like Turing machines and Halting problem. So maybe "mutex" is one of the tools that can't solve Consensus?

Who Can and Can Not Solve Consensus

A concurrent object is wait-free if there exists and upper limit N on time/instruction count it takes every call to complete, such that the same limit remains in effect as the number of thread grows indefinitely. Very few concurrent algorithms satisfy this property, and those who do are considered impractical.

Chapter 5 of the book is devoted to assessing relative strength of synchronization primitive operations, and it uses their capability to provide a wait-free solution to consensus problem as a measure of their strength. It later turns out, in chapter 6, that Consensus problem is universal enough to construct a wrapper that turns any single-threaded object into a thread-safe one. So, which synchronization primitives are universal?

Consensus vs. Registers

Assume you only have “simple” memory cells or registers at your disposal, which can turn either way when multiple threads write to them simultaneously. Given these, you only can solve Consensus in a wait-free manner for no more than one thread. (In other words, they are completely useless).

The complete proof is sketched in the book. It's constructed as follows. Assume Consensus can be solved for two threads. At one point, the memory state must be such that the next operation over one of these registers in some thread will determine the outcome. Then all the cases of these operations are enumerated, and viable executions, which usually end up with one thread “running solo,” are constructed. They lead to a situation where the algorithm should return different results based on the same thread-local data, which can't happen.

In particular, this can't happen if you're trying to solve Consensus for two or more threads with simple registers. It is scientifically thus proven that one needs more sophisticated synchronization primitives than just reads and writes to one memory cell. Which could these be?

Consensus vs. Multi-Register Operations

Round Robin Table

That's a sample round robin table with some competition results. Threads fill a similar data structure concurrently when they solve Consensus with multi-cell atomic writes.

If you can atomically write to N*(N+1)/2 memory cells rather than just one, you can solve Consensus for N threads. The impossibility of solving it for more than N follows the same scheme as outlined in the previous section, but the sample solution is very interesting.

Imagine that threads compete in a round robin tournament; well, not really compete, but simply fill the table with its results. Each game is either a win or a lose, and the score is a number of wins against threads that played at least one game.

When a thread participates in the Consensus, it atomically writes “I lose” in N-1 cells corresponding to its results; it also writes something in the “against itself” cell to distinguish itself from those who did not participate yet. If all threads do that, then, whenever somebody examines the table and computes the score of each thread. the thread who has the highest score will never change. The highest-score thread will always be the same one even if the reads are not atomic, and results change as one is reading the table.

Consensus vs. Complex Data Structures

Section 5.4 of the book demonstrates a proof that synchronized FIFO queues can't solve Consensus for more than two threads. It also claims that the same is true for “many similar data types, such as sets, stacks, double-ended queues, and priority queues,” which I don't have any reason to not believe.

Consensus vs. Atomic Integer Operations

By “atomic integer operations” I mean such functions as swap(&x, v) (set x to v and return the previous value) and compareAndSet(&x, e, u) (set x to u if x == e; do not change otherwise; return whether the value was equal to e) and others. Infrequently used, they are somewhat known to system programmers.

Turns out, these operations are not created equal. For instance, swap can't solve Consensus for more than two threads, while comareAndSet can solve it for infinitely many threads with a very simple algorithm:

Mutex Solution Paradox

The book begins with demonstration how one can implement mutexes using only simple registers. I formulated one of such implementations as a puzzle in my previous post. If we could use mutexes succesfully to solve Consensus, we could have replaced them with register implementations, which is impossible. Does it mean that we can't solve consensus with mutexes for more than one thread? Well, what about this simple algorithm?

More than this, we can implement the all-powerful compareAndSet with a mutex by wrapping the comparison and setting into a mutex' lock-unlock. Yet the book claims it's not possible. How come?

Different Flavors of “Solve”

A concurrent object is starvation-free if every call to its method eventually completes regardless of how threads are scheduled. For instance, an algorithm that hangs if only one thread is (unfairly) scheduled, while others sleep indefinitely for no reason, is not starvation-free per se, but may be such if OS promises not to do this, and be fair. It is weaker than wait-freedom because it doesn't require a global bound on the call length.

The mutex-based consensus algorithm is only correct if mutex has strong enough progress condition, for instance, if it's starvation-free (wait-freedom is not required here; see the last exercise to the Chapter 5 in the book). Mutexes implemented with simple registers are not starvation-free hence they don't completely “solve” Consensus. The theoretical results on impossibility given above only provide proofs for wait-free algorithms, however, it depends on the actual implementation of mutex whether the resultant algorithm will be wait-free or not.

It indeed is a corollary of the results discussed above that you can't make a starvation-free lock with simple registers only. Threads in such locks usually “spin” waiting for the memory to enter a state that signals this particular thread (and only it) to enter the critical section. If, at this point, some other thread “runs solo,” you are stuck forever. If you apply the register "weakness" proof scheme to a register-based mutex, this infinite loop will eliminate the contradiction that same thread-local data will lead to different results: they'll lead to endless spinning instead. Notice how that's not the case for compareAndSet() solution to Consensus presented above.

So the resolution to the paradox described above is that the mutex-based program is a solution only if the underlying mutex is starvation-free--in addition to simply being a correct mutual exclusion algorithm. To implement a starvation-free mutex, you need to have compareAndSet or similarly strong primitive operations, but they may be concealed within the lock or the OS implementation, and don't have to appear in your program explicitly.

Consensus vs. Humans

Consensus and Humans

Here's how choices of individual subjects changed over time during the experiment on how humans solve Consesus. Read more here.

Another interesting example of a computational system that can's solve consensus problem is a social network of human beings. A group of researchers conducted an experiment where several people were given material incentives to reach a specific consensus (an equivalent of threads proposing their own value), and were set out to reach one. They could not communicate in any way except for announcing the current value each person agrees to to their neighbors, i.e. they didn't have a designated leader or a common strategy.

The experiments demonstrated that people can't reach consensus in this settings. An interesting finding, though, was that the subjects quickly created a "bipartisan" system with only two colors, and started to juggle them around. Sometimes, new colors appeared as an attempt to unite under a different color, but these attempts quickly died off. Doesn't it remind of anything?..

Unsurprising Conclusion

So, it turns out, the theory didn't help us much with the original task. It helped us understand that thread nomination is a separate, studied problem that can be solved given the right tools. However, our initial choice of tools, the "mutex" was correct, and the knowledge of theory couldn't improve our initial choice. I'm glad it made me read "The Art Of Multiprocessor Programming" book more thoroughly than I otherwise would, and I do not consider it a waste of time. Multicore computing is getting more ubiquitous, and knowing the theory won't hurt.

Read on | Comments (0) | Make a comment >>


A Mutex Puzzle

I wrote earlier that I know very few decent programming puzzles. To be more precise, I know one, The Prisoners Riddle. So I was reading that The Art of Multiprocessor Programming book (I already made a post influenced by what I read there, and I'll soon make another one--stay tuned). Some very interesting things the book tells about can be formulated as puzzles. Here's one.

Two threads need a mutex object with Lock and Unlock methods, but they only have three usual shared boolean variables. How to implement such a mutex?

Every thread can call GetThreadId() function which returns 0 for one thread and 1 to the other, and the boolean variables are automatically initialized to false before threads execute.

If you solved it, congratulations; you have just reinvented this wheel.

Since I know the solution, I can't really understand if it's a good riddle or not. It's definitely too hard for a job interview, but still, it can be a good one. So I need your comments. What do you think? Could you solve it?

Read on | Comments (0) | Make a comment >>


Consistency Models Explained Briefly

Wikipedia does a ridiculously bad job at explaining different consistency conditions of multithreaded/distributed systems. Just take a look at the articles, and see how inconsistent the consistency model descriptions are. I'll try to get them together.

What is a Consistency Model?

When you're describing how your distributed system behaves, you usually specify something named a "consistency model". This "model" describes how distant the behavior of your system is in multi-threaded or multi-machine environment from the "ideal" behavior.

Assume that you're implementing a data structure (a set, for instance; say, you can add/remove elements, and check for presence). The specification (model) of its behavior usually implies that the operations are performed sequentially. For instance, a set indicates a presence of an element if ant only if it was added before the presence check, and not removed after that. The terms "before" and "after" imply that there is an order in operations over that structure. While it is straightforward to know the order of operations in a single thread, it gets unclear if there are several threads or machines:

  • Calls may overlap. If two threads call methods on the same structure at approximately the same time, the calls may overlap because they don't happen simultaneously in real world. In what order the calls took effect is not clear, however, because thread scheduling or network connection introduces arbitrary delays in processing.
  • Calls may not take effect immediately. When a method call returns, it is sometimes not clear when the operation actually "takes the effect." By diluting the notion of "taking the effect" one may achieve increase in speed. I.e. the elements added to a stack may not be fetched in the exactly same order, but the operations could be faster. The rationale behind this is that the "order" of events that happen in different parts of your cluster is not that well-defined and meaningful in the first place. I talked about why this is natural in my other post "Relativity of Simultaneity in Distributed Computing.

Object-Oriented Model and Executions

A consistency model usually assumes Object-Oriented approach to describing a system. Assume that we have something (say, a set), and its internal state can be observed and altered via a well-defined set of methods only (say, add(elem), remove(elem), and bool in(elem)). The OO approach assumes that internal state is concealed, and it has no meaning to discuss it per se, and we do not define consistency models in terms of object's state. Instead, we restrict the values these methods may return based on when other methods were called in different threads or machines.

In practice, method calls don't happen instantly, and our models totally acknowledge this. The set of method call times, the times it takes to complete each, their arguments, and their return values is named an execution. Executions are usually depicted as intervals plotted at lines. The parallel lines represent how time simultaneously passes in all threads. The intervals are the method calls, the beginning corresponding to the time the method was called and the end when the method returned. The call arguments and the return values as well as the method and object name are usually written near the method call. Here's a sample execution of a set object:

We will omit the actual call values further because consistency models do not operate in terms of actual arguments and values, but rather try to "map" concurrent executions to sequential ones. In our further diagrams we will also assume that all methods are called against the same object.

Since consistency model imposes restrictions on an execution, the latter may match or not match a consistency model. A data structure adheres to a consistency model iff every execution over its methods we may observe adheres to it. That's what's hidden behind such phrases as "sequentially consistent stack".

No Consistency

The most important consistency "model", is, of course, no consistency. Nothing is guaranteed. The methods called on the object may observe anything. We may add two different elements to an empty set, and observe that the size of set is three, and none of these elements is what we added.

All other consistency models are opposite to the No Consistency in the following sense. They all assume that the method calls may be re-arranged in a single-threaded sequence that is correct with respect to the object's specification. Let's call it arranged correctly in the rest of the post. For example, a set of pets that adheres to any consistency model described below can't tell us that it contains a cow, if only dogs were added to it.

"Multithreading" Consistency Models

What I discovered is that consistency models are divided into two categories: that lean to multithreading (shared memory multiprocessor systems), and to "multi-machine" (distributed systems). I find the former category more formal and mathematical; these models can be used to describe behavior precise enough to allow math-level, rock-solid reasoning about system properties. Some "distributed" consistency models are less precise in their definition, but are widely mentioned in documentation and description, and do allow users to make informed decisions.

Let's start with the "multithreading" ones.

Strong Consistency aka Linearizability

An execution is strongly consistent (linearizable) if the method calls can be correctly arranged retaining the mutual order of calls that do not overlap in time, regardless of what thread calls them. (See "No Consistency" section for definition of "correctly arranged").

In other words, if two calls overlap, then they may "take effect" in any order, but if a call returns, then you may be sure it already worked.

A more anecdotal definition is "before the method returns, it completely takes effect consistently with other threads." This is also usually called "thread safety" unless more precise definition of consistency model is specified. This is quite a stringent restriction, especially for distributed systems, where threads run on different machines.

Two linearizable executions combined also form a linearizable execution. This is an important property, since it allows us to combine linearizable components to get strongly consistent systems. This is not true for other models, for instance....

Sequential Consistency

An execution is sequentially consistent if the method calls can be correctly arranged retaining the mutual order of method calls in each thread.

The calls in different threads can be re-arranged however you wish, regardless of when they start and when they end.

An example of a sequentially consistent system is a naive notion of RAM in multi-core processor. When different threads read from the memory, they actually read from a cache or a register, which is synchronized with a common RAM at writes. While values read and written on different cores may not be immediately synchronized (cores may diverge in the notion of what value a cell contains), the values read and written on the same core are always consistent with each other. This is naive, though: even this guarantee is relaxed by modern compilers, and accesses to different variables within one thread can be reordered (as far as I recall, ARM platforms are guilty of this).

You might notice that the definition of sequential consistency does not directly prevents methods to return values from the future (i.e. to read what will be written later in a different thread). Yet I claim that RAM is sequentially consistent; how's that? The thing is that each execution should be sequentially consistent. If read and write do not overlap, and write follows the read, we may just stop the program between them. Since each execution should be consistent, the value read should be written before in some thread. Therefore, this "future prediction" is not possible in consistent data structures (not only those that are "sequentially" consistent) even that the definition doesn't directly prevent it.

However, if we combine two sequentially consistent executions, the result won't be sequentially consistent. But there is another model, weaker than linearizability, that makes it possible.

Quiescent Consistency

An execution is quiescently consistent if the method calls can be correctly arranged retaining the mutual order of calls separated by quiescence, a period of time where no method is being called in any thread.

This consistency condition is stronger than sequential consistency, but is still not strong enough that we usually expect from a multithreaded system. See my other post for a more in-depth example of a quiescently consistent stack. The reason to use this model is to attain a higher degree of concurrency and better response time in return of weakening consistency a bit.

Quiescent consistency is not as esoteric as you might think, especially since you barely heard about it. Here's an easy way to imagine a quiescently consistent system: assume that all writes are cached, and the cache is flushed to a persistent storage when there are no ongoing writes (or after it reaches certain size). There are special data structures that are quiescently consistent.

Quiescent consistency components can be combined to form quiescently consistent components again. Note also, that strong consistency implies quiescent consistency, hence you can combine these two kinds of systems, and the result will be quiescently consistent.

"Distributed" Consistency Models

Consistency is not Fault Tolerance

It may be tempting to mix the concepts of "consistency" and "fault tolerance". The former is how object reacts to method calls depending on the calling thread (or, in other words, how data are seen by multiple observers). Fault (or Partition) Tolerance describes how system behaves when system becomes partitioned, i.e. messages are lost, and/or parts of the system get disconnected from the network, or form several networks. These concepts are orthogonal to each other: an object can have one of them, both of them, or neither.

It will be, more likely, one of them. There is a scientific result known as CAP theorem that states that a system can't at the same time guarantee:

  • Linearizability
  • Availability (whether RPC calls send replies)
  • Fault Tolerance

In this post, however, we will focus on consistency rather than other properties of distributed systems. Note that multithreaded systems due to close proximity of their components, and less versatile medium that connects them, do not have to worry about fault tolerance...at least, at the current state of art.

The following models are usually seen in the context of distributed systems, i.e. those that reside on multiple machines, communication between which is slow enough. To bring Object-Oriented abstraction to distributed systems, programmers invented "Remote Procedure Call" (RPC) pattern. Method calls (object identifiers, method name, and arguments) are encoded into messages that are then sent across the network to specified destination, which may be the part of the encoding too. Method calls that do not expect a reply (say, unreliable "fire-and-forget" writes) may be treated as instantaneous.

Eventual Consistency

A system is eventually consistent if the method calls can be correctly arranged retaining the mutual order of calls separated by a sufficiently long period of time.

The eventual consistency definition—as well as its understanding—is specifically vague on how much time it would take, and whether this time depends on the call history. We can't really tell how soon an eventually consistent set will respond that an element X exists in it after it was added. But "eventually" it will.

This looks like a quite useless consistency model, since it doesn't actually restrict anything mathematically. Why is it used then? Exactly because of its lack of restriction. Users usually expect some consistency from a system, and if there's actually not much of it, something should be said about this. Eventual consistency is actually a limitation of liability; it's used in documentation to convey that the system can't guarantee that different threads become in sync immediately. However, the system is engineered in such a way that they eventually take effect. Here's an example (here's more on eventual vs. strong consistency)

Some authors, according to wikipedia, require that no method calls should be invoked during the "long period of time" for system to get in sync. This actually sounds like a stronger version of quiescent consistency, where a qualifying quiescence should be long enough.

Strict Consistency

An execution is strictly consistent if the method calls sorted by completion time are correctly ordered.

This is as simple and powerful as it's impossible to attain in most practical systems that have more than one thread. First, it order calls according to "global" time, which is a purely mathematical abstraction unachievable in real life, both according to Physics, and to the theory of distributed computing. Second, any implementation I might imagine would essentially disallow multithreading per se.

This is my rendition on the more "official" definition (1, 2), which defines a strictly consistent system as one where any read operation over a certain item returns the value supplied to the latest write operation according to global time counter. I don't like this definition because it needs to distinguish between "reads" and "writes", and introduces the concept of "data item"—i.e. it breaches the object-oriented approach. Still, I'm sure that if the system really consists of "data items," the definition at the beginning of the section would work as well.

Serializability

An execution is serializable if its methods in each thread are grouped in contiguous, non-overlapping transactions, and the methods can be correctly ordered in such a way that transactions are still contiguous.

In other words, the outcome of a sequence of method calls can be described as a serial execution of transactions, where methods of different transactions are not mingled, while the transactions actually overlap in time across different threads. The useful outcome here is that calls within overlapping transactions can be reordered, even if they don't overlap. However, "serializability" is not enough to describe the system: while it limits reordering of methods across transactions, it is unclear to what extent transactions can be reordered themselves. This is a job for a different consistency model. And serializability is usually used to "remind" about this important property of transaction-based systems.

Practice

Some systems provide several levels of consistency for operations (like the AppEngine's High-Reliability Datastore linked above). You should understand what tradeoffs each model has to make the decision best suitable for your specific application.

Most system you encounter will be not consistent at all ("thread-unsafe"), or linearizable (this is what usually understood by "thread-safe"). If the consistency model differs, it will be noted explicitly.

Strictly speaking, most systems that claim linearizability will be not consistent due to bugs, but these nasty little creatures aren't suitable for careful scientific discourse anyway.

Now that you know what consistency models are out there (and that they actually exist), you can make informed decisions based on what's specified in documentation. If you're a system developer, you should note that you are not bound to choose between linearizability and "thread-unsafety" when designing a library. The models presented above may be viewed as patterns that enumerate what behaviors in multithreaded context are considered universally useful. Just consider carefully what your users need, and do not forget to specify which model your system follows. It will also help to link to something that explains what lies behind the words "somethingly consistent" well--like this post, for instance.

Read on | Comments (1) | Make a comment >>


Counting Visits Efficiently with "Wide Caching"

One of the features of the web forum engine, a clone of which I develop in Ruby on Rails, is that it displays some site usage statistics.

There are two kins of statistics it displays. First, it tracks and shows how many times a particular message was read (i.e. clicked), tracking if a user doesn't self-click his or her message. Second, it shows the generic site usage activity, unique visitors, and page views in the top-right corner.

In this post I'll tell how I solved the problem of calculation the page view and visitors statistics. Impatient developers may jump directly to the solution section, while others are welcome to read about the path I walked from the most naïve activity tracking to the Wide Cache.

Naïve solution

The naïve solution was to store all site accesses in a table in the Application MySQL database. Each line in this table would represent one access to a site, keeping a user's hostname, and the access time. At each access, a corresponding line is inserted into the table. Then, if we are to display the information gathered, we run two simple db queries to get the pageview information (the time is the current time minus 10 minutes):

SELECT COUNT(distinct host) FROM `activities` ;
SELECT COUNT(*) FROM `activities` WHERE (created_at < '2012-04-01 18:33:25');

The writes to and reads from the activities table happened at each request. This was achieved via a before_filter in the root controller which is usually named ApplicationController, as hinted in this StackOverflow question:

It spawned another process that handled all the activity business, and didn't prevent the main process from serving pages timely.

As you might expect, the performance measurements were disastrous: the application could serve only (four) requests per second, while with all activity stuff turned off the throughput was about 40 rps.

Storing the table in memcached

The reason why the naive solution was not working as expected was lack of throughput. At first, I thought that since we could defer the database writes, and perform them after the request was served, we wont have any problems. However, this only addressed one of the performance metrics, response time, while what solution lacked was throughput.

I also didn't want to introduce other technologies specifically for activity tracking. What I had at my disposal was memcached. Other components of my application used memcached to cache database reads and whole pages. We could, I thought, cache database writes with it as well.

Rails storage with memcached backend supports two operations. The first is to write something under a string key, perhaps with a limited lifetime. the second is to read from a specified storage entry if anything has been written to it before, and if its lifetime is not over yet. That's basically all.

We could try to use memcached to store the SQL table. itself. Indeed, that table may be viewed as an array of rows, so we could just read the table, append a row, and write the result back. However, for the sake of performance, memcached doesn't support any locking, so we can't just store the table described in the naive approach in a cache bucket. Two threads may read the array, then both append the next row, and both write, one row being discarded. This is a classical race condition. And my aim was to serve 30-40 requests per second, which means that race conditions that appear if this approach is used were inevitable.

Besides, even if we could use locking (for instance, via Linux named locks (flocks)), it could even perform worse than the database, because it wouldn't be able to handle enough requests sequentially. We need a certain degree of concurrency here.

Wide caching

To mitigate the race conditions that happen during database writes, I used a technique I named "wide caching". I certainly reinvented the wheel here, but for the sake of clarity, I'll use this name in the rest of the post.

The race condition from the previous section could be mitigated by distributing the big "table" into smaller, independent tables stored in different memcached chunks. Writes to each of the chunks would not be protected by a mutex, so a race condition would be possible. We could try to mitigate it by using a round-robin chunk allocation, i.e. a new thread writes to the chunk next to the one allocated to the previous thread.

This solution, however, could be improved.

First, round-robin allocation requires a central, synchronized authority to return proper chunks. This brings the same level of technological complexity as a single bucket with a lock.

Second, round-robin doesn't guarantee the lack of race conditions either. A thread may stall long enough for the round-robin to make a whole "round", and to consider the chunk currently in process again. To avoid this, the allocator should be even more complex.

Third, let's realize that we may trade precision for speed. The result will not be absolutely accurate. People look at the site usage statistics out of curiosity, and the figures may not be precise. We'll try to make them fast first.

This all suggests a simple idea: get rid of the allocator! Just use a random bucket in each thread.

This will not prevent us from race conditions either, but it will make them predictable. If the allocation is completely random, we can carry out experiments, and extend their results in time being sure that they will be reproducible.

Decreasing effect of race conditions

The previous paragraph consisted of reasons and measures that addressed the amount of potential accesses to the same cache bucket within a period of time. What also matters is how long this cache access is. The shorter it is, the more writes to a hash bucket per second we are going to handle correctly.

Memcache request structure

Rails provides a transient interface to various caches. The interface allows to store serialized Ruby structures in the cache. This picture shows the steps that happen during an update of a table in hash, the scale roughly representing the time each of the steps take.

A typical hash bucket access consists of these steps:

  1. reading raw data from cache;
  2. deserializing, converting to Ruby format from a character format (the opposite to serializing;
  3. appending a row to the deserialized table;
  4. serializing to the character format (the opposite to deserializing);
  5. writing raw data to the cache.

We can see that steps 1, 2, 4, and 5 depend on the amount of records in the array we store in the hash bucket. The more information we record, the more time they take, and when the data are large enough, the time becomes proportional to the size. And if we just store all the activity data in cache, the size of the arrays being (de)serialized only grows in time!

How could we decrease the size of buckets? The wide cache already decreases the expected size by a factor of width, could we do better?

It turns out we can. Activity is a time-based data. The older the data are, the less interesting they become. In our use-case, we were only interested in the data for the latest 10 minutes. Of course, the routine cleanup of tables that drops all records with old enough timestamps is not sufficient: the activity data for 10 minutes are large enough (with 50 rps, and 50 being the cache width, the mean size would be 600).

Time reminds us about another idea. Why do we load and then write the old data for the table? They don't change anyway! Since we're using a generic cache solution, and can't lurk into its read/write algorithm, we can't do it on a per-record basis (i.e. do not load the table, just append the record being inserted. What can we do instead?

We may utilize another highly-accessible resource, which usage is however totally discouraged in building a reliable distributed algorithm. We have the clock that are easy and fast to access. The clock provides us with a natural discriminator: each, say, three seconds, abandon the old hash bucket, and start a new that will be responsible for storing the requests for the next three seconds. Given current time, we can always find the "layer" of the cache we should randomly select the bucket from.

Reading the data back

The discussion above was devoted to how to record the activity data. However, the data should be displayed in a timely manner. How timely? Let's consider that if the data are updated at least once per 30 seconds, it's good enough.

Then, each 30 seconds we should reconcile the activity data written into the memcached, and "compress" it to an easily readable format. Since I already had MySQL implementation before, I piggybacked on this, and merely inserted the activity information to the SQL table, so the reading procedures do not have change at all!

To get the statistics, we'll have to iterate all hash buckets the writing algorithm could fill, because gathering the ids of the buckets filled will require additional synchronization (additional writes), and, since the majority of them will be filled under high load, we'd better just collect them all. Theoretically, we should collect all the buckets that might have been filled during the last 10 minutes, the period we show the activity for. However, if we run the collecting algorithm each 30 seconds, we can only collect the data for the latest 30 seconds as well, since all the previous data should have already been collected.

We'll have another race condition here. A worker may get the current time, generate a hash bucket id X, and then stall for several seconds. During that stall, the writing worker collects the commits all the data to the MySQL table, including the piece stored in X. The first worker wakes up, and writes the visit information to X, from which it will never be read, so the request is lost.

To mitigate this race condition, we may collect and commit the data only from the layers that are deep enough. This won't help to avoid it completely, but will decrease its probability to the degree at which we can ignore it.

The final Wide Caching solution

If we assemble all of the above, we'll get the final solution that looks like this:

When a page request arrives, it asks for the current site usage statistics as one of the steps during the page printing. The statistic data are requested from the `activity` table in MySQL database, and are cached for a medium amount of time (30 seconds), because more frequent updates do not improve user experience.

After the page has been served, the request notifies the wide cache about the access to the site. First, it determines the "row" of the cache by rounding the current time in seconds since the Epoch down to the number divisible by three. Then, it determines the "column" by choosing a random number from 1 to 15. These numbers are appended; they form a unique identifier of a memcache entry. The website then reads the table from that entry, appends a row, and updates the same entry.

Dumping the collected accesses to the DB is performs like this. After notification, the request also checks if there is a live memcached entry with a special name, and a lifetime equal to 30 seconds. If there is such entry, it means that the information in the DB is obsolete, so the algorithm starts to commit the information into the MySQL DB.

While the information is being committed, there may appear other requests that would also check the lifetime of the memcached entry, and start the commit process. This is why the order, in which the memcached entries are being read is randomized, so that the amount of cases when the same information is committed twice is minimized.

Here's the code. It really looks much shorter than this post that describes it.

The latest version of the source code may be found here.

Experiments

I mentioned that the setup contains race conditions that will lead to losing some requests. With the cache width of 15, and bucket height of 3 seconds, the payload of 35 requests per second made the tracker lose 350 out of 6800 requests. This is approximately 5% of total number of requests, which is acceptable. Because we randomized request queries, we may conclude that 5%—given these figures for requests per seconds, cache width, and the equipment—will be an average visit loss factor.

Spawning

In previous sections, I claimed that spawn-ing threads using spawn Rails gem, and writing to the database in the spawned processes/threads is slow. Indeed, spawn reinitializes the connection in the spawned thread, and this is already a considerable burden on the server if several dozens of threads are spawned each second. Here are the experimental details (see bug #67 where I first posted them info):

MethodReqs. per sec.
No activity40.2
With activity; no spawning27.4
With activity and spawning13.0
With wide cache36.7

This shows that (a) you should not use spawning for short requests, and (b) wide cache is really fast enough.

Background periodic job or per-request?

In the algorithm described above, activity data collection was started when a request arrives, and the application finds out that the currently cached activity stats are out of date. We were careful to make this case as painless as possible, but it has other implications that are not related to race conditions, and experiments show that the loss of throughput is acceptable if we don't try to spawn a thread for each this activity.

Surprisingly, this method starts performing badly when the site activity is low rather than high. On a low-activity site, we can't rely on requests coming each second. Rather, they may arrive even less frequently that activity cache times. So, to support the low-activity case, we have to collect the information for caches that might have been filled in the latest 10 minutes (the maximum value a visit information could still be relevant for), not 30 seconds (the lowest possible time since the previous data collection). Otherwise, if users arrive less frequently than 30 seconds, the engine would always show zero activity, which would make users visit the site even less.

This wouldn't be important, unless I used HAML template engine, which—and this is a known, sad bug—doesn't flush the page until the request is complete. Even putting the code to after_filter doesn't help. Experiments demonstrate that activity data reconciliation may take up to 1 second, so some requests will be served with an excessive 1-second delay, and when the rate of requests is low, this will constitute a considerable fraction of them. And I surely don't want my users to experience frequent delays during their infrequent visits.

Spawn instead of after_filter? We have already seen that spawn-ing will make our server choke at the other extreme, when the load is high.

Luckily, we have a solution that suits both cases equally well. It is to periodically invoke the activity data collection procedure, without tying it to requests. However, the periodic invocation of a task is not straightforward in Rails. I'll get back to it in another post.

Future work

The current implementation of the activity tracker is parametrized with cache attributes (cache lifetime, and width). However, this Wide Cache could be parametrized further, with the procedures that are executed, namely:

  1. Cache bucket updating
  2. Commit procedure
  3. Reading what we previously committed

I think that I'm going to need the same cache that doesn't always touch the database for the first kind of the activity, for visits of individual posts. This parametrization will help me to keep the code DRY, and to re-use the Wide Cache.

This will require refactoring of visits and hits to a single function that calls a lambda function passed in the constructor.

Other approaches

Parse apache/nginx/rails logs.

Indeed, the topmost web serving layer already collects the activity data: it carefully logs each visit, with the URL being visited, a user's IP address and user-agent. Instead of spending time on activity in a comparatively slow Rails application, we could use the logs of a "fast" web server.

I have seen production systems that display activity data based on parsing nginx logs, and it may be integrated into Rails in such a way that it doesn't look like a kludge. Perhaps, free log parsers are already available on github... but this solution just doesn't look cool enough to me.

Do no track activity

Does anyone care about the visits at all? Maybe we just shouldn't track them?

First, from my observation, everybody tries to stick a visit tracker into a website. A bigger activity also means that you're visiting something valuable. A Russian clone of the Facebook social network even used to display a fake registered user number counter at their homepage. Such fraud could only be justified by a huge benefit from displaying it, which means that the statistics is considered valuable by at least some very popular sites.

Second, in my case, there was an easier answer. I should reimplement everything the engine I'm trying to clone contains by politics-related reasons: unless I reimplement everything, and manage to keep the site fast at the same time, my approach to the development will be considered a failure.

Use a specialized activity tracking solution

Too late. I have already implemented my own. But if you want some activity data on your website, do not hesitate to use one. It is hard, see above.

Conclusion

In one of the discussions on the very forum I'm reimplementing, someone wondered, why the view count on Youtube is not update in realtime for popular videos. Having gotten through a series of trial-and-failure experiments with visit tracking, I realize that the developers of Youtube surely had their reasons. My activity information is also already delayed, while the traffic volume still insignificant compared to even a single Lady Gaga video.

It seems that during the development of this tracking I have reinvented a lot of wheels. This is how, I guess, the most primitive database and file systems caches look like. Their algorithms are, of course, more sophisticated, and are not built on top of the generic cache and serialization solutions, using their own, custom ones instead.

But as any other re-invention of a wheel, it was fun, and I surely got a lot of understanding about higher-load web serving while trying to optimize this component of the forum engine.

Read on | Comments (0) | Make a comment >>


Relativity of Simultaneity in Distributed Computing

About a year ago, I described an allusion to physical phenomena in computing in "Caching and Levers". This post is devoted to a more complex theory, namely the Special Theory of Relativity (STR), and mostly to one of its implications, the "relativity of simultaneity".

Relativity of Simultaneity

Special Theory of Relativity may be approached as a set of equations for coordinate translation that take their effect as the speeds of objects approach the speed of light, c.

STR is based on two postulates, one of which states that light is always propagated in empty space with a constant velocity c regardless of the speed of the emitting body. This may be used to introduce a tricky paradox.

Train car experiment

As seen by a passenger of the train. The events happen simultaneously.

As seen by a spectator on the station. The "back" event happens earlier than the "front" event.

(pictures from Wikipedia)

Assume that a train car is passing you when you're standing on a station platform. The train car has a lamp installed exactly at its middle. When the train passes you, a lamp flashes, and then you watch when the light reaches the front and the back of the car. It's interesting that those who sit in the car will notice the light hitting the two walls simultaneously, while those who stand at a platform will notice the back wall lit earlier than the front one. This is implied by the postulate described above: as seen by a spectator on the platform, the light propagates with equal speed at all directions, so it will hit the rear wall before the front one, as the former moves toward the light source, while the latter reproaches it. See wikipedia for more info.

This paradox, known as relativity of simultaneity may be formulated more generically: whether several events are simultaneous, depends on the location one watches them.

But what does it have to do with the computing? Before I answer this, I'd like to share what I've learned from a CACM magazine.

Quiescent consistency

In the multicore age, the classical data structures we all know about are about to change. The reason for that is the increasing demand on computation speed and volume that careful "sequential" CPU chip engineering is no longer capable to provide.

This challenge makes us rethink our approach to algorithm and data structure design. If we want data strctures to be efficient, we no longer may expect them to behave as good old ones in the sequential edge.

In distributed programming, there is an approach to describe data structure behavior known as quiescent consistency. There is a number of consistency conditions, sequential consistency, linearizability and others. These conditions describe how an object behaves when there are several threads calling its methods.

A data structure possesses quiescent consistency if it is consistent between its states of quiescence, i.e. when there are no methods currently in progress. As soon as a quiescently consistent structure has no operations pending (i.e. reaches the quiescence), we may be sure that the executions of methods before this state and after this state is never interpositioned.

Imagine a quiescently consistent stack. An implementation of it is described in this CACM paper "Data Structures in the Multicore Age", the one where I first encountered the quiescent consistency concept. Assume the following sequence of events happen to the q.c. stack:

  1. Three pushes x,y,z
  2. Quiescence (the pushes are processed)
  3. Three more pushes a,b,c
  4. Quiescence (the pushes are processed)
  5. Three pops
  6. Quiescence (the pushes are processed)
  7. Three more pops
  8. Quiescence

Note that q.c. doesn't mean that a data structure guarantees nothing except for this specific consistency. Consistency condition only maps data structure behavior in a concurrent setting to a behavior in a single-threaded environment, i.e. it only limits the number of a multitude of different sequences of method calls that may happen for a specific set of multithreaded events. All the properties a data structure exhibit in this sequential processing should still apply.

Quiescent consistency guarantees that the first three pops return x, y, and z (in an arbitrary order), and the next three pops return a, b, and c, somehow intermixed as well.

Note that, unlike linearizability, quiescent consistency doesn't impose any ordering on results of pops if there was no quiescence between the initial pushes. For instance, if processing of the first and the third pushes do not overlap, the linearizability ensures the pops will respect this order, while q.c. (in case that the second push overlaps with both of them) doesn't ensure that.

Having noted that, q.c. looks like a very weak and useless property. However, it still implies correctness, which, however, is enough in many circumstances. Imagine that you need a pool of unique numbers. It is enough to use a q.c. counter; the numbers it returns will be unique, which should fulfill our initial purpose.

Do we really need stronger consistency?

However, the reasons why we may be satisfied with weaker consistency conditions are not constrained with the specific examples. Let's try to prove the opposite. Assume we're implementing a stack that is accessed from a number of threads. Quiescent consistency may be not enough because if a push of A precedes the push of B, then the pops should be ordered in the specific way, and quiescent consistency may not guarantee that.

But wait... We say, "If one push precedes the other," but do we really know what "precedes" mean? If two threads in our distributed computational system invoke push call independently, how can we be sure that one of them precedes the other? And is there any sense in defining a measure for that?

Let's reckon the paradox we described at the beginning. In one reference frame, one event precedes the other, and in a different reference frame it's vice versa. So whether one push happens before the other, depends on the place we look from, and is not—and may not be—determined by a central, ubiquitous authority. That's a law of the universe, and we can't change this in computing.

Surprisingly, it makes quiescent consistency be a fairly appropriate constraint for a distributed data structure. If there's no strict sense of precedence between events that happen in different places of our network then why can't we use this to develop a more efficient data structure? The correct answer is, "indeed, why not?", and such data structures are well-defined nowadays.

OSI model

OSI model is an abstraction that aims to make the design of distributed computation systems easier. The chain of events that happen during a process of data transmission is separated to several separated layers: each layer has its own protocol, which is independent of the actual implementation of the underlying layers. You may learn more at the wikipedia.

STR vs computing: the difference

We have successfully used a metaphor from physics in distributed computing, but there is an inherent limitation of applying the concept further. In physics, if we have a car of a specific length (as measured by its riders), we may install the light source in such a way that the events at two sides of it happen predictably simultaneous in the reference frame of the spectator at the station.

In computing, we can't do that. OSI model prevents us from predicting how fast the events happen by masking out the capabilities of a system at lower layers. In physical reality, we know that as soon as the light "hits" the back and front covers of the car, the event "happens" (or, more precisely, a piece of reflected light is launched towards the spectator). In computing, however, we don't even know how long it takes for a signal to reach another node. Moreover, the OSI model makes this impossible to predict. All the layers guarantee correctness, but do not have any timing conditions.

Conclusion

The absence of timing in the OSI model suggests that may be no sense in saying, "this structure may not be used, as it processed the request in the different order than they're issued in". The "order they're issued in" is inherently unpredictable in our computing models.

This inherent unpredictability of timings of processes and events, as described in OSI model, is why we really can't apply the special theory of relativity to computing. However, we still may notice that simultaneity in the world around us, as confirmed by physical experimentation, is relative. And the nature still works well!

And if the nature accepts it, why can't we learn from it, and allow our distributed systems to be less predictable, and trade this predictability for speed?..

Read on | Comments (1) | Make a comment >>


How to Use Open3 and Avoid Dead Locks

Linux pipes and open3

Three articles on how open3 is implemented in Linux, and how it is properly used

Open3 is a common name for a function that spawns a process and substitutes its standard input, output and error streams with the pipes connected to the original process. This way you may interactively feed the child with data, and read the processed data back.

The problem with open3 is that its power renders it more dangerous, and its usage—more error-prone. As the famous Spider-Man quote shares, "with great power comes great responsibility", and Linux inter-process communication is not an exception. So here comes the effective usage advice number 1.

Avoid open3 unless necessary

For the purpose of clarity, throughout the post, standard input, output, and error file handlers are called "standard filehandlers"; the process that spawns, and the process that is spawned are the "parent", and the "child" respectively.

The first advice about a safe open3 usage is... to avoid its usage. Languages that are popular in Linux world usually come with a handful of tools for process spawning, such as:

Shell redirection instead of pipes

Instead of using open3 in your program, you may try to connect process inputs and outputs with Bash redirection utilities, invoking the shell via standard functions to spawn processes.

For instance, you may write system("find | gzip >archive.gz") in Perl to spawn a process that archives recursive directory listing into a file, instead of passing the data through the parent process. You may use a usual redirection into just a text file as well, if that's what you really need.

However, I wouldn't use intermediate files if they're only to dump information temporarily for an immediate processing by something else, like this:

I consider it a bad stlye, let alone that it requires extra time on disk access and synchronization, while it may be totally unnecessary. The arguments to the commands passed that way will also undergo the interpretation by the shell, so you'll need to escape your arguments, a painful and an error-prone process.

If you need the intermediate result saved in the files for debugging purposes, you stil may dump it in debug mode, saving cycles when your project is compiled for production usage.

  • system — spawn the process, inheriting parent's standard file handlers;
  • fork+exec — ditto, but, possibly, with additional customizations you learned about in the previous post, for instance (such as tuning environment or playing with file handlers);
  • popen—open only one pipe to the process, either for reading or writing (note that popen also suffers from some of the typical misuses of pipes).
  • shell redirection untilities—Bash has a number of measures to take control over how to output and redirect input and output of the process. A section in one of my older posts is devoted to pipes in Bash. What could be helpful is the ability of using shell interpreter when spawning a process with such functions as system in Perl. For instance, you may write system("find | gzip >archive.gz") in Perl to spawn a process that archives recursive directory listing into a file, instead of passing the data through the parent process (see side note). You may use a usual redirection into file as well, if that's what you really need.

If one of these tools fulfills your needs, then go for it! You can replace it with open3 at any time anyway—unlike houses, software is relatively easy to rebuild.

Where open3 should be used

However, if you'll find the alternatives listed above inefficient, or not powerful enough, you may opt our for using open3. I'll try to enumerate the cases where I would certainly advise using open3.

What is select?

The select function (man page) is a common name for a function to wait on file handlers. Assume you develop a task scheduler that connects to several machines via sockets; it should wait for the first socket to have the result in to schedule the next task to that specific machine (since others are obviously busy). How do you accomplish that?

Of course, there is a faster and more easy-to-use alternative than trying a non-blocking read on each socket once per several milliseconds. It is using the select() function (or one of its companions, such as poll() or kpoll) to perform a blocking wait on all of them. This function will return the filehandler list of those that have the data readily available in them—as soon as there will be at least one such descriptor!

You may find a plenty of manuals on how to use select; later in this post you'll see an example.

  1. several consumers of one source—if you are given a source of data that should be redirected to several consumers (such as some data that should be both printed onto the terminal and saved into an archived log file for history keeping reasons), you should connect a pipe to its output and error streams, and when you read some amount of data from there (for instance, when you read a line), send it to all the consumers you have. In some cases you can do it with Bash (not with sh), but the code with open3 should look much more clear.
  2. aggregation of several providers—if you maintain several data sources, which all should be aggregated into a common place (a text file, for instance) that has some concurrency issues, you might benefit from open3. You may spawn the processes, and then select() (see sidenote) their output streams, thus making writing of the data to the aggregator thread-safe.
  3. interactive data exchange with a continuously-running process—you can't avoid open3 if you spawn a process that responds to your input with some text to stdout. A lot of axillary processes that are launched thousands of times, have the interface of reading from stdin, and writing to stdout (such as SAT solvers), and you most likely don't have the time to write or read anything from the disk.

One of the examples of the application of these patterns (more specifically, a combination of the first and the second) is transparent logging—it's a blatant crime to use anything but open3 for this. By transparent logging I mean combining logs from the child processes into a common logging sink in the parent one. Usually it's done automatically: the system call just routes standard output of the child to that of parent. However, assume you spawn several concurrent processes, and unless you prefix each of them with a unique identifier, you'll get lost in the log quickly.

This may be solved by opening these processes with open3, and attaching prefixes to their output lines before printing them. Note also that this way you may control severity of logs: for instance, you might want treat standard error and output streams from the children differently, and that can not be achieved with a simple popen call.

Synchronous vs. asynchronous

In this post we will mostly study issues with a synchronous processing of data exchange with the child process. Being synchronous means that nothing else but the interaction with the process is performed while the child is alive. In other words, there is only one thread in the parent.

Some notes on asynchronous processing will be given at the end of this post. Still, I consider synchronous experience extremely valuable, as it helps to shape the vision what a good asynchronous interaction via open3 should be.

Issues with open3

So far I've been telling that open3 is not straightforward to use, but what causes thee complexity? Why could there be the "Dead Locks" referenced in the title of this post? I had to learn this in the hard way, tacking cumbersome bugs in our project, and here's what it was about.

Following the pattern "aggregation of several providers", I spawned the process that wrote to both stdout and stderr with the intent to combine them both in the archived log file. The code (simplified) looked like this:

I expected to get a file that has all the lines from the stdout of the child, and prefixed lines from the stderr of the child afterwards. However, sometimes the application just deadlocked!

A quick strace demonstrated that the parent hung up on read from child's stdout, and the child hung up... on write to its stderr! How could that be?

Remember that in the first post about pipes, I listed the limited capacity as one of the properties of the pipes. I stressed it on purpose, because it plays its role right here, when you try to use open3. The pipe that connected the parent to the child was full, and the child wanted to write an error message there. While the parent was still reading from the output pipe, because the child was still running, and its pipes were not closed! That's the open3 Dead Lock.

Of course, this could happen with any pair of pipes here. Assume the process just echoes every character it takes on the input to the output (of course, real programs will be doing a useful transformations, but for clarity we may assume it as identical). We want to feed it twice as much characters as the pipe's capacity.

You might think that this won't strike you unless you're dealing with extremely long inputs and outputs. Sorry to disappoint you, but, quoting the man 7 pipe:

In Linux versions before 2.6.11, the capacity of a pipe was the same as the system page size (e.g., 4096 bytes on i386). Since Linux 2.6.11, the pipe capacity is 65536 bytes.

It's not much, though, in certain cases, it's big enough to let badly written programs work. On a larger scale, we definitely need a generic, limit-agnostic solution.

How to prevent the dead lock

It's relatively simple to devise a generic rule of mitigating the effect of such a limitation. To avoid deadlocks with open3, you should clear each of the output pipes (and fill the input pipe) as soon as possible, and do not put a dependency between clearing a pipe and waiting for another pipe. So we need to watch closely to all the file handlers (up to 3) which we plan to read/write data from/to—and it's important that you wait for all of them at once! If you read the sidenote above, you already know that we could use select for this.

Using select to react to input promptly

So, a potential way to fix the program above is to write something like this:

This program, however, only outlines the approach to tackling deadlocks with open3. It is still prone to deadlocks, though less than the original one. Assume that the child started to print a line to its stdout, but haven't finished it, because it got a debugging interrupt. Having not finishing printing the line, it spits 100 Kb of debugging information to stderr with the intent to continue printing to stdout the normal output. However, the parent is still blocked in the out.readline() call, waiting for the line termination character to appear there, and the child gets blocked at the write to stderr, because the err pipe is full, and no one's going to remove data from it. Deadlock again. (You may play with this deadlock by open3-ing this sample program).

The issue here is that we still do not "remove data from pipes as soon as possible". For that, we need nonblocking reads, more low-level than those of the readline()- and scanf-like functions.

Using nonblocking reads and your own buffers

The problem with nonblocking low-level reads, as Capt. Obvious notes, is that they are low-level. We can't read more or less structured data from them. Assume that we want to read a number (a number of seconds to wait before launching the starship, for instance) from the child's stdout. If that debugging interrupt described above is triggered just in the middle of printing 1000, our nonblocking read will read 10 (before turning to reading from stderr), and act accordingly, launching the multi-billion-dollar ship prematurely. From the child's viewpoint, however, doing so is totally legitimate, since it printed 1000 and debugging information to the different output channels (stdout and stderr), and if the reader confuses these numbers, it's its problem.

Do not use strings as buffers (like I do here)

In the samples below we used "strings" as buffers. However, strings in the modern scripting languages (including those used here, as it's Ruby) consist of multi-byte characters with variable length (see Joel's post on Unicode), and not with one-byte symbols. On the other hand, in some less modern and "less scripting" languages, strings can not contain zero bytes, as they would be treated as the end of the string.

Therefore, "strings" are going to misbehave if chosen as a buffer for an abstract byte stream. I used them for simplicity, and for the sake of demonstration of open3 usage; in real programs, however, you should not use them.

Therefore, we need to handle these situations accordingly, adding another level of indirection between the child and the parent. We will store the data we read from pipes in the intermediate storage; we could name it a buffer. In fact, we are going to re-implement buffered read.

In the next example we will implement a linewise-open3, a subroutine that invokes user-defined callbacks at reading a complete line from stdin or stderr. You could play with it more, introducing scanf-like behavior (instead of these regexps you'll see). However, we have two more issues to discuss before getting to the code.

Reactive interaction with both input and output data

The select with callbacks works well for reading output with the input pipe closed at all. What should we do to if we want to write something to the input pipe of the child based what's being read from it?

To talk interactively with the child, you'll most likely need an asynchronous processing. However, there is one pattern which allows the exchange of data through both the input and the output in the synchronous mode. We already agreed that we will invoke user-defined callbacks after reading lines from stdin and stdout. However, we didn't use the return value of these callbacks in any way! The idea about this arises immediately:

If the user-defined callbacks to stdout and stderr lines return a non-empty string, we feed this string to stdin of the child as soon as possible. To get more control over the child, we treat NULL return value from a callback as a sign to close the standard input. We will also make our wrapper get a string as an input and feed it at the very beginning, as it is what could trigger the outputting of the data in the first place.

This still sounds not powerful enough, but you still have aplenty of asynchronous options, such as interrupting pselect with a specific signal, or setting up a separate thread for feeding data into the input pipe. We will omit these options in this blog post.

Watching for the child termination

As we're implementing a synchronous open3 wrapper, the natural assumption would be that at return from the wrapper the child should be dead and reaped. This way we can also return its exit status to the parent.

As we discussed in the previous post, process termination does not depend on the status of the pipes, so we should watch for it independently.

What we actually want is a version of select that watches for filehandlers and for the child's termination. If you know such a version of select (which I don't), make a comment, please. For now, let's search for another solution.

Such functionality could be simulated with a special wrapper thread that only waits for the child (with wait), and sends signal at its termination. This signal would interrupt select, and we would handle that.

In our project I implemented a simpler solution. It is based on the assumption that the more time the child is running, the less it's likely to terminate during the next second. So we can use timely wakeups to check for a process status (implemented as a non-null timeout to select), and increase the wait period with the course of time. Having the upper boundary for that period is a good idea as well.

Note that if a process has terminated, and the pipes are still open, we assume that the last nonblocking read will fetch us all the data we're interested in, and we may close the pipes. This may not be true, but in such specific cases you'll need specific solutions anyway.

Linewise-open3 code

Let's sum up what we're up to. Here's a listing of an open3 wrapper that prints the string supplied into stdin of the child, and then invokes one of two user-specified callbacks when a complete, \n-terminated line is read from stdin or stdout respectively. These callbacks may return more data to put into the stdin of the process. The execution terminates when the child is terminated. The wrapper returns the return code of the process (everything else is assumed to be done by callbacks).

I tested this program on random echo. You may also view the complete listing for open3 usage, and test it either with echo or with sshfs installed on your system.

Note also, that in our project we have also developed an open3 wrapper for Perl; you can view it here. It is less capable (without interactiveness), but it's live and working.

Notes on asynchronous open3 usage

In some cases you may trade the complexity for efficiency. The problem with the dead lock above was that we had a single thread designated to read from several pipes. Instead of introducing a buffering system, we may just spawn several threads, and attach each of them to its own endpoint. The burden of waking up the thread that actually has something to process doesn't vanish, it just gets imposed on the OS scheduler, which should contain fewer bugs. Or, to a language runtime scheduler (if it employs green threads, as in Ruby).

This may sound like an easier solution, especially in the multicore era, where developers cheer upon every possibility to develop a multithreaded program that makes the high-end hardware stall less. To me, it's a bit of an overkill for simple tasks (if the can be expressed in the terms of the interface discussed above). And in the context we used open3 in our project, too many competing threads had already started to become a problem.

Perhaps, I'll study the asynchronous option in other posts.

Conclusion

This post concludes what I initially planned for the series of blog posts of how to work with pipes in Linux. Another topic emerged, on the asynchronous interaction with an open3-driven process, but I don't know if I will write about it.

We have observed what are pipes, how they are used for inter-process communication, what options we have for spawning processes with pipes attached to them, and how it may be achieved in modern Linux scripting languages. However, we mainly focused on open3, studying details of its implementation, and the use-cases it's the most efficient in. We studied its quirks and how to avoid the traps set up among the joint of pipes we have to deal with.

I have learned this all spending a couple of days messing with processes that magically deadlocked without any obvious reasons, and with nontrivial multithreaded debugging. I hope these posts will help you when you will be working with pipes, so that you'll avoid my mistakes.

Read on | Comments (4) | Make a comment >>


How to Implement open3

Linux pipes and open3

Three articles on how open3 is implemented in Linux, and how it is properly used

Open3 (or popen3) is a common name for a library function to spawn a child process with its standard streams (input, output and error) both connected to newly created filehandlers in the original process. You may encounter that function in such languages as Perl, Ruby or Python.

Standard Linux C library, unfortunately, does not contain such a function. Only its less capable counterpart, popen, is a member of C library. The popen function, which is more common, opens only one pipe, either to the input or to the output of the process being spawned. This means that if you program in C you'll have to implement open3 on your own.

Another reason why you would have to re-implement open3, even if your standard (or non-standard) library provides it, is when you need more control over what happens during its call. For instance, Perl and Ruby 1.8 versions of open3 lack the ability to adjust the environment of the process being invoked (see my older rant). This functionality is why I re-implemented open3 in the project I work on.

The choice of Ruby was driven by the fact that I used it in the project I work on, and that it has a killer feature, unhandled exceptions (so that you don't have to check return code of each library call invoked, as in C). Besides, it's a lot less verbose.

The subsequent sections of this article depict sample implementation of open3 in Ruby. However, it relies on the certain system and C library calls I will also outline, so you'll be able to do the same in C or in any other language with access to such primitives.

Foundation for open3

The open3 implementation contains no magic. It relies on two powerful features of Linux system.

Forking

Good old forking comes to help in this case. The process spawning in Linux is done via fork call, cloning the current process, followed by exec call, replacing the newly spawned child with the command we wanted to invoke in the first place. It may sound unnecessarily complex, but in our case it has a feature that makes open3 as a library function possible. After fork, but prior to exec we can invoke commands in the context of the new process, while retaining the knowledge we had in the old one. How does it help us?

First, we may alter the environment of the child being invoked, which was my original motivation for reimplementing open3. Many scripting languages have a global, process-wide ENV hash with read/write access, which is used as the environment at execve system calls. If we need to alter the environment in the child process in a thread-safe way, we may modify ENV between fork and exec, so that it will be executed at the context of child process without interfering with the parent.

Second, some filehandlers (internal OS "pointers" to open files, pipes and sockets) preserve across exec! Namely, those that have the flag FD_CLOEXEC cleared. The flag is set by default (I guess, for all except standard 0, 1, and 2 descriptors), but we can change it via fcntl call.

So what we could do is to open pipes in parent process, and inculcate the knowledge about them into the child process before the target command is executed. How should we do the inculcation?

File handler renaming

The child process should spawn with the pipes instead of its STDIN, STDOUT, and STDERR. However, after forking, it already has some standard streams assigned by the OS or the language runtime. What should we do now? The answer is obvious: rename the relevant file handlers.

Closing and opening a file IO object in a high-level language (such as in C++) is not the same as "reopening" it, as the OS filehandler will not be retained. Which is extremely important to altering standard streams.

Language runtime libraries usually have a "reopen" functions (freopen in C, or IO.reopen in Ruby). The thing is that it substitutes the file pointed to by OS file handler, as well as the underlying file of the "language" file handler (this is an integer value in C, or IO object in Ruby or, say, C++).

Changing IO objects intrinsics left to the specific languages, OS filehandler swapping is performed via C library function dup2. A usual dup function just duplicates a filehandler. Called as dup2(keep,replace), it will close(replace), and re-name keep to replace. This renaming is done via fcntl system call, and should be relatively cheap. Additionally, this fcntl call will reset FD_CLOEXEC flag.

Forking and threading in POSIX is an interesting subject itself. I one had a lot of fun, reading this thread, then the FAQ of the newsgroup.

Note that fcntl and dup2 are async-signal-safe function, and therefore it can be called between the fork and exec safely in a multithreaded process.

Simple implementation

Okay, let's assemble all the tools together. Opening pipes is easy, as described before, then comes the fork, and renaming the file descriptors.

Instead of just assigning an environment variable, we may allow user to specify the actions they desire to be executed in child's context as a function pointer (or as a block, in Ruby).

Process termination control

From this simple implementation we have completely omitted an important way to control what's happening with the process. Controlling when it's terminated (or forcing it to be).

The thing is that whether the process is terminated does not depend on the status of its standard pipes. First, closing the standard input doesn't necessarily make the process terminate (as with the standard grep command). Actually, very few of them do. So if we've got all the results we wanted from our forked creature, we should have a means to kill it before it gets too late...

Second, the filehandlers we read from do not tell us whether the process is alive either. Remember that filehandlers may survive the exec if FD_CLOEXEC is not set? The process we spawned might, in turn, have given birth to a daemon, which we don't want to mess with. It could inherit the filehandlers, and just not use them. This actually happened with SSHFS mounter: the process wouldn't close the output pipes.

Therefore, pipes won't help us to control the process spawned. What could? Of course, its PID. The fork returns to us the process identifier of the child spawned, and our process, as a parent, has a full access to its status and to its return code. So, along with the pipe filehandlers, we may return child's PID to the caller for it to control the process in a more fine-grained way.

Anything else?

We've learned how to implement open3 itself. However, the bigger troubles are coming when we try to use it.

The thing is that open3 is a powerful tool, but with great power comes great responsibility. As outlined in the previous post, reads and writes to pipes may block at each endpoint, and this renders the usage of open3 error-prone. You should not use it for each process spawn. If a simpler system, fork, or popen would suffice, use them. But if it's not, you're welcome to read the next article on how to actually use open3 in an efficient and correct way.

Proceed to "How to Use Open3 and Avoid Dead Locks".

Read on | Comments (0) | Make a comment >>


Pipes in Linux and in The Real World

Linux pipes and open3

Three articles on how open3 is implemented in Linux, and how it is properly used

The importance of inter-process communication

In Linux programming the concept of a diversity of small and highly specialized tools collaborating their efforts to achieve the goal a programmer instructed them to has always been dominating. Take a look at shell scripting. Here's how one searches for a string FOOBAR in all *.txt files in their home dir:

Pipes

A lot of pipes stocked somewhere in Russia.

Pipes that transfer solid items

This photo is taken from here.These pipes are more widely known as pneumatic tube transport.

Pipes that transfer documents

This pneumatic tube seems to transfer medical documents and samples packed into capsules. I saw such system in one of a banks I had an account at; there it was used to transfer documents between counters.

It's a good illustration of sequential payload delivery by pipe if the width of the pipe is the same as the width of the objects.

You may find more sophisticated examples in my article on xargs, which is a robust spawner of such commands. It is such robustness that allowed me, for example, making such a simple "web service" that sends the information to the web triggered by just adding lines into a text file--look how concise its code is!

The usage of such tools is not constrained by making a desktop user convenient at their workstation. These tools are actively used in production scripting as well. Having a lot of ready-to-use tools at hand provides a programmer with extra opportunities for building larger programs: instead of trying to find a library or to re-implement something, one may just try to find a small tool for a specific purpose, which interacts with the outer world via command line, input and output streams.

But the combination of tools requires an efficient communication between them. Small tools can nevertheless exchange large amounts of data! Being the world largest consumer of such communication primitives, Linux uses pipes.

What are "pipes"?

Before you read this section, I strongly encourage you to visit your bathroom or kitchen and examine closely the metal or plastic extended round items that have moving water inside (or, if your country isn't rich enough for you to have them, take a closer look to the things the oil your country sells flows through). These are pipes.

Pipes that have started a war!

A week ago I watched "Fair Game" movie. Being a non-US national, I nevertheless consider the problems discussed there relevant. And one episode of a plot attracted me. One of the "casus belli" for attacking Iraq was that it purchased pipes that could be used to make nuclear weapons (in fact, they couldn't, and it all was a speculation of politicians). Above is a picture of such pipe.

See how important pipes could be? Read the rest of the article to learn more!

Pipes are used to direct a flow of something from one place to another; it could be water, oil, other liquid, or even documents, or change (see side pictures). Pipes can even start a war! And Linux pipes transfer bytes: you can write to one endpoint, and read the data written from the second endpoint, which may end up in another process!

The three properties of a pipe

The first is that a pipe can transfer payload only in one direction. You can't use a single pipe to transfer water in both directions, such that it would leak from one end and simultaneously consume water for it to leak from another. For that, you need at least two pipes.

Second, a pipe has a limited capacity. When you close your valve in the kitchen, your pipe is full of water, and no matter how the pump station tries, there will never be more water inside the pipe than there is now. (Of course, the station may try harder and make your pipe leak, and water can undergo compression under certain conditions, but it's not generic). When the station tries to pump more water, the new water is "blocked". It continues until the valve at the other end is opened, and the water is removed from the pipe for the new to come from the other end.

The third property is that a pipe transfers the payload more or less sequentially. Even transfer of liquids, which can mix easily, is somewhat sequential: when you turn on your shower after a cold night, you literally feel how the cold water is removed from the pipe before the hot water starts to erupt.

The interesting thing is that Linux pipes are designed closely after "real world pipes", the only difference being that Linux pipes transfer information, bytes.

Pipes in Linux

The main attributes of Linux pipes, one-side transfer of data, limited capacity, and sequential output, are found in the real world too, as shown above. There is, however, one difference.

The pipes in the real world are usually found in "full" state, i.e. the pipe is full and waiting for items to be removed from it. In Linux programming, however, the "empty" pipes, where it is the consumer who waits for the input, are much more widespread.

To create a pipe, you just invoke the relevant system call via a standard C library. A standard pipe(2) call returns two file handlers, one is for reading (fh_out), and another—for writing (fh_in).

The use of these file handlers usually happens in a blocking mode, such that:

  • a read from fh_out blocks until something is written to the other end of the pipe;
  • a write to fh_out returns when it has written everything into the pipe. If the pipe has no capacity to consume everything, the call is blocked until the consumer reads some data from the other end, so that more data could be written.

Of course, you can use ioctl to adjust the modes and make such calls nonblocking. Still, you can't bypass the basic restrictions: you obviously can't read what's still haven't been written, and you can't store more data in a pipe than it's capable to keeping. If you want to continue execution as the data are automatically written to an overflown pipe, you have to allocate a buffer and a concurrent thread that pushes data there.

You should never forget about these two blockages, as it will affect your everyday workflow (see, for example, this StackOverflow question about less command). Yes, sometimes it's an obstacle you have to specifically overcome (in the third article about pipes in Linux I'll address it). But in most cases such two-blockage behavior is really what you want.

Pipes as a synchronization means

Let's return to the find ... | xargs -L 100 example shown at the beginning of the article. If the find command has already found a lot of files, there's no sense for it to work further, damaging the response time (the frequency of matches found printed) of the system. With pipes, it will be seamlessly blocked by write() to a pipe, and you don't even have to write anything to support it: your simple, usual printf() will just return control only when the second party does some work!

In other words, the design of Linux pipes makes, for two processes connected with a pipe as A | B, this two-blockage system automatically "pause" the faster to let the slower a chance to accomplish its job!

So, basically, pipe is a synchronization "primitive" that pauses a program connected to one of its ends at certain operations. It's not that "primitive", actually, as it may be implemented via a semaphore, but it's simple enough to consider it as such. And—as with other synchronization mechanisms—you may deadlock when using a single pipe, let alone using multiple pipes.

Road to open3

So, let's return to using inter-process communication for more efficient programming in Linux. A classical way for a tool to work is to print to standard output, and read from standard input, putting error messages to standard error stream. How could you efficiently use such a tool in your program? The answer is: connect to it with pipes! Linux has all the capabilities to arrange the filehandlers in such a way, that the program won't even notice that it's printing to a pipe instead of a console terminal.

We used named pipes to illustrate how Bash may be used for parallelization.

Of course, you may do it upfront, without impersonating a genuine console. Linux has a capability to create named pipes with mknod call. These look like usual files, but are actually pipes. If a target program can read from it a file instead of reading from standard input (or write to a file instead), you're lucky. However, this sometimes makes the target programs unnecessarily complex—and they're already complex enough, just take a look at various echo implementations, of a program that is supposed to just print its arguments. Second, this functionality is rarely provided for standard error stream, and error log is a very important piece of information for tackling bugs. Therefore, you will have to either use shell redirection, or just to establish a direct connection from the child's streams to the parent, which is an easier solution.

As we've already learned, you'll need three of them: one for each channel among input, output and error. This has gave the name to how such a function is usually called in standard libraries of various languages, open3. It takes a command line as an input, and returns three filehandlers corresponding to the said streams of the program spawned. Here's what it looks like in Ruby:

However, open3 implementation may sometimes be not powerful enough to meet all your desires (it happened during my work on a cluster software for a project at work, see some rant in my earlier post), and you'll have to code a more sophisticated version. That's why it's important to know how open3 is implemented. It has several quirks, and in the next blog post I'll explain the intrinsics of an open3 implementation.

Proceed to "How to Implement open3"

Read on | Comments (0) | Make a comment >>


Why execve()-like system() call is a must-have

How do you usually call an external program in Linux? Whichever language you use, you may fork() your process, and call exec() afterwards; you runtime library surely supports these primitives. However, unless oyu're doing something crazy, you more likely use a system() function, that does the fork for you, and automatically waits for the process to finish.

In Linux runtime libraries for various languages usually support setting environment variables, so that all processes forked afterwards inherit the altered environment. In Ruby and Perl it's as easy as changing a global hash. Setting environment variables is a usual mean of interprocess communication in Linux.

However, an only system call to Linux Kernel on that matter is execve() (see its man page). It both sets environment variables for the program called, and calls it, replacing the current program (you should do a fork before, it's also a separate syscall). There's no syscall for system(), it's usually a library function. Anyway, higher-level libraries provide more rich functionality. They surely provide more convenient primitives of the aforementioned setting env variables, and of mere system() call. The work of execve() could be easily substituted with the combination of these.

However, I recently discovered that it's not that obiovus.

Why execve() is crucial

If you know what I usually write about, you should probably have already guessed, what's the deal. The execve()—as a single library call­—is especially useful in multithreaded programs!

Let's assume that you only use environment-setting and a mere system(). How would you call an external program then? Here's an example in Ruby (but that actually doesn't matter).

Now assume that two threads are doing this. Yes, you got what's the problem, here's a possible execution:

  1. The first thread sets environment variable to 'value'. Context switch happens.
  2. The second thread sets environment variable to 'another_value'.
  3. The second thread calls system(), and the process spawned inherits the correct 'another_value' environment variable value. Context is switched back.
  4. The first thread calls system(), and the process spawned inherits the same 'another_value' as a value of the env variable, as the other thread has changed the global environment setting. However, the programmer's intent was that it should have been called with 'value' instead.

So how could we overcome this? A naïve approach is to use a mutex to synchronize the different threads, like this:

But then the mutex won't be available until the first thread to capture it finishes execution of the whole external program. However, out intent was to invoke the programs safely, not to execute them in a mutually-exclusive manner.

How it could look like

One solution is to use raw fork() and exec() calls, and assign your environment after forking only:

The problem with open3 invocation with an altered environment is what actually made this article appear. I had to study and modify an implementation of open3 in Ruby to make this work. See my blog post on how to implement open3 for details.

It's more verbose, but it should work. But what if it's more complex? What if you're not doing a mere system(), but want to make a open3() instead? There are many ways how a child process may be used, including opening it with one or more pipes, or opening a pipeline of several processes. You have to re-implement these primitives if you need just to alter the environment, and you'll be lucky if their implementation is readily available and reusable...

How it should look like

A good implementation of a set of safe library functions resides in Ruby 1.9 runtime library. The system() call and each popen()-like call can accept a hash of environment variables the child process invoked inherits. So the calls look like this:

Unfortunately, Ruby 1.8 doesn't have this functionality, and you can't use a single call to both set the environment and spawn a child process. That pisses me off, actually, since not everyone is still migrated, and I had to reimplement open3 in Ruby to just impose environment variable assignment in a thread-safe manner.

Conclusion

I know that I should be careful when I alter global variables in multithreaded programs. However, when I access basic primitives, I tent to treat them as something special, not just like any usual data. This is not the most safe approach. Use your forks with care, especially if you're operating several at the same time, and choose safe runtime libraries to work with.

Read on | Comments (0) | Make a comment >>


More posts about concurrency >>