X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=api%2Fapi.go;h=93f2b4210c920bfea91a4f6bbb1a2b7bdb3d7fc8;hb=d0b973030fe07dc7875da72f5ebe42d8bd9544b1;hp=3c2c7ec67fee040725c628d60035aa61083d2188;hpb=8adb6cdcb496f05169263d32a857791faf8baee1;p=govpp.git diff --git a/api/api.go b/api/api.go index 3c2c7ec..93f2b42 100644 --- a/api/api.go +++ b/api/api.go @@ -1,368 +1,131 @@ -// Copyright (c) 2017 Cisco and/or its affiliates. +// Copyright (c) 2020 Cisco and/or its affiliates. // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package api import ( - "errors" - "fmt" + "context" "time" - - "github.com/sirupsen/logrus" ) -// MessageType represents the type of a VPP message. -type MessageType int - -const ( - // RequestMessage represents a VPP request message - RequestMessage MessageType = iota - // ReplyMessage represents a VPP reply message - ReplyMessage - // EventMessage represents a VPP notification event message - EventMessage - // OtherMessage represents other VPP message (e.g. counters) - OtherMessage -) - -// Message is an interface that is implemented by all VPP Binary API messages generated by the binapigenerator. -type Message interface { - // GetMessageName returns the original VPP name of the message, as defined in the VPP API. - GetMessageName() string - - // GetMessageType returns the type of the VPP message. - GetMessageType() MessageType +// Connection represents the client connection to VPP API. +// +// NOTE: This API is EXPERIMENTAL. +type Connection interface { + // NewStream creates a new stream for sending and receiving messages. + // Context can be used to close the stream using cancel or timeout. + NewStream(ctx context.Context, options ...StreamOption) (Stream, error) - // GetCrcString returns the string with CRC checksum of the message definition (the string represents a hexadecimal number). - GetCrcString() string + // Invoke can be used for a simple request-reply RPC. + // It creates stream and calls SendMsg with req and RecvMsg with reply. + Invoke(ctx context.Context, req Message, reply Message) error } -// DataType is an interface that is implemented by all VPP Binary API data types by the binapi_generator. -type DataType interface { - // GetTypeName returns the original VPP name of the data type, as defined in the VPP API. - GetTypeName() string - - // GetCrcString returns the string with CRC checksum of the data type definition (the string represents a hexadecimal number). - GetCrcString() string -} +// Stream provides low-level access for sending and receiving messages. +// Users should handle correct type and ordering of messages. +// +// NOTE: This API is EXPERIMENTAL. +type Stream interface { + // SendMsg sends a message to the client. + // It blocks until message is sent to the transport. + // + // It is safe to have a goroutine calling SendMsg and another goroutine + // calling RecvMsg on the same stream at the same time, but it is not safe + // to call SendMsg on the same stream in different goroutines. + SendMsg(Message) error + + // RecvMsg blocks until a message is received or error occurs. + // + // It is safe to have a goroutine calling SendMsg and another goroutine + // calling RecvMsg on the same stream at the same time, but it is not safe + // to call SendMsg on the same stream in different goroutines. + RecvMsg() (Message, error) + + // Close closes the stream. Calling SendMsg and RecvMsg will return error + // after closing stream. + Close() error +} + +// StreamOption allows customizing a Stream. Available options are: +// - WithRequestSize +// - WithReplySize +// - WithReplyTimeout +type StreamOption func(Stream) // ChannelProvider provides the communication channel with govpp core. type ChannelProvider interface { // NewAPIChannel returns a new channel for communication with VPP via govpp core. // It uses default buffer sizes for the request and reply Go channels. - NewAPIChannel() (*Channel, error) + NewAPIChannel() (Channel, error) // NewAPIChannelBuffered returns a new channel for communication with VPP via govpp core. // It allows to specify custom buffer sizes for the request and reply Go channels. - NewAPIChannelBuffered() (*Channel, error) -} - -// MessageDecoder provides functionality for decoding binary data to generated API messages. -type MessageDecoder interface { - // DecodeMsg decodes binary-encoded data of a message into provided Message structure. - DecodeMsg(data []byte, msg Message) error -} - -// MessageIdentifier provides identification of generated API messages. -type MessageIdentifier interface { - // GetMessageID returns message identifier of given API message. - GetMessageID(msg Message) (uint16, error) - // LookupByID looks up message name and crc by ID - LookupByID(ID uint16) (string, error) -} - -// Channel is the main communication interface with govpp core. It contains two Go channels, one for sending the requests -// to VPP and one for receiving the replies from it. The user can access the Go channels directly, or use the helper -// methods provided inside of this package. Do not use the same channel from multiple goroutines concurrently, -// otherwise the responses could mix! Use multiple channels instead. -type Channel struct { - ID uint16 // channel ID - - ReqChan chan *VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider - ReplyChan chan *VppReply // channel where VPP replies are delivered to - - NotifSubsChan chan *NotifSubscribeRequest // channel for sending notification subscribe requests - NotifSubsReplyChan chan error // channel where replies to notification subscribe requests are delivered to - - MsgDecoder MessageDecoder // used to decode binary data to generated API messages - MsgIdentifier MessageIdentifier // used to retrieve message ID of a message - - lastSeqNum uint16 // sequence number of the last sent request - - delayedReply *VppReply // reply already taken from ReplyChan, buffered for later delivery - replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout -} - -// VppRequest is a request that will be sent to VPP. -type VppRequest struct { - SeqNum uint16 // sequence number - Message Message // binary API message to be send to VPP - Multipart bool // true if multipart response is expected, false otherwise -} - -// VppReply is a reply received from VPP. -type VppReply struct { - MessageID uint16 // ID of the message - SeqNum uint16 // sequence number - Data []byte // encoded data with the message - MessageDecoder can be used for decoding - LastReplyReceived bool // in case of multipart replies, true if the last reply has been already received and this one should be ignored - Error error // in case of error, data is nil and this member contains error description -} - -// NotifSubscribeRequest is a request to subscribe for delivery of specific notification messages. -type NotifSubscribeRequest struct { - Subscription *NotifSubscription // subscription details - Subscribe bool // true if this is a request to subscribe, false if unsubscribe -} - -// NotifSubscription represents a subscription for delivery of specific notification messages. -type NotifSubscription struct { - NotifChan chan Message // channel where notification messages will be delivered to - MsgFactory func() Message // function that returns a new instance of the specific message that is expected as a notification -} - -// RequestCtx is a context of a ongoing request (simple one - only one response is expected). -type RequestCtx struct { - ch *Channel - seqNum uint16 -} - -// MultiRequestCtx is a context of a ongoing multipart request (multiple responses are expected). -type MultiRequestCtx struct { - ch *Channel - seqNum uint16 -} - -const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout - -// NewChannelInternal returns a new channel structure. -// Note that this is just a raw channel not yet connected to VPP, it is not intended to be used directly. -// Use ChannelProvider to get an API channel ready for communication with VPP. -func NewChannelInternal(id uint16) *Channel { - return &Channel{ - ID: id, - replyTimeout: defaultReplyTimeout, - } -} - -// SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply -// from VPP before returning an error. -func (ch *Channel) SetReplyTimeout(timeout time.Duration) { - ch.replyTimeout = timeout -} - -// Close closes the API channel and releases all API channel-related resources in the ChannelProvider. -func (ch *Channel) Close() { - if ch.ReqChan != nil { - close(ch.ReqChan) - } -} - -// SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply. -// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). -func (ch *Channel) SendRequest(msg Message) *RequestCtx { - ch.lastSeqNum++ - ch.ReqChan <- &VppRequest{ - Message: msg, - SeqNum: ch.lastSeqNum, - } - return &RequestCtx{ch: ch, seqNum: ch.lastSeqNum} -} - -// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). -// The reply will be decoded into the msg argument. Error will be returned if the response cannot be received or decoded. -func (req *RequestCtx) ReceiveReply(msg Message) error { - if req == nil || req.ch == nil { - return errors.New("invalid request context") - } - - lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) - - if lastReplyReceived { - err = errors.New("multipart reply recieved while a simple reply expected") - } - return err -} - -// SendMultiRequest asynchronously sends a multipart request (request to which multiple responses are expected) to VPP. -// Returns a multipart request context, that can be used to call ReceiveReply. -// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). -func (ch *Channel) SendMultiRequest(msg Message) *MultiRequestCtx { - ch.lastSeqNum++ - ch.ReqChan <- &VppRequest{ - Message: msg, - Multipart: true, - SeqNum: ch.lastSeqNum, - } - return &MultiRequestCtx{ch: ch, seqNum: ch.lastSeqNum} -} - -// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). -// The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is -// set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data. -// Error will be returned if the response cannot be received or decoded. -func (req *MultiRequestCtx) ReceiveReply(msg Message) (lastReplyReceived bool, err error) { - if req == nil || req.ch == nil { - return false, errors.New("invalid request context") - } - - return req.ch.receiveReplyInternal(msg, req.seqNum) -} - -// receiveReplyInternal receives a reply from the reply channel into the provided msg structure. -func (ch *Channel) receiveReplyInternal(msg Message, expSeqNum uint16) (lastReplyReceived bool, err error) { - var ignore bool - if msg == nil { - return false, errors.New("nil message passed in") - } - - if ch.delayedReply != nil { - // try the delayed reply - vppReply := ch.delayedReply - ch.delayedReply = nil - ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) - if !ignore { - return lastReplyReceived, err - } - } - - timer := time.NewTimer(ch.replyTimeout) - for { - select { - // blocks until a reply comes to ReplyChan or until timeout expires - case vppReply := <-ch.ReplyChan: - ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) - if ignore { - continue - } - return lastReplyReceived, err - - case <-timer.C: - err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout) - return false, err - } - } - return -} - -func (ch *Channel) processReply(reply *VppReply, expSeqNum uint16, msg Message) (ignore bool, lastReplyReceived bool, err error) { - // check the sequence number - cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum) - if cmpSeqNums == -1 { - // reply received too late, ignore the message - logrus.WithField("sequence-number", reply.SeqNum).Warn( - "Received reply to an already closed binary API request") - ignore = true - return - } - if cmpSeqNums == 1 { - ch.delayedReply = reply - err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum) - return - } - - if reply.Error != nil { - err = reply.Error - return - } - if reply.LastReplyReceived { - lastReplyReceived = true - return - } - - // message checks - var expMsgID uint16 - expMsgID, err = ch.MsgIdentifier.GetMessageID(msg) - if err != nil { - err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", - msg.GetMessageName(), msg.GetCrcString()) - return - } - - if reply.MessageID != expMsgID { - var msgNameCrc string - if nameCrc, err := ch.MsgIdentifier.LookupByID(reply.MessageID); err != nil { - msgNameCrc = err.Error() - } else { - msgNameCrc = nameCrc - } - - err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) " + - "(check if multiple goroutines are not sharing single GoVPP channel)", - reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc) - return - } - - // decode the message - err = ch.MsgDecoder.DecodeMsg(reply.Data, msg) - return -} - -// compareSeqNumbers returns -1, 0, 1 if sequence number precedes, equals to, -// or succeeds seq. number . -// Since sequence numbers cycle in the finite set of size 2^16, the function -// must assume that the distance between compared sequence numbers is less than -// (2^16)/2 to determine the order. -func compareSeqNumbers(seqNum1, seqNum2 uint16) int { - // calculate distance from seqNum1 to seqNum2 - var dist uint16 - if seqNum1 <= seqNum2 { - dist = seqNum2 - seqNum1 - } else { - dist = 0xffff - (seqNum1 - seqNum2 - 1) - } - if dist == 0 { - return 0 - } else if dist <= 0x8000 { - return -1 - } - return 1 -} - -// SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel. -// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's -// buffer is full, the notifications will not be delivered into it. -func (ch *Channel) SubscribeNotification(notifChan chan Message, msgFactory func() Message) (*NotifSubscription, error) { - subscription := &NotifSubscription{ - NotifChan: notifChan, - MsgFactory: msgFactory, - } - ch.NotifSubsChan <- &NotifSubscribeRequest{ - Subscription: subscription, - Subscribe: true, - } - return subscription, <-ch.NotifSubsReplyChan -} - -// UnsubscribeNotification unsubscribes from receiving the notifications tied to the provided notification subscription. -func (ch *Channel) UnsubscribeNotification(subscription *NotifSubscription) error { - ch.NotifSubsChan <- &NotifSubscribeRequest{ - Subscription: subscription, - Subscribe: false, - } - return <-ch.NotifSubsReplyChan -} - -// CheckMessageCompatibility checks whether provided messages are compatible with the version of VPP -// which the library is connected to. -func (ch *Channel) CheckMessageCompatibility(messages ...Message) error { - for _, msg := range messages { - _, err := ch.MsgIdentifier.GetMessageID(msg) - if err != nil { - return fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", - msg.GetMessageName(), msg.GetCrcString()) - } - } - return nil + NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (Channel, error) +} + +// Channel provides methods for direct communication with VPP channel. +type Channel interface { + // SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply. + // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). + SendRequest(msg Message) RequestCtx + + // SendMultiRequest asynchronously sends a multipart request (request to which multiple responses are expected) to VPP. + // Returns a multipart request context, that can be used to call ReceiveReply. + // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). + SendMultiRequest(msg Message) MultiRequestCtx + + // SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel. + // Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's + // buffer is full, the notifications will not be delivered into it. + SubscribeNotification(notifChan chan Message, event Message) (SubscriptionCtx, error) + + // SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply + // from VPP before returning an error. + SetReplyTimeout(timeout time.Duration) + + // CheckCompatibility checks the compatiblity for the given messages. + // It will return an error if any of the given messages are not compatible. + CheckCompatiblity(msgs ...Message) error + + // Close closes the API channel and releases all API channel-related resources + // in the ChannelProvider. + Close() +} + +// RequestCtx is helper interface which allows to receive reply on request. +type RequestCtx interface { + // ReceiveReply receives a reply from VPP (blocks until a reply is delivered + // from VPP, or until an error occurs). The reply will be decoded into the msg + // argument. Error will be returned if the response cannot be received or decoded. + ReceiveReply(msg Message) error +} + +// MultiRequestCtx is helper interface which allows to receive reply on multi-request. +type MultiRequestCtx interface { + // ReceiveReply receives a reply from VPP (blocks until a reply is delivered + // from VPP, or until an error occurs).The reply will be decoded into the msg + // argument. If the last reply has been already consumed, lastReplyReceived is + // set to true. Do not use the message itself if lastReplyReceived is + // true - it won't be filled with actual data.Error will be returned if the + // response cannot be received or decoded. + ReceiveReply(msg Message) (lastReplyReceived bool, err error) +} + +// SubscriptionCtx is helper interface which allows to control subscription for +// notification events. +type SubscriptionCtx interface { + // Unsubscribe unsubscribes from receiving the notifications tied to the + // subscription context. + Unsubscribe() error }