RESTBase/StorageDesign
Requirements
editCurrent (low latency access)
edit- Storage of current revisions (most up to date render of most current revision);
- Resilient in the face of non-linearized writes; Precedence defined by revision ID and render time-stamp, not write time
- Storage of past revisions for a TTL period (at least) after they have been superseded by something newer (aka recent)
- Storage of arbitrarily old revisions (on request), for a TTL period (at least), from the time of the request (aka historical)[1]
- 50p read latencies of 5ms, 99p of <100ms
Archive
edit- Read latencies on the order of 10x that of current.
Recency
editOne of the requirements is for a window of recently superseded values; Values must be preserved for a predefined period after they have been replaced by something newer. This sliding window of recent history is needed to support application concurrency (see: MVCC Multiversion concurrency control ). An example use-case is Visual Editor: A user begins an edit by retrieving the HTML of the most recent revision of a document, Rn. While they are working on their edit, another user commits a change, making the most recent revision Rn+1. The first user then subsequently attempts to commit their change, requiring the Parsoid meta-data for Rn, despite it having now been superseded by Rn+1.
An open question remains regarding the latency requirements for recent data. For example: Should access by comparable to that of current? Is the aggregate of current, and a secondary lookup of comparable latency acceptable (2x current)? Is the aggregate of current and a secondary lookup of archive storage acceptable (current + (10x current))?
Retention policies using application-enforced TTLs
editThis approach uses a schema identical to that of the current storage model, one that utilizes wide rows to model a one-to-many relationship between a title and its revisions, and a one-to-many relationship between each revision and its corresponding renders. It differs only in how it approaches retention.
-- Strawman Cassandra schema
CREATE TABLE current_and_recent (
"_domain" text,
title text,
rev int,
tid timeuuid,
value blob,
PRIMARY KEY (("_domain", title), rev, tid)
) WITH CLUSTERING ORDER BY (rev DESC, tid DESC);
-- Querying for...
-- Latest
SELECT rev, tid, value FROM current_and_recent WHERE "_domain" = ? AND title = ? LIMIT 1;
-- Latest render for a specific revision
SELECT tid, value FROM current_and_recent WHERE "_domain" = ? AND title = ? AND rev = ? LIMIT 1;
-- Specific render
SELECT value FROM current_and_recent WHERE "_domain" = ? AND title = ? AND rev = ? AND tid = ?;
Retention
editCulling of obsolete data is accomplished using range deletes. For example:
-- Storing a new revision, a DELETE to clean up older revisions can be batched with the INSERT. The
-- 'rev' predicate is one that was superseded/replaced at TTL seconds ago.
BEGIN BATCH
INSERT INTO current_and_recent ("_domain", title, rev, tid, value) VALUES (?, ?, ?, ?);
DELETE FROM current_and_recent WHERE "_domain" = ? AND title = ? AND rev <= ?;
APPLY BATCH;
-- Storing a new render (extant revision), a DELETE to clean up older renders can be batched with the
-- INSERT, (the 'tid' predicate is synthesized using a timestamp derived from the time a matching tid
-- was replaced by a superseding one).
BEGIN BATCH
INSERT INTO current_and_recent ("_domain", title, rev, tid, value) VALUES (?, ?, ?, ?);
DELETE FROM current_and_recent WHERE "_domain" = ? AND title = ? AND rev = ? AND tid <= ?;
APPLY BATCH;
In order to obtain the predicates used in these range deletes, revisions and renders must be indexed by a timestamp that represents when they were superseded, or replaced, by a newer revision and/or render respectively.
Records in the index tables are a compound key of the domain and title. Updates can be performed probabilistically, if necessary. TTLs can be applied to prevent unbounded partition growth.
CREATE TABLE revision_timeline (
"domain" text,
title text,
ts timestamp,
rev int,
PRIMARY KEY(("domain", title), ts)
) WITH CLUSTERING ORDER BY (rev DESC)
AND compaction = {'class': 'TimeWindowCompactionStrategy'}
AND default_time_to_live = 2592000; -- 30 days(?)
CREATE TABLE render_timeline (
"domain" text,
title text,
ts timestamp,
rev int,
tid timeuuid,
PRIMARY KEY(("domain", title), rev, ts)
) WITH CLUSTERING ORDER BY (rev DESC)
AND compaction = {'class': 'TimeWindowCompactionStrategy'}
AND default_time_to_live = 2592000; -- 30 days(?)
-- Sample queries:
-- Find a revision that corresponds to a timestamp outside of the retention period
SELECT ts, rev FROM revision_timeline WHERE "domain" = ? AND title = ? AND ts < ? LIMIT 1;
-- Find a render that corresponds to a timestamp outside of the retention period
SELECT ts, tid FROM render_timeline WHERE "domain" = ? AND title = ? AND rev = ? AND ts < ? LIMIT 1;
-- When storing a new revision, a DELETE to clean up older revisions can be batched with the INSERT, (the
-- 'rev' predicate comes from the SELECT above).
BEGIN BATCH
INSERT INTO current_and_recent ("_domain", title, rev, tid, value) VALUES (?, ?, ?, ?);
DELETE FROM current_and_recent WHERE "_domain" = ? AND title = ? AND rev <= ?;
APPLY BATCH;
DELETE
s of revisions or renders can be (should be) applied probabilisticallyIndex/Time-line storage
editOnly a single index for each (revisions and renders) is needed for all logical tables (e.g. parsoid html, data, and section offsets, mobileapps, etc), so to eliminate update duplication, these indices are separately maintained by change-propagation.
Properties
editTODO: Suss out properties of both indices (rates, size, etc)
The distribution of edit frequencies across Wikimedia projects is quite extreme, ranging from approximately 1/day, to nearly 200K/day. Without sampling, the lowest edit frequencies are sufficient to manage retentions of not less than 24 hours efficiently. The highest frequencies (again, without sampling) could place an unnecessary burden on storage in exchange or a resolution that vastly exceeds what is needed. Sampling applied to all time-line updates to correct for high frequency edits would render indexing of domains with lower edit frequencies less efficient. Ideally, rate-limiting by domain can be employed to sample writes from the few high edit frequency projects without effecting those with lower edit frequencies.
wiki | edit frequency | 30d retention | 10d retention |
---|---|---|---|
mkwikimedia | 1.3/day (0.000015046/s) | 39 | 13 |
cebwiki | 29578/day (0.34/s) | 887340 | 295780 |
commonswiki | 151434/day (1.75/s) | 4543020 | 1514340 |
enwiki | 186497/day (2.16/s) | 5594910 | 1864970 |
Implementation
editThe examples above demonstrate storage of the time-line in Cassandra, but there is no requirement to do so. Redis for example, would likely prove adequate for this use case. For example, the contents of the index/time-line need not be perfectly durable, a catastrophic loss of all entries would merely delay the culling of past entries, and only for a period equal to that of the retention configured. The index can be replicated to remote data-centers, but this is not a requirement, it could for example be independently generated in each without impacting correctness.
Secondary Storage
editThis proposal in its native form does not address the 4th requirement (storage of arbitrarily older revisions for a TTL from the time of request/generation). Ultimately, it may be possible to address this requirement by falling back to a lookup of archival storage, but as that is a longer term goal, a near-term solution for this is needed.
Option: Use pre-existing tables
editExisting RESTBase key-rev-value tables are utilized as secondary storage. Updates to these tables are disabled, and revision retention policies are not used. When a request against current_and_recent
results in a miss, these existing storage tables are consulted. On a secondary storage miss, the content is generated in-line and persisted where it will live in perpetuity (or until the completion of archival storage replaces this as an intermediate solution). Since current_and_recent
misses are presumed to be exceptional, the amount of accumulated data should be manageable.
Option: Dedicated table with default TTL
editAn additional table of similar schema is used, this table utilizes a default Cassandra TTL.
-- Strawman Cassandra schema
CREATE TABLE historical (
"_domain" text,
title text,
rev int,
tid timeuuid,
value blob,
PRIMARY KEY (("_domain", title), rev, tid)
) WITH CLUSTERING ORDER BY (rev DESC, tid DESC)
AND default_time_to_live = 86400;
When a request against current_and_recent
results in a miss, secondary storage is consulted. If a secondary storage request results in a miss, the content is generated in-line and persisted where it will live for a period of (at least) default_time_to_live
seconds.
secondary
table can be removed after archival storage is in place.Writes (updates)
edit- Append updated value to the
current_and_recent
table - If the update created a new revision, query the
revision_timeline
table for a revision that was superseded TTL seconds or longer from the current time - Otherwise, if the update created a new render for an existing revision, query the
render_timeline
table for a render that was superseded TTL seconds or longer from the current time - Perform a range delete of either revisions or renders, using the information obtained in #2 or #3 above
NOTE: Steps 2 through 4 can be performed asynchronously from step 1; Failure to perform the range delete does not affect correctness
Reads
edit- The
current_and_recent
table is consulted - The
historical
table is consulted - On a miss, the content is generated in-line and written to the
historical
table.
Pros
edit- Easy to implement (low code delta)
- Least risk; Inherits correctness from current (well tested) implementation
- Minimal read / write amplification
- Low latency access to values in recency window (identical to that of current)
Cons
edit- Creates a hard-dependency on Cassandra 3.x (needed to create range tombstones using inequality operators)
- Requires the indexing of revisions and renders by the time they were superseded by newer values
- Corner case: A fully qualified lookup against
current_and_recent
is a hit, but is subsequently removed by policy in less than TTL from the time of the request. In other words, a request that corresponds with requirement #4, but is incidentally recent enough to be found incurrent_and_recent
at the time of the request. - Unclear how this will generalize, and which changes to the table storage interface would be needed.
Ideas for generalizing the storage module
editCurrently we have 3 layers of abstraction: application level, bucket level (provides a set generic key-value, key-rev-value interfaces to put/get/list content) and table level (abstracts away the particular storage technology and provides generic API to describe table schemas, including retention policies, put/get and list operations). The proposal is to keep the 3-layer approach, but shuffle the responsibilities of each layer. Application layer responsibilities are unchanged.
The table layer would loose the retention policies and secondary indexing support, but instead it will expand the API to support deletes (including range deletes) and batching the statements together. Example is below. This is pseudocode just to illustrate the idea, please don't take that literally.
table_module.batch([
table_module.put({ table: 'parsoid.html', data: some_data }),
table_module.put({ table: 'parsoid.data_parsoid', data: some_data }),
table_module.put({ table: 'parsoid.retention_index', data: some_data }),
table_module.delete({ table: 'parsoid.html', condition: some_condition }),
table_module.delete({ table: 'parsoid.data_parsoid', condition: some_condition })
]);
The responsibilities of the bucket value would expand and now include creating proper tables and doing retention.
For Parsoid, there will be a special bucket, parsoid_bucket.
It will expose parsed-specific endpoints to store and retrieve content while internally it will create all the necessary tables, maintain indexes and manage range deletes. The parsed bucket is, obviously, not useful in any other use-case, because of the specific requirements we have for the parsoid storage.
Along with it we can design other types of buckets, useful in general, or just replicate the functionality of the key_value and key_rev_value buckets with certain retention policies. For example, for mobile apps, we would could use the key_rev_value bucket with `latest_hash` policy, or even design a special `mobile_bucket` if we want to batch together the lead and remaining section update.
Two-table: Range delete-maintained latest, TTL-maintained history
editThis approach uses two tables with schema identical to that of the current storage model, utilizing wide rows to model a one-to-many relationship between a title and its revisions, and a one-to-many relationship between each revision and its corresponding renders. It differs though in how it approaches retention.
-- Strawman Cassandra schema
CREATE TABLE current (
"_domain" text,
title text,
rev int,
tid timeuuid,
value blob,
PRIMARY KEY (("_domain", title), rev, tid)
) WITH CLUSTERING ORDER BY (rev DESC, tid DESC);
CREATE TABLE recent_and_historical (
"_domain" text,
title text,
rev int,
tid timeuuid,
value blob,
PRIMARY KEY (("_domain", title), rev, tid)
) WITH CLUSTERING ORDER BY (rev DESC, tid DESC)
AND default_time_to_live = 864000;
The first of the two tables (current
) uses (probabilistic) deletes of all previous revisions and/or renders on update in order to maintain a view of current versions. The second table (recent_and_historical
) uses Cassandra TTLs to automatically expire records and stores recently superseded values, along with any historical values that had to be generated in-line.
Writes (updates)
edit- Read the latest render from the
current
table - Write the value read above to the
recent_and_historical
table - Write the updated render to the
current
table - Write the updated render to the
recent_and_historical
table[2] - Apply any range deletes for previous renders of the revision, (and for previous revisions if the
latest_hash
policy is used)
Reads
edit- The
current
table is consulted - On a miss, the
recent_and_historical
table is consulted - On a hit, the TTL may be refreshed if there is insufficient time remaining[3]
- On a miss, the content is generated in-line and written to both
current
andrecent_and_historical
Longer-term alternatives to the recent_and_historical
table
edit
Unlike the application-enforced TTL proposal above, this proposal's use of a TTL table doubles as storage of both recently superseded data, and historical records that were (re)generated in-line (requirements #3 and #4 above respectively). Nevertheless, it has been suggested that, once in place, archival storage could be used in place of this table. This would have the advantage of reducing transactional complexity, eliminating the need for the extra read and writes on update. It has the disadvantage of higher read latency for all requests in the recent window. Additionally, it creates the requirement that archival storage contain the full history in a window of recency according to the semantics dictated by requirement #3.
Pros
edit- Avoids the need to index revisions and renders by the time of their replacement
Cons
edit- Transactional complexity (read → 3x write → range delete)
- Read latency; Misses on
current
seem unexceptional, and make latency within the recency window the aggregate of two requests, instead of one- Aggregate of current and archive (current + (10 x current)) if archive is used as fall-back for recent history instead of a dedicated TTL table
- Corner case: A fully qualified lookup against
current
is a hit despite the values copied torecent_and_historical
having since expired. After the successful read, and a probabilistically applied range delete removes the record. The likelihood of this happening can be reduced by increasing the range delete probability (at the expense of generating more tombstones, obviously). The possibility of this occurring can not be entirely eliminated if range delete probability is < 1.0. - If archive is used as a fall-back for values in the recency window, this pushes an additional requirement on archive (namely that it maintain the window of recency)
Table-per-query
edit
This approach materializes views of results using distinct tables, each corresponding to a query.
Queries
edit- The most current render of the most current revision (table:
current
) - The most current render of a specific revision (table:
by_rev
) - A specific render of a specific revision (table:
by_tid
)
Algorithm
editData in the current
table must be durable, but the contents of by_rev
and by_tid
can be ephemeral (should be, to prevent unbounded growth), lasting only for a time-to-live after the corresponding value in current
has been superseded by something more recent. There are three ways of accomplishing this:
a) idempotent writes; write-through to all tables on update
b) copying the values on a read from current
, or
c) copying them on update, prior to replacing a value in current
.
With non-VE use-cases, copy-on-read is problematic due to the write-amplification it creates (think: HTML dumps). Additionally, in order to fulfill the VE contract, the copy must be done in-line to ensure the values are there for the forthcoming save, introducing additional transaction complexity, and latency. Copy-on-update over-commits by default, copying from current
for every new render, regardless of the probability it will be edited, but happens asynchronously without impacting user requests, and can be done reliably. This proposal uses the copy-on-update approach.
Update logic pseudo-code:
# Read current latest values from 'current' table
latest_html = read('html', 'current')
latest_data_parsoid = read('data-parsoid', 'current')
latest_section_offsets = read('section_offsets', 'current')
# section_offsets ~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Copy the current latest, and new/proposed latest to the TTL-managed tables
write(new_latest_section_offsets, 'section_offsets', 'by_tid')
write(new_latest_section_offsets, 'section_offsets', 'by_rev')
write(latest_section_offsets, 'section_offsets', 'by_tid')
write(latest_section_offsets, 'section_offsets', 'by_rev')
# Write the new value to the current table last
write(new_latest_section_offsets, 'section_offsets', 'current')
# data-parsoid ~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Copy the current latest, and new/proposed latest to the TTL-managed tables
write(new_latest_data_parsoid, 'data-parsoid', 'by_tid')
write(new_latest_data_parsoid, 'data-parsoid', 'by_rev')
write(latest_data_parsoid, 'data-parsoid', 'by_tid')
write(latest_data_parsoid, 'data-parsoid', 'by_rev')
# Write the new value to the current table last
write(new_latest_data_parsoid, 'data-parsoid', 'current')
# html ~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Copy the current latest, and new/proposed latest to the TTL-managed tables
write(new_latest_html, 'html', 'by_tid')
write(new_latest_html, 'html', 'by_rev')
write(latest_html, 'html', 'by_tid')
write(latest_html, 'html', 'by_rev')
# Write the new value to the current table last
write(new_latest_html, 'html', 'current')
Option A
edit
Precedence is first by revision, then by render; The current
table must always return the latest render for the latest revision, even in the face of out-of-order writes. This presents a challenge for a table modeled as strictly key-value, since Cassandra is last write wins. As a work around, this option proposes to use a constant for write-time, effectively disabling the database's in-built conflict resolution. Since Cassandra falls back to a lexical comparison of values when encountering identical timestamps, a binary value encoded first with the revision, and then with a type-1 UUID is used to satisfy precedence requirements.
-- Strawman Cassandra schema
-- value is binary encoded; rev (as 32-bit big-endian), tid (as 128-bit type-1 UUID), and content
CREATE TABLE current (
"_domain" text,
title text,
value blob,
PRIMARY KEY ("_domain", title)
);
CREATE TABLE by_rev (
"_domain" text,
title text,
rev int,
tid timeuuid,
value blob,
PRIMARY KEY (("_domain", title, rev))
);
CREATE TABLE by_tid (
"_domain" text,
title text,
rev int,
tid timeuuid,
value blob,
PRIMARY KEY (("_domain", title, rev, tid))
);
Option B
editIdentical to the A proposal above, except for how the current
table is implemented; In this approach, current
is modeled as "wide rows", utilizing a revision-based clustering key. For any given rev
, re-renders result in the tid
and value
attributes being overwritten each time. To prevent unbounded grow of revisions, range deletes are batched with the INSERT
.
Strawman Cassandra schema:
CREATE TABLE current (
"_domain" text,
title text,
rev int,
tid timeuuid,
value blob,
PRIMARY KEY (("_domain", title), rev)
);
-- Same as Option 1a above
CREATE TABLE by_rev (
"_domain" text,
title text,
rev int,
tid timeuuid,
value blob,
PRIMARY KEY (("_domain", title, rev))
);
CREATE TABLE by_tid (
"_domain" text,
title text,
rev int,
tid timeuuid,
value blob,
PRIMARY KEY (("_domain", title, rev, tid))
);
Example: Batched INSERT+DELETE
BEGIN BATCH
INSERT INTO current ("_domain", title, rev, tid, value) VALUES (?, ?, 10000, ?, ?);
DELETE FROM current WHERE "_domain" = ? AND title = ? AND rev < 10000;
APPLY BATCH
Note:
DELETE
when creating a new revision; Re-renders can be preformed with an unbatched/standalone INSERT
DELETE
s can be done probabilistically; Allowing a small number of revisions to accumulate to limit write amplification is likely a worthwhile optimization.Pros
edit- Expiration using Cassandra TTL mechanism
Cons
edit- Write amplification (4 additional writes for the copy scheme, 2 for the write-through)
- Read-on-write (for copy schemes)
- Race conditions (copy schemes)
- Semantics of write-through scheme do not result in expiration after a value has been superseded (the clock on expiration starts at the time of the update)
Option A
edit- Breaks
DELETE
semantics (without timestamps tombstones do not have precedence)
- Defeats a read optimization designed to exclude SSTables from reads (optimization relies on timestamps)
- Defeats a compaction optimization meant to eliminate overlaps for tombstone GC (optimization relies on timestamps)
- Is an abuse of the tie-breaker mechanism
- Lexical value comparison only meant as a fall-back for something considered a rare occurrence (coincidentally identical timestamps)
- Lexical value comparison is not part of the contract, could change in the future without warning (has changed in the past without warning)
- Cassandra semantics are explicitly last write wins; This pattern is a violation of intended use/best-practice, and is isolating in nature
Option B
edit- Introduces a dependency on Cassandra 3.x (option B only)
Notes
editCassandra 3.x
editAt the time of this writing, the production cluster is running Cassandra 2.2.6, so any of the solutions above that rely on features(s) in Cassandra 3.x call this out as a drawback. However, there are compelling reasons to move to Cassandra 3.x beyond just the capability that enable the proposals cited above:
- Proper support for JBOD configurations (CASSANDRA-6696) allows us to solve the blast radius that having a single large RAID-0 creates
- A side-effect of how CASSANDRA-6696 was implemented enables us to partition the compaction workload, improving key locality, and reducing read latency
- Changes to how row indexing is handled drastically reduce partition overhead on the heap, making wider partitions possible
- Storage in versions >= Cassandra 3.0.0 are more compact on disk (often more compact without compression than older versions with).
Comparison: Operation counts
editApp-enforced TTL w/ Parsoid-specific abstraction | App-enforced TTL w/o Parsoid-specific abstraction | Two-table | |
---|---|---|---|
Reads | prob(1 read, 1.0) → prob(1 read, ?.??)[4]
1 to satisfy requests for current and recent, 1 additional on miss to satisfy historical |
prob(1 read, 1.0) → prob(1 read, ?.??)[4]
1 to satisfy requests for current and recent, 1 additional on miss to satisfy historical |
prob(1 read, 1.0) → prob(1 read, ?.??)[4]
1 to satisfy requests for current, 1 additional on miss to satisfy recent and historical. |
Writes (updates) | 2 writes (batched) → prob(1 read → 1 delete, 0.10)
1 write to current and 1 write to an index (indexing values are small and can be batched). For some fraction of all requests, this is followed by one read from an index, and a subsequent range delete to current. |
2 writes (batched) → 1 read → 1 delete
1 write to current and 1 write to an index (indexing values are small and can be batched). This is followed by one read from an index, and a subsequent range delete to current. |
1 read → 3 writes (concurrent) → 1 delete.
1 read from current, followed by 3 writes, one to current and 2 to recent_and_historical, followed by 1 range delete to current. The 3 writes are for values that are likely to large to consider batching, but could be performed concurrently. |
Decision
editAfter much deliberation, a decision to move forward with the option entitled Revision Policies Using Application-enforced TTLs (aka "app-enforced TTLs"), over the option entitled Two-table: Range delete-maintained latest, TTL-maintained history (or "two-table"), was made. A high-level summary of the reasoning follows:
Algorithmically, this option is the most iterative; From the read/write perspective, it inherits the correctness properties of the current system. It differs only in how retention is managed (the aspect of the current system that is intractable). Like the current system, retention can be decoupled and opportunistic; A failure in applying a delete for retention does not effect correctness (it only delays the culling of obsolete data), nor does it need fail the entire request.
When compared to the two-table approach, app-enforced TTLs has less transactional complexity, requiring fewer reads to satisfy requests, fewer writes for updates, and less complexity in the sequencing of operations, to satisfy correctness.
The primary point of contention here related to generalization. For use-cases with multiple, referential tables, the app-enforced TTL approach requires deletes of affected tables to be performed in batches. This violates encapsulation when using the current interfaces. However, during discussion, consensus was that use-cases with tables needing referential integrity would be better served by a different abstraction anyway, one that can encapsulate the batching and/or sequencing of interdependent writes, and where retention can act up on the sum of affected tables.
Footnotes
edit- ↑ This is technically not current; A stop-gap until archival storage is in place
- ↑ This is necessary to avoid a race condition between concurrent updates resulting in lost writes
- ↑ Refreshing simply writes the data in the
historical
table again, potentially with a newTID
- ↑ 4.0 4.1 4.2 It is understood that by volume, most requests are for current, followed by recent, with historical accesses deemed "exceptional", but we do not have numbers for the distribution of requests.