This is an exploration of a generic way to offload data from a transactional data store into alternative storage like S3 by using the transactional system for the metadata storage and the upstream system as the object store for the larger bulk data. The core concept is to have the simplicity and durability of S3 for bulk storage but to do it simply such that data can be offloaded from records as it becomes clear that the data set size is too large or that an individual field is prohibitive.
The core concept is that the fields in the metadata store will contain reference URIs where the reference is a protocol, a path, additional arguments, and a field reference to be able to pull a bulk object back and get the necessary information out of it. We propose investigating lance file format for random access or parquet if only fetching selected fields in bulk. Performance profiling will reveal which format performs better on a given workload.
Examples:
Record = {
PrimaryKey: int
Field_b: int
Field_c: encrypted_blob
Field_d: varchar(128MB)
}
To that
Record = {
PrimaryKey: 1
Field_b: int
Field_c: s3://path/to/records/1/blob?field=Field_c
Field_d: s3://path/to/records/1/blob?field=Field_d
}
Alternatively we could choose how to group/store records on remote such that we group by a common sharding key
To that
Record = {
PrimaryKey: 1
ShardKey: 2
Field_c: s3://path/to/records_by_shard_key/2/blob?k=1,field=Field_c
Field_d: s3://path/to/records_by_shard_key/2/blob?k=1,field=Field_d
}
This optimization works well if we almost always query by the ShardKey instead of the primary key but due to the transactional data access layer we can perform pk direct lookups regardless. This mechanism is more optimal for batching datasize onto s3 storage for block size, but requires complexity of the remote file format (lance or parquet).
Read pathway for point lookup:
- Fetch transactional record
- Determine record(s) and field(s) to fetch and deduplicate/coalesce them
- Consider a short local caching mechanism
- Use the reference values to extract from the blob
- Hydrate struct and pass record back to caller
Batch read lookups:
- Fetch the full set of transactional records
- Build a bulk query to fetch many ids at once that follow same shape
- Via pagination fetch and hydrate that data into transactional record
Key design considerations are:
- Ensuring data consistency across two data stores
- Propagating data data deletion through the multiple datastores
- Performance and durability in disaster recovery implementation
Design
Field reference paths are a generic specification and follow the URI structure
URI (Uniform Resource Identifier) RFC 3986
Eg: http://domain.com/path?args=one#fragment
We leave the problem space open to readers but expect the following are useful reference types:
s3://path/to/record?k=1,field=Field_c
mysql://cluster-reference/database/table?k=1,field=Field_c
redis://cluster-reference/database/namespace?k=1,field=Field_c
Migration
To migrate fields out to the secondary store:
- Fetch record from txn store
- Persist record to object store (possibly in bulk via a shard key batching)
- Compare and Set the original object and if successful update the respective fields with their
refvalue
Future opportunities
Encode magic bit at start of ref?
We could encode fr for field reference as the start of every field string as a means to signal the content type more generically than trying to parse for each type of protocol.
fr:s3://path/to….
Query params
Query params are flexible metadata but we recommend the following fields if relevant: K (key) for point lookup F (field) for field lookup inside record and optional if remote field name matches local
Compact Reference Format
Compact forms of the field ref URI make sense on large datasets.
Store the verbose form once in application layer / config servers and instead use the compact form:
fr:c:slug?k=1,f=c
Versioning
Do versioned objects and use that as a way to guarantee transactionality across multiple data stores. We version the object stored on the backend in S3 via a unique content hash each time or a literal version number.
s3://path/to/records_by_shard_key/2/sha256hashOfContent?pk=1,field=Field_d
OR
s3://path/to/records_by_shard_key/2/v1?pk=1,field=Field_d
If internal records change in the multi-record blob, we would need to go change many refs in the internal storage txn layer… but they would be correct until updated.
Hedged Reads
Store multiple paths in s3
s3://path/1/to/records_by_shard_key/2/sha256hashOfContent?pk=1,field=Field_d|s3://path/2/to/records_by_shard_key/2/sha256hashOfContent?pk=1,field=Field_d
Allows for using multiple buckets or express zones (availability)
Allows for hedged reads: dual reads to improve tail latency
Build out an erasure coding backend
Ceph rados as k/v store or Ceph rados RGW for S3 interface to fit rest of design at scale that it’s worth self-hosting