# Msg Services

A Msg Service processes messages. Msg Services are specific to the module in which they are defined, and only process messages defined within the said module. They are called from BaseApp during DeliverTx.

# Pre-requisite Readings

# Implementation of a module Msg service

All Msg processing is done by a Msg protobuf service. Each module should define a Msg service, which will be responsible for request and response serialization.

As further described in ADR 031, this approach has the advantage of clearly specifying return types and generating server and client code.

Based on the definition of the Msg service, Protobuf generates a MsgServer interface. It is the role of the module developer to implement this interface, by implementing the state transition logic that should happen upon receival of each Msg. As an example, here is the generated MsgServer interface for x/bank, which exposes two Msgs:

Copy // MsgServer is the server API for Msg service. type MsgServer interface { // Send defines a method for sending coins from one account to another account. Send(context.Context, *MsgSend) (*MsgSendResponse, error) // MultiSend defines a method for sending coins from some accounts to other accounts. MultiSend(context.Context, *MsgMultiSend) (*MsgMultiSendResponse, error) }

When possible, the existing module's Keeper should implement MsgServer, otherwise a msgServer struct that embeds the Keeper can be created, typically in ./keeper/msg_server.go:

Copy type msgServer struct { Keeper }

msgServer methods can retrieve the sdk.Context from the context.Context parameter method using the sdk.UnwrapSDKContext:

Copy package keeper import ( "context" "github.com/armon/go-metrics" "github.com/cosmos/cosmos-sdk/telemetry" sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" "github.com/cosmos/cosmos-sdk/x/bank/types" ) type msgServer struct { Keeper } // NewMsgServerImpl returns an implementation of the bank MsgServer interface // for the provided Keeper. func NewMsgServerImpl(keeper Keeper) types.MsgServer { return &msgServer{Keeper: keeper} } var _ types.MsgServer = msgServer{} func (k msgServer) Send(goCtx context.Context, msg *types.MsgSend) (*types.MsgSendResponse, error) { ctx := sdk.UnwrapSDKContext(goCtx) if err := k.SendEnabledCoins(ctx, msg.Amount...); err != nil { return nil, err } from, err := sdk.AccAddressFromBech32(msg.FromAddress) if err != nil { return nil, err } to, err := sdk.AccAddressFromBech32(msg.ToAddress) if err != nil { return nil, err } if k.BlockedAddr(to) { return nil, sdkerrors.Wrapf(sdkerrors.ErrUnauthorized, "%s is not allowed to receive funds", msg.ToAddress) } err = k.SendCoins(ctx, from, to, msg.Amount) if err != nil { return nil, err } defer func() { for _, a := range msg.Amount { telemetry.SetGaugeWithLabels( []string{"tx", "msg", "send"}, float32(a.Amount.Int64()), []metrics.Label{telemetry.NewLabel("denom", a.Denom)}, ) } }() ctx.EventManager().EmitEvent( sdk.NewEvent( sdk.EventTypeMessage, sdk.NewAttribute(sdk.AttributeKeyModule, types.AttributeValueCategory), ), ) return &types.MsgSendResponse{}, nil } func (k msgServer) MultiSend(goCtx context.Context, msg *types.MsgMultiSend) (*types.MsgMultiSendResponse, error) { ctx := sdk.UnwrapSDKContext(goCtx) // NOTE: totalIn == totalOut should already have been checked for _, in := range msg.Inputs { if err := k.SendEnabledCoins(ctx, in.Coins...); err != nil { return nil, err } } for _, out := range msg.Outputs { accAddr, err := sdk.AccAddressFromBech32(out.Address) if err != nil { panic(err) } if k.BlockedAddr(accAddr) { return nil, sdkerrors.Wrapf(sdkerrors.ErrUnauthorized, "%s is not allowed to receive transactions", out.Address) } } err := k.InputOutputCoins(ctx, msg.Inputs, msg.Outputs) if err != nil { return nil, err } ctx.EventManager().EmitEvent( sdk.NewEvent( sdk.EventTypeMessage, sdk.NewAttribute(sdk.AttributeKeyModule, types.AttributeValueCategory), ), ) return &types.MsgMultiSendResponse{}, nil }

Msg processing usually follows these 2 steps:

  • First, they perform stateful checks to make sure the message is valid. At this stage, the message's ValidateBasic() method has already been called, meaning stateless checks on the message (like making sure parameters are correctly formatted) have already been performed. Checks performed in the msgServer method can be more expensive and require access to the state. For example, a msgServer method for a transfer message might check that the sending account has enough funds to actually perform the transfer. To access the state, the msgServer method needs to call the keeper's getter functions.
  • Then, if the checks are successful, the msgServer method calls the keeper's setter functions to actually perform the state transition.

Before returning, msgServer methods generally emit one or more events via the EventManager held in the ctx:

Copy ctx.EventManager().EmitEvent( sdk.NewEvent( eventType, // e.g. sdk.EventTypeMessage for a message, types.CustomEventType for a custom event defined in the module sdk.NewAttribute(attributeKey, attributeValue), ), )

These events are relayed back to the underlying consensus engine and can be used by service providers to implement services around the application. Click here to learn more about events.

The invoked msgServer method returns a proto.Message response and an error. These return values are then wrapped into an *sdk.Result or an error using sdk.WrapServiceResult(ctx sdk.Context, res proto.Message, err error):

Copy package baseapp import ( "context" "fmt" gogogrpc "github.com/gogo/protobuf/grpc" "github.com/gogo/protobuf/proto" "google.golang.org/grpc" codectypes "github.com/cosmos/cosmos-sdk/codec/types" sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) // MsgServiceRouter routes fully-qualified Msg service methods to their handler. type MsgServiceRouter struct { interfaceRegistry codectypes.InterfaceRegistry routes map[string]MsgServiceHandler } var _ gogogrpc.Server = &MsgServiceRouter{} // NewMsgServiceRouter creates a new MsgServiceRouter. func NewMsgServiceRouter() *MsgServiceRouter { return &MsgServiceRouter{ routes: map[string]MsgServiceHandler{}, } } // MsgServiceHandler defines a function type which handles Msg service message. type MsgServiceHandler = func(ctx sdk.Context, req sdk.MsgRequest) (*sdk.Result, error) // Handler returns the MsgServiceHandler for a given query route path or nil // if not found. func (msr *MsgServiceRouter) Handler(methodName string) MsgServiceHandler { return msr.routes[methodName] } // RegisterService implements the gRPC Server.RegisterService method. sd is a gRPC // service description, handler is an object which implements that gRPC service. // // This function PANICs: // - if it is called before the service `Msg`s have been registered using // RegisterInterfaces, // - or if a service is being registered twice. func (msr *MsgServiceRouter) RegisterService(sd *grpc.ServiceDesc, handler interface{}) { // Adds a top-level query handler based on the gRPC service name. for _, method := range sd.Methods { fqMethod := fmt.Sprintf("/%s/%s", sd.ServiceName, method.MethodName) methodHandler := method.Handler // Check that the service Msg fully-qualified method name has already // been registered (via RegisterInterfaces). If the user registers a // service without registering according service Msg type, there might be // some unexpected behavior down the road. Since we can't return an error // (`Server.RegisterService` interface restriction) we panic (at startup). serviceMsg, err := msr.interfaceRegistry.Resolve(fqMethod) if err != nil || serviceMsg == nil { panic( fmt.Errorf( "type_url %s has not been registered yet. "+ "Before calling RegisterService, you must register all interfaces by calling the `RegisterInterfaces` "+ "method on module.BasicManager. Each module should call `msgservice.RegisterMsgServiceDesc` inside its "+ "`RegisterInterfaces` method with the `_Msg_serviceDesc` generated by proto-gen", fqMethod, ), ) } // Check that each service is only registered once. If a service is // registered more than once, then we should error. Since we can't // return an error (`Server.RegisterService` interface restriction) we // panic (at startup). _, found := msr.routes[fqMethod] if found { panic( fmt.Errorf( "msg service %s has already been registered. Please make sure to only register each service once. "+ "This usually means that there are conflicting modules registering the same msg service", fqMethod, ), ) } msr.routes[fqMethod] = func(ctx sdk.Context, req sdk.MsgRequest) (*sdk.Result, error) { ctx = ctx.WithEventManager(sdk.NewEventManager()) interceptor := func(goCtx context.Context, _ interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { goCtx = context.WithValue(goCtx, sdk.SdkContextKey, ctx) return handler(goCtx, req) } // Call the method handler from the service description with the handler object. // We don't do any decoding here because the decoding was already done. res, err := methodHandler(handler, sdk.WrapSDKContext(ctx), noopDecoder, interceptor) if err != nil { return nil, err } resMsg, ok := res.(proto.Message) if !ok { return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidType, "Expecting proto.Message, got %T", resMsg) } return sdk.WrapServiceResult(ctx, resMsg, err) } } } // SetInterfaceRegistry sets the interface registry for the router. func (msr *MsgServiceRouter) SetInterfaceRegistry(interfaceRegistry codectypes.InterfaceRegistry) { msr.interfaceRegistry = interfaceRegistry } func noopDecoder(_ interface{}) error { return nil }

This method takes care of marshaling the res parameter to protobuf and attaching any events on the ctx.EventManager() to the sdk.Result.

Copy // Result is the union of ResponseFormat and ResponseCheckTx. message Result { option (gogoproto.goproto_getters) = false; // Data is any data returned from message or handler execution. It MUST be // length prefixed in order to separate data from multiple message executions. bytes data = 1; // Log contains the log information from message or handler execution. string log = 2; // Events contains a slice of Event objects that were emitted during message // or handler execution. repeated tendermint.abci.Event events = 3 [(gogoproto.nullable) = false]; }

# Legacy Amino Msgs

# handler type

The handler type defined in the Cosmos SDK will be deprecated in favor of Msg Services.

Here is the typical structure of a handler function:

Copy package types // Handler defines the core of the state transition function of an application. type Handler func(ctx Context, msg Msg) (*Result, error) // AnteHandler authenticates transactions, before their internal messages are handled. // If newCtx.IsZero(), ctx is used instead. type AnteHandler func(ctx Context, tx Tx, simulate bool) (newCtx Context, err error) // AnteDecorator wraps the next AnteHandler to perform custom pre- and post-processing. type AnteDecorator interface { AnteHandle(ctx Context, tx Tx, simulate bool, next AnteHandler) (newCtx Context, err error) } // ChainDecorator chains AnteDecorators together with each AnteDecorator // wrapping over the decorators further along chain and returns a single AnteHandler. // // NOTE: The first element is outermost decorator, while the last element is innermost // decorator. Decorator ordering is critical since some decorators will expect // certain checks and updates to be performed (e.g. the Context) before the decorator // is run. These expectations should be documented clearly in a CONTRACT docline // in the decorator's godoc. // // NOTE: Any application that uses GasMeter to limit transaction processing cost // MUST set GasMeter with the FIRST AnteDecorator. Failing to do so will cause // transactions to be processed with an infinite gasmeter and open a DOS attack vector. // Use `ante.SetUpContextDecorator` or a custom Decorator with similar functionality. // Returns nil when no AnteDecorator are supplied. func ChainAnteDecorators(chain ...AnteDecorator) AnteHandler { if len(chain) == 0 { return nil } // handle non-terminated decorators chain if (chain[len(chain)-1] != Terminator{}) { chain = append(chain, Terminator{}) } return func(ctx Context, tx Tx, simulate bool) (Context, error) { return chain[0].AnteHandle(ctx, tx, simulate, ChainAnteDecorators(chain[1:]...)) } } // Terminator AnteDecorator will get added to the chain to simplify decorator code // Don't need to check if next == nil further up the chain // ______ // <((((((\\\ // / . }\ // ;--..--._|} // (\ '--/\--' ) // \\ | '-' :'| // \\ . -==- .-| // \\ \.__.' \--._ // [\\ __.--| // _/'--. // \ \\ .'-._ ('-----'/ __/ \ // \ \\ / __>| | '--. | // \ \\ | \ | / / / // \ '\ / \ | | _/ / // \ \ \ | | / / // snd \ \ \ / type Terminator struct{} // Simply return provided Context and nil error func (t Terminator) AnteHandle(ctx Context, _ Tx, _ bool, _ AnteHandler) (Context, error) { return ctx, nil }

Let us break it down:

  • The Msg is the actual object being processed.
  • The Context contains all the necessary information needed to process the msg, as well as a branch of the latest state. If the msg is successfully processed, the branched version of the state contained in the ctx will be written to the main state (branch).
  • The [*Result] returned to BaseApp contains (among other things) information on the execution of the handler and events.

Module handlers are typically implemented in a ./handler.go file inside the module's folder. The module manager is used to add the module's handlers to the application's router via the Route() method. Typically, the manager's Route() method simply constructs a Route that calls a NewHandler() method defined in handler.go.

Copy // Route returns the message routing key for the gov module. func (am AppModule) Route() sdk.Route { return sdk.NewRoute(types.RouterKey, NewHandler(am.keeper)) }

# Implementation

NewHandler function dispatches a Msg to appropriate handler function, usually by using a switch statement:

Copy // NewHandler returns a handler for "bank" type messages. func NewHandler(k keeper.Keeper) sdk.Handler { return func(ctx sdk.Context, msg sdk.Msg) (*sdk.Result, error) { ctx = ctx.WithEventManager(sdk.NewEventManager()) switch msg := msg.(type) { case *types.MsgSend: return handleMsgSend(ctx, k, msg) case *types.MsgMultiSend: return handleMsgMultiSend(ctx, k, msg) default: return nil, sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "unrecognized bank message type: %T", msg) } } }

First, NewHandler function sets a new EventManager to the context to isolate events per msg. Then, a simple switch calls the appropriate handler based on the Msg type.

In this regard, handlers functions need to be implemented for each module Msg. This will also involve manual handler registration of Msg types. handlers functions should return a *Result and an error.

# Telemetry

New telemetry metrics can be created from msgServer methods when handling messages.

This is an example from the x/auth/vesting module:

Copy defer func() { telemetry.IncrCounter(1, "new", "account") for _, a := range msg.Amount { if a.Amount.IsInt64() { telemetry.SetGaugeWithLabels( []string{"tx", "msg", "create_vesting_account"}, float32(a.Amount.Int64()), []metrics.Label{telemetry.NewLabel("denom", a.Denom)}, ) } } }()

# Next

Learn about query services