Thursday, March 31, 2011

The Competitors

I don't normally spend much time worrying about our competitors. I figure keeping the customers happy is a better use of my time. But no set of blog posts about performance would be complete without a final showdown...

There aren't all that many standalone JTA transaction managers out there. Most function only as part of an application server and are not a viable option for embedding in other environments. Others are incomplete or largely abandoned. In my opinion there are currently only two serious challengers to JBossTS for use outside a full Java EE app server. Let's see how they do.

results in transactions per second, using quad core i7, jdk6, in-memory logging i.e. we're testing the CPU not the disk.


threads JBossTS Competitor A.
1 93192 9033
2 164744 20242
3 224114 22649
4 249781 21148
100 239865 19978


umm, guys, "synchronized ( Foo.class )" is really not a good idea, ok? Come on, put up a bit more of a fight please, this is no fun.

Next contender, please....


threads JBossTS Competitor B.
1 93192 102270
2 164744 145011
3 224114 185528
4 249781 193199
100 239865 29528


Ahh, that's more like it. Good to have a worthy opponent to keep us on our toes. Some kind of scaling problem there though. Looks like contention on the data structures used to queue transaction timeouts. Been there, done that. Feel free to browse the source code for inspiration guys :-)

Since nobody in their right mind actually runs a transaction manager with logging turned off, this is really all just a bit of fun. Well, for some of us anyhow. So where is the competitive comparison for the real world scenario?

Well, I do have the comparison numbers for all three systems with logging enabled, but I'd hesitate to describe them as competitive. You'll have to try it for yourself or you just won't believe the difference. Suffice it to say there is only one Clebert, and we've got him. Sorry guys.

Now if this had been an ease of use contest I'd be a bit more worried. Sure we have a ton of detailed and comprehensive reference documentation, but the learning curve is near vertical. Time to work on some intro level material: quick starts, config examples etc. Not as much fun as performance tuning, but probably more useful at this point.

Monday, March 28, 2011

fast enough?

Remember the performance tuning we've been doing? Well, the powers that be sat up and took notice too. I think the phrase was 'more than enough to satisfy requirements' with a non-too-subtle subtext of 'so now put down that profiler and do some real work'. Fortunately I'm not too good at taking hints...

After some more tweaking today (finalizers are evil and must die), I thought I'd see how we were doing. Here is trunk, soon to become JBossTS 4.15 and be used in EAP6, against JBossTS 4.6.1.CP11, used in the current EAP5.1 platform release. 4.6.1 originally dates from September 2009, but we backported some performance improvements into it in mid 2010.

results in transactions per second, using quad core i7, jdk6, in-memory objectstore i.e. we're testing the CPU not the disk.


threads 4.6.1.CP12 4.15.SNAPSHOT improvement
1 37285 93192 2.50x
2 55309 164744 2.97x
3 65070 224114 3.44x
4 66172 249781 3.77x
100 29027 239865 8.26x


hmm, ok, so I guess maybe we did exceed the requirements a bit.

Saturday, March 19, 2011

Concurrency control

We managed to gloss over a few important points during the discussion on Isolation. Let's dig into them here.

How can you ensure isolation? Well typically this is covered by what's referred to as concurrency control for the resources within the transaction. A very simple and widely used approach is to regard all operations on resources (objects) to be of type ‘read’ or ‘write’, which follow the synchronization rule permitting ‘concurrent reads' but exclusive ‘writes’. This rule is imposed by requiring that any computation intending to perform an operation that is of type read (write) on an object, first acquire a ‘read lock’ (‘write lock’) associated with that object. A read lock on an object can be held concurrently by many computations provided no computation is holding a write lock on that object. A write lock on an object, on the other hand, can only be held by a computation provided no other computation is holding a read or a write lock. (Note, although we'll talk in terms of locks, this should not be used to infer a specific implementation. Timestamp-based concurrency control could just as easily be used, for example.)

In order to ensure the atomicity property, all computations must follow a ‘two–phase’ locking policy. During the first phase, termed the growing phase, a computation can acquire locks, but not release them. The tail end of the computation constitutes the shrinking phase, during which time held locks can be released but no locks can be acquired. Now suppose that a computation in its shrinking phase is to be rolled back, and that some objects with write locks have already been released. If some of these objects have been locked by other computations, then abortion of the computation will require these computations to be aborted as well. To avoid this cascade roll back problem, it is necessary to make the shrinking phase ‘instantaneous’.

Most transaction systems utilize what is commonly referred to as pessimistic concurrency control mechanisms: in essence, whenever a data structure or other transactional resource is accessed, a lock is obtained on it as described earlier. This lock will remain held on that resource for the duration of the transaction and the benefit of this is that other users will not be able to modify (and possibly not even observe) the resource until the holding transaction has terminated. There are a number of disadvantages of this style: (i) the overhead of acquiring and maintaining concurrency control information in an environment where conflict or data sharing is not high, (ii) deadlocks may occur, where one user waits for another to release a lock not realizing that that user is waiting for the release of a lock held by the first.

Therefore, optimistic concurrency control assumes that conflicts are not high and tries to ensure locks are held only for brief periods of time: essentially locks are only acquired at the end of the transaction when it is about to terminate. This kind of concurrency control requires a means to detect if an update to a resource does conflict with any updates that may have occurred in the interim and how to recover from such conflicts. Typically detection will happen using timestamps, whereby the system takes a snapshot of the timestamps associated with resources it is about to use or modify and compares them with the timestamps available when the transaction commits.

Resolution of conflicts is a different problem entirely, since in order to do so requires semantic information about the resources concerned. Therefore, most transaction systems that offer optimistic schemes will typically cause the detecting transaction to roll back and the application must retry, this time with new data. Obviously this may result in a lot of work being lost, especially if the transaction that rolls back has been running for some time.

Assuming both optimistic and pessimistic concurrency control are available to you (and they may not be), then which one to use is up to you. A close examination of the environment in which the application and transactional resources reside is necessary to determine whether a) shared access to resources occurs and b) the relative probability that sharing will cause a transaction to roll back. This might very well not be a black or white choice and may change over the lifetime of your objects or application. Certainly the use of different concurrency control schemes can be important when trying to improve the throughput of user requests and committed transactions, so it’s well worth considering and understanding the issues involved.
Type specific concurrency control

Another possible enhancement is to introduce type specific concurrency control, which is a particularly attractive means of increasing the concurrency in a system (and yes, it's supported in JBossTS). Concurrent read/write or write/write operations are permitted on an object from different transactions provided these operations can be shown to be non-interfering (for example, for a directory object, reading and deleting different entries can be permitted to take place simultaneously). Object-oriented systems are well suited to this approach, since semantic knowledge about the operations of objects can be exploited to control permissible concurrency within objects. Additional work may be needed when working with procedural systems.

Finally, what about deadlocks? When multiple transactions compete for the same resources in conflicting modes (locks), it is likely that some of them will fail to acquire those resources. If a transaction that cannot acquire a lock on a resource waits for it to be released, then that transaction is blocked – no forward progress can be made until the lock has been acquired. In some environments, it is possible for some transactions to be waiting for each other, where each of them is blocked and is also blocking another transaction. In this situation, none of the transactions can proceed and the system is deadlocked.

For example, let’s consider two transactions T1 and T2 that operate on two resources X and Y. Let’s assume that the execution of the operations involved in these transactions is:

T1: read(X); write(Y)
T2: read(Y); write(X)

If the serial execution of these transactions were to result in:

readT1(X); readT2(Y); writeT2(X); readT1(Y)

Note, readT1 means the read operation performed by T1 etc.

Assume that T1 obtained a read lock on X and then T2 gets a read lock on Y – possible because these operations aren’t conflicting and can thus occur in parallel. However, when T2 comes to write to X its attempt to get a write lock on X will block because T1 still holds its read lock. Likewise, T1’s attempt to get a write lock on Y will block because of the read lock that T2 holds. Each transaction is blocked waiting for the release of the others read lock before they can progress: they are deadlocked.
The only way for the deadlock to be resolved is for at least one of the transactions to release its locks that are blocking another transaction. Obviously such a transaction cannot commit (it has not been able to perform all of its work since it was blocked); therefore, it must roll back.

Deadlock detection and prevention is complicated enough in a non-distributed environment without then including the extra complexity of distribution. In general, most transaction systems allow deadlocks to occur, simply because to do otherwise can be too restrictive for applications. There are several techniques for deadlock detection, but the two most popular are:

  • Timeout-based: if a transaction has been waiting longer than a specified period of time, the transaction system will automatically roll back the transaction on the assumption it is deadlocked. The main advantage of this approach is that it is easy to implement in a distributed environment; the main disadvantage is that some transactions may execute for longer than expected and be rolled back when they are not in fact deadlocked.

  • Graph-based: this explicitly tracks waiting transaction dependencies by constructing a waits-for graph: nodes are waiting transactions and edges are waiting situations. The main advantage of this approach is that it is guaranteed to detect all deadlocks, whereas the main disadvantage is that in a distributed environment is can be costly to execute.


A slight variation on the timeout-based approach exists in some transaction systems, where timeouts can be associated with lock acquisition, such that the system will only block for the specified period of time. If the lock has not been acquired by the time this period elapses, it returns control to the application indicating that the lock has not been obtained. It is then up to the application to determine what to do; for example, it may be possible to acquire the required data elsewhere or to ask for a lock in a different mode. The advantage of this approach is that a transaction is not automatically rolled back if it cannot acquire a lock, possibly saving the application lots of valuable time; the disadvantage is that it requires additional effort on behalf of the application to resolve lock acquisition failures. However, for many objects and applications, this is precisely the best place to resolve lock conflicts because this is the only place where the semantic information exists to know what (if anything) can be done to resolve such conflicts.

Durability

D is for Dog, Design and Durability, i.e., the effects of a committed transaction are never lost (except by a catastrophic failure).

The durability (or persistence) property means that any state changes that occur during the transaction must be saved in a manner such that a subsequent failure will not cause them to be lost. How these state changes are made persistent is typically dependant on the implementation of the transaction system and the resources that are ultimately used to commit the work done by the transactional objects. For example, the database will typically maintain its state on hard disk in order to ensure that a machine failure (e.g., loss of power) does not result in loss of data.

Although most users of transactions will see durability from their application’s point-of-view, there is also an aspect within the transaction system implementation itself. In order to guarantee atomicity in the presence of failures (both transaction coordinator and participant), it is necessary for the transaction service itself to maintain state. For example, in some implementations the coordinator must remember the point in the protocol it has reached (i.e., whether it is committing or aborting), the identity of all participants that are registered with the transaction and where they have reached in the protocol (e.g., whether they have received the prepare message). This is typically referred to as the transaction log, though this should not be interpreted as implying a specific implementation. Some implementations may maintain a separate log (file) per transaction, with this information recorded within it and removed when it is no longer needed. Another possible implementation has a single log for all transactions and the transaction information is appended to the end of the log and pruned from the log when the respective transaction completes.

Let’s look at what happens at the participant in terms of durability, when it’s driven through the two-phase commit protocol. When the participant receives a prepare message from the coordinator it must decide whether it can commit or roll back. If it decides to roll back then it must undo any state changes that it may control and inform the coordinator; there is no requirement for durable access at this point. If the participant can commit, it must write its intentions to a durable store (participant log) along with sufficient information to either commit or roll back the state changes it controls. The format of this information is typically dependant on the type of participant, but may include the entire original and new states.

Once successfully recorded, the participant informs the coordinator that it can commit and awaits the coordinator’s decision. When this second phase message arrives, the participant will either cancel all state changes (the coordinator wants to roll back), or if the coordinator wants to commit, make those state changes the current state (e.g., overwrite the original state with the new state). It can then delete the participant log and inform the coordinator.

Isolation

I is for Iguana, Igloo and Isolation, i.e., intermediate states produced while a transaction is executing are not visible to others. Furthermore transactions appear to execute serially, even if they are actually executed concurrently.

This property is often referred to as serializability. If we assume that objects and services can be shared between various programs then it is necessary to ensure that concurrent executions of programs are free from interference, i.e., concurrent executions should be equivalent to some serial order of execution. We have already seen a fairly trivial example of this through the online bank account, but it is worth formalizing this requirement. Consider the following two programs (where w, x, y and z are distinct state variables):

P1 : z := 10; x := x+1; y := y+1
P2 : w := 7: x := x * 2; y := y * 2

Assume that x=y=2 initially. Then, a serial execution order ’P1;P2 ’ will produce the result z=10, w=7, x=y=6, and execution ’P2 ;P1 ’ will produce the results z=10, w=7, x=y=5. The partly concurrent execution order given below will be termed interference free or serializable, since it is equivalent to the serial order ’P1 ;P2 ’:

(z := 10 || w := 7); x := x+1; y := y+1; x := x * 2; y := y * 2

However, an execution order such as the one below is not free from interference since it cannot be shown to be equivalent to any serial order.

(z := 10 || w := 7); x := x+1; x := x * 2; y := y * 2; y := y+1

Programs that possess the above-mentioned serializable property are said to be atomic with respect to concurrency. The serializability property is extremely important, especially in an environment where multiple concurrent users may be attempting to use the same resources consistently.

Consistency

C is for Crayon, Cup and Consistency, i.e., transactions produce consistent results and preserve application specific invariants.

A transactional application should maintain the consistency of the resources (e.g., databases, file-systems, etc.) that it uses. In essence, transactional applications should move from one consistent state to another. However, unlike the other transactional properties (A, I and D) this is something that the transaction system cannot achieve by itself since it does not possess any semantic information about the resources it manipulates: it would be impossible for a transaction processing system to assert that the resources are moving to (or from) consistent states. All a transaction system can ensure is that any state changes that do occur are performed in a manner that is guaranteed despite failures. It is the application programmers’ responsibility to ensure consistency (in whatever way makes sense for the resources concerned.)

Atomicity

We've already covered some of this in previous postings, but in the next four entries let's explicitly look at all ACID properties. And of course that means we start here with ... A is for Apple, Aardvark and Atomic, i.e., the transaction completes successfully (commits) or if it fails (aborts) all of its effects are undone (rolled back).

In order to ensure that a transaction has an atomic outcome, the two-phase commit protocol that we discussed earlier, is typically used. This protocol is used to guarantee consensus between participating members of the transaction. When each participant receives the coordinator’s phase 1 message, they record sufficient information on stable storage to either commit or abort changes made during the action. After returning the phase 1 response, each participant that returned a commit response must remain blocked until it has received the coordinator’s phase 2 message. Until they receive this message, these resources are unavailable for use by other actions. If the coordinator fails before delivery of this message, these resources remain blocked. However, if failed machines eventually recover, crash recovery mechanisms can be employed to unblock the protocol and terminate the transaction.

Tuesday, March 15, 2011

fiddling with knobs

Get your mind out the gutter you. I'm talking about control knobs. Also buttons, switches, levers and anything else that allows for interesting behavioural tweaks.

Messing around with settings on software is a great way to pass the time. For those who enjoy such things there are over 100 configuration properties in JBossTS. Which admittedly may be a little too much of a good thing. Fortunatly they mostly have sensible default values.

I recently discussed the performance benefits of using one of those properties to swap in a new ObjectStore implementation for transaction logging in JBossTS. In the last few days I've been adding some more configuration options to allow overriding of the settings in the underlying HornetQ Journal. It turns out that there are also a bunch of interesting knobs to fiddle with in that code. By judicious tweaking of the settings on the Journal and the file system I've got a nice speedup over the last iteration:

60119 tx/second with in-memory logging, no disk I/O

43661 tx/second with the new Journal on SSD

43k is a big step up from 30-36k or so we were seeing previously, but still not saturating the CPU. However, a new test case shows the disk is not to blame:

89221 logs writes / second.

That's taking most of the transaction manger code out of the picture and writing an equivalent amount of data direct to the Journal based ObjectStore. So, plenty of disk write capacity to spare now.

So why can't we use it all?

The log write done by an individual transaction is very small. To get an efficient batch size you need to aggregate the writes for a significant number of transactions. Whilst waiting for that batch to be written, the transactions are blocked. So, to keep the CPU busy you need to throw more threads / concurrent transactions at it. Infact the disk is so quick you need a LOT more threads. At which point lock contention elsewhere in the transaction manager code become a problem, despite the significant improvements we made to it anyhow for pure in-memory transaction cases.

Naturally it's possible we can get some additional benefit from another iteration of lock refactoring. We've already picked off most of the reasonably tractable cases though and what's left is going to be complex to change.

Another possibility is to actually run the Journal out of process and use IPC so several JVMs can share it. The additional communication overhead may just be worthwhile. It's a similar model to what we do with the SQL db based objectstore, but without the overhead of SQL. Although naturally this multi process architecture is attractive only where the application workload is amenable to partitioning between JVMs.

It may have to wait a while though. We're entering the final weeks of 4.15 development and it's time to start cleaning up all the loose ends and getting things in shape for use by JBossAS 7 GA. Onward.

Sunday, March 13, 2011

Slightly alkaline transactions if you please ...

Given that the traditional ACID transaction model is not appropriate for long running/loosely coupled interactions, let’s pose the question, “what type of model or protocol is appropriate?” The answer to that question is that that no one specific protocol is likely to be sufficient, given the wide range of situations that transactions are likely to be deployed within. In fact trying to shoe-horn ACID transactions into a wide range of situations for which they were never designed is one of the reasons they've gotten such a bad reputation over the years.

There are a number of different extensions to the standard transaction model that have been proposed to address specific application needs, that may not be easily or efficiently addressed through the use of traditional transactions:

Nested transactions: permits a finer control over recovery and concurrency. The outermost transaction of such a hierarchy is referred to as the top-level transaction. The permanence of effect property is only possessed by the top-level transaction, whereas the commits of nested transactions (subtransactions) are provisional upon the commit/abort of an enclosing transaction. And yes, JBossTS supports nested transactions!

Type specific concurrency control: concurrent read/write or write/write operations are permitted on an object from different transactions provided these operations can be shown to be non-interfering. Oh and yes, we support this too!

Independent top-level transactions: with this model it is possible to invoke a top-level transaction from within another (possibly deeply nested) transaction. If the logically enclosing transaction rolls back, this does not lead to the rollback of the independent top-level transaction, which can commit or rollback independently. In the event that the enclosing transaction rolls back, compensation may be required, but this is typically left up to the application. Yes, we've got this one covered as well.

Structured top-level transactions: long-running top-level transactions can be structured as many independent, short-duration top-level transactions. This allows an activity to acquire and use resources for only the required duration. In the event of failures, to obtain transactional semantics for the entire duration may require compensations for forward or backward recovery. Now although we don't support this directly within JBossTS since it is really workflow, JBossTS has been used to implement workflow systems for many years.

What this range of extended transaction models illustrate is that a single model is not sufficient for all applications. Therefore, is it possible to develop a framework within which all of these models can be supported, and also facilitate the development of other models? This was the question asked by the Object Management Group when it began its work on attempting to standardise extended transaction models. In this paper we shall given an overview of the results of the work we performed with IBM, Iona and others in producing the final Activity Service OMG specification that attempts to answer that question. This also became a Jave specification with JSR 95.

Now I'm not going to go into this standard in any detail, since that could be the subject of several other blog posts, but I will summarise it as this: have a generic coordination infrastructure that allows the intelligence behind the protocol (e.g., two-phase or three-phase) as well as the trigger points for the protocol (e.g., at the end of the transaction or during the transaction) to be defined at runtime and in a pluggable manner. But why did I mention all of this? Because this standard formed the basis of the Web Services transactions standard (and the various competing specifications that came before it), with WS-Coordination as the pluggable infrastructure.

At this point in time there are only two types of transaction protocol supported by the Web Services standard and the REST-based approach that we're working on:

• ACID transactions: Web services are for interoperability in closely coupled environments such as corporate intranets as much as they are for the Web. Interoperability between heterogeneous transaction service implementations is a requirement and yet has been difficult to achieve in practice. This transaction model is designed to support interoperability of existing transaction processing systems via Web services, given such systems already form the backbone of enterprise class applications. Although ACID transactions may not be suitable for all Web services, they are most definitely suitable for some, and particularly high-value interactions such as those involved in the finance sector. For example, in a J2EE environment, JTA-to-JTA interoperability is supported through the JTS specification, but this is neither mandated nor universally implemented.

• Forward compensation based transactions: this model is designed for those business interactions that are long in duration, where traditional ACID transactions are inappropriate. With this model, all work performed within the scope of an application should be able to be compensated such that an application’s work is either performed successfully or undone. However, how individual Web services perform their work and ensure it can be undone if compensation is required, is an implementation choice. The model simply defines the triggers for compensation actions and the conditions under which those triggers are executed.

And the nice thing about our implementations of both of these is that they're based on the exact same transaction engine: ArjunaCore. Because it was designed with extensibility in mind and also with the requirement to be able to relax the various ACID properties, over the last decade we've been able to use it relatively untouched as the basis of pretty much every transaction protocol and implementation that's been out there. So if we do decide that we need to add another extended transaction protocol for other use cases, I'm pretty confident that it won't require us to start from scratch!

When ACID is too strong

Transaction processing itself is not by any means a new discipline. Though much innovative and interesting work continues in the field, many of the fundamental techniques and algorithms are well-known and for several decades transactions have been supported on various platforms. However, as Web services have evolved as a means to integrate processes and applications at an inter-enterprise level, traditional transaction semantics and protocols have proven to be inappropriate. Web services or REST-based transactions, colloquially termed Business Transactions, differ from traditional transactions in that they execute over long periods, they require commitments to the transaction to be “negotiated” at runtime, and isolation levels have to be relaxed. Business Transactions require an extended transaction model that builds on existing standards where possible and defines interoperable transaction protocol and message flows that help negotiate transactions guarantees at the inter-enterprise level.

Structuring certain activities from long-running transactions can reduce the amount of concurrency within an application or (in the event of failures) require work to be performed again. For example, there are certain types of application where it is known that resources acquired within a transaction can be released “early”, rather than having to wait until the transaction terminates; in the event of the transaction rolling back, however, certain compensation activities may be necessary to restore the system to a consistent state.

Long-running activities can be structured as many independent, short-duration transactions, to form a “logical” long-running transaction. This structuring allows an activity to acquire and use resources for only the required duration of this long-running activity, as shown.



In the figure, an application activity (shown by the dotted ellipse) has been split into different, coordinated short-duration transactions. Assume that the application activity is concerned with booking a taxi (t1), reserving a table at a restaurant (t2), reserving a seat at the theatre (t3), and then booking a room at a hotel (t4), and so on. If all of these operations were performed as a single transaction then resources acquired during t1 would not be released until the transaction has terminated. If subsequent activities t2, t3 etc. do not require those resources, then they will be needlessly unavailable to other clients.

However, if failures and concurrent access occur during the lifetime of these individual transactional activities then the behavior of the entire “logical long-running transaction” may not possess ACID properties. Therefore, some form of compensation may be required to attempt to return the state of the system to consistency. For example, let us assume that t4 aborts. Further assume that the application can continue to make forward progress, but in order to do so must now undo some state changes made prior to the start of t4 (by t1, t2 or t3). Therefore, new activities are started; tc1 which is a compensation activity that will attempt to undo state changes performed, by say t2, and t3 which will continue the application once tc1 has completed. tc5’ and tc6’ are new activities that continue after compensation, e.g., since it was not possible to reserve the theatre, restaurant and hotel, it is decided to book tickets at the cinema.

Previous transaction processing systems shared a great deal of commonality in terms of the crux of the problem that they address and the abstractions they use to address it. Specifically, transaction processing systems were developed for particular platforms and each system assumes that it is in sole control of the transaction domain and hence does not generally have to interoperate with other transaction processing systems (though interoperability with lower-level components like databases is generally well supported via interfaces like X/Open XA). Early attempts at transaction interoperability (e.g., the Object Transaction Service from the Object Management Group) did not manage to get past the “vendor lock-in” barrier, and attempts at using transactions across enterprise boundaries failed because in such systems transactions are assumed to exhibit ACID properties.

Web services, or REST-based applications, present a different kind of problem: they are specifically about fostering systems interoperability as well as long duration interactions. This presents some interesting problems from a transaction management point of view. What makes Web services so interesting is the fact that the architecture is deliberately not prescriptive about what happens behind service endpoints – Web services are ultimately only concerned with the transfer of structured data between parties, plus any meta-level information to safeguard such transfers (e.g. by encrypting or digitally signing messages) – yet it is behind service endpoints that we find traditional transaction processing architectures supporting business activities.

Thus we are presented with a paradox. The Web services or REST approaches provide a service-oriented, loosely coupled, and potentially asynchronous means of propagating information between parties, whilst in the background we have traditional transaction processing infrastructures whose behavior is neither or mutually interoperable. Furthermore, the fact that transactions in these systems are assumed to exhibit ACID properties potentially leads to problems when exposing resources to third parties, since it presents opportunities to those parties to tie up resources and prevent transactions from making progress. Thus if transactions were to be supported in either of these architectural approaches then it is clear that some re-addressing of the problem is required.

Drum roll ... stay tuned for the next thrilling instalment in the series!

Non-atomic outcome

We've already touched on the concept of heuristic outcomes (where atomicity cannot be guaranteed). But lets delve into this in a bit more detail and consider the types of heuristic that may occur.

So far we've seen that in order to guarantee atomicity, the two-phase commit protocol is necessarily blocking. What this means is that as a result of failures, participants may remain blocked for an indefinite period of time even if failure recovery mechanisms exist. Some applications and participants simply cannot tolerate this blocking. To break this blocking nature, participants that have got past the prepare phase are allowed to make autonomous decisions as to whether they commit or rollback: such a participant must record this decision in case it is eventually contacted to complete the original transaction. If the coordinator eventually informs the participant of the transaction outcome and it is the same as the choice the participant made, then there’s no problem. However, if it is contrary, then a non-atomic outcome has obviously happened: a heuristic outcome. How this heuristic outcome is reported to the application and resolved is usually the domain of complex, manually driven system administration tools, since in order to attempt an automatic resolution requires semantic information about the nature of participants involved in the transactions.

Precisely when a participant makes a heuristic decision is obviously implementation dependant. Likewise, the choice the participant makes (to commit or to roll back) will depend upon the implementation and possibly the application/environment in which it finds itself. The possible heuristic outcomes are:

• Heuristic rollback: the commit operation failed because some or all of the participants unilaterally rolled back the transaction.
• Heuristic commit: an attempted rollback operation failed because all of the participants unilaterally committed. This may happen if, for example, the coordinator was able to successfully prepare the transaction but then decided to roll it back (e.g., it could not update its log) but in the meanwhile the participants decided to commit.
• Heuristic mixed: some updates (participants) were committed while others were rolled back.
• Heuristic hazard: the disposition of some of the updates is unknown. For those which are known, they have either all been committed or all rolled back.

Heuristic decisions should be used with care and only in exceptional circumstances since there is the possibility that the decision will differ from that determined by the transaction service and will thus lead to a loss of integrity in the system. Having to perform resolution of heuristics is something you should try to avoid, either by working with services/participants that don’t cause heuristics, or by using a transaction service that provides assistance in the resolution process.

Optimisations to the transaction protocol

There are several variants to the standard two-phase commit protocol that are worth knowing about because they can have an impact on performance and failure recovery. We shall briefly describe those that are the most common variants on the protocol:

• Presumed abort: if a transaction is going to roll back then it may simply record this information locally and tell all enlisted participants. Failure to contact a participant has no affect on the transaction outcome; the transaction is effectively informing participants as a courtesy. Once all participants have been contacted the information about the transaction can be removed. If a subsequent request for the status of the transaction occurs there will be no information available and the requestor can assume that the transaction has aborted (rolled back). This optimization has the benefit that no information about participants need be made persistent until the transaction has decided to commit (i.e., progressed to the end of the prepare phase), since any failure prior to this point will be assumed to be an abort of the transaction.

• One-phase: if there is only a single participant involved in the transaction, the coordinator need not drive it through the prepare phase. Thus, the participant will simply be told to commit and the coordinator need not record information about the decision since the outcome of the transaction is solely down to the participant.

• Read-only: when a participant is asked to prepare, it can indicate to the coordinator that no information or data that it controls has been modified during the transaction. Such a participant does not need to be informed about the outcome of the transaction since the fate of the participant has no affect on the transaction. As such, a read-only participant can be omitted from the second phase of the commit protocol.

So these optimisations let us get around some concerns that two-phase commit, particularly in a distributed environment, is overkill. Most modern transaction manager implementations will support all of these and when you consider that many transactional applications use only a single participant, or don't modify state, then you can see how the above considerations can dramatically reduce (or even remove) any overhead that may appear to be there at first glance. We'll go into this a bit more in a subsequent posting. But next up for consideration is what happens when we can't guarantee transactional semantics?

Synchronizations: what are they and why do you need them?

As well as the two-phase commit protocol, traditional transaction processing systems employ an additional protocol, often referred to as the synchronization protocol. If you recall the original ACID properties, then you’ll remember that Durability is important in the case where state changes have to be available despite failures. What this means is that applications interact with a persistence store of some kind (e.g., a database) and this can impose a significant overhead – disk access is orders of magnitude slower than access to main computer memory.

One apparently obvious solution to this problem would be to cache the state in main memory and only operate on that for the duration of a transaction. Unfortunately you’d then need some way of being able to flush the state back to the persistent store before the transaction terminates, or risk losing the full ACID properties. This is what the synchronization protocol does, with Synchronization participants.

Synchronizations are informed that a transaction is about to commit, so they can, for example, flush cached state, which may be being used to improve performance of an application, to a durable representation prior to the transaction committing. They are then informed when the transaction has completed and in what state it completed.

Synchronizations essentially turn the two-phase commit protocol into a four-phase protocol:

• Before the transaction starts the two-phase commit, all registered Synchronizations are informed. Any failure at this point will cause the transaction to roll back.
• The coordinator then conducts the normal two-phase commit protocol.
• Once the transaction has terminated, all registered Synchronizations are informed. However, this is a courtesy invocation because any failures at this stage are ignored: the transaction has terminated so there’s nothing to affect.

Unlike the two-phase commit protocol, the synchronization protocol does not have the same failure requirements. For example, Synchronization participants don’t need to make sure they can recover in the event of failures; this is because any failure before the two-phase commit protocol completes means the transaction will roll back, and failures after it has completed can’t affect the data the Synchronization participants were managing.

So we've now looked at ACID and Synchronizations. What about optimisations to the protocol when we don't need two-phase commit? Stay tuned.

Transactions 101

When I was at QCon the other day I was asked a number of questions around transactions that made me realise that I really need to take my JavaOne presentation and turn it into some blog entries as I promised last year. So over the next indeterminate amount of time (hey, I'm busy with things like JUDCon too), we'll take a tour through transactions and try to dispel some of the myths that surround them.

So let's start here with some of the basics. And you don't get more basic than defining what we mean by transaction. Put simply, a transaction provides an “all-or-nothing” (atomic) property to work that is conducted within its scope, whilst at the same time ensuring that shared resources are isolated from concurrent users. Importantly application programmers typically only have to start and end a transaction; all of the complex work necessary to provide the transaction’s properties is hidden by the transaction system, leaving the programmer free to concentrate on the more functional aspects of the application at hand.

Let’s take a look at just how a transaction system could help in a real-world application environment. Consider the case of an on-line cinema reservation system. The cinema has many seats that can be reserved individually, and the state of a seat is either RESERVED or UNRESERVED. The cinema service exports two operations, reserveSeat and unreserveSeat (we’ll ignore the other operations that are obviously required to make this service truly usable). Finally we’ll assume that there is a transaction manager service that will be used to manage any transactions that the cinema may require in order to process the user’s requests.

Let’s consider a very simple example: imagine that Mr. Doe wants to reserve a block of seats for his family (1A, 1B and 1C). Now, the service only allows a single seat to be reserved through the reserveSeat operation, so this will require Mr. Doe to call it 3 times, once for each seat. Unfortunately the reservation process may be affected by failures of software or hardware that could affect the overall consistency of the system in a number of ways. For example, if a failure occurs after reserving 1A, then obviously none of the other seats will have been reserved. Mr. Doe can try to complete the reservation when (assuming) the cinema service eventually recovers, but by this time someone else may have reserved the seats.

What Mr. Doe really wants is the ability to reserve multiple seats as an atomic (indivisible) block. This means that despite failures and concurrent access, either all of the seats Mr. Doe requires will be reserved for him, or none will. At first glance this may seem like a fairly straightforward thing to achieve, but it actually requires a lot of effort to ensure that these requirements can be guaranteed. Fortunately atomic transactions possess the following (ACID) properties that make them suitable for this kind of scenario:

• Atomicity: The transaction completes successfully (commits) or if it fails (aborts) all of its effects are undone (rolled back).
• Consistency: Transactions produce consistent results and preserve application specific invariants.
• Isolation: Intermediate states produced while a transaction is executing are not visible to others. Furthermore transactions appear to execute serially, even if they are actually executed concurrently.
• Durability: The effects of a committed transaction are never lost (except by a catastrophic failure).

A transaction can be terminated in two ways: committed or aborted (rolled back). When a transaction is committed, all changes made within it are made durable (forced on to stable storage, e.g., disk). When a transaction is aborted, all of the changes are undone. Atomic transactions can also be nested, and in which case the effects of a nested action are provisional upon the commit/abort of the outermost (top-level) atomic transaction.

Associated with every transaction is a coordinator, which is responsible for governing the outcome of the transaction. The coordinator may be implemented as a separate service or may be co-located with the user for improved performance. It communicates with enlisted participants to inform them of the desired termination requirements, i.e., whether they should accept (commit) or reject (rollback) the work done within the scope of the given transaction. For example, whether to purchase the (provisionally reserved) flight tickets for the user or to release them. A transaction manager factory is typically responsible for managing coordinators for many transactions. The initiator of the transaction (e.g., the client) communicates with a transaction manager and asks it to start a new transaction and associate a coordinator with the transaction.

Traditional transaction systems use a two-phase protocol to achieve atomicity between participants, (a three-phase protocol may also be supported, but it rarely is these days): during the first (preparation) phase, an individual participant must make durable any state changes that occurred during the scope of the transaction, such that these changes can either be rolled back or committed later once the transaction outcome has been determined. Assuming no failures occurred during the first phase, in the second (commitment) phase participants may “overwrite” the original state with the state made durable during the first phase.

In order to guarantee consensus, two-phase commit is necessarily a blocking protocol: after returning the first phase response, each participant who returned a commit response must remain blocked until it has received the coordinator’s phase 2 message. Until they receive this message, any resources used by the participant are unavailable for use by other transactions, since to do so may result in non-ACID behavior. If the coordinator fails before delivery of the second phase message these resources remain blocked until it recovers.

As we’ve mentioned, transactions are required to provide fault tolerance. What this means is that information about running transactions (often referred to as in-flight transactions) and the participants involved must survive failures and be accessible during recovery. This information (the transaction log) is held in some durable state-store. Typically the transaction log is scanned to determine whether there are transactions mentioned in it that require recovery to be performed. If there are, then the information within the log is used to recreate the transaction and the recovery subsystem will then continue to complete the transaction.

Failures aren’t restricted to just the transaction coordinator. Therefore, participants must retain sufficient information in durable store so that they too can be recovered in the event of a failure. What information is recorded will obviously depend upon the participant implementation.

OK, so that's ACID in a nutshell. Next time we'll move on to take a look at the protocol that may run either side of two-phase commit: synchronizations.

Saturday, March 12, 2011

Large scale transactions (ACID, BASE and CAP)

I've been working with transactions for quite a while and in the area of large-scale (numbers of participants, physical distance) since the original work on the Additional Structuring Mechanisms for the OTS (aka Activity Service). However, it wasn't until Web Services transactions, BTP, WS-CAF and WS-TX that theory started to get put into practice. We first started to talk about relaxing the ACID properties back with the CORBA Activity Service, but it was with the initial submissions to BTP that things started to be made more explicit and directly relevant.

Within the specifications/standards and associated papers or presentations, we made statements along the lines that isolation should be a back-end issue for services or the transaction model (remembering that one-size does not fit all). The notions of global consistency and global atomicity were relaxed by all of the standards. For instance, sometimes it is necessary to commit some participants in a transaction and roll back others (similar to what nested transactions would give us). Likewise, globally consistent updates and a globally consistent view of the transaction outcome have to be relaxed as you scale up and out.

Now I didn't find this as much of a leap of faith as some others, but I think that's because when I was doing my PhD I spent a lot of time working with weak consistency replication protocols. There's always been a close relationship between transactions and replication. Traditional replica consistency protocols are strongly consistent: all of the replicas are kept identical and this is fine for closely coupled groups, but it doesn't scale. Therefore, weak consistency replication protocols evolved in the 1980's and 1990s, where the states of replicas are allowed to diverge, either forever or for a defined period of time (see gossip protocols for some background). You trade of consistency for performance and availability. For many kinds of applications, this works really well.

It turns out that the same is true for transactions: in fact, it's necessary in Web Services if you want to glue together disparate services and domains, some of which may not be using the same transaction implementation behind the service boundary. I still think the best specification to illustrate this relaxation of the various properties is WS-BusinessProcess, part of WS-TransactionManager (OASIS WS-CAF). Although Eric and I came up with the original concept, we were never able to sell it to our co-authors on WS-TX (so far). I think one of our failings was to not write enough papers, articles or blogs about the benefits it offered and the practicalities it fit. However, every time I explained it to people in the field it was such an easy sell for them to understand how it fit into the Web Services world so much better than other approaches. (The original idea behind WS-BP came from some of the RESTful transactions work we did in HP, where it was code-named the JFDI-transaction implementation.)

I still find it a pleasant surprise that although our co-authors from Microsoft on WS-TX didn't get the reasons behind WS-BP, other friends and colleagues such as Pat Helland started to write about the necessity to relax transactionality. I like Pat's use of relativity to explain some of the problems. However, when I had to come and talk about what we'd been doing in the world of transactions for the past decade I thought Heisenberg's Uncertainty Principle was perhaps slightly better: you can either know the state that all participants will have, but not when; or vice versa.

Transactions and TorqueBox

About a year ago Bob McWhirter and I were talking about TorqueBox and the subject of transactions came up. Bob and team are doing a great job of exposing the enterprise capabilities that Java developers take for granted to the Ruby community, so once they'd added messaging I think transactions were the next obvious step. Since I was looking for an opportunity to improve my Ruby skills and Bob was looking to add transactions, I think it was definitely a Casablanca moment.

We scoped out the requirements for transactions, both short term and long, using JBossTS within JBossAS. Short term was pretty simple: just adding the ability to start and end transactions, with the assumption that the underlying components written in Java, such as a messaging system, would use any transaction context associated with the thread to "do the right thing" as far as transactional resources are concerned. So really this is no more than adding a Ruby client interface to something like UserTransaction in the JTA. Useful, but limiting. That work was done and should hopefully be coming to a TorqueBox release near you soon.

Longer term we want to give the Ruby developer full access to a toolkit to write transactional applications from the ground up. That means writing transactions objects and participants, not just being able to start or stop transactions. And because the Ruby community isn't as tied to the legacy of XA, we don't need to mandate XAResources, top-level transactions only or two-phase commit. Now this is where it really gets interesting. I'm hoping to use a lot of the underlying ArjunaCore implementation (and possibly the STM implementation too, which reminds me that I need to finish the blog entries around that soon). I want to provide something that is incredibly simple and yet also incredibly powerful if you want to really get into the details.

I like the way that the TorqueBox team have architected their system and I like the opportunities that it and Ruby in general provides. Transaction support should be something that fits into that ethos and I think that having a relatively clean slate, which we don't really have with Java, is a good thing. It's almost back to the old days when we were writing Arjuna in C++, which at that time hadn't even been released officially! Happy days ahead!

Heuristics and why you need to know they can happen!

Imagine you walk into a bank and want to perform a transaction (banks are very useful things in transaction examples). That transaction involves you transferring money from one account (savings) to another (current. You obviously want this to happen with some kind of guarantee, so for the sake of this example let's assume we use an ACID transaction.

To ensure atomicity between multiple participants, a two phase commit mechanism is required: during the first (preparation) phase, an individual participant must make durable any state changes that occurred during the scope of the atomic transaction, such that these changes can either be rolled back (undone) or committed later once consensus to the transaction outcome has been determined amongst all participants, i.e., any original state must not be lost at this point as the atomic transaction could still roll back. Assuming no failures occurred during the first phase (in which case all participants will be forced to undo their changes), in the second (commitment) phase participants may “overwrite” the original state with the state made durable during the first phase.

In order to guarantee atomicity, the two-phase protocol is necessarily blocking. If the coordinator fails, for example, any prepared participants must remain in their prepared state until they hear the transaction outcome from the coordinator. All commercial transaction systems incorporate a failure recovery component that ensures transactions that were active when a failure occurred are eventually completed. However, in order for recovery to occur, machines and processes obviously need to recover! In addition, even if recovery does happen, the time it takes can be arbitrarily long.

So, in our bank example, despite the fact that we're using transactions and assuming that the transaction system is reliable, certain failures will always occur, given enough time and probabilities. The kinds of failure were interested in for this example are those that occur after the participants in the two-phase commit transaction have said they will do the work requested of them (transfer the money) i.e., during the second (commit) phase. So, the money has been moved out of the current account (it's really gone) and is being added to the savings account, when the disk hosting the savings account dies, as shown in the diagram. Usually what this means is that we have a non-atomic outcome, or a heuristic outcome: the transaction coordinator has said commit, one participant (savings account) has said “done”, but the second one (current account) has said “oops!”. There's no going back with the work the current account participant has done, so this transaction isn't going to be atomic (all or nothing).

Imagine that this error happens and you don't know about it! Or at least don't know about it until the next time you check your account. Not good. Personally I'd like to know if there's been an error as soon as possible. In our bank scenario, I can go and talk to someone in the branch. If I was doing this via the internet there's usually a number I can call to talk to someone.

But fortunately most enterprise transaction specifications, such as the OMG’s Object Transaction Service and the X/Open XA specification, and implementations such as JBossTS allow for heuristics to occur. This basically means that the transaction system can be informed (and hence can inform) that such an error has happened. There's not a lot that can be done automatically to fix these types of error. They often require semantic information about the application in order to restore consistency, so have to be handled by a system administrator. However, the important thing is that someone knows there's been a problem.

REST, Cloud and transactions

REST has grown in popularity recently for a variety of reasons. Developers are attracted to the simplicity of the interfaces created. Since HTTP is such a ubiquitous protocol, developers get lightweight interoperability out of the box because most languages and platforms support both client and server interactions with their built-in HTTP support. REST also provides developers with a strong set of architectural guidelines and constraints. As developers explore these techniques, they are finding that their distributed interfaces become more decoupled, usable, and maintainable over time.

It is true that the Web and REST have progressed well without transactions. However, we believe that there are circumstances and particular applications where the use of transactions, or at least atomicity, would be beneficial. As we have evangelized REST, we have found that a frequent question is: how can application developers leverage transactions? As the movement to the Cloud (public or private) gathers momentum, these same questions arise more and more, either when cloud-based applications need to interact with clients or services outside of the Cloud or within the Cloud, where HTTP is often the only guaranteed means of communication.

These questions are often the result of having tried to do without transactions initially and found the resulting systems inadequate. Sometimes those users have come from backgrounds such as Java Enterprise Edition, where they expect such capabilities and have architected for them. Of course it could be that some of these applications were designed inappropriately and the apparent need for transactions would disappear through a careful redesign. However, this cannot account for all of these use cases.

However, you might ask: Why not use Web Services transactions? After all, WS-Transactions defines atomic and compensation based models and has demonstrated interoperability between all of the major transactions vendors. So the obvious question is why not simply use WS-Transactions? There are several reasons for this:

• The typical Web Services stack is often too large and complex. By leveraging HTTP as both a rich protocol and message format we can reduce the footprint at both the client and the server.
• The HTTP protocol already has a rich vocabulary. For instance, we use Links to convey to clients different ways in which they can interact with the transaction manager.

So how do we go about creating a RESTful transaction protocol? Understanding state and how it relates to transactions has influenced our approach to the REST transaction protocol. We have tried to ensure that the protocol embraces HATEOAS principles rather than just using HTTP as a means of conveying message protocols. For instance, if we consider the two-phase commit protocol, one way of instructing a participant to prepare and commit would be through the use of multiple URIs, such as /participant/prepare and /participant/commit, where the root of the URI (/participant) is the actual participant resource on which the protocol is ultimately operating and whose state is ultimately being changed as a result. A POST request on these URIs could then be used to trigger the relevant operation.

However, we took a different approach; one which is intimately tied to state management and which we believe is more in the HATEOAS approach. Rather than define a URI per operation, our protocol requires a single URI for each participant (as well as coordinator) and the invoker (e.g., the coordinator) requests that the participant change its state to the relevant value via PUT, e.g., to prepare a participant the coordinator would PUT the status Prepare to the URI.

As mentioned previously, working with HTTP gives us a lot of flexibility to address transactional and distributed system faults without having to resort to ad hoc approaches. For instance, attempting to DELETE any transaction will return a 403 response code. Requesting that a transaction commit may return either 200 or 202 HTTP codes (OK and Accepted, respectively). In the latter case the Location header should contain a URI upon which a GET may be performed to obtain the asynchronous transaction outcome later. If the operation fails, e.g., because a participant cannot be committed, then the protocol requires that implementations return the 409 code, i.e., Conflict. If the participant is not in the correct state for the requested operation, e.g., it receives a Prepare when it has been already been prepared, then HTTP gives us another convenient code to return: 412, i.e., Precondition Failed.

But just as with WS-Transactions, we believe that ACID semantics and specifically atomic outcomes, are not appropriate for all applications. The use of ACID transactions within a REST application can break REST principles. Fortunately we also believe that there is a solution in what are commonly referred to as extended transactions. If the transaction needs to undo then it instructs the services to perform some compensation work. This means that in a loosely coupled environment there is no retaining of locks or provisional state change for long durations. We have been working on a compensation transaction model for REST based on Sagas. At this stage the compensation protocol is still under development but the goal is to provide something that is not only a good REST citizen but also does not turn a RESTful application that uses it into one that cannot claim to be RESTful.

Friday, March 11, 2011

Uncaging Cassandra

The ongoing debate between proponents of locking and multi-version concurrency control (MVCC) is almost as heated as that between supporters of vi and emacs. There is nothing fundamentally wrong with doing concurrency control in your data store using multiple reader, single writer locking. Except that multi-version concurrency control is often better.

In an MVCC system with ongoing transactions there may be several copies of a given item of data, representing its state at different points in time. A consistent read over multiple data items can be obtained by using the version of each item that most recently pre-dates the start of the transaction. In other words, the read simply ignores updates from others that occur after its transaction began. Using this system, readers never block each other, nor do they block writers.

Compared to locking, an MVCC approach places very little extra latency on reads. Instead of adding distributed communication to manage a global lock, there is just some extra data managed in the read - multiple copies of the value, from which one is selected for use according to the versioning information. Writes are a bit more involved, as they must preserve the old value and timestamp before updating the new one. So, the reduced read cost comes at the price of more storage space and slightly more activity on writes. Which, as it happens, is exactly the design philosophy on which Cassandra is built...

In most systems built using MVCC, the versioning is entirely server side and not exposed to the client. We don't currently have that luxury with Cassandra, but it does have some features we can use to layer MVCC on top of the existing storage engine. Since it's a schemaless environment, we can simply inline the versions as additional Columns in the existing row, rather than requiring distinct storage. A get becomes a get_slice and the client uses the timestamps to decide which value to use. Of course we may need to consider a more involved approach to storage if we're sharing the store with clients that are not MVCC aware, but that kind of problem is true of external locking solutions too. With the new expiring columns support in 0.7 we can also avoid doing explicit garbage collection of older values - just set the expiry to an interval longer than we expect any transaction to take.

Of course we can't use MVCC as a complete replacement for locking on Cassandra, as there is no CAS support. We're stuck with locking on write to avoid lost updates. Nevertheless, the mixed approach has got some interesting possibilities to explore. Now I just need a few extra hours in each day so that I have time to actually try it...

Thursday, March 10, 2011

NoSQL != NoTx

The extent of support for transaction concepts in NoSQL systems is almost as diverse as the systems themselves. Some have not only full support for 'native' i.e. local transactions, but also support distributed transactions using XA. Others barely support CAS or durable write, much less any higher level constructs. This is not necessarily a bad thing, but does make life 'interesting' for those of us trying to build consistent abstractions over various systems, or to use them as components in the composition of large, reliable systems.

My current preference in NoSQL is Cassandra, which tends towards the 'no support for transactions' end of the spectrum and therefore presents a more interesting challenge than systems that come with XA already baked in.

Cassandra is architected as a BASE rather than an ACID system. In CAP terms it favours availability and partition tolerance over consistency. That's not to say there is no hope of consistency, just that you have to make some compromises if you want it.

There is durability (but don't forget to switch CommitLogSync from periodic to batch), and atomicity in so far as operations on a single row key are atomic. Batch mutates are non-atomic and there is no locking primitive so you can't do much in the way of consistency over multiple API calls.

Non of which is necessarily a problem if you're aware of it and writing your app from the ground up with those constraints in mind. In which case, lucky you. The rest of us get to endure some degree of pain, particularly when migrating, either mentally or physically, from a SQL environment.

There are several ongoing efforts to put a JPA like abstraction on top of Cassandra, thus making it readily accessible to the hordes of JPA trained Java EE programmers out there. Thus far these focus mainly on the data mapping and indexing considerations for query support, whilst avoiding or deferring the tricky transactional bits. That's not to say there is no appreciation of the looming problem, just that it seems nobody has stepped up and tackled it seriously yet.

In order to build an even semi-credible OGM fa├žade over Cassandra or some other approximately key-value oriented store, we need support for certain transactional characteristics, of which repeatable read is perhaps the most fundamental.

The expectation of repeatable read is that if I retrieve the value for a given key twice in the same transaction, I expect it to have the same value, regardless of other users manipulating the database in the meanwhile. If I perform some form of search or index based query multiple times I expect the result set to be consistent within the transaction. If I update a value and then retrieve it, I expect to see the result of the update within the transaction even before I make a commit.

In an RDBMS it's the database engine's job to deal with this and all of them do, using mechanisms from locking to MVCC. All the ORM layer has to do is delegate the problem to the store. In a NoSQL world where the store has no such support and no plans to add any, the client has to do some additional work.

For simple key lookups the client can cache the result locally and use that copy to serve subsequent reads in the same transaction, effectively doing its own MVCC. Most ORMs do this already for performance, although they'll flush modifications and use the relational db engine to perform queries rather than compute the result over the cached copies, which avoids reimplementing large chunks of a db in the ORM but is not feasible when there is no support for it in the store layer.

The client cache approach is fine for small tx, but becomes a problem when the read size is large. Although they are arguably an anti-pattern, tx on large sets can be supported by using a backing store, perhaps the NoSQL engine itself, to store the cache copies when they overspill the available client RAM. However, the client needs some way of isolating these private copies and for rewriting queries to use them.

Alternatively, it can use an external lock manager to perform read/write locking on keys to prevent them being changed for the duration of the transaction. However, this only works where all access to the stores is going via clients that are lockmanager aware. In siloed deployments where a single app exclusively owns the data that is not unreasonable. In enterprise deployments where a single store is shared by multiple apps, each perhaps using a different client library, potentially in a different language, it gets a bit hairy.

The Cassandra community's best effort in this direction thus far is Cages, written by Dominic Williams. Cages is a Java client solution that uses ZooKeeper as a distributed lock manager. It's not a general purpose solution though, as it assumes the complete list of locks required in known in advance. That's fine for apps written to Cassandra's native usage model, where the assumption is that you've laid out your data based on knowing exactly what queries you're going to be performing.

Where you are trying to use Cassandra to support arbitrary JPA-QL style queries it's going to fall short though. The transaction systems in traditional databases allow for gradual acquisition of locks on demand over the course of multiple operations in the transaction and JPA or any other interface that supports ad-hoc querying is going to assume that support. The locking model used by Cages also assumes relatively low contention and is likely to scale poorly compared to an MVCC based solution. Fair enough - it's written by an end user to scratch their own itch, so it's simpler and less powerful than middleware written for the general case.

Whilst a good locking system allows for the provision of several transactional characteristics, it's not the only piece we need to provide full ACID behaviour. In particular, it's not going to help us with durable atomic updates over multiple keys. Nor does it address the issues of making updates of the NoSQL store atomic with respect to actions in other systems the app may be using, such as a message queue or relational databases. There is an expectation that such components support XA, allowing them to participate in distributed transactions coordinated by a JTA.

We're still a long way from being able to use Cassandra as a full member of the transactional resource ecosystem and it's going to be an interesting journey to get there, even assuming we should try. Sounds like it could be a fun trip for those who like adventures though.

SQL != NoSQL

The new menagerie of NoSQL databases do not have the same characteristics as traditional SQL databases, much less each other. Which is kind of the point of using them in the first place.

So why are so many otherwise sane and smart people doing their best to ignore this fact? And why is the middleware community pandering to their delusions?

There is a dangerously seductive attraction in the idea that you can improve an existing system just by swapping out some component implementation for another using the same interface. A java.util.Map may abstract a Hashtable, HashMap, ConcurrentHashMap or distributed, replicated in-memory data grid like Infinispan. Choose wisely and your system works better. Choose poorly and things unravel pretty fast. It is necessary to look beyond the interface to the underlying implementation and understand the details in order to know how to drive the interface in optimal, or even correct, manner.

The JPA may abstract not only one of several ORMs, but one of an even larger number of relational database engines behind them. It may even be implemented using an OGM backed by a key-value store such as Infinispan or Cassandra. The ability to reuse existing JPA code or programming skills when migrating from relational to non-relational storage is attractive to both developers and management. The middleware community responds to this user demand with solutions like Kundera and Hibernate OGM, which developers lap up in ever increasing numbers. Unfortunately they often do this with an inadequate understanding of the underlying implementation details.

As middleware developers we are guilty of doing too good a job of abstracting away the underlying implementation detail. Many users willingly buy into this delusion, being all too keen to believe we can magically shield them from having to understand those details.

There are two approaches to dealing with this problem: Improve the abstraction so it becomes less important to understand the implementation details, and provide material to help the users understand those details in cases where they must.

These tasks are going to occupy a big chunk of my time in the future, as I shift attention towards providing transaction management capability for the new generation of cloud environments, where data is managed and manipulated in both SQL and NoSQL stores. Interesting times ahead I think.

Wednesday, March 9, 2011

Is it turned On?

Question #0 on any tech troubleshooting checklist is, as you well know, 'Is it plugged in?'. This is followed closely by question #1, 'Is it turned On?'. Sometimes I have to relearn this the hard way.

Those joining the party late will have missed yesterday's exciting episode, at the end of which the intrepid hero is left scratching his head at the failure of his Shiny New SSD to outperform his clunky HDD. Let's move the incredibly riveting plot forward a bit with some hot command line action scenes:

$ dd if=/dev/zero of=/tmp/ssd/foo bs=4k count=1000000 oflag=direct

32.9 MB/s

This sucks. Let's swap in the noop scheduler, downgrade the journalled ext3 to ext2 and change the mount to noatime:

$ dd if=/dev/zero of=/tmp/ssd/foo bs=4k count=1000000 oflag=direct

35.0 MB/s

Better, but still sucking.

$ dd if=/dev/zero of=/tmp/ssd/foo bs=8k count=500000 oflag=direct

58.2 MB/s

$ dd if=/dev/zero of=/tmp/ssd/foo bs=16k count=250000 oflag=direct


87.6 MB/s

ok, so the performance is a direct factor of the block size. Ramp the block size up high enough and we saturate the drive at over 200MB/s. But what is limiting the number of blocks we can throw at the device? The hardware spec rates it at 50000 x 4k IOPS, which would be 195MB/s . Let's throw a few more processor cores at the problem just for the hell of it:

$ dd if=/dev/zero of=/tmp/ssd/foo1 bs=4k count=1000000 oflag=direct &

$ dd if=/dev/zero of=/tmp/ssd/foo2 bs=4k count=1000000 oflag=direct &


$ dd if=/dev/zero of=/tmp/ssd/foo3 bs=4k count=1000000 oflag=direct &


$ dd if=/dev/zero of=/tmp/ssd/foo4 bs=4k count=1000000 oflag=direct &


10.5 MB/s


10.5 MB/s


10.5 MB/s


10.5 MB/s


Well, 42 > 35, but nowhere near a linear speedup. Something is fishy here. SATA should do NCQ, which would allow all four of those processes (actually up to 32) to have outstanding requests, so we should be soaking up a lot more of that lovely bandwidth.

Unless...

$ lsmod | grep libata
libata 209361 1 piix


umm, oops.

The Intel ICH10R on our P6X58D-E is running in running in legacy IDE mode, because someone didn't check the BIOS settings carefully enough when building the machine. Not that I have any clue who that may have been. No, Sir, not at all.

Ahem. Let's reboot shall we...

$ lsmod | grep libata
libata 209361 1 ahci


Right, that's better. Off we go again:

$ dd if=/dev/zero of=/tmp/ssd/foo bs=4k count=1000000 oflag=direct
76.8 MB/s


Double the speed. Not too bad for five minutes work, even if it did require walking all the way down the hall to the machine room.

$ dd if=/dev/zero of=/tmp/ssd/foo1 bs=4k count=1000000 oflag=direct &

$ dd if=/dev/zero of=/tmp/ssd/foo2 bs=4k count=1000000 oflag=direct &

34.5 MB/s

34.5 MB/s

Huh?

With libata now correctly driving the SSD with all its features supported, those concurrent processes should be getting 70+MB/s each, not sharing it. Grrr.

Oh well, let's see how the transaction system is doing shall we. It's writing a single file at a time anyhow. Since we were already running at over 36k tx/s against a theoretical max of 55k we can't expect a 2x speedup the raw dd numbers would suggest, but we should see some improvement...

30116 tx/second.

Some days it's just not worth getting out of bed.

Change request to Clebert: make the block size in the Journal a config option.

Tuesday, March 8, 2011

hardware p0rn

Like most geeks I adore shiny new tech toys. I recently built two machines to host additional virtualized slave nodes for our hudson cluster. Thanks to the drop in RAM prices I had enough budget to stuff them full (i7 based workstations, so triple channel RAM on a 6 slot board topping out at 24GB). There was even enough left over for an SSD. yay. So today I fired one up and dumped my infamous XA transaction microbenchmark onto it.

The SSD totally failed to outperform the HDD.

So I checked the disk mount points, ran some diagnostics with dd and was still left scratching my head. The test code is basically write only, that being the nature of the beast when working with transaction recovery logs. Fortunately I'd opted for an SSD with the Sandforce controller, which provides for much more symmetric read/write performance compared to some of the competition. The manufacturer specs are 285MB/s read and 275MB/s write. Let's give it a try:

$dd if=/dev/zero of=/tmp/ssd/foo bs=1M count=4000 oflag=direct
4000+0 records in
4000+0 records out
4194304000 bytes (4.2 GB) copied, 19.0998 seconds, 220 MB/s

ok, close enough. The HDD does not do so well:

$dd if=/dev/zero of=/tmp/hdd/foo bs=1M count=4000 oflag=direct
4000+0 records in
4000+0 records out
4194304000 bytes (4.2 GB) copied, 30.5214 seconds, 137 MB/s

So if I can push around 2x the data onto it, I should be able to log 2x the number of transactions per second, right? cool.

Umm, No.

25106 tx/second to HDD log.

24996 tx/second to SSD log.

hmmm.

A little bit of fiddling around with the ObjectStore provides another useful data point: a single transaction log record for this test case is a little over 600 bytes - basically two Xids (one for each XAResource) plus transaction id and overhead.
So, 25k tx/s * 600 bytes = 15MB/s give or take. Less that 1/10th of what the SSD should handle.

We know the benchmark will run at over 50k tx/s with an in-memory store, so we're definitely bottlenecked on the I/O somewhere, but it's not write bandwidth. With a conventional HDD I'd put my money on the number of physical writes (syncs/forces) a drive head can handle. High performance log stores are designed to do contiguous append to a single file to eliminate head seek latency, so it's something to do with the number of events the device can handle in series. Let's use the filesystem's native block size:

dd if=/dev/zero of=/tmp/hdd/foo bs=4k count=1000000 oflag=direct
1000000+0 records in
1000000+0 records out
4096000000 bytes (4.1 GB) copied, 139.289 seconds, 29.4 MB/s

dd if=/dev/zero of=/tmp/sdd/foo bs=4k count=1000000 oflag=direct
1000000+0 records in
1000000+0 records out
4096000000 bytes (4.1 GB) copied, 158.286 seconds, 25.9 MB/s

and there we have it - the block management is killing us.

Looks like I need to spend some time down in the native code and kernel to understand the I/O paths at work here. My feeling is that SSD technology just invalidated a lot of design assumptions that were probably a bit outdated already. SSDs don't do head seeks. The unit of parallelism is the memory bank, not the drive head. A log store designed for SSDs should probably stripe writes over multiple files for lock concurrency and not care about contiguous blocks. Unless the SSD drive firmware is written to assume and optimise for legacy HDD usage patterns. Not that a modern HDD actually does so much in the way of truly contiguous writes anyhow - there is an awful lot of abstraction between the filesystem blocks and the physical layout these days.

Even setting aside the redesign for SSD optimization it's clear there is still room for improvement here. On the HDD we should be able to trade transaction latency against batch size to reduce the number of write events and drive up the overall throughput until we saturate the drive bandwidth. On the SSD I'm not so sure we can actually saturate a properly tuned drive on this machine - the CPU can run 50k tx/s but that's only ~30MB/s of log data. Naturally a server class machine is going to have more CPU power without necessarily increasing the number of drives, but if we can retain a ratio of say one SSD to between 8-16 cores then we should be firmly in CPU bound territory. Welcome to the new era. Your design assumptions are now invalid and your transaction manager needs redesigning to be more CPU efficient. Have a nice day.

More Speed!

"You don't have to outrun the bear, you just have to outrun the guy next to you."

Nevertheless it's always a good idea to run as fast as possible - there is no telling when the guy next to you may put on a sudden burst of acceleration.

So we did some more performance improvements on JBossTS.

For transactions that involve multiple XAResources, the protocol overhead is divided between network round trips to communicate between the TM and RMs, and disks writes to create a recovery log. There are a limited number of tricks that can be done with the network I/O (issuing the prepares in parallel rather than series springs to mind), but the disk I/O is another matter.

JBossTS writes its log through an abstract storage API called the ObjectStore. There are several ObjectStore implementations, including one that uses a relational database. Most however are filesystem based. The default store is an old, reliable piece of code that has worked well for years, but recent hardware evolution has led us to question some of the design decisions.

For years now multicore chips meant the number of concurrent threads in a typical production app server deployment rising and the processor capability in general outstripping the disk I/O capability. Relatively recently SSD have rebalanced things a bit, but we still see a large number of threads (transactions) contending for relatively limited I/O capability.

Time to break out the profiler again...

For total reliability a transaction log write must be forced to disk before the transaction can proceed. No in-memory fs block buffering by the O/S, thank you. This limits the ability of the O/S and disk to batch and reorder writes for efficiency. Essentially there are a fixed number of syncs() a drive can perform per second and for small data like tx logs it's not bottlenecked on the I/O bandwidth.

The design of the current default store is such that it syncs once for each transaction. This becomes a problem when the number of transactions your CPUs and RMs can handle exceeds the number of syncs your disk array can handle. Which is pretty much what's happened. So, back to the drawing board.

Clearly what we need is some way for multiple transaction log writes to be batched into a single sync. Which in turn means putting the log records in a single file, with all the packing and garbage collection problems that entails. Then you have the thread management problems, making sure individual writes blocks until the shared sync is done. That's a lot of fairly complex code and testing. Nightmare.

So we found someone who has already done it and borrowed their implementation. Got to love this open source thing :-)

The new ObjectStore implementation is based on Clebert's brilliant Journal code from HornetQ. The same bit of code that makes persistent messaging in HornetQ so staggeringly quick.

The Journal API is not a perfect for for what we want, but nothing that can't be fixed with another layer of abstraction. One adaptor layer later and we're ready to run another microbenchmark.

First let us get a baseline by using an in-memory ObjectStore (basically a ConcurrentHashMap). Useless for production use, but helpful to establish the runtime needed to execute a basic transaction with two dummy resources and all the log serialization overhead but no actual disk write.

53843 tx/second using 400 threads.

ok, that will do for starters - thanks to lock reduction and other performance optimizations made earlier we're pretty much saturating the quad core i7 CPU. We could probably improve matters a bit by tweaking the ConcurrentHashMap lock striping, but let's move on and swap in the default ObjectStore:

1650 tx/second using 100 threads.

iowait through the roof, CPU mostly idle. dear oh dear. Look on the bright side: that means we've got a lot of scope for improvement.

Adding more threads just causes more scheduling and locking overhead - we're bottlenecked on the number of disk syncs the 2x HDD RAID-1 can handle.

Let's wheel out the secret weapon and plug in it...

20306 tx/second at 400 threads.

yup, you read that right. Don't get too excited though - you won't see that kind of performance improvement in production. We're running an empty transaction with dummy resources - no business logic and no RM communication overhead. Still, pretty sweet huh? Better buy Clebert a beer next time you see him. And one for me too of course.

And if you thought that was good...

Linux has a nifty little library for doing efficient asynchronous I/O operations. If you are running the One True Operating System and you don't mind polluting your Pure Java with a little bit of native code you can drop a .so file into LD_LIBRARY_PATH and leave the other guy to be eaten by that bear:

36491 tx/second at 400 threads.

Coming Soon to a JBossTS release near you. Enjoy.

Must Go Faster

Transaction management systems, like much other middleware, embody a very simple tradeoff: they make life easier for developers at the cost of a runtime overhead. Maximising the ease of use and minimising the overhead are things we spend a fair bit of time thinking about.

Conventional wisdom in the transaction community is that the runtime cost of a classic ACID transaction is dominated by the disk writing needed to create a log for crash recovery purposes. As with much folklore there is some truth in this, but it's not the whole story. For starters, there are some use cases where we don't need to write a log at all.

In transactions that have only one resource it's easy to optimise the 2PC protocol to a 1PC and avoid much of the overhead. But here is the snag: the design of the transaction API in Java EE does not allow for developers to communicate metadata about number of resource managers expected in the transaction ahead of time. In some cases it's not actually known, as it may depend on runtime data values. However, it's disappointing that you still need some parts of the XA protocol overhead even where the transaction is known at design time to be local (native) to a single RM.

xaresource1.start();

xaresource1.end();

xaresource1.commit();


is still three network rounds trips and one disk sync. That's a huge improvement on the eight trips and three syncs needed for a 2PC, but it's nevertheless more than the single trip and sync in the native case with

connection.commit();

Fortunately there is a workaround: define both XA and non-XA ('local-tx' in JBossAS -ds.xml terminology) datasources in the app server and use the local one wherever you know the transaction won't involve other RMs.

Perhaps one day we'll have a less clunky solution - maybe being able to define a single datasource as supporting both XA and non-XA cases, then annotating transactional methods with e.g. @OneResource or @MultiResource to tell the JCA and TM how to manage the connection. Or even being able to escalate an RM local tx to an XA one on demand rather than having to chose in advance, although that would need RM support as well as changes to the XA protocol and Java APIs. Dream On.

Even where it's running with 1PC optimization for a single RM, the transaction manager still provides benefits over the native connection based transaction management. The most critical is the ability to handle certain lifecycle events, in particular beforeCompletion(), a notification phase that allows in-memory caches such as an ORM session to be flushed to stable store and its companion afterCompletion() which allows for resource cleanup. The TM's ability to manage transaction timeouts is also important to prevent poorly written operations from locking up resources for a protracted period.

As with writing logs for 2PC recovery, the management of timeouts is one of those activities we have to do every time, even though it turns out to be required in only a tiny minority of cases. Efficiently managing the rollback of transactions that have exceeded their allotted lifetime is a seemingly trivial overhead compared to the log write and as a result the code for it received little attention until fairly recently. This is where the folklore came to bite us: conventional wisdom dictated that we should focus the performance tuning effort on the I/O paths and not worry too much about functions that just operated on in-memory structures.

WRONG.

For the reasons outlines above, in a typical app server workload there are an awful lot of transactions containing just a single resource and hence not doing a log write. D'Oh. For those use cases the overhead of the in-memory activity in the TM is actually significant. So we sat down, wrote a highly concurrent 1PC microbenchmark test scenario, put it though a profiler, shuddered and went down the pub.

When we'd recovered a bit we tuned the transaction reaper, a background process responsible for timing out transactions. By deferring much of the work as long as possible it turns out to be possible to skip it entirely for many transactions. A short lived tx does not always need to be inserted into the time ordered reaper queue - it may terminate normally long before the reaper needs to take any action. By being lazy we saved a lot of list sorting and the associated locking overhead.

As a result the recent JBossTS releases substantially outperform their predecessors in the single resource case, particularly when scaling to a large number of threads. Upgrade and Enjoy.

Next time: More Speed! Very fast I/O for 2PC logging.