Archive for the “Distributed Systems” Category

The difficulty of constructing remote services is often not in writing them but testing and debugging whilst ensuring that some of the nastier types of failure (e.g. packet loss or machine failure) are adequately handled.

The norm for these kinds of testing scenarios is to have a full, mocked-up test environment with a bunch of servers. Such a setup needs sysadmin and repeated deployment steps which for most organisations are slow, ponderous things. Incremental test cycles in such an environment become costly which leads to onerous, last-minute testing and the late discovery of difficult to fix bugs that introduce endless release delays.

Over the years I’ve developed an approach for pushing all these testing scenarios back toward the unit level so they can be run regularly per build as they take mere minutes to complete. The core philosophy is to design the software in such a fashion that it runs on a single machine using all the network protocols it would use when deployed across many servers (ah, the power of localhost/127.0.0.1).

Preliminaries

Putting this philosophy into practice requires that we adopt certain design practices:

  1. Clean separation between the transport/remote layers and the core service logic. This makes it easy to develop tests that verify the core logic without any remoteness concerns and a second set of tests that perform the more heavyweight remote tests. The benefit is that we can more easily isolate issues when they occur. For example, if the core logic tests pass but the remote tests fail we can be pretty confident the issue is in the remote layers.
  2. Clean separation of configuration source from core service and transport/remote layer. This ensures all our software requests configuration using a consistent API which could then be implemented via LDAP, flat-files, in-memory etc. Such a setup allows us to easily build up configuration inside of our tests and make it available to the services we’re building.
  3. Runtime discovery of endpoints. To allow us to dynamically allocate port/ip combinations and make them available to whichever services require them. One can achieve this via the abstracted configuration source but it’s often cleaner to have a dynamic lookup/discovery mechanism.
  4. Configurable log file locations. So that we can avoid path clashes between services.

Once these things are in place, unit tests can construct transports, endpoints and configuration dynamically at run time in whatever combination is required for a test. It is thus possible to instantiate a collection of services inside of a single process and have them talk to each other as if they were all running remotely. This is somewhat at odds with other design practices where we typically look to remove remoteness when running services locally for purposes of performance.

Failure Scenarios

By virtue of the unit tests having control of all the services and their transports/endpoints it becomes possible to stop or disable services thus simulating machine failures but it’s also possible to extend the approach to cover problems such as packet loss, corruption or increased latency.

These more advanced scenarios are more readily handled with server construction toolkits such as Netty which allows tight control of packet processing and protocol. Using Netty, one can build up the protocol stack per service exactly as required and introduce Decoder/Encoder pairs, Handlers or wrappers around core service implementation that can randomly (and silently without severing the connection) lose messages or packets, break connections etc.

Example

I’ve been working on a Paxos implementation which breaks down into:

  • State machines – Leader, Acceptor and Learner and associated elements such as leader election and failure detection.
  • Persistent storage layer – as various state must be remembered across Paxos instances.
  • Remote communications layer – including cluster membership and remote communications.

The state machines accept messages, make appropriate state transitions and produce messages. These are then passed around between participants via the remote communications layer. The persistent storage layer allows for specification of file locations at construction time which allows test code to allocate separate directories on a single-disk to hold respective state.

The remote layer is built such that none of the members need static/well-known ports to operate off. There is one exception which is a fixed multicast address that is used to do initial cluster discovery. It is implemented using Netty and consists of some codecs for the various messages and a handler that passes messages to and from the state machines.

There are several different implementations of the handler. There is the normal version that dispatches messages reliably and several others that randomly drop messages or lose them at critical moments in an instance of Paxos. The exact behaviour of these handlers is configured at runtime which allows unit tests to construct random or specific failure scenarios and ensure the state machines behave appropriately.

All these elements together allow unit tests to construct, in a single-process, fully remote services that communicate via TCP and UDP/Multicast as if they were running on a network and simulate failure scenarios. Alongside these tests are a collection to verify correct behaviour of the state machines and a set that validates their failure handling via timeouts, leader election behaviours etc. The entire suite including the failure scenarios runs in less than five minutes. That leaves one long-running test that exercises a collection of state machines concurrently for long periods, a necessary soak test run separately.

Alternative Implementations

A similar testing approach is possible with the likes of Jetty 7 as the lower IO layers are open enough to be customised to support these test scenarios. This can be a better option than Netty if services are Servlet based.

More challenging are the RPC-based services as these tend to run atop closed stacks that limit the amount of customisation possible and often have horrid configuration methods. However Thrift, by virtue of it’s Processor/Protocol abstraction can be readily modified to support such testing.

Sidenotes:

  1. Applications that use databases for state storage can make this sort of testing tricky but it’s not impossible. One solution to the problem is to make use of virtual machines where one instantiates an image containing a pre-defined database and shuts it down afterwards alongside some scripts to prepare and tear down data within the database
  2. I’ve recently applied this approach to several other systems including a trade management system written in Clojure, a trading platform written in Scala and a gossip-based directory service also written in Scala

Comments 1 Comment »

Queues and Threads

An actor is essentially a queue of requests fed into some piece of single-threaded logic. Only one thread at any moment in time can be dispatching a request through the actor.

In essence this is a SingleThreadExecutor as found in java.util.concurrent.

The model is wonderfully simple at the surface but is hiding a number of challenging issues which aren’t easily solved without assistance from the actor/app developer.

Flow Control

The speed at which an actor’s queue is drained is related to the time taken for the actor to dispatch a request taken from the queue. There are also a number of runtime factors that can cause the queue to empty more slowly than might be expected including:

  • Limited size thread of actor thread pool.
  • The work being performed within the actor takes a long time generally.
  • An actor doing I/O is slowed down due to contention on the device or because of a fault (e.g. a RAID enclosure slowing down whilst it restores a disk).

If actor queues fill faster than they can be emptied we will eventually exhaust resources and fail. It must be possible to do explicit flow-control and generate back-pressure on other components in a system which requires an actor infrastructure to expose metrics about queue sizes or provide other feedback about the state of the system (e.g. by posting events to a listener).

Partitioning

If we partition our actors by function then only one actor can perform some set of actions against some set of state owned by that function which represents a throughput limitation. An alternative is to partition the state across multiple actors each capable of performing the function. Or we can pass all state to be worked upon into an actor as part of the request such that we can run many stateless actors. The resultant and prior state must then be stored somewhere else (database or global memory or some other actor). In summary, we must either:

Ensure state is partitioned inside of actors in sufficiently granular fashion that we can provide enough throughput of operations to prevent overload.

or:

Ensure that we have sufficient actors capable of performing some function and can spread state across requests without contention.

[Sidenote: Achieving this efficiently in a system with dynamic load is not easy, one can of course take the easy option of over-provisioning ]

Network

The kind of network used for actor communication has significant effects on system characteristics. For the purposes of this conversation we’ll consider two kinds of network:

  1. Processor/Memory bus – essentially guaranteed delivery, high throughput and low latency. Actors and requesters communicating via a bus are local to each other.
  2. Ethernet – no delivery guarantee (a machine failure now doesn’t stop the entire system and packets can be lost silently for variable periods of time), low throughput, higher and more variable latency as compared to processor/memory bus. Actors and requesters communicating via a network are remote from each other.

[Sidenote: A lot of the literature considers both communication over buses and networks to be styles of distributed system. Generally, algorithms for the bus-based style are more plentiful, considerably less complex and easier to construct than for the network-based style see e.g. Lynch ]

An actor that is remote from an entity passing it requests requires two queues. One at the originator and one at the actor. The outbound queue in particular has substantially different characteristics from local actors and requesters:

  1. Throughput – maximum throughput of the queue is closely related to the throughput of the ethernet between originating and remote machines. This is considerably lower than the throughput for a local queue.
  2. Latency – the remote queue has the latency of the underlying ethernet and transport stack. Again, substantially different from the local queue.

Why does this matter? Because the number of requests one can dispatch through a remote queue per unit time is substantially different from a local queue. One cannot write a naive algorithm to dispatch requests evenly across a set of actors and assume they make progress at the same rate. Care must be taken to control flow in and out of queues to prevent resource exhaustion. Actor infrastructures that encourage these flow-naive algorithms are likely to exhibit difficult to debug failures in the face of unexpected/unanticipated load.

If one chooses an approach of passing state into an actor, then one must ensure the state is not large because it will take significant time to pull it across the network from storage (or another machine) and put it back. Further whilst the data is being transferred it cannot be modified by some other entity for fear of modifying old-state and losing some changes thus some form of conflict resolution is required (locks, vector clock based merge etc).

Availability

By virtue of machine and network failures (packet loss, outright failure etc), remote actors may become unavailable temporarily or permanently. A stateless actor can be respawned on another machine with little effort but a stateful actor requires some storage that is available across a number of nodes. Note that use of stateless actors will only eradicate the need for storage in the case where nothing else is required to hold state and pass it to actors for processing.

[Sidenote: The need for storage and the costly disk-syncing/checkpointing required to look after state so it can be made available to relocated actors impacts the relevance of actor benchmarks that concentrate on request throughput particularly in the multi-machine scenario. This is because storage and network become the dominant performance factors rather than the speed of request dispatch in real-world scenarios. Such a change may require us to adjust our actor/state partitioning approach ]

Should a machine be running a large number of actors and become unavailable, there is now a need to do significant and costly work to get these actors onto another machine. State must be moved, actors must be re-started and potentially re-balanced across remaining machines. Over the course of a re-balancing, various actor queues must be temporarily suspended which can lead to an increased backlog of requests which as above can cause resource exhaustion. Note also that when a machine becomes unavailable all queues on remote machines that were feeding it must be paused until new resource becomes available.

[Sidenote: Once one is running actors across multiple machines, it becomes necessary to have some form of directory service which can be used to find an actor. Without this feature it becomes difficult to relocate an actor (how do you find it's new address?). We must now keep this service up to date, make it tolerant of failures and scalable to potentially large numbers of actors ]

Summary

In a single-machine/multi-core environment, actor models can work well subject to being able to achieve an appropriate partitioning of function and/or state. This mode of operation also permits substantial simplicity in APIs but flow-control cannot be ignored.

Actor models will only work well in the multi-machine environment if they trade their simplicity for more complicated APIs and heightened developer awareness (see e.g. A Note on Distributed Computing) or are deployed into a very strictly controlled environment (increasingly difficult as a system grows).

Actors like many other concurrency models have their uses (though I’d claim they aren’t anything new and can be built in e.g. Java using an executor) but are not a panacea and their much-vaunted simplicity can be a double-edged sword.

Comments 2 Comments »

Those specifying requirements often express them without consideration for the passing of time, assuming that actions are instantaneous. A naive development team with limited experience in distributed systems will then make the classic mistake of attempting to implement those requirements to the letter. This can lead to a bunch of undesirable outcomes including:

  • Brittleness in the face of failure.
  • High cost solutions.
  • Poor scaling properties.
  • Disappointment as the expectations of the requirements source aren’t met.

Consider a system where we have two (network) hops to an observer and one hop to the initiator of an action (assuming uniform network latency for each hop). Potentially for every two actions there will be a single observation. Thus each observation of the system is out of date by the time it reaches the observer.

Administrative actions can suffer similar problems, in that it could take several hops for the request to arrive at the system. A user may be only one hop away and could be performing many operations in the time it takes for one of our actions to reach the system. For example if we wish to block a user, whilst our request is in transit they might perform several operations.

Things are made worse by network failures which can further delay or prevent execution of an action and slow down the rate of updates to an observer.

How then do we account for these troubles when specifying requirements? By qualifying them with appropriate SLA’s. In the example above, appropriate SLA’s might include:

  • Time for propagation of an administrative action.
  • Maximum acceptable time after the action is triggered for a user to be blocked.

SLA’s such as the above:

  1. Help us to identify appropriate solutions (e.g. do we need to pay for multiple independent routes between data-centres).
  2. Allow us to make appropriate use of asynchronous operations and eventual consistency.

Since SLA’s have significant impact on the way in which a requirement will be implemented it is essential to perform appropriate expectation management, discussing and communicating the implications with the requirements source, they cannot be solely the domain of techies. Remember also that in many situations customers prefer availability over consistency.

Comments Comments Off

Neglecting to account for failure is an age old problem. Consider this common error (Purify anybody?):

#include <stdio.h>
#include <stdlib.h>
struct rhubarb {
  int aVal;
  int anotherVal;
  char* aString;
};
......
  struct rhubarb* mystruct;
  mystruct = malloc(sizeof(struct rhubarb));
  mystruct->aVal = 55;
......

Of course the following code should have been included after the malloc:

/*
  If memory wasn't allocated, do something appropriate.
*/
if (mystruct == NULL) {
  .....
}

An equivalent mistake is easily possible when building a distributed system in http or RMI by ignoring error codes or exceptions that are designed to communicate failures that we ought to handle. It’s similarly easy to ignore latency, or implement brittle and dumb retry logic or assume something is reliable (like a message queue) when it isn’t. Many have managed to concoct systems with http that breach the idempotent “constraints” of REST and whilst Erlang provides link() and receive timeouts, we’re not forced to use them.

In essence there is no way to ensure developers do the right thing in a single-process or distributed context. No technology, tool or design approach can prevent developers from making poor implementation decisions which limits the value in re-hashing (Steve, Steve and Stu) RPC rights and wrongs.

I believe the best chance we have for doing distributed right is not by providing some de-facto standard toolset, rather it’s through education[1] and mentoring to encourage the correct mindset. Such a mindset allows a developer building a distributed system to choose the most appropriate tools and use them right.

[1] Material to be covered would be substantially broader then the fallacies, failure handling, latency and should probably include: logical time, FLP, failure detectors, global snapshots and Paxos.

Comments 1 Comment »

Amazon has had a few problems of late, one of the more interesting ones being something S3 users encountered. It took Amazon a little while to identify the root cause:

We’ve isolated this issue to a single load balancer that was brought into service at 10:55pm PDT on Friday, 6/20. It was taken out of service at 11am PDT Sunday, 6/22. While it was in service it handled a small fraction of Amazon S3′s total requests in the US. Intermittently, under load, it was corrupting single bytes in the byte stream.

Perhaps they had anticipated this scenario as the S3 API features explicit support for software-level check-summing via MD5:

For all PUT requests, Amazon S3 computes its own MD5, stores it with the object, and then returns the computed MD5 as part of the PUT response code in the ETag. By validating the ETag returned in the response, customers can verify that Amazon S3 received the correct bytes even if the Content MD5 header wasn’t specified in the PUT request. Because network transmission errors can occur at any point between the customer and Amazon S3, we recommend that all customers use the Content-MD5 header and/or validate the ETag returned on a PUT request to ensure that the object was correctly transmitted. This is a best practice that we’ll emphasize more heavily in our documentation to help customers build applications that can handle this situation.

Some developers were surprised that any of this was necessary, expecting TCP/UDP checksums to be sufficient however Stevens points out in TCP/IP Illustrated Vol I:

Also, if your data is valuable, you might not want to trust the UDP or the TCP checksum, since these are simple checksums and were not meant to catch all possible errors.

Takeaways:

  1. Not all types of failure are binary – working or not working.
  2. Leaving the responsibility of data-safety to software layers further down the stack may not be best.
  3. Mechanisms for failure handling must be embedded in APIs.

Comments Comments Off