Implementation of Invalidation with GRPC
Summary
We can build a cache invalidation mechanism using GRPC streaming as a messaging layer to notify standby nodes of pending writes. Standby nodes can detect when the read is propagated by sending along a storage index after the corresponding write. When the subsequent storage index passes on their local database index, the invalidation mechanism can fire as expected. Because this is built at an application layer, rather than at the database level, we can introduce other distribution topologies in the future.
Notably, standby nodes which do not have GRPC connectivity to the primary (and only have database-level indirect connectivity) cannot become read-enabled.
Problem Statement
The original PostgreSQL operationalization RFC noted that OpenBao's requirements for PostgreSQL use may cause issues for read replication in the future. While designing and implementing read replication, we noted that cache invalidation was the core design challenge, and drafted a dedicated RFC on invalidation, noting some of the challenges with a backend-agnostic implementation at the time.
Notably, we had a few restrictions on PostgreSQL use:
- While PostgreSQL has a WAL, this is only available on the primary node;
applications cannot listen to WAL events if they're connected to secondary
nodes and thus need to consume a replication slot on the primary.
LISTEN/NOTIFYhave the same challenge.
- It is up to the application to manage replication slots; if a replication slot is not consumed (but reserved), it will continue to hold back WALs, causing resource usage. As OpenBao with PostgreSQL-based HA is more ephemeral than Raft and supports auto-scaling, it would be ideal if we can avoid long-lived resource management.
- PostgreSQL's default WAL level,
replica, is not sufficient to enable the feature as discussed in the earlier RFC.
This lead to our initial RFC proposing a table-based design, which has many of the same shortcomings, though allowing the active OpenBao node to track standbys and cleanup the WAL table.
Ultimately though, as long as cache invalidation occurs somehow, we should be able to combine future header-based retry mechanisms with any form of eventual consistency to achieve read scalability. Additional mechanisms can be added in the future as well if necessary.
We wish to tie into the existing storage invalidation mechanism as efficiently as possible; this means mirroring the data that would've otherwise been present in a Raft log in this new invalidation mechanism.
User-facing Description
This feature will allow PostgreSQL backend users to achieve similar read scalability as Raft backends.
Technical Description
In order to implement GRPC-based invalidation, we'll want to introduce three new objects:
- A new storage layer,
GRPCInvalidation, which tracks writes on the active node and notifies Core which write occurred. - A standby tracking layer,
core.connectedInvalidationPeers, which keeps track of invalidation streaming requests by the standby nodes. - The invalidation index tracking layer,
core.standbyAwaitedInvalidations, which dispatches invalidations on the standby once the index from the active node has been seen on the local database instance.
These pieces are tied together by the existing forwarding GRPC layer; in the
future this layer should probably be renamed as a result of its expanding
scope. Notably, this mechanism is strictly standby->active; this
necessitates using a streaming GRPC endpoint rather than having the active
node simply push invalidations down. This complicates the design slightly,
though perhaps less than the effort that establishing an active->standby
mechanism would be.
GRPCInvalidation
This layer is a physical.Backend which can be layered on top of the
underlying physical.Backend to track all writes. This should be the closest
layer the real storage layer and thus sit below physical.Cache. On an
error-free write, we'll call the Core's selected GRPC-based invalidation
notification function:
func (g *grpcInvalidator) Put(ctx context.Context, entry *Entry) error {
err := g.backend.Put(ctx, entry)
if err == nil {
g.logger.Error("notifying of write", "key", entry.Key)
g.notifyWrite(entry.Key)
}
return err
}
The same will hold true for Delete(...); in a transaction, all writes will
be batched until Commit(...) is called for the first time.
Core will provide its hook to this storage layer, which internally will tie to the connected invalidation peers.
connectedInvalidationPeers
This member on Core is used on the active node and tracks stream requests from standby nodes. On a new invalidation stream request, the active node's core:
- Issues the peer a UUID identifying it internally in logs on both this active node and the remote standby peer.
- Sends the current storage index of the active to the remote peer, so it can wait until it sees that index to begin read-only unseal and read-request handling.
- Immediately starts sending all subsequent invalidations to the remote peer for queuing until startup is finished.
- Ejects peers which have not had a GRPC heartbeat request in a recent enough time. This causes the peer to need to be restarted on subsequent connections.
By backing this object by a JobManager, we can ensure limit the number of
parallel invalidations running at a given time to prevent high outbound
traffic load on the active node.
standbyAwaitedInvalidations
This member on Core is used on standby nodes to handle events from its invalidation streams request. This member will enqueue all invalidations, periodically checking the underlying storage mechanism's index, and dequeuing all invalidations whose index has been reached or exceeded. Intermittent issues reading the index can be ignored, though if queue pressure grows too large (and invalidations have been in the queue for too long), we'll consider the node stale and step down from read-enabled status.
Atomic Active Context
This change also requires some adjustments to how the active context is accessible. Because storage write operations and incoming GRPC streamed invalidations do not necessarily hold or not hold the state lock, we cannot safely access the active context. We'll move the active context and its cancellation function to an atomic pointer, letting us acquire it without grabbing the state lock.
Rationale and Alternatives
See above past RFCs (in the Problem Statement section) for most alternatives.
In PR#2952, a RFC for TTL-based cache invalidation was proposed. This was subject to the limitation that LIST style updates were not easily performed: only entries which the standby node knew about and had read were invalidated; entries which were added or deleted were not invalidated. Additionally, the overhead of continually refreshing the cluster meant that performance was low due to reloading of the auth, mount, and namespace tables.
Downsides
As noted above, this currently requires a direct standby->active node connection; without writing to the underlying database table, this is impossible to avoid. Even in that case, we'd still have to require some form of connection to allow efficient cleanup of the invalidation table.
However, in the future we could make this more of a tree network topology by binding the cluster listener address on standby nodes and allowing other standby nodes to connect to receive invalidations.
Security Implications
This should have no net-new security implications beyond those imposed by eventual consistency and mirror the existing Raft-based semantics.
User/Developer Experience
n/a
Unresolved Questions
- Only writes and deletes which do not error are propagated at the current moment; it may be safer to propagate all such writes regardless of whether or not they've erred.
Related Issues
n/a
Proof of Concept
See proof of concept branch.