Changelog
- 11/23/2020: Initial draft
- 10/06/2022: Introduce plugin system based on hashicorp/go-plugin
- 10/14/2022:
- Add
ListenCommit, flatten the state writes in a block to a single batch. - Remove listeners from cache stores, should only listen to
rootmulti.Store. - Remove
HaltAppOnDeliveryError(), the errors are propagated by default, the implementations should return nil if don’t want to propogate errors.
- Add
Status
ProposedAbstract
This ADR defines a set of changes to enable listening to state changes of individual KVStores and exposing these data to consumers.Context
Currently, KVStore data can be remotely accessed through Queries which proceed either through Tendermint and the ABCI, or through the gRPC server. In addition to these request/response queries, it would be beneficial to have a means of listening to state changes as they occur in real time.Decision
We will modify theCommitMultiStore interface and its concrete (rootmulti) implementations and introduce a new listenkv.Store to allow listening to state changes in underlying KVStores. We don’t need to listen to cache stores, because we can’t be sure that the writes will be committed eventually, and the writes are duplicated in rootmulti.Store eventually, so we should only listen to rootmulti.Store.
We will introduce a plugin system for configuring and running streaming services that write these state changes and their surrounding ABCI message context to different destinations.
Listening
In a new file,store/types/listening.go, we will create a MemoryListener struct for streaming out protobuf encoded KV pairs state changes from a KVStore.
The MemoryListener will be used internally by the concrete rootmulti implementation to collect state changes from KVStores.
ListenKVStore
We will create a newStore type listenkv.Store that the rootmulti store will use to wrap a KVStore to enable state listening.
We will configure the Store with a MemoryListener which will collect state changes for output to specific destinations.
MultiStore interface updates
We will update theCommitMultiStore interface to allow us to wrap a Memorylistener to a specific KVStore.
Note that the MemoryListener will be attached internally by the concrete rootmulti implementation.
MultiStore implementation updates
We will adjust therootmulti GetKVStore method to wrap the returned KVStore with a listenkv.Store if listening is turned on for that Store.
AddListeners to manage KVStore listeners internally and implement PopStateCache
for a means of retrieving the current state.
rootmulti CacheMultiStore and CacheMultiStoreWithVersion methods to enable listening in
the cache layer.
Exposing the data
Streaming Service
We will introduce a newABCIListener interface that plugs into the BaseApp and relays ABCI requests and responses
so that the service can group the state changes with the ABCI requests.
BaseApp Registration
We will add a new method to theBaseApp to enable the registration of StreamingServices:
BaseApp struct:
ABCI Event Hooks
We will modify theBeginBlock, EndBlock, DeliverTx and Commit methods to pass ABCI requests and responses
to any streaming service hooks registered with the BaseApp.
Go Plugin System
We propose a plugin architecture to load and runStreaming plugins and other types of implementations. We will introduce a plugin
system over gRPC that is used to load and run Cosmos-SDK plugins. The plugin system uses hashicorp/go-plugin.
Each plugin must have a struct that implements the plugin.Plugin interface and an Impl interface for processing messages over gRPC.
Each plugin must also have a message protocol defined for the gRPC service:
plugin.Plugin interface has two methods Client and Server. For our GRPC service these are GRPCClient and GRPCServer
The Impl field holds the concrete implementation of our baseapp.ABCIListener interface written in Go.
Note: this is only used for plugin implementations written in Go.
The advantage of having such a plugin system is that within each plugin authors can define the message protocol in a way that fits their use case.
For example, when state change listening is desired, the ABCIListener message protocol can be defined as below (for illustrative purposes only).
When state change listening is not desired than ListenCommit can be omitted from the protocol.
Impl(this is only used for plugins that are written in Go):
(interface{}, error).
This provides the advantage of using versioned plugins where the plugin interface and gRPC protocol change over time.
In addition, it allows for building independent plugin that can expose different parts of the system over gRPC.
RegisterStreamingPlugin function for the App to register NewStreamingPlugins with the App’s BaseApp.
Streaming plugins can be of Any type; therefore, the function takes in an interface vs a concrete type.
For example, we could have plugins of ABCIListener, WasmListener or IBCListener. Note that RegisterStreamingPluing function
is helper function and not a requirement. Plugin registration can easily be moved from the App to the BaseApp directly.
NewStreamingPlugin and RegisterStreamingPlugin functions are used to register a plugin with the App’s BaseApp.
e.g. in NewSimApp:
Configuration
The plugin system will be configured within an App’s TOML configuration files.ABCIListener plugin: streaming.abci.plugin, streaming.abci.keys, streaming.abci.async and streaming.abci.stop-node-on-err.
streaming.abci.plugin is the name of the plugin we want to use for streaming, streaming.abci.keys is a set of store keys for stores it listens to,
streaming.abci.async is bool enabling asynchronous listening and streaming.abci.stop-node-on-err is a bool that stops the node when true and when operating
on synchronized mode streaming.abci.async=false. Note that streaming.abci.stop-node-on-err=true will be ignored if streaming.abci.async=true.
The configuration above support additional streaming plugins by adding the plugin to the [streaming] configuration section
and registering the plugin with RegisterStreamingPlugin helper function.
Note the that each plugin must include streaming.{service}.plugin property as it is a requirement for doing the lookup and registration of the plugin
with the App. All other properties are unique to the individual services.
Encoding and decoding streams
ADR-038 introduces the interfaces and types for streaming state changes out from KVStores, associating this data with their related ABCI requests and responses, and registering a service for consuming this data and streaming it to some destination in a final format. Instead of prescribing a final data format in this ADR, it is left to a specific plugin implementation to define and document this format. We take this approach because flexibility in the final format is necessary to support a wide range of streaming service plugins. For example, the data format for a streaming service that writes the data out to a set of files will differ from the data format that is written to a Kafka topic.Consequences
These changes will provide a means of subscribing to KVStore state changes in real time.Backwards Compatibility
- This ADR changes the
CommitMultiStoreinterface, implementations supporting the previous version of this interface will not support the new one
Positive
- Ability to listen to KVStore state changes in real time and expose these events to external consumers
Negative
- Changes
CommitMultiStoreinterface and its implementations
Neutral
- Introduces additional- but optional- complexity to configuring and running a cosmos application
- If an application developer opts to use these features to expose data, they need to be aware of the ramifications/risks of that data exposure as it pertains to the specifics of their application