# ADR 038: KVStore state listening
# Changelog
- 11/23/2020: Initial draft
- 10/14/2022:
- Add
ListenCommit
, flatten the state writes in a block to a single batch. - Remove listeners from cache stores, should only listen to
rootmulti.Store
. - Remove
HaltAppOnDeliveryError()
, the errors are propogated by default, the implementations should return nil if don't want to propogate errors.
- Add
# Status
Proposed
# Abstract
This ADR defines a set of changes to enable listening to state changes of individual KVStores and exposing these data to consumers.
# Context
Currently, KVStore data can be remotely accessed through Queries (opens new window) which proceed either through Tendermint and the ABCI, or through the gRPC server. In addition to these request/response queries, it would be beneficial to have a means of listening to state changes as they occur in real time.
# Decision
We will modify the CommitMultiStore
interface and its concrete (rootmulti
) implementations and introduce a new listenkv.Store
to allow listening to state changes in underlying KVStores. We don't need to listen to cache stores, because we can't be sure that the writes will be committed eventually, and the writes are duplicated in rootmulti.Store
eventually, so we should only listen to rootmulti.Store
.
We will introduce a plugin system for configuring and running streaming services that write these state changes and their surrounding ABCI message context to different destinations.
# Listening interface
In a new file, store/types/listening.go
, we will create a WriteListener
interface for streaming out state changes from a KVStore.
# Listener type
We will create two concrete implementations of the WriteListener
interface in store/types/listening.go
, that writes out protobuf
encoded KV pairs to an underlying io.Writer
, and simply accumulate them in memory.
This will include defining a simple protobuf type for the KV pairs. In addition to the key and value fields this message will include the StoreKey for the originating KVStore so that we can write out from separate KVStores to the same stream/file and determine the source of each KV pair.
# ListenKVStore
We will create a new Store
type listenkv.Store
that the MultiStore
wraps around a KVStore
to enable state listening.
We can configure the Store
with a set of WriteListener
s which stream the output to specific destinations.
# MultiStore interface updates
We will update the CommitMultiStore
interface to allow us to wrap a set of listeners around a specific KVStore
.
# MultiStore implementation updates
We will modify all of the CommitMultiStore
implementations to satisfy these new interfaces, and adjust the rootmulti
GetKVStore
method
to wrap the returned KVStore
with a listenkv.Store
if listening is turned on for that Store
.
We will also adjust the rootmulti
CacheMultiStore
method to wrap the stores with listenkv.Store
to enable listening when the cache layer writes.
# Exposing the data
# Streaming service
We will introduce a new StreamingService
interface for exposing WriteListener
data streams to external consumers.
In addition to streaming state changes as StoreKVPair
s, the interface satisfies an ABCIListener
interface that plugs
into the BaseApp and relays ABCI requests and responses so that the service can observe those block metadatas as well.
The WriteListener
s of StreamingService
listens to the rootmulti.Store
, which is only written into at commit event by the cache store of deliverState
.
# BaseApp registration
We will add a new method to the BaseApp
to enable the registration of StreamingService
s:
We will also modify the BeginBlock
, EndBlock
, and DeliverTx
methods to pass ABCI requests and responses to any streaming service hooks registered
with the BaseApp
.
# Error Handling And Async Consumers
ABCIListener
s are called synchronously inside the consensus state machine, the returned error causes panic which in turn halt the consensus state machine. The implementer should be careful not to break consensus unexpectedly or slow down it too much.
For some async use cases, one can spawn a go-routine internanlly to avoid slow down consensus state machine, and handle the errors internally and always returns nil
to avoid halting consensus state machine on error.
Furthermore, for most of the cases, we only need to use the builtin file streamer to listen to state changes directly inside cosmos-sdk, the other consumers should subscribe to the file streamer output externally.
# File Streamer
We provide a minimal filesystem based implementation inside cosmos-sdk, and provides options to write output files reliably, the output files can be further consumed by external consumers, so most of the state listeners actually don't need to live inside the sdk and node, which improves the node robustness and simplify sdk internals.
The file streamer can be wired in app like this:
# Plugin system
We propose a plugin architecture to load and run StreamingService
implementations. We will introduce a plugin
loading/preloading system that is used to load, initialize, inject, run, and stop Cosmos-SDK plugins. Each plugin
must implement the following interface:
The Name
method returns a plugin's name.
The Version
method returns a plugin's version.
The Init
method initializes a plugin with the provided AppOptions
.
The io.Closer is used to shut down the plugin service.
For the purposes of this ADR we introduce a single kind of plugin- a state streaming plugin.
We will define a StateStreamingPlugin
interface which extends the above Plugin
interface to support a state streaming service.
The Register
method is used during App construction to register the plugin's streaming service with an App's BaseApp using the BaseApp's SetStreamingService
method.
The Start
method is used during App construction to start the registered plugin streaming services and maintain synchronization with them.
e.g. in NewSimApp
:
# Configuration
The plugin system will be configured within an app's app.toml file.
There will be three parameters for configuring the plugin system: plugins.on
, plugins.enabled
and plugins.dir
.
plugins.on
is a bool that turns on or off the plugin system at large, plugins.dir
directs the system to a directory
to load plugins from, and plugins.enabled
provides opt-in
semantics to plugin names to enable (including preloaded plugins).
Configuration of a given plugin is ultimately specific to the plugin, but we will introduce some standards here:
Plugin TOML configuration should be split into separate sub-tables for each kind of plugin (e.g. plugins.streaming
).
Within these sub-tables, the parameters for a specific plugin of that kind are included in another sub-table (e.g. plugins.streaming.file
).
It is generally expected, but not required, that a streaming service plugin can be configured with a set of store keys
(e.g. plugins.streaming.file.keys
) for the stores it listens to and a flag (e.g. plugins.streaming.file.halt_app_on_delivery_error
)
that signifies whether the service operates in a fire-and-forget capacity, or stop the BaseApp when an error occurs in
any of ListenBeginBlock
, ListenEndBlock
and ListenDeliverTx
.
e.g.
# Encoding and decoding streams
ADR-038 introduces the interfaces and types for streaming state changes out from KVStores, associating this data with their related ABCI requests and responses, and registering a service for consuming this data and streaming it to some destination in a final format. Instead of prescribing a final data format in this ADR, it is left to a specific plugin implementation to define and document this format. We take this approach because flexibility in the final format is necessary to support a wide range of streaming service plugins. For example, the data format for a streaming service that writes the data out to a set of files will differ from the data format that is written to a Kafka topic.
# Consequences
These changes will provide a means of subscribing to KVStore state changes in real time.
# Backwards Compatibility
- This ADR changes the
CommitMultiStore
interface, implementations supporting the previous version of these interfaces will not support the new ones
# Positive
- Ability to listen to KVStore state changes in real time and expose these events to external consumers
# Negative
- Changes
CommitMultiStore
interface
# Neutral
- Introduces additional- but optional- complexity to configuring and running a cosmos application
- If an application developer opts to use these features to expose data, they need to be aware of the ramifications/risks of that data exposure as it pertains to the specifics of their application