// EventEmitter is a type that describes event emitter functions
// This should be defined in `types/events.go`
type EventEmitter func(context.Context, client.Context, ...EventHandler)
error
// EventHandler is a type of function that handles events coming out of the event bus
// This should be defined in `types/events.go`
type EventHandler func(proto.Message)
error
// Sample use of the functions below
func main() {
ctx, cancel := context.WithCancel(context.Background())
if err := TxEmitter(ctx, client.Context{
}.WithNodeURI("tcp://localhost:26657"), SubmitProposalEventHandler); err != nil {
cancel()
panic(err)
}
return
}
// SubmitProposalEventHandler is an example of an event handler that prints proposal details
// when any EventSubmitProposal is emitted.
func SubmitProposalEventHandler(ev proto.Message) (err error) {
switch event := ev.(type) {
// Handle governance proposal events creation events
case govtypes.EventSubmitProposal:
// Users define business logic here e.g.
fmt.Println(ev.FromAddress, ev.ProposalId, ev.Proposal)
return nil
default:
return nil
}
}
// TxEmitter is an example of an event emitter that emits just transaction events. This can and
// should be implemented somewhere in the Cosmos SDK. The Cosmos SDK can include an EventEmitters for tm.event='Tx'
// and/or tm.event='NewBlock' (the new block events may contain typed events)
func TxEmitter(ctx context.Context, cliCtx client.Context, ehs ...EventHandler) (err error) {
// Instantiate and start CometBFT RPC client
client, err := cliCtx.GetNode()
if err != nil {
return err
}
if err = client.Start(); err != nil {
return err
}
// Start the pubsub bus
bus := pubsub.NewBus()
defer bus.Close()
// Initialize a new error group
eg, ctx := errgroup.WithContext(ctx)
// Publish chain events to the pubsub bus
eg.Go(func()
error {
return PublishChainTxEvents(ctx, client, bus, simapp.ModuleBasics)
})
// Subscribe to the bus events
subscriber, err := bus.Subscribe()
if err != nil {
return err
}
// Handle all the events coming out of the bus
eg.Go(func()
error {
var err error
for {
select {
case <-ctx.Done():
return nil
case <-subscriber.Done():
return nil
case ev := <-subscriber.Events():
for _, eh := range ehs {
if err = eh(ev); err != nil {
break
}
}
}
}
return nil
})
return group.Wait()
}
// PublishChainTxEvents events using cmtclient. Waits on context shutdown signals to exit.
func PublishChainTxEvents(ctx context.Context, client cmtclient.EventsClient, bus pubsub.Bus, mb module.BasicManager) (err error) {
// Subscribe to transaction events
txch, err := client.Subscribe(ctx, "txevents", "tm.event='Tx'", 100)
if err != nil {
return err
}
// Unsubscribe from transaction events on function exit
defer func() {
err = client.UnsubscribeAll(ctx, "txevents")
}()
// Use errgroup to manage concurrency
g, ctx := errgroup.WithContext(ctx)
// Publish transaction events in a goroutine
g.Go(func()
error {
var err error
for {
select {
case <-ctx.Done():
break
case ed := <-ch:
switch evt := ed.Data.(type) {
case cmttypes.EventDataTx:
if !evt.Result.IsOK() {
continue
}
// range over events, parse them using the basic manager and
// send them to the pubsub bus
for _, abciEv := range events {
typedEvent, err := sdk.ParseTypedEvent(abciEv)
if err != nil {
return er
}
if err := bus.Publish(typedEvent); err != nil {
bus.Close()
return
}
continue
}
}
}
}
return err
})
// Exit on error or context cancelation
return g.Wait()
}