Tuesday, June 28, 2016

How to package your ideas using the Winston star

I came across this advice serendipitously by reading a random Hacker News comment. The advice shows up towards the last 10 minutes of an Artificial Intelligence lecture by Patrick Winston. Winston, a prominent professor at MIT, tells his class that he will disclose them important advice that may make or break their careers. It is about how to pack and present ideas.

His advice is simple. Follow this 5-tip star to pack your ideas better. All the tips start with "S":

  • Symbol: use a symbol to make your idea visual and memorable
  • Slogan: find a catchy slogan for your idea
  • Surprise: highlight the unconventional/counterintuitive/interesting part of your idea
  • Salient: focus on the essential part of your idea, remember: less is more, fewer ideas is better
  • Story: pack your idea with a story, human brains are hardwired for stories

Here let me try to apply the  Winston's Star method on itself, to make this more concrete.

  • Symbol: star is the symbol of the Winston's method
  • Slogan:  with these 5-star tips, you can 5x the impact of your good ideas
  • Surprise: you can achieve fame and impact by packaging your ideas better following these simple presentation tips
  • Salient: devising a good presentation is as important as having good ideas and doing good work
  • Story: these presentation tips are told by a top MIT prof to his undergraduate AI class as secrets to career success

After some googling I found another relevant video from Patrick Winston. This one is titled "How to Speak". (As always, watch at 1.5x speed.)

Related posts

How to present your work
Nobody wants to read your shit

Wednesday, June 22, 2016

Nobody wants to read your shit

Earlier I wrote about how much I liked the "The War of Art" by Steven Pressfield. This newly released book by Pressfield takes on where "The War of Art" has left. While the former focused on the psychological aspects of the writing process, this book focuses on the structural/mechanical aspects of writing. The book was freely distributed as pdf and ebook for a limited time for promotion purposes. (It looks like the promotion ended.) I read it in one sitting and found it interesting. This book can benefit anyone who needs to communicate in writing. (Of course, if you are a novice writer, start with the Elements of Style.)

The book gives a very interesting account of what Steven learned from a 50+ years career performing all forms of writing, including ad writing, Hollywood script-writing, novels, and nonfiction. The first chapter lays down the most important lesson Steven has learned: "Nobody wants to read your shit", and the rest of the book talks about what you can do about it. In a nutshell, you must streamline your message (staying on theme), and make its expression fun (organizing around an interesting concept).

Steven lists these as the universal principles of story telling:
  1. Every story must have a concept. It must put a unique and original spin, twist or framing device upon the material.
  2. Every story must be about something. It must have a theme.
  3. Every story must have a beginning (that grabs the listener), a middle (that escalates in tension, suspense, stakes, and excitement), and an end (that brings it all home with a bang). Act One, Act Two, Act Three.
  4. Every story must have a hero.
  5. Every story must have a villain.
  6. Every story must start with an Inciting Incident, embedded within which is the story's climax.
  7. Every story must escalate through Act Two in terms of energy, stakes, complication and significance/meaning as it progresses.
  8. Every story must build to a climax centered around a clash between the hero and the villain that pays off everything that came before and that pays it off on-theme.
He says that these rules for writing applies to writing nonfiction as well. That includes your whitepapers, blog posts, and theses. You should definitely have a theme and an interesting concept. The hero and villain can be abstract. They can be useful for building some tension when motivating your problem.

The book is an easy and fun read. It feels like Steven is having a heart-to-heart conversation with you and coaching you about how you can become a better writer. While there were many gems, I was particularly intrigued by this passage:
I will do between 10 and 15 drafts of every book I write. Most writers do.
This is a positive, not a negative.
If I screw up in Draft #1, I'll attack it again in Draft #2.
"You can't fix everything in one draft."
Thinking in multiple drafts takes the pressure off. 

My writing related posts:

Tuesday, June 7, 2016

Progress by impossibility proofs

I recently listened to this TED talk by Google X director Astro Teller. The talk is about the Google X moonshot projects, and the process they use for managing them.

Google X moonshot projects aim to address big, important, and challenging problems.  Teller tells (yay, pun!) that the process they manage a moonshot project is basically to try to find ways to kill the project. The team first tries to identify the bottleneck at a project by focusing on the most important/critical/risky part of the project. Then they try to either solve that hardest part or show that it is unsolvable (in a feasible manner) and kill the project. Teller claims that, at Google X, they actually incentivize people to kill the project by awarding bonuses, and celebrate when a project gets proved impossible and killed. And if a project still survives, that is a successful moonshot project that has potential for transformative change.

Although, this approach looks counter intuitive at first, it is actually a good way to pursue transformative impact and fail-fast without wasting too much time and resources. This can be very useful method for academic research as well.
  1. Find a use-inspired grand-challenge problem. (This requires creativity, domain expertise, and hard thinking.)
  2. Try to prove an impossibility result.
  3. If you prove the impossibility result, that is still nice and publishable.
  4. If you can't prove an impossibility result, figure out why, and try to turn that into a solution to an almost "impossibly difficult problem". Bingo!
"Once you eliminate the impossible, whatever remains, no matter how improbable, must be the truth."
-- Sir Arthur Conan Doyle channeling Sherlock Holmes

Thursday, June 2, 2016

TensorFlow: A system for large-scale machine learning

This paper has been uploaded to arxiv.org on May 27 2016, so it is the most recent description of Google TensorFlow. (Five months ago, I had commented on an earlier TensorFlow whitepaper, if you want to check that first.) Below I summarize the main points of this paper by using several sentences/paragraphs from the paper with some paraphrasing. I end the post with my wild speculations about TensorFlow. (This speculation thing is getting strangely addictive for me.)

TensorFlow is built leveraging Google's experience with their first generation distributed machine learning system, DistBelief. The core idea of this paper is that TensorFlow's dataflow representation subsumes existing work on parameter server systems (including DistBelief), and offers a uniform programming model that allows users to harness large-scale heterogeneous systems, both for production tasks and for experimenting with new approaches.

TensorFlow versus Parameter Server systems

DistBelief was based on the parameter server architecture, and satisfied most of Google's scalable machine learning requirements. However, the paper argues that this architecture lacked extensibility, because adding a new optimization algorithm, or experimenting with an unconventional model architecture would require users to modify the parameter server implementation. Not all the users are comfortable with making those changes due to the complexity of the high-performance parameter server implementation.  In contrast, TensorFlow provides a high-level uniform programming model that allows users to customize the code that runs in all parts of the system, and experiment with different optimization algorithms, consistency schemes, and parallelization strategies in userspace/unprivilege code.

TensorFlow is based on the dataflow architecture. Dataflow with mutable state enables TensorFlow to mimic the functionality of a parameter server, and even provide additional flexibility. Using TensorFlow, it becomes possible to execute arbitrary dataflow subgraphs on the machines that host the shared model parameters. We say more on this when we discuss the TensorFlow model and the structure of a typical training application below.

TensorFlow versus dataflow systems

The principal limitation of a batch dataflow systems (including Spark) is that they require the input data to be immutable and all of the subcomputations to be deterministic, so that the system can re-execute subcomputations when machines in the cluster fail.  This unfortunately makes updating a machine learning model a heavy operation. TensorFlow improves on this by supporting expressive control-flow and stateful constructs.

Naiad is designed for computing on sparse, discrete data, and does not support GPU acceleration. TensorFlow borrows aspects of timely dataflow iteration from Naiad in achieving dynamic control flow.

TensorFlow's programming model is close to Theano's dataflow representation, but Theano is for a single node and does not support distributed execution.

Tensorflow model

TensorFlow uses a unified dataflow graph to represent both the computation in an algorithm and the state on which the algorithm operates. Unlike traditional dataflow systems, in which graph vertices represent functional computation on immutable data, TensorFlow allows vertices to represent computations that own or update mutable state. By unifying the computation and state management in a single programming model, TensorFlow allows programmers to experiment with different parallelization schemes. For example, it is possible to offload computation onto the servers that hold the shared state to reduce the amount of network traffic.

In sum, TensorFlow innovates on these two aspects:

  • Individual vertices may have mutable state that can be shared between different executions of the graph.
  • The model supports multiple concurrent executions on overlapping subgraphs of the overall graph.

Figure 1 shows a typical training application, with multiple subgraphs that execute concurrently, and interact through shared variables and queues. Shared variables and queues are stateful operations that contain mutable state. (A Variable operation owns a mutable buffer that is used to store the shared parameters of a model as it is trained. A Variable has no inputs, and produces a reference handle.)

This Figure provides a concrete explanation of how TensorFlow works. The core training subgraph depends on a set of model parameters, and input batches from a queue. Many concurrent steps of the training subgraph update the model based on different input batches, to implement data-parallel training. To fill the input queue, concurrent preprocessing steps transform individual input records (e.g., decoding images and applying random distortions), and a separate I/O subgraph reads records from a distributed file system. A checkpointing subgraph runs periodically for fault tolerance.

The API for executing a graph allows the client to specify the subgraph that should be executed. A subgraph is specified declaratively: the client selects zero or more edges to feed input tensors into the dataflow, and one or more edges to fetch output tensors from the dataflow; the run-time then prunes the graph to contain the necessary set of operations. Each invocation of the API is called a step, and TensorFlow supports multiple concurrent steps on the same graph, where stateful operations enable coordination between the steps. TensorFlow is optimized for executing large subgraphs repeatedly with low latency. Once the graph for a step has been pruned, placed, and partitioned, its subgraphs are cached in their respective devices.

Distributed execution

TensorFlow's dataflow architecture simplifies distributed execution, because it makes communication between subcomputations explicit. Each operation resides on a particular device, such as a CPU or GPU in a particular task. A device is responsible for executing a kernel for each operation assigned to it. The TensorFlow runtime places operations on devices, subject to implicit or explicit device constraints in the graph. the user may specify partial device preferences such as “any device in a particular task”, or “a GPU in any Input task”, and the runtime will respect these constraints.

TensorFlow partitions the operations into per-device subgraphs. A per-device subgraph for device d contains all of the operations that were assigned to d, with additional Send and Recv operations that replace edges across device boundaries. Send transmits its single input to a specified device as soon as the tensor is available, using a rendezvous key to name the value. Recv has a single output, and blocks until the value for a specified rendezvous key is available locally, before producing that value. Send and Recv have specialized implementations for several device-type pairs. TensorFlow supports multiple protocols, including gRPC over TCP, and RDMA over Converged Ethernet.

TensorFlow is implemented as an extensible, cross-platform library. Figure 5 illustrates the system architecture: a thin C API separates user-level in various languages from the core library written in C++.

Current development on TensorFlow

On May 18th,  it was revealed that Google built the Tensor Processing Unit (TPU) specifically for machine learning. The paper mentions that TPUs achieve an order of magnitude improvement in performance-per-watt compared to alternative state-of-the-art technology.

The paper mentions ongoing work on automatic optimization to determine default policies for performance improvement that work well for most users. While power-users can get their way by taking advantage of TensorFlow's flexibility, this automatic optimization feature would make TensorFlow more user-friendly, and can help TensorFlow adopted more widely (which looks like what Google is pushing for). The paper also mentions that, on the system level, Google Brain team is actively developing algorithms for automatic placement, kernel fusion, memory management, and scheduling.

My wild speculations about TensorFlow 

Especially with the addition of mutable state and coordination via queues, TensorFlow is equipped for providing incremental on the fly machine learning. Machine learning applications built with TensorFlow can be long-running applications that keep making progress as new input arrive, and can adapt to new conditions/trends on the fly. Instead of one shot huge batch machine learning, such an incremental but continuous machine learning system has obvious advantages in today's fast paced environment. This is definitely good for Google's core search and information indexing business. I also speculate this is important for Android phones and self-driving cars.

Previously I had speculated that with the ease of partitioning of the dataflow graph and its heterogenous device support, TensorFlow can span over and bridge smartphone and cloud backend machine learning. I still standby that prediction.
TensorFlow enables cloud backend support for machine learning to the private/device-level machine learning going on in your smartphone. It doesn't make sense for a power-hungry entire TensorFlow program to run on your wimpy smartphone. Your smartphone will be running only certain TensorFlow nodes and operations, the rest of the TensorFlow graph will be running on the Google cloud backend. Such a setup is also great for preserving privacy of your phone while still enabling machine learned insights on your Android.
Since TensorFlow supports inference as well as training, it can use 100s of servers for fast training, and run trained models for inference in smartphones concurrently. Android voice assistant (or Google Now) is a good application for this. In any case, it is a good time to be working on smartphone machine learning.

This is a wilder speculation, but a long-running self-improving machine learning backend in the datacenter can also provide great support for self-driving cars. Every minute, new data and decisions from self-driving cars would flow from TensorFlow subgraphs running on the cars to the cloud backend TensorFlow program. Using this constant flux of data, the program can adopt to changing road conditions (snowy roads, poor visibility conditions) and new scenarios on the fly, and all self-driving cars would benefit from the new improvements to the models.

Though the paper mentions that reinforcement style learning is future work, for all we know Google might already have reinforcement learning implemented on TensorFlow. It also looks like the TensorFlow model is general enough to tackle some other distributed systems data processing applications, for example large-scale distributed monitoring at the datacenters. I wonder if there are already TensorFlow implementations for such distributed systems services.

In 2011, Steve Yegge ranted about the lack of platforms thinking in Google. It seems like Google is doing good in that department lately. TensorFlow constitutes an extensible and flexible distributed machine learning platform to leverage for several directions.

Sunday, May 15, 2016

Useful podcasts

I love listening to podcasts while commuting. I find that I learn a lot by listening to my selected list of podcasts, than just listening to the radio. I am blissfully ignorant about elections, US politics, and celebrity gossip.

Here are the podcasts I have been listening to. I will appreciate if you can let me know of your recommendations. Who said podcasting is dead?

(Here is one tip you may find useful. I listen to my podcasts at 1.5x speed. It is actually a more comfortable and natural way to listen to stuff than 1x. 1x is too slow, your brain starts losing the stream of conversation due to the slow pace, and starts wondering around. Give 1.5x a try.)

Dan Carlin's Hardcore History

Who knew history could be this interesting and captivating. Dan Carlin, a seasoned radio host and American political commentator, found his true calling in this podcasts series about history. The episodes are long, 4 hour episodes. But they are so exciting and captivating. I found myself depressed and scared while listening to World War 1 episodes. The Wrath of Khans episodes about the Genghis Khan were also very interesting.

The Tim Ferris Show

I don't like Tim Ferris's writing and his persona for the most part. But I am a big fan of his podcast. His host selection and his interviewing skills are excellent. The topics are almost always interesting. And I learn a lot from his podcasts.
For example, I learned about Dan Carlin's and Mike Rowe's podcasts from Tim's episodes. Just check this list, it is quite impressive. 

Some of my favorite episodes were:
Interview master: Cal Fussman and the Power of Listening
How Seth Godin Manages His Life -- Rules, Principles, and Obsessions
Luis von Ahn on Learning Languages, Building Companies, and Changing the World
The Scariest Navy SEAL Imaginable…And What He Taught Me
Scott Adams: The Man Behind Dilbert
Chris Sacca on Being Different and Making Billions

TEDTalks (audio)

I skip about half or 2/3rds of the talks, and listen to the ones that sound interesting. It is convenient to be able to listen to TED talks on your commute.

The Way I Heard It with Mike Rowe

Mike Rowe talks about 5 minutes about a captivating story. And, the story always has an amazing twist-ending, revelation at the end. Very interesting podcast.

OPTIMIZE with Brian Johnson

This podcast has 5 minute and 15 minute posts that review recent self-improvement books. It is nice to get the gist of the books summarized to you in a couple minutes to keep upto date with books.

Curious Minds: Innovation, Inspiration, Improvement

This podcast has 20 minute interviews with recent self-improvement books. The format of the interviews are a little dry. Topics are hit and miss.

This American Life

This American Life by Ira Glass. I don't think this requires any introduction.

Wednesday, April 13, 2016

Paper review. GraphLab: A new Framework for Parallel Machine Learning

This GraphLab paper is from 2010. It is written elegantly and it does a good job of explaining the GraphLab abstraction and foundations. A later VLDB 2012 paper presents how to extend this basic GraphLab abstraction to a distributed GraphLab implementation.

It seems like GraphLab has since took off big time. There are workshops and conferences about GraphLab. And GraphLab is now developed by the company Dato (formerly GraphLab inc.). Dato Inc. raised 6.75M$ from Madrona and New Enterprise Associates in A round, and 18.5M$ in B round from Vulcan Capital and Opus Capital, as well as Madrona and New Enterprise Associates.


The motivation for GraphLab was to hit the sweetspot in development of machine learning solutions. "(Then in 2010) Existing high-level parallel abstractions like MapReduce are insufficiently expressive, while low-level tools like MPI and Pthreads leave machine learning (ML) experts repeatedly solving the same design challenges."

GraphLab aimed to express asynchronous iterative algorithms with sparse computational dependencies while ensuring data consistency and achieving good parallel performance. This paper provides an efficient multicore parallel (but not distributed) shared-memory implementation of GraphLab. (The C++ reference implementation of the GraphLab is at http://select.cs.cmu.edu/code)

Related work

MapReduce works well on embrassingly data parallel tasks, but is not efficient for graph processing and iterative algorithms. This line from the paper gave me a chuckle. "By coercing efficient sequential ML algorithms to satisfy the restrictions imposed by MapReduce, we often produce inefficient parallel algorithms that require many processors to be competitive with comparable sequential methods." (Frank's laptop also took on against GraphLab.)

DAG abstraction represents parallel computation as a directed acyclic graph with data flowing along edges between vertices that correspond to computation/function. Dryad, Storm, Heron fit here. DAG does not naturally express iterative algorithms.

Dataflow abstraction in Naiad is a related work. This being a 2010 paper there is no comparison to Naiad in the paper. This is what Naiad (2013) has to say about GraphLab. "GraphLab and PowerGraph offer a different asynchronous programming model for graph computations, based on a shared memory abstraction. These asynchronous systems are not designed to execute dataflow graphs so the notion of completeness of an epoch or iteration is less important, but the lack of completeness notifications makes it hard to compose asynchronous computations. Although GraphLab and PowerGraph provide a global synchronization mechanism that can be used to write a program that performs one computation after another, they do not achieve task- or pipeline-parallelism between stages of a computation. Naiad allows programs to introduce coordination only where it is required, to support hybrid asynchronous and synchronous computation."

Petuum is also a closely related work. Petuum uses BSP, with SSP relaxation and nonuniform convergence. It looks like  GraphLab allows more fine-granular scheduling and supports data-graph computations and dependency modeling better. On the other hand, Petuum was designed clean-slate to support machine learning algorithms better, and introduced model-parallelism concept. GraphLab, as you will see in the summary below is principally a graph processing framework, with good applicability for graph-based machine learning tasks. Petuum paper says "Petuum ML implementations can run faster than other platforms (e.g. Spark, GraphLab4), because Petuum can exploit model dependencies, uneven convergence and error tolerance". The paper provides performance comparison experiments with GraphLab.

GraphLab abstraction

The minimalism yet generality of the GraphLab framework remind me of LISP. "The data graph G = (V, E) encodes both the problem specific sparse computational structure and directly modifiable program state. The user can associate arbitrary blocks of data (or parameters) with each vertex and directed edge in G." (Also the set scheduler that enables users to compose custom update schedules is clever and LISPy.)

Figure 3 illustrates the overall GraphLab framework. A GraphLab program is composed of the following parts:
1. A data graph which represents the data and computational dependencies.
2. Update functions which describe local computation
3. A Sync mechanism for aggregating global state
4. A data consistency model (i.e., Fully Consistent, Edge Consistent or Vertex Consistent), which determines the extent to which computation can overlap.
5. Scheduling primitives which express the order of computation and may depend dynamically on the data.

Data Model

The GraphLab data model consists of two parts: a directed data graph and a shared data table. The data graph G = (V, E) encodes both the problem specific sparse computational structure and directly modifiable program state. The user can associate arbitrary blocks of data (or parameters) with each vertex and directed edge in G. To support globally shared state, GraphLab provides a shared data table (SDT) which is an associative map, T: [Key] --> Value, between keys and arbitrary blocks of data. Here, Key can be a vertex, edge, result of a sync computation as explained below, or a global variable, say model parameter.

User defined computation

Computation in GraphLab can be performed either through an update function which defines the local computation, or through the sync mechanism which defines global aggregation. The Update Function is analogous to the Map in MapReduce, and sync mechanism is analogous to the Reduce operation. A GraphLab program may consist of multiple update functions and it is up to the scheduling model to determine which update functions are applied to which vertices and in which parallel order.


The GraphLab update schedule describes the order in which update functions are applied to vertices and is represented by a parallel data-structure called the scheduler. The scheduler abstractly represents a dynamic list of tasks (vertex-function pairs) which are to be executed by the GraphLab engine. This is minimalist yet flexible and general model. Since writing a scheduler is hard, GraphLab provides a default BSP style scheduler or a round-robin scheduler.

Since many ML algorithms (e.g., Lasso, CoEM, Residual BP) require more control over the tasks that are created and the order in which they are executed, GraphLab also allows update functions to add and reorder tasks. For this, GraphLab provides 1) FIFO schedulers that only permit task creation but do not permit task reordering, and 2) prioritized schedules that permit task reordering at the cost of increased overhead.


The paper  demonstrates the expressiveness of the GraphLab framework by designing and implementing parallel versions of belief propagation, Gibbs sampling, Co-EM, Lasso and Compressed Sensing.


GraphLab Wikipedia page

Paper Review. Petuum: A new platform for distributed machine learning on big data


Friday, April 8, 2016

Book review: The War Of Art by Steven Pressfield

I read this book recently and liked it a lot. The book is written by Steven Pressfield. He is also the writer of "The Legend of Bagger Vance" and "Gates of Fire" (arguably the best book about Spartans, and is being used as recommended reading in Army academies). Pressfield definitely knows and respects his craft.

This book is a call for all people, and creative people and writers in particular, to wake up and realize their calling. The book says: "Most of us have two lives. The life we live, and the unlived life within us." Yeah... About that... I know "self-help" books, and books that use "new-agey" language rub many people the wrong way. I am a pragmatic about that. The way I see it, if I can learn some good paradigms, tips, strategy to become more productive, effective, I can look past some of the turn-offs.

The book is organized in 3 parts. The first part talks about resistance, the enemy of creating anything meaningful and worthwhile. This part gives an extended and spot on description and analysis of resistance. You have to know the enemy to have a chance to beat it.

The second part talks about turning professional to beat resistance. I liked this part best. This part has very positivist/pragmatist advice in contrast to the romantic/mysticist theme dominating Part 3, which is also somewhat evident in Part 1.

The third part talks about beyond resistance, and living the life in a higher realm. This part is mostly mysticist --done tastefully, I think. (What do you expect? Pressfield is the author of The Legend of Bagger Vance after all.) There are still several useful spot on observations. One is about territorial versus hierarchical orientation.

Below I am going to paste quotes from the book to give you a taste of it. I liked this book, and recommend it for anyone who wants to understand resistance and improve her craft.

Part 1: Resistance, Defining The Enemy

Any act that rejects immediate gratification in favor of long-term growth, health or integrity … will elicit Resistance.

Resistance has no strength of its own. Every ounce of juice it possesses comes from us. We feed it with power by our fear of it. Master that fear, and we conquer Resistance.

Procrastination is the most common manifestation of Resistance because it's the easiest to rationalize. We don't tell ourselves, "I'm never going to write my symphony." Instead we say, "I am going to write my symphony; I'm just going to start tomorrow."

There's a secret that real writers know that wannabe writers don't, and the secret is this: It's not the writing part that's hard. What's hard is sitting down to write. What keeps us from sitting down is Resistance.

Resistance is invisible, internal, insidious, fueled by fear, only opposes in one direction.

Like a magnetized needle floating on a surface of oil, Resistance will unfailingly point to true North–meaning that calling or action it most wants to stop us from doing. We can use this. We can use it as a compass. We can navigate by Resistance, letting it guide us to that calling or action that we must follow before all others. Rule of thumb: The more important a call or action is to our soul’s evolution, the more Resistance we will feel toward pursuing it.

Part 2: Turning Pro, Combating Resistance

The conventional interpretation is that the amateur pursues his calling out of love, while the pro does it for money. Not the way I see it. In my view, the amateur does not love the game enough. If he did, he would not pursue it as a sideline, distinct from his "real" vocation. The professional loves it so much he dedicates his life to it. He commits full-time. That's what I mean when I say turning pro. Resistance hates it when we turn pro.

Grandiose fantasies are a symptom of Resistance. They're the sign of an amateur. The professional has learned that success, like happiness, comes as a by-product of work. The professional concentrates on the work and allows rewards to come or not come, whatever they like.

Professional is patient, prepared, acts in face of fear, dedicated to mastering technique.

The professional dedicates himself to mastering technique not because he believes technique is a substitute for inspiration but because he wants to be in possession of the full arsenal of skills when inspiration does come. The professional is sly. He knows that by toiling beside the front door of technique, he leaves room for genius to enter by the back.

Part 3: Beyond resistance, the higher realm

We come into this world with a specific, personal destiny. We have a job to do, a calling to enact, a self to become. We are who we are from the cradle, and we're stuck with it. Our job in this lifetime is not to shape ourselves into some ideal we imagine we ought to be, but to find out who we already are and become it.

The most important thing about art is to work. Nothing else matters except sitting down every day and trying.

When we sit down day after day and keep grinding, something mysterious starts to happen. A process is set into motion by which, inevitably and infallibly, heaven comes to our aid. Unseen forces enlist in our cause; serendipity reinforces our purpose.

The territory provides sustenance. -- Runners know what a territory is. So do rock climbers and kayakers and yogis. Artists and entrepreneurs know what a territory is. The swimmer who towels off after finishing her laps feels a hell of a lot better than the tired, cranky person who dove into the pool 30 minutes earlier.

Territory can only be claimed by work. -- When Arnold Schwarzenegger hits the gym, he's on his own turf. But what made it his own are the hours and years of sweat he put in to claim it. The territory doesn't give, it gives back.

The artist must operate territorially. He must do his work for its own sake. To labor in the arts for any reason other than love is prostitution.

Of any activity you do, ask yourself: If I were the last person on earth, would I still do it? If you're all alone on the planet, a hierarchical orientation makes no sense. There's no one to impress. So, if you’d still pursue that activity, congratulations. You're doing it territorially.

The book ends with the following. I think this is a useful perspective/paradigm to cultivate:

If you were meant to cure cancer or write a symphony or crack cold fusion and you don't do it, you not only hurt yourself, even destroy yourself. You hurt your children. You hurt me. You hurt the planet.

Creative work is not a selfish act or a bid for attention on the part of the actor. It's a gift to the world and every being in it. Don't cheat us of your contribution. Give us what you’ve got.

Wednesday, April 6, 2016

Consensus in the Cloud: Paxos Systems Demystified

This our most recent paper, still under submission. It is available as a technical report here. We felt we had to write this paper because we have seen misuses/abuses of Paxos-based coordination services. Glad this is off our chests.

Here is the short pitch for the paper. I hope you like it and find it helpful.

Coordination and consensus play an important role in datacenter and cloud computing. Examples are leader election, group membership, cluster management, service discovery, resource/access management, and consistent replication of the master nodes in services.

Paxos protocols and systems provide a fault-tolerant solution to the distributed consensus problem and have attracted significant attention but they also  generated substantial confusion. Zab, Multi-Paxos, Raft are examples of Paxos protocols.  ZooKeeper, Chubby, etcd are examples of Paxos systems. Paxos systems and Paxos protocols reside in different planes, but even that doesn't prevent these two concepts to be confused. Paxos protocols are useful for low-level components for server replication, whereas Paxos systems have been often shoehorned to that task. The proper use case for Paxos systems is in highly-available/durable metadata management, under the conditions that all metadata fit in main-memory and are not subject to frequent changes.

In order to elucidate the correct use of distributed coordination systems, we compare and contrast popular Paxos protocols and Paxos systems and present advantages and disadvantages for each. Zab and Raft protocols differ from Paxos as they divide execution into epochs: Each epoch begins with a new election, goes into the broadcast phase and ends with a leader failure. Similarly, Paxos systems also have nuances. Chubby uses the MultiPaxos algorithm to achieve linearizability, while Zab lies at the heart of ZooKeeper and provides not only linearizability, but also FIFO order for client requests, enabling the developers to build complex coordination primitives with ease. Etcd system uses Raft as the consensus protocol, and adopts a stateless design and implements certain features very differently than ZooKeeper and Chubby.

We also categorize the coordination use-patterns in cloud into nine broad categories: server replication (SR), log replication (LR), synchronization service (SS), barrier orchestration (BO), service discovery (SD), group membership (GM), leader election (LE), metadata management (MM) and distributed queues (Q). Using these categories, we examine Google and Facebook infrastructures, as well as Apache top-level projects to investigate how they use Paxos protocols and systems.

Finally, we analyze tradeoffs in the distributed coordination domain and identify promising future directions for achieving more scalable distributed coordination systems.

See the paper for more information.

Related links

Paper summary: ZooKeeper: Wait-free coordination for Internet-scale systems

Monday, April 4, 2016

Paper Review. Petuum: A new platform for distributed machine learning on big data

First there was big data. Industry saw that big data was good. Industry made big data storage systems, NoSQL datastores, to store and access the big data. Industry saw they were good. Industry made big data processing systems, Map Reduce, Spark, etc., to analyze and extract information and insights (CRM, business logistics, etc.) from big data. Industry saw they were good and popular, so machine learning libraries are added to these big data processing systems to provide support for machine learning algorithms and techniques.

And here is where this paper makes a case for a redesign for machine learning systems. The big data processing systems produced by the industry are general analytic systems, and are not specifically designed for machine learning from the start. Those are data analytics frameworks first, with some machine learning libraries as add on to tackle machine learning tasks. This paper considers the problem of a clean slate system design for a big data machine learning system: What if we designed and built a big data framework specifically for  machine learning systems, what would it look like?

This paper is from CMU and appeared in KDD 2015.

Machine Learning (ML) objectives and features

Naturally the paper starts by first identifying the objective and features of ML systems. "ML defines an explicit objective function over data where the goal is to attain optimality of this function in the space defined by the model parameters and other intermediate variables."

Thus, the paper argues, ML algorithms have an iterative-convergent structure and  share these principles:

  • error tolerance: iterative-convergent algorithms are against errors and converge/heal towards the solution 
  • dynamic structure dependency: iterative-convergent algorithms often have changing correlation strengths between model parameters during the course of execution
  • nonuniform convergence: model parameters converge at different speeds

The paper proposes Petuum, a new distributed ML framework, to leverage these principles.

Stale Synchronous Parallel (SSP) consistency

Petuum leverages error-tolerance by introducing SSP (Stale Synchronous Parallel) consistency. SSP reduces network synchronization costs among workers, while maintaining bounded staleness convergence guarantees.

While the BSP (Bulk Synchronous Parallel) approach, used in Map Reduce, Giraph, etc., require the workers to synchronize state and exchange messages after each round, SSP cuts some slack. The SSP consistency model guarantees that if a worker reads from parameter server at iteration c, it is guaranteed to receive all updates from all workers computed at least at iteration c-s-1, where s is the staleness threshold. If there is a straggler more than s iterations behind, the reader will stop until the straggler catches up and sends its updates. How do you determine slack, s? That requires ML expertise and experimenting.

Big data, meet Big model!

Another significant idea in the Petuum paper is the dichotomy between big data and big model. Yes, big data is big, on the order of terabytes or petabytes. And the paper observes, we now also have a big model problem: ML programs for industrial-scale big data problems use big models that are 100s of billions of parameters (Figure 1 gives a nice summary). And this big model needs special attention as well.

Here I will use snippets from the paper to give the proper definition of these concepts.

Essentially, model-parallelism provides ability to invoke dynamic schedules that reduce model parameter dependencies across workers, leading to faster convergence. So Petuum uses model-parallelism to leverage nonuniform convergence and dynamic structure dependency to improve the efficiency/performance of ML tasks.

Petuum design

Petuum consists of three main components: Scheduler, parameter server, and workers.

The scheduler is responsible for enabling model parallelism. Scheduler sends subset of parameters to workers via parameter exchange channel. The parameter server stores and updates model paramerters., which can be accessed via a distributed shared memory API by both workers and scheduler. Each worker is responsible for performing operations defined by a user on a partitioned data set and a parameter subset specified by scheduler.

Here is the Petuum API:
schedule: specify the subset of model parameters to be updated in parallel.
push: specify how individual workers compute partial results on those parameters.
pull [optional]: specify how those partial results are aggregtaed to perform the full parameter update.

To illustrate the use of Petuum API, the paper presents the code for a data-parallel Distance Metric Learning (DML) algorithm and for a model parallel Lasso algorithm.

It looks like writing an efficient/optimized schedule would need significant expertise. This will often require running the algorithm on test dataset to see relations between model parameters, convergence speeds, etc.


The paper provides evaluation results on the performance of Petuum.

Petuum versus TensorFlow

How does Petuum compare with Google's TensorFlow? TensorFlow framework can be used to express a wide variety of algorithms, including training and inference algorithms for deep neural network models. And yet, TensorFlow has a completely different design approach than Petuum. Tensorflow uses dataflow graphs. Implementing a ML  algorithm/recipe on TensorFlow has a more distributed nature. An algorithm/recipe consists of many operations, and TensorFlow maps one or multiple operations in this algorithm to a node/worker. In Petuum the entire algorithm/recipe is mapped on a node/worker and efficiency is achieved via data-parallel and model-parallel partitioning.

There would be advantages/disadvantages for each approach, and it will be interesting to watch how this plays out in the coming years.

Related links

Short review of Google TensorFlow

Wednesday, March 30, 2016

Paper review: Building Consistent Transactions with Inconsistent Replication (SOSP'15)

This paper investigates an interesting and promising research problem: how can we implement distributed transactions more efficiently by performing a cross-layer design of the transaction protocal/layer with the underlying distributed storage protocol/layer? This idea is motivated by the observation that there is wasted/duplicated work done both at the transaction layer and at the storage layer. The transaction layer already provides ordering (i.e., linearizability), and, if you look at the distributed storage layer, it also provides strongly consistent replication which orders/linearizes updates often with Paxos. The paper argues that this is overlapping functionality, and leads to higher latency and lower throughput.

Ok, at this point, we have to take a step back and realize that we cannot avoid all coordination between the two layers. The distributed storage layer needs to be consistent with the order of updates at the transaction layer, because it serves reads and you like it to serve the most recent version, not an old version. OK, to fix this, what if we used a multiversion store and stage data for replication without coordination and consistency, and on top of this we commit transactions with 2PC? But who decides the version numbers? If it is the transaction layer, where does it learn the old version number: from the storage layer. And, this cyclic dependency will create a problem when the storage/replication layer is inconsistent and as a result inconsistent version number maybe learned and proposed back. A way to fix this is to use timestamps for version numbers. But that would require precise clock synchronization so that the timestamps also match the transaction ordering. I guess this would be possible to do with atomic clocks, or maybe with PTP.

Actually, it turned out the paper also resorted to a multiversion store, but did not need very precise clock synchronization, and went with NTP synchronization. In fact, they could have gone without any clock synchronization. Clock synchronization is only used for improving the performance, not for satisfying safety. The paper  lays out a principled cross-layer codesign of a linearizable transaction protocol on top of the  unordered/inconsistent replication. And the design is complicated as we will see below. This shows that if we had very precise and dependable clock synchronization to rely on, we can simplify many problems in distributed systems, including this one.

Of course a co-design, or a cross-layer design, has an associated drawback: now these two layers are tangled, and you cannot swap storage layer protocols and transaction layer protocols and expect any combination to work without problems. When using a consistent replication layer (such as Paxos replication), you don't have to worry about swapping transaction layer protocols; use OCC, or 2PC, etc.  But, when using an inconsistent replication (IR) layer, a special purpose transaction protocol that works on top of IR is needed. Any OCC or 2PC protocol will not work because building over IR imposes restrictions on the transaction protocol. The paper proposes TAPIR (Transactional Application Protocol for Inconsistent Replication) as  the transaction layer protocol to accompany IR.

The results are nice as they show significant improvement. They demonstrate TAPIR in a transactional key-value store, TAPIR-KV, which supports linearizable transactions over a partitioned set of keys. The experiments found that TAPIR-KV had: (1) 50% lower commit latency and (2) more than 3x better throughput compared to systems using conventional transaction protocols, including an implementation of Spanner’s transaction protocol, and (3) comparable performance to MongoDB and Redis, widely-used eventual consistency systems.

What is not nice is the complexity this consistency relaxation at the replication layer imposes on the overall system design. IR is complex, and TAPIR involves even more complex ideas/techniques. It looks like this is inevitable complexity to compensate for the inconsistent/unordered replication at the storage layer.

 Inconsistent Replication (IR) protocol for relaxing the consistency of replication

In the IR protocol, instead of ordering of operations, replicas agree on the operation results. What does this mean? The unordered operation set provides the following 3 guarantees.
P1. [Fault tolerance] At any time, every operation in the operation set is in the record of at least one replica in any quorum of f+1 non-failed replicas.
P2. [Visibility] For any two operations in the operation set, at least one is visible to the other.
P3. [Consensus results] At any time, the result returned by a successful consensus operation is in the record of at least one replica in any quorum. The only exception is if the consensus result has been explicitly modified by the application (i.e., transaction) layer protocol through Merge, after which the outcome of Merge will be recorded instead.

What is this Merge business, you ask.
"Some replicas may miss operations or need to reconcile their state if the consensus result chosen by the application (i.e., transaction) protocol does not match their result. To ensure that IR replicas eventually converge, they periodically synchronize. Similar to eventual consistency, IR relies on the application (i.e., transaction) protocol to reconcile inconsistent replicas. On synchronization, a single IR node first upcalls into the application protocol with Merge, which takes records from inconsistent replicas and merges them into a master record of successful operations and consensus results. Then, IR upcalls into the application (i.e., transaction) protocol with Sync at each replica. Sync takes the master record and reconciles application (i.e., transaction) protocol state to make the replica consistent with the chosen consensus results."

Requirements from IR clients

IR imposes some hard requirements on the transaction layer. These limit the transaction layer and complicate its design.

1. Invariant checks must be performed pairwise.
This is the most restrictive requirement. "IR's limitation is that it can only support system guarantees that depend on conflicts between pairs of operations. For example, IR can be used to replicate a lock server but not a sales app that only sells thing in stock. The lock server's mutual exclusion guarantee is a pair-wise invariant, but calculating the store's stock requires a full history of operations that only strong consistency can achieve."

2. Application  (i.e., transaction) protocols must be able to change consensus operation results.

3. Application  (i.e., transaction) protocols should not expect operations to execute in the same order.

I don't provide explanation for the last two, because they are complicated. I am not sure I understand the explanation provided in the paper. This boils down to the following: the transaction protocol needs to sort out the mess of inconsistent ordering at the replication/storage layer, by essentially resorting to retry (with reordering) or abort (due to conflict) transactions and getting it right in the next round. This retry with reordering resembles the Fast Paxos approach. Due to inconsistent ordering at the replicas, this is the price to pay.


TAPIR is designed to work on top of IR's weak guarantees to provide linearizable (strict-serializability) distributed transactions: (1) TAPIR does not have any leaders or centralized coordination, (2) TAPIR Reads always go to the closest replica, and (3) TAPIR Commit takes a single round-trip to the participants in the common case.

TAPIR uses clients as 2PC coordinators. A client Begins a transaction, then executes Reads and Writes during the transaction's execution period. During this period, the client can Abort the transaction. Once it finishes execution, the client Commits the transaction, after which it can no longer abort the transaction. The 2PC protocol will run to completion, committing or aborting the transaction based entirely on the decision of the participants.

TAPIR uses optimistic transaction ordering via NTP timestamps and OCC in order to concentrate all ordering decisions into a single set of validation checks at the proposed transaction timestamp. Under load, its abort rate can be as bad as OCC. The evaluation section shows a graph comparing TAPIR and OCC abort rates.

TAPIR protocol is complicated. To the credit of the authors, they provide a TLA+ model and model checking of TAPIR to support its correctness.

Finally, let's consider this question: what happens when the TAPIR client fails? Since client is the coordinator of the 2PC transaction, this is similar to coordinator failure in 2PC, and leads to blocking and a complicated recovery procedure which necessitates going to 3PC. In other words, TAPIR doesn't solve the 2PC blocking problem of transactions. A recovery procedure at the client level is needed if the client/coordinator blocks or dies. (RIFL, as we discuss below, considers almost the dual problem of IR/TAPIR and results in a cleaner recovery for the client level.)

RIFL comparison

RIFL also appeared on SOSP 15 as the TAPIR paper. RIFL takes a different approach to the problem. RIFL advocates building transactions over a linearizability layer at the replication, the advantage of which is cleaner recovery at the transaction layer. RIFL eliminated the complex recovery procedures of Sinfonia transactions, when the initiator (transaction manager) dies.


The paper uses Retwis Twitter clone and YCSB benchmark for evaluation. Evaluation is done on Google App Engine. Note that peak throughput is lower than both Cassandra and Redis, but those are eventual consistency systems, while TAPIR-KV uses a distributed transaction protocol that ensures strict serializability. Unfortunately the consistency violations in the eventual consistency KVs are not provided. That would be interesting to see and compare. They may turn out to be not so bad, as PBS paper and Facebook consistency papers point out.

Related links

Link to the paper

SOSP conference presentation where Irene does a great job of presenting overview of the ideas/techniques involved

The code for TAPIR is available here 

Irene's blog post on lessons learned from TAPIR

Sunday, March 27, 2016

Paper review: Measuring and Understanding Consistency at Facebook

I have reviewed many Facebook papers in this blog before (see the links at the bottom for full list). Facebook papers are simple (in the good sense of the word) and interesting due to the huge scale of operation at Facebook. They are testaments of high scalability in practice. This one here is no different.

This paper investigates the consistency of the Facebook TAO system. The TAO system is a replicated storage system for Facebook's social graph of billion vertices. In theory, TAO provides only eventual consistency. The interesting result in this paper is this: In practice, Facebook's TAO system is highly consistent, with only 5 inconsistency violations out of a million read requests!

Facebook's TAO system

TAO is basically 2-level memcached architecture backed by a database, as first discussed in NSDI'13 paper. TAO architecture is given in Figure 1. As a prerequisite to TAO consistency discussion in the next section, it will be enough to review how TAO serves read and write requests below.

Read requests from web servers are served by the corresponding leaf cache in the local region (the cache-hit ratios are very high), and if that fails it progresses down the stack to be served by the root cache, and if that also fails, by the local database.

Write requests are complicated. The update needs to be reflected at the master database so the write is routed all the way to the master database and back, following the path 1, 2, 3, 4, 5, 6, 7, 8 in Figure 1. Each of those caches in that route applies the write when it forwards the database's acknowledgment back towards the client.

When the data is updated at the master database, the old data in other caches (those not in the route to the writing client) need to be updated as well. TAO does not go for the update, but goes for the lazy strategy: invalidation of those old data in the other caches. When a read comes from those other paths, the read will cause a miss, and later populate the caches with the updated value learned from the regional database. This lazy strategy also has the advantage of being simple and avoiding inadvertantly messing things (such as ordering of writes). TAO chooses to delete cached data instead of updating it in cache because deletes are idempotent. The database is the authoritative copy, and caches, are just, well, caches.

So, how does the invalidation of caches in other routes work. This proceeds in an asynchronous fashion. The root caches in the master (6') and originating regions (7') both asynchronously invalidate the other leaf caches in their region. The master database asynchronously replicates the write to the slave regions (5'). When a slave database in a region that did not originate the write receives it, the database asynchronously invalidates its root cache (6'') that in turn asynchronously invalidates all its leaf caches (7'').

This asynchronous invalidation of caches result in a "vulnerability time window" where inconsistent reads can happen. In theory, TAO provides per-object sequential consistency and read-after-write consistency within a cache, and only eventual consistency across caches.

Well, that is in theory, but what about in practice? How can one quantify the consistency of Facebook TAO?

Consistency analysis setup

To answer this question, the authors develop an offline checker-based consistency analysis. A random subset of the Facebook graph is chosen for monitoring. Random sampling is done for 1 vertex out of a million, so this is for ~1000 vertices. Every request for these ~1000 vertices are logged during their 12 day experiment (the trace contains over 2.7 billion requests), and later analyzed by the offline checker they developed for consistency violations.

Here are the consistency properties considered (going from stronger to weaker): linearizability, per-object sequential consistency, read-after write consistency, and eventual consistency. Linearizability means that there exists a total order over all operations in the system (where each operation appears to take effect instantaneously at some point between when the client invokes the operation and it receives the response), and that this order is consistent with the real-time order of operations. Per object-sequential consistency means that there exists a legal, total order over all requests to each object that is consistent with client’s orders. Read-After-Write means that when a write request has committed, all following read requests to that cache always reflect this write or later writes.

The offline checker converts dependencies between reads and writes into a dependency graph and checks for cycles in the dependency graph. Cycle means a linearizability anomaly. The same technique is also used for checking weaker local-consistency models, per-object consistency, and read-after write consistency. They check for total ordering versus real-time ordering and can process these weaker local-consistency models accordingly.

Consistency evaluation results 

Check Table 3 for the consistency evaluation results. It turns out the linearizability inconsistencies are very low: 0.0004%   And this gets even lower if you consider read-after-write within a single region/cluster: 0.00006%.

How come inconsistencies are so rare considering TAO provides only eventual consistency in theory? What gives? This stems from the following feature of Facebook TAO: "writes are rare". (Check Table 2.) In the traces you need both write and reads to an object to see inconsistency. And only 10%-25% of objects has both. Even then, a write is rare, and a read does not immediately follow write. Even when a read immediately follows write there is access locality, the read comes from the same region where the cache is already updated. These all contribute to keep inconsistency rate very low, at 4 in a million.

The paper also considers edge update consistency, which return similar results to vertex consistency. An interesting finding here is that 60% of all observed edge anomalies are related to ‘like’ edges.
The high update and request rate of “likes” explains their high contribution to the number of overall anomalies. The high update rate induces many vulnerability windows during which anomalies could occur and the high request rate increases the likelihood a read will happen during that window. The top 10 types (of edges) together account for ~95% of the anomalies. These most-anomalous edge types have implications for the design of systems with strong consistency.

Online practical consistency monitoring for Facebook TAO

Offline checker is expensive so it cannot be used as online. For online, Facebook uses a heuristic to monitor the consistency of their TAO system in production: phi consistency checking. This is is a snapshot query to check if all caches return the same result for a given object.  This is basically a measure of how closely synchronized the caches are within regions (phi(R)) and globally (phi(G)).

Phi consistency is an incomparable query with linearizability and other local-consistency queries, because it is an instant-query, and does not depend on history like the former. So what good is it? It is still good for catching errors. If for some reason (bad configuration, operator error) caches are not maintained properly (e.g., cache invalidations screw up, etc) these phi queries will catch that problem in real-time and warn.

So, what did we learn? 

Although TAO provides only eventual-consistency in theory, it turns out TAO is highly consistent in practice, with only 5 out of a million read-requests resulting in a linearizability violation.

How generalizable is this finding to other eventual-consistency systems? Of course we need to understand this finding with its limitations. This finding applies for a social network system updated with real-time human actions, so the data does not change very fast. And when it does, usually there is no immediate read request from other regions due to access locality, so we don't see much inconsistency. Another limitation to the study is that the way they sample ~1000 vertices out of a billion may not capture the power law model of the social graph. What about celebraties that have many followers, their posts are bound to see more traffic and prone to more inconsistency problems, no? I think the paper should have investigated the vertices with respect to their popularity tiers: high, medium, low.

Finally another limitation is this. This work considers local consistency model. Local consistency means whenever each individual object provides C, the entire system also provides C. This property is satisfied by linearizability, per-object sequential consistency, read-after write consistency, and eventual consistency, but is not satisfied by causally-consistent and transactional consistency models. The paper has this to say on the topic: "Transactional isolation is inherently a non-local property and so we cannot measure it accurately using only a sample of the full graph. This unfortunately means we cannot quantify the benefits of consistency models that include transactions, e.g., serializability and snapshot isolation, or the benefit of even read-only transactions on other consistency models. For instance, while our results for causal consistency bound the benefit of the COPS system, they do not bound the benefit of the COPS-GT system that also includes read-only transactions."

Related links

Conference presentation video of this paper

Probabilistically bounded staleness

Facebook's software architecture 

Facebook's Mystery Machine: End-to-end Performance Analysis of Large-scale Internet Services 

Holistic Configuration Management at Facebook

Scaling Memcache at Facebook

Finding a Needle in Haystack: Facebook's Photo Storage

One Trillion Edges, Graph Processing at Facebook-Scale

Monday, February 15, 2016

Paper review: Implementing Linearizability at Large Scale and Low Latency (SOSP'15)

Motivation for the paper

Remember the 2-phase commit problem?  The 2-phase commit blocks when the initiator crashes. Or, after a server crash the client/initiator may not be able to determine whether the transaction is completed. And transaction blocks. Because if we retry we may end up giving inconsistent result or redoing the transaction which messes up linearizability.

You need to have 3-phase commit to fix this. The new-leader/recovery-agent comes and tries to recommit things. Unfortunately, 3-phase commit solutions are complicated, there are a lot of corner cases. Lamport and Gray recommended that the Paxos consensus box can be used to remember the initiator's abort commit decision to achieve consistency, or more precisely they recommended the Paxos box remembers each participants decision  for the sake of shaving off a message in critical latency path.

What this paper recommends is an alternative approach. Don't use Paxos box to remember the decision, instead use a durable log at the participant to remember the decision. At this log the participant stores a completion record, which includes any results that are returned to the client. So if the initiator is confused and retries, or if the client retries, or a recovery-agent from one participating server comes and retries, this querying party is not going to get an inconsistent answer/decision from what is committed/returned earlier from the transaction.

How is the log at the participant durable against the crash of the participant? In other words, how do we ensure that the completion record is preserved? This is where this assumption about fast log-based recovery and RAMCloud specific features comes into play. RAMCloud maintains a log-structured replication and quick recovery, that ensures the completion record is not lost.

The paper presents this durable log-based transaction serializability idea with single participant, i.e., single object transaction, and then shows that it can be extended to multiple participant transactions.

That was my plot line for motivating the approach in the paper. The paper used, what I think is an indirect way to motivate the problem, by first pointing a problem with linearizability: exactly-once semantics. The figure illustrates that "at least once + idempotence != exactly-once". The paper then presents the completion record idea to achieve exactly-once semantics, and then builds linearizability on top of it, and in turn builds transactions on top of it.

Another idea in the paper is "implementing transactions on top of a lightweight linearizability layer". The paper argues that after having a lightweight linearizability layer in place, transations in fact become easier to implement. We will revisit this idea at the end of the review to see how it holds up.

RIFL architecture

The lightweight linearizability layer the paper suggests is named RIFL (Reusable Infrastructure for Linearizability).
In order to implement exactly-once semantics, RIFL must solve 4 problems: RPC identification, completion record durability, retry rendezvous, and garbage collection.
1) In order to detect redundant RPCs, each RPC must have a unique identifier, which is present in all invocations of that RPC.
2) RIFL assumes that the underlying system provides durable storage for completion records keyed with the RPC identifier.
3) If an RPC completes and is then retried at a later time, the retries must find the completion record to avoid re-executing the operation. To achieve this  each operation is associated with a particular object in the underlying system, and the completion record is stored wherever that object is stored.
4) For garbage collection, a completion record should not be reclaimed until it is certain that the corresponding request will never be retried. This can happen in two ways. First, once the client has received a response, it will never retry the request. Clients provide acknowledgments to the servers about which requests have successfully completed, and RIFL uses the acknowledgments to delete completion records.
RIFL appears to the rest of the system as three modules. The first, RequestTracker, runs on client machines to manage sequence numbers for outstanding RPCs (Figure 3). The second module, LeaseManager, runs on both clients and servers to manage client leases (Figure 4). On clients, LeaseManager creates and renews the client’s lease, which also yields a unique identifier for the client. On servers, LeaseManager detects the expiration of client leases. The third module, ResultTracker, runs only on servers: it keeps track of currently executing RPCs and manages the completion records for RPCs that have finished (Figure 5).

Implementing transactions over RIFL

The paper shows how Sinfonia minitransactions can be implemented over RIFL layer. You can read a summary of Sinfonia minitransactions here. The implementation of Sinfonia transactions over RIFL requires a long description, so I will avoid summarizing it myself, and instead point to a couple paragraphs verbatim from the paper to give you an idea about this.
"No side-effects" is the key idea when implementing transactions. The Transaction object defers all updates to the key-value store until commit is invoked. Commit must atomically verify that each object has the required version number, then apply all of the write and delete operations. If any of the version checks fail, the commit aborts and no updates occur.
Commit is implemented using a two-phase protocol where the client serves as coordinator. In the first phase, the client issues one prepare RPC for each object involved in the transaction (see Figure 6). The server storing the object (called a participant) locks the object and checks its version number. If it doesn't match the desired version then the participant unlocks the object and returns ABORT; it also returns ABORT if the object was already locked by another transaction. Otherwise the participant stores information about the lock in a transaction lock table and creates a durable record of the lock in its log. It then returns PREPARED to the client. The client issues all of the prepare RPCs concurrently and it batches requests to the same participant. If all of the prepare RPCs return PREPARED, then the commit will succeed; if any of the prepare RPCs return ABORT, then the transaction will abort. In either case, the client then enters the second phase, where it issues a decision RPC for each object. The participant for each object checks whether the RPC indicates “commit” or “abort”. If the decision was to commit, it applies the mutation for that object, if any. Then, whether committing or aborting, it removes the lock table entry and adds a tombstone record to the RAMCloud log to nullify the lock record. The transaction is effectively committed once a durable lock record has been written for each of the objects. At this point, regardless of the client's actions, the mutations will eventually be applied in the crash recovery.

If the client is suspected to have crashed, the participant for the transaction's first object acts as recovery coordinator. The recovery coordinator executes a two-phase protocol similar to that of the client, except that its goal is to abort the transaction unless it has already committed (in general, there may not be enough information to complete an incomplete transaction, since the client may have crashed before issuing all of the prepare RPCs). In the first phase the recovery coordinator issues a requestAbort RPC for each object, whose participant will agree to abort unless it has already accepted a prepare for the object. If requestAbort returns PREPARED for every object in the transaction, then the transaction has already committed. Otherwise, the recovery coordinator will abort the transaction. In either case, the recovery coordinator then sends decision RPCs for each object. In order to provide enough information for crash recovery, the client includes identifiers for all objects in the transaction as part of each prepare. This information serves two purposes. First, it allows each participant to identify the recovery coordinator (the server for the first object in the list). Second, the object list is needed by the recovery coordinator to identify participants for its requestAbort and decision RPCs. The recovery coordinator may not have received a prepare if the client crashed, so when a participant invokes startRecovery it includes the list of objects that it received in its prepare.

Transactions over Linearizability vs. Linearizability over Transactions

Implementing transactions over linearizability certainly provided a lot of benefit in terms of simplifying the complex recovery protocol in the original Sinfonia system. By implementing Sinfonia over RIFL, the paper did not need to implement the recovery protocol of Sinfonia. On the other hand we don't have results from the paper to see if implementing Sinfonia over RIFL helped improve performance.  The paper does not include any comparison of performance with base Sinfonia transactions.  Or the paper could have at least measured throughput of base RAMCloud transaction and RAMCloud with RIFL transactions to see if there is a throughput increase. That is also missing sorely in the paper.

As far as general transactions and general linearizability is concerned: the RIFL paper doesn't compare with Calvin, a system that made the case for transactions over linearizability. This is a big omission as Calvin system set out to do just that, implementing transactions over a linearized log: “By first writing transaction requests to a durable, replicated log, and then using a concurrency control mechanism that emulates a deterministic serial execution of the log's transaction requests, Calvin supports strongly consistent replication and fully ACID distributed transactions, and manages to incur lower inter-partition transaction coordination costs than traditional distributed database systems.”

There is also the question of throughput vs. latency for transactions. The Calvin paper suggests that transactions over linearizability improves throughput but latency suffers. I think the reason is as follows: linearizability-first avoids coordination headache/contentions. So eventhough it may add to latency, it should help improve throughput overall because coordination contention is avoided. Things are all predecided by the linearizability layer, aka the log.

Evaluation section

The evalution section shows the practical implications of the shim lightweight linearizability layer and demonstrates that this provides low overhead transactions even as the number of participants increase.
The RIFL+RAMCloud stack is compared/contrasted with H-Store main-memory database system using the TPC-C benchmark. The RAMCloud implementation of RIFL exhibits high performance: it adds less than 4% to the 13.5 μs base cost for writes, and simple distributed transactions execute in about 20 μs. RAMCloud transactions outperform H-Store on the TPC-C benchmark, providing at least 10x lower latency and 1.35x–7x as much throughput.
To contextualize these comparison results,  keep in mind that H-Store is a totally different beast than RAMCloud. H-Store is a  row-store-based main-memory relational database management system for OLTP applications. So H-Store is definitely more heavyweight to deal with general transactions and relational databases.

Figure 12 graphs the throughput of RIFL-based transactions. For transactions involving a single participant, a cluster with 10 servers can support about 440k transactions per second. With more participants per transaction, throughput drops roughly in proportion to the number of participants: with five participants in each transaction, the cluster throughput is about 70k transactions per second. Figure 12 also shows that the single-server optimization improves throughput by about 40%.

Related links

The paper: http://dl.acm.org/citation.cfm?id=2815416
Conference video: https://www.youtube.com/watch?v=MkF2wuHaf04

The case for RAMClouds:http://muratbuffalo.blogspot.com/2010/12/case-for-ramclouds-scalable-high.html
RAMCloud Reloaded: http://muratbuffalo.blogspot.com/2013/06/ramcloud-reloaded-log-structured-memory.html