From f1bef4a3c66f4408afdeb64cda62ccd8562d0fc6 Mon Sep 17 00:00:00 2001 From: Vladimir Lavor Date: Tue, 3 Jul 2018 10:39:21 +0200 Subject: [PATCH] make api.Channel as interface Change-Id: I052d241ab09043b1195beebeee99df4d8536621f Signed-off-by: Vladimir Lavor --- Makefile | 1 - adapter/mock/mock_adapter.go | 27 ++- api/api.go | 314 +++++----------------------- api/doc.go | 94 +-------- codec/doc.go | 2 + {core => codec}/msg_codec.go | 30 +-- core/channel.go | 276 ++++++++++++++++++++++++ api/api_test.go => core/channel_test.go | 27 ++- core/{core.go => connection.go} | 57 ++--- core/{core_test.go => connection_test.go} | 55 ++--- core/doc.go | 91 ++++++++ core/notification_handler.go | 7 +- core/request_handler.go | 18 +- examples/cmd/perf-bench/perf-bench.go | 16 +- examples/cmd/simple-client/simple_client.go | 22 +- examples/cmd/stats-client/stats_client.go | 6 +- 16 files changed, 547 insertions(+), 496 deletions(-) create mode 100644 codec/doc.go rename {core => codec}/msg_codec.go (82%) create mode 100644 core/channel.go rename api/api_test.go => core/channel_test.go (97%) rename core/{core.go => connection.go} (90%) rename core/{core_test.go => connection_test.go} (90%) diff --git a/Makefile b/Makefile index 4eed58d..cfc99f7 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,6 @@ build: test: @cd cmd/binapi-generator && go test -cover . - @cd api && go test -cover ./... @cd core && go test -cover . install: diff --git a/adapter/mock/mock_adapter.go b/adapter/mock/mock_adapter.go index 959fd86..a5cb62d 100644 --- a/adapter/mock/mock_adapter.go +++ b/adapter/mock/mock_adapter.go @@ -25,8 +25,8 @@ import ( "git.fd.io/govpp.git/adapter" "git.fd.io/govpp.git/adapter/mock/binapi" "git.fd.io/govpp.git/api" - "git.fd.io/govpp.git/core" + "git.fd.io/govpp.git/codec" "github.com/lunixbochs/struc" ) @@ -48,10 +48,10 @@ type VppAdapter struct { binAPITypes map[string]reflect.Type access sync.RWMutex - replies []reply // FIFO queue of messages - replyHandlers []ReplyHandler // callbacks that are able to calculate mock responses - repliesLock sync.Mutex // mutex for the queue - mode replyMode // mode in which the mock operates + replies []reply // FIFO queue of messages + replyHandlers []ReplyHandler // callbacks that are able to calculate mock responses + repliesLock sync.Mutex // mutex for the queue + mode replyMode // mode in which the mock operates } // defaultReply is a default reply message that mock adapter returns for a request. @@ -79,7 +79,7 @@ type MsgWithContext struct { Multipart bool /* set by mock adapter */ - hasCtx bool + hasCtx bool } // ReplyHandler is a type that allows to extend the behaviour of VPP mock. @@ -178,7 +178,7 @@ func (a *VppAdapter) ReplyBytes(request MessageDTO, reply api.Message) ([]byte, log.Println("ReplyBytes ", replyMsgID, " ", reply.GetMessageName(), " clientId: ", request.ClientID) buf := new(bytes.Buffer) - struc.Pack(buf, &core.VppReplyHeader{VlMsgID: replyMsgID, Context: request.ClientID}) + struc.Pack(buf, &codec.VppReplyHeader{VlMsgID: replyMsgID, Context: request.ClientID}) struc.Pack(buf, reply) return buf.Bytes(), nil @@ -238,7 +238,7 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error { replyHandler := a.replyHandlers[i] buf := bytes.NewReader(data) - reqHeader := core.VppRequestHeader{} + reqHeader := codec.VppRequestHeader{} struc.Unpack(buf, &reqHeader) a.access.Lock() @@ -273,13 +273,13 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error { context = setSeqNum(context, msg.SeqNum) } if msg.Msg.GetMessageType() == api.ReplyMessage { - struc.Pack(buf, &core.VppReplyHeader{VlMsgID: msgID, Context: context}) + struc.Pack(buf, &codec.VppReplyHeader{VlMsgID: msgID, Context: context}) } else if msg.Msg.GetMessageType() == api.EventMessage { - struc.Pack(buf, &core.VppEventHeader{VlMsgID: msgID, Context: context}) + struc.Pack(buf, &codec.VppEventHeader{VlMsgID: msgID, Context: context}) } else if msg.Msg.GetMessageType() == api.RequestMessage { - struc.Pack(buf, &core.VppRequestHeader{VlMsgID: msgID, Context: context}) + struc.Pack(buf, &codec.VppRequestHeader{VlMsgID: msgID, Context: context}) } else { - struc.Pack(buf, &core.VppOtherHeader{VlMsgID: msgID}) + struc.Pack(buf, &codec.VppOtherHeader{VlMsgID: msgID}) } struc.Pack(buf, msg.Msg) a.callback(context, msgID, buf.Bytes()) @@ -299,7 +299,7 @@ func (a *VppAdapter) SendMsg(clientID uint32, data []byte) error { // return default reply buf := new(bytes.Buffer) msgID := uint16(defaultReplyMsgID) - struc.Pack(buf, &core.VppReplyHeader{VlMsgID: msgID, Context: clientID}) + struc.Pack(buf, &codec.VppReplyHeader{VlMsgID: msgID, Context: clientID}) struc.Pack(buf, &defaultReply{}) a.callback(clientID, msgID, buf.Bytes()) } @@ -392,4 +392,3 @@ func setMultipart(context uint32, isMultipart bool) (newContext uint32) { } return context } - diff --git a/api/api.go b/api/api.go index 34e17c1..9c68ab9 100644 --- a/api/api.go +++ b/api/api.go @@ -15,11 +15,7 @@ package api import ( - "errors" - "fmt" "time" - - "github.com/sirupsen/logrus" ) // MessageType represents the type of a VPP message. @@ -61,11 +57,11 @@ type DataType interface { type ChannelProvider interface { // NewAPIChannel returns a new channel for communication with VPP via govpp core. // It uses default buffer sizes for the request and reply Go channels. - NewAPIChannel() (*Channel, error) + NewAPIChannel() (Channel, error) // NewAPIChannelBuffered returns a new channel for communication with VPP via govpp core. // It allows to specify custom buffer sizes for the request and reply Go channels. - NewAPIChannelBuffered() (*Channel, error) + NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (Channel, error) } // MessageDecoder provides functionality for decoding binary data to generated API messages. @@ -82,26 +78,57 @@ type MessageIdentifier interface { LookupByID(ID uint16) (string, error) } -// Channel is the main communication interface with govpp core. It contains two Go channels, one for sending the requests -// to VPP and one for receiving the replies from it. The user can access the Go channels directly, or use the helper -// methods provided inside of this package. Do not use the same channel from multiple goroutines concurrently, -// otherwise the responses could mix! Use multiple channels instead. -type Channel struct { - ID uint16 // channel ID - - ReqChan chan *VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider - ReplyChan chan *VppReply // channel where VPP replies are delivered to - - NotifSubsChan chan *NotifSubscribeRequest // channel for sending notification subscribe requests - NotifSubsReplyChan chan error // channel where replies to notification subscribe requests are delivered to - - MsgDecoder MessageDecoder // used to decode binary data to generated API messages - MsgIdentifier MessageIdentifier // used to retrieve message ID of a message - - lastSeqNum uint16 // sequence number of the last sent request - - delayedReply *VppReply // reply already taken from ReplyChan, buffered for later delivery - replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout +// Channel provides methods for direct communication with VPP channel. +type Channel interface { + // SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply. + // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). + SendRequest(msg Message) RequestCtx + // SendMultiRequest asynchronously sends a multipart request (request to which multiple responses are expected) to VPP. + // Returns a multipart request context, that can be used to call ReceiveReply. + // In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). + SendMultiRequest(msg Message) MultiRequestCtx + // SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel. + // Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's + // buffer is full, the notifications will not be delivered into it. + SubscribeNotification(notifChan chan Message, msgFactory func() Message) (*NotifSubscription, error) + // UnsubscribeNotification unsubscribes from receiving the notifications tied to the provided notification subscription. + UnsubscribeNotification(subscription *NotifSubscription) error + // CheckMessageCompatibility checks whether provided messages are compatible with the version of VPP + // which the library is connected to. + CheckMessageCompatibility(messages ...Message) error + // SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply + // from VPP before returning an error. + SetReplyTimeout(timeout time.Duration) + // GetRequestChannel returns request go channel of the VPP channel + GetRequestChannel() chan<- *VppRequest + // GetReplyChannel returns reply go channel of the VPP channel + GetReplyChannel() <-chan *VppReply + // GetNotificationChannel returns notification go channel of the VPP channel + GetNotificationChannel() chan<- *NotifSubscribeRequest + // GetNotificationReplyChannel returns notification reply go channel of the VPP channel + GetNotificationReplyChannel() <-chan error + // GetMessageDecoder returns message decoder instance + GetMessageDecoder() MessageDecoder + // GetID returns channel's ID + GetID() uint16 + // Close closes the API channel and releases all API channel-related resources in the ChannelProvider. + Close() +} + +// RequestCtx is helper interface which allows to receive reply on request context data +type RequestCtx interface { + // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). + // The reply will be decoded into the msg argument. Error will be returned if the response cannot be received or decoded. + ReceiveReply(msg Message) error +} + +// MultiRequestCtx is helper interface which allows to receive reply on multi-request context data +type MultiRequestCtx interface { + // ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). + // The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is + // set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data. + // Error will be returned if the response cannot be received or decoded. + ReceiveReply(msg Message) (lastReplyReceived bool, err error) } // VppRequest is a request that will be sent to VPP. @@ -131,238 +158,3 @@ type NotifSubscription struct { NotifChan chan Message // channel where notification messages will be delivered to MsgFactory func() Message // function that returns a new instance of the specific message that is expected as a notification } - -// RequestCtx is a context of a ongoing request (simple one - only one response is expected). -type RequestCtx struct { - ch *Channel - seqNum uint16 -} - -// MultiRequestCtx is a context of a ongoing multipart request (multiple responses are expected). -type MultiRequestCtx struct { - ch *Channel - seqNum uint16 -} - -const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout - -// NewChannelInternal returns a new channel structure. -// Note that this is just a raw channel not yet connected to VPP, it is not intended to be used directly. -// Use ChannelProvider to get an API channel ready for communication with VPP. -func NewChannelInternal(id uint16) *Channel { - return &Channel{ - ID: id, - replyTimeout: defaultReplyTimeout, - } -} - -// SetReplyTimeout sets the timeout for replies from VPP. It represents the maximum time the API waits for a reply -// from VPP before returning an error. -func (ch *Channel) SetReplyTimeout(timeout time.Duration) { - ch.replyTimeout = timeout -} - -// Close closes the API channel and releases all API channel-related resources in the ChannelProvider. -func (ch *Channel) Close() { - if ch.ReqChan != nil { - close(ch.ReqChan) - } -} - -// SendRequest asynchronously sends a request to VPP. Returns a request context, that can be used to call ReceiveReply. -// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). -func (ch *Channel) SendRequest(msg Message) *RequestCtx { - ch.lastSeqNum++ - ch.ReqChan <- &VppRequest{ - Message: msg, - SeqNum: ch.lastSeqNum, - } - return &RequestCtx{ch: ch, seqNum: ch.lastSeqNum} -} - -// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). -// The reply will be decoded into the msg argument. Error will be returned if the response cannot be received or decoded. -func (req *RequestCtx) ReceiveReply(msg Message) error { - if req == nil || req.ch == nil { - return errors.New("invalid request context") - } - - lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) - - if lastReplyReceived { - err = errors.New("multipart reply recieved while a simple reply expected") - } - return err -} - -// SendMultiRequest asynchronously sends a multipart request (request to which multiple responses are expected) to VPP. -// Returns a multipart request context, that can be used to call ReceiveReply. -// In case of any errors by sending, the error will be delivered to ReplyChan (and returned by ReceiveReply). -func (ch *Channel) SendMultiRequest(msg Message) *MultiRequestCtx { - ch.lastSeqNum++ - ch.ReqChan <- &VppRequest{ - Message: msg, - Multipart: true, - SeqNum: ch.lastSeqNum, - } - return &MultiRequestCtx{ch: ch, seqNum: ch.lastSeqNum} -} - -// ReceiveReply receives a reply from VPP (blocks until a reply is delivered from VPP, or until an error occurs). -// The reply will be decoded into the msg argument. If the last reply has been already consumed, lastReplyReceived is -// set to true. Do not use the message itself if lastReplyReceived is true - it won't be filled with actual data. -// Error will be returned if the response cannot be received or decoded. -func (req *MultiRequestCtx) ReceiveReply(msg Message) (lastReplyReceived bool, err error) { - if req == nil || req.ch == nil { - return false, errors.New("invalid request context") - } - - return req.ch.receiveReplyInternal(msg, req.seqNum) -} - -// receiveReplyInternal receives a reply from the reply channel into the provided msg structure. -func (ch *Channel) receiveReplyInternal(msg Message, expSeqNum uint16) (lastReplyReceived bool, err error) { - var ignore bool - if msg == nil { - return false, errors.New("nil message passed in") - } - - if ch.delayedReply != nil { - // try the delayed reply - vppReply := ch.delayedReply - ch.delayedReply = nil - ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) - if !ignore { - return lastReplyReceived, err - } - } - - timer := time.NewTimer(ch.replyTimeout) - for { - select { - // blocks until a reply comes to ReplyChan or until timeout expires - case vppReply := <-ch.ReplyChan: - ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) - if ignore { - continue - } - return lastReplyReceived, err - - case <-timer.C: - err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout) - return false, err - } - } - return -} - -func (ch *Channel) processReply(reply *VppReply, expSeqNum uint16, msg Message) (ignore bool, lastReplyReceived bool, err error) { - // check the sequence number - cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum) - if cmpSeqNums == -1 { - // reply received too late, ignore the message - logrus.WithField("sequence-number", reply.SeqNum).Warn( - "Received reply to an already closed binary API request") - ignore = true - return - } - if cmpSeqNums == 1 { - ch.delayedReply = reply - err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum) - return - } - - if reply.Error != nil { - err = reply.Error - return - } - if reply.LastReplyReceived { - lastReplyReceived = true - return - } - - // message checks - var expMsgID uint16 - expMsgID, err = ch.MsgIdentifier.GetMessageID(msg) - if err != nil { - err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", - msg.GetMessageName(), msg.GetCrcString()) - return - } - - if reply.MessageID != expMsgID { - var msgNameCrc string - if nameCrc, err := ch.MsgIdentifier.LookupByID(reply.MessageID); err != nil { - msgNameCrc = err.Error() - } else { - msgNameCrc = nameCrc - } - - err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) "+ - "(check if multiple goroutines are not sharing single GoVPP channel)", - reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc) - return - } - - // decode the message - err = ch.MsgDecoder.DecodeMsg(reply.Data, msg) - return -} - -// compareSeqNumbers returns -1, 0, 1 if sequence number precedes, equals to, -// or succeeds seq. number . -// Since sequence numbers cycle in the finite set of size 2^16, the function -// must assume that the distance between compared sequence numbers is less than -// (2^16)/2 to determine the order. -func compareSeqNumbers(seqNum1, seqNum2 uint16) int { - // calculate distance from seqNum1 to seqNum2 - var dist uint16 - if seqNum1 <= seqNum2 { - dist = seqNum2 - seqNum1 - } else { - dist = 0xffff - (seqNum1 - seqNum2 - 1) - } - if dist == 0 { - return 0 - } else if dist <= 0x8000 { - return -1 - } - return 1 -} - -// SubscribeNotification subscribes for receiving of the specified notification messages via provided Go channel. -// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's -// buffer is full, the notifications will not be delivered into it. -func (ch *Channel) SubscribeNotification(notifChan chan Message, msgFactory func() Message) (*NotifSubscription, error) { - subscription := &NotifSubscription{ - NotifChan: notifChan, - MsgFactory: msgFactory, - } - ch.NotifSubsChan <- &NotifSubscribeRequest{ - Subscription: subscription, - Subscribe: true, - } - return subscription, <-ch.NotifSubsReplyChan -} - -// UnsubscribeNotification unsubscribes from receiving the notifications tied to the provided notification subscription. -func (ch *Channel) UnsubscribeNotification(subscription *NotifSubscription) error { - ch.NotifSubsChan <- &NotifSubscribeRequest{ - Subscription: subscription, - Subscribe: false, - } - return <-ch.NotifSubsReplyChan -} - -// CheckMessageCompatibility checks whether provided messages are compatible with the version of VPP -// which the library is connected to. -func (ch *Channel) CheckMessageCompatibility(messages ...Message) error { - for _, msg := range messages { - _, err := ch.MsgIdentifier.GetMessageID(msg) - if err != nil { - return fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", - msg.GetMessageName(), msg.GetCrcString()) - } - } - return nil -} diff --git a/api/doc.go b/api/doc.go index e1abffe..044a820 100644 --- a/api/doc.go +++ b/api/doc.go @@ -1,94 +1,2 @@ -// Package api provides API for communication with govpp core using Go channels, -// without the need of importing the govpp core package itself. -// -// The API offers two ways of communication with govpp core: using Go channels, or using convenient function -// wrappers over the Go channels. The latter should be sufficient for most of the use cases. -// -// The entry point to the API is the Channel structure, that can be obtained from the existing connection using -// the NewAPIChannel or NewAPIChannelBuffered functions: -// -// conn, err := govpp.Connect() -// if err != nil { -// // handle error! -// } -// defer conn.Disconnect() -// -// ch, err := conn.NewAPIChannel() -// if err != nil { -// // handle error! -// } -// defer ch.Close() -// -// -// Simple Request-Reply API -// -// The simple version of the API is based on blocking SendRequest / ReceiveReply calls, where a single request -// message is sent to VPP and a single reply message is filled in when the reply comes from VPP: -// -// req := &acl.ACLPluginGetVersion{} -// reply := &acl.ACLPluginGetVersionReply{} -// -// err := ch.SendRequest(req).ReceiveReply(reply) -// // process the reply -// -// Note that if the reply message type that comes from VPP does not match with provided one, you'll get an error. -// -// -// Multipart Reply API -// -// If multiple messages are expected as a reply to a request, SendMultiRequest API must be used: -// -// req := &interfaces.SwInterfaceDump{} -// reqCtx := ch.SendMultiRequest(req) -// -// for { -// reply := &interfaces.SwInterfaceDetails{} -// stop, err := reqCtx.ReceiveReply(reply) -// if stop { -// break // break out of the loop -// } -// // process the reply -// } -// -// Note that if the last reply has been already consumed, stop boolean return value is set to true. -// Do not use the message itself if stop is true - it won't be filled with actual data. -// -// -// Go Channels API -// -// The blocking API introduced above may be not sufficient for some management applications that strongly -// rely on usage of Go channels. In this case, the API allows to access the underlying Go channels directly, e.g. -// the following replacement of the SendRequest / ReceiveReply API: -// -// req := &acl.ACLPluginGetVersion{} -// // send the request to the request go channel -// ch.ReqChan <- &api.VppRequest{Message: req} -// -// // receive a reply from the reply go channel -// vppReply := <-ch.ReplyChan -// -// // decode the message -// reply := &acl.ACLPluginGetVersionReply{} -// err := ch.MsgDecoder.DecodeMsg(vppReply.Data, reply) -// -// // process the reply -// -// -// Notifications API -// -// to subscribe for receiving of the specified notification messages via provided Go channel, use the -// SubscribeNotification API: -// -// // subscribe for specific notification message -// notifChan := make(chan api.Message, 100) -// subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags) -// -// // receive one notification -// notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags) -// -// ch.UnsubscribeNotification(subs) -// -// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's -// buffer is full, the notifications will not be delivered into it. -// +// Package api defines interfaces required by every file generated with binapi-generator package api diff --git a/codec/doc.go b/codec/doc.go new file mode 100644 index 0000000..eb18e15 --- /dev/null +++ b/codec/doc.go @@ -0,0 +1,2 @@ +// Package codec provides methods allowing to encode and decode message structs to/from binary format accepted by VPP. +package codec diff --git a/core/msg_codec.go b/codec/msg_codec.go similarity index 82% rename from core/msg_codec.go rename to codec/msg_codec.go index e32916b..7ba8771 100644 --- a/core/msg_codec.go +++ b/codec/msg_codec.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package core +package codec import ( "bytes" @@ -20,10 +20,8 @@ import ( "fmt" "reflect" - "github.com/lunixbochs/struc" - logger "github.com/sirupsen/logrus" - "git.fd.io/govpp.git/api" + "github.com/lunixbochs/struc" ) // MsgCodec provides encoding and decoding functionality of `api.Message` structs into/from @@ -82,22 +80,14 @@ func (*MsgCodec) EncodeMsg(msg api.Message, msgID uint16) ([]byte, error) { } err := struc.Pack(buf, header) if err != nil { - log.WithFields(logger.Fields{ - "error": err, - "header": header, - }).Error("Unable to encode the message header: ", err) - return nil, fmt.Errorf("unable to encode the message header: %v", err) + return nil, fmt.Errorf("unable to encode message: header: %v, error %v", header, err) } // encode message content if reflect.Indirect(reflect.ValueOf(msg)).NumField() > 0 { err := struc.Pack(buf, msg) if err != nil { - log.WithFields(logger.Fields{ - "error": err, - "message": msg, - }).Error("Unable to encode the message: ", err) - return nil, fmt.Errorf("unable to encode the message: %v", err) + return nil, fmt.Errorf("unable to encode message: header %v, error %v", header, err) } } @@ -127,11 +117,7 @@ func (*MsgCodec) DecodeMsg(data []byte, msg api.Message) error { // decode message header err := struc.Unpack(buf, header) if err != nil { - log.WithFields(logger.Fields{ - "error": err, - "data": data, - }).Error("Unable to decode header of the message.") - return fmt.Errorf("unable to decode the message header: %v", err) + return fmt.Errorf("unable to decode message: data %v, error %v", data, err) } // get rid of the message header @@ -148,11 +134,7 @@ func (*MsgCodec) DecodeMsg(data []byte, msg api.Message) error { // decode message content err = struc.Unpack(buf, msg) if err != nil { - log.WithFields(logger.Fields{ - "error": err, - "data": buf, - }).Error("Unable to decode the message.") - return fmt.Errorf("unable to decode the message: %v", err) + return fmt.Errorf("unable to decode message: data %v, error %v", data, err) } return nil diff --git a/core/channel.go b/core/channel.go new file mode 100644 index 0000000..87b3e29 --- /dev/null +++ b/core/channel.go @@ -0,0 +1,276 @@ +// Copyright (c) 2018 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "fmt" + "time" + + "errors" + + "git.fd.io/govpp.git/api" + "github.com/sirupsen/logrus" +) + +const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout + +// requestCtxData is a context of a ongoing request (simple one - only one response is expected). +type requestCtxData struct { + ch *channel + seqNum uint16 +} + +// multiRequestCtxData is a context of a ongoing multipart request (multiple responses are expected). +type multiRequestCtxData struct { + ch *channel + seqNum uint16 +} + +func (req *requestCtxData) ReceiveReply(msg api.Message) error { + if req == nil || req.ch == nil { + return errors.New("invalid request context") + } + + lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum) + + if lastReplyReceived { + err = errors.New("multipart reply recieved while a simple reply expected") + } + return err +} + +func (req *multiRequestCtxData) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) { + if req == nil || req.ch == nil { + return false, errors.New("invalid request context") + } + + return req.ch.receiveReplyInternal(msg, req.seqNum) +} + +// channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests +// to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels +// via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines +// concurrently, otherwise the responses could mix! Use multiple channels instead. +type channel struct { + id uint16 // channel ID + + reqChan chan *api.VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider + replyChan chan *api.VppReply // channel where VPP replies are delivered to + + notifSubsChan chan *api.NotifSubscribeRequest // channel for sending notification subscribe requests + notifSubsReplyChan chan error // channel where replies to notification subscribe requests are delivered to + + msgDecoder api.MessageDecoder // used to decode binary data to generated API messages + msgIdentifier api.MessageIdentifier // used to retrieve message ID of a message + + lastSeqNum uint16 // sequence number of the last sent request + + delayedReply *api.VppReply // reply already taken from ReplyChan, buffered for later delivery + replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout +} + +func (ch *channel) SendRequest(msg api.Message) api.RequestCtx { + ch.lastSeqNum++ + ch.reqChan <- &api.VppRequest{ + Message: msg, + SeqNum: ch.lastSeqNum, + } + return &requestCtxData{ch: ch, seqNum: ch.lastSeqNum} +} + +func (ch *channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { + ch.lastSeqNum++ + ch.reqChan <- &api.VppRequest{ + Message: msg, + Multipart: true, + SeqNum: ch.lastSeqNum, + } + return &multiRequestCtxData{ch: ch, seqNum: ch.lastSeqNum} +} + +func (ch *channel) SubscribeNotification(notifChan chan api.Message, msgFactory func() api.Message) (*api.NotifSubscription, error) { + subscription := &api.NotifSubscription{ + NotifChan: notifChan, + MsgFactory: msgFactory, + } + ch.notifSubsChan <- &api.NotifSubscribeRequest{ + Subscription: subscription, + Subscribe: true, + } + return subscription, <-ch.notifSubsReplyChan +} + +func (ch *channel) UnsubscribeNotification(subscription *api.NotifSubscription) error { + ch.notifSubsChan <- &api.NotifSubscribeRequest{ + Subscription: subscription, + Subscribe: false, + } + return <-ch.notifSubsReplyChan +} + +func (ch *channel) CheckMessageCompatibility(messages ...api.Message) error { + for _, msg := range messages { + _, err := ch.msgIdentifier.GetMessageID(msg) + if err != nil { + return fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", + msg.GetMessageName(), msg.GetCrcString()) + } + } + return nil +} + +func (ch *channel) SetReplyTimeout(timeout time.Duration) { + ch.replyTimeout = timeout +} + +func (ch *channel) GetRequestChannel() chan<- *api.VppRequest { + return ch.reqChan +} + +func (ch *channel) GetReplyChannel() <-chan *api.VppReply { + return ch.replyChan +} + +func (ch *channel) GetNotificationChannel() chan<- *api.NotifSubscribeRequest { + return ch.notifSubsChan +} + +func (ch *channel) GetNotificationReplyChannel() <-chan error { + return ch.notifSubsReplyChan +} + +func (ch *channel) GetMessageDecoder() api.MessageDecoder { + return ch.msgDecoder +} + +func (ch *channel) GetID() uint16 { + return ch.id +} + +func (ch *channel) Close() { + if ch.reqChan != nil { + close(ch.reqChan) + } +} + +// receiveReplyInternal receives a reply from the reply channel into the provided msg structure. +func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) { + var ignore bool + if msg == nil { + return false, errors.New("nil message passed in") + } + + if ch.delayedReply != nil { + // try the delayed reply + vppReply := ch.delayedReply + ch.delayedReply = nil + ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) + if !ignore { + return lastReplyReceived, err + } + } + + timer := time.NewTimer(ch.replyTimeout) + for { + select { + // blocks until a reply comes to ReplyChan or until timeout expires + case vppReply := <-ch.replyChan: + ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) + if ignore { + continue + } + return lastReplyReceived, err + + case <-timer.C: + err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout) + return false, err + } + } + return +} + +func (ch *channel) processReply(reply *api.VppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) { + // check the sequence number + cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum) + if cmpSeqNums == -1 { + // reply received too late, ignore the message + logrus.WithField("sequence-number", reply.SeqNum).Warn( + "Received reply to an already closed binary API request") + ignore = true + return + } + if cmpSeqNums == 1 { + ch.delayedReply = reply + err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum) + return + } + + if reply.Error != nil { + err = reply.Error + return + } + if reply.LastReplyReceived { + lastReplyReceived = true + return + } + + // message checks + var expMsgID uint16 + expMsgID, err = ch.msgIdentifier.GetMessageID(msg) + if err != nil { + err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to", + msg.GetMessageName(), msg.GetCrcString()) + return + } + + if reply.MessageID != expMsgID { + var msgNameCrc string + if nameCrc, err := ch.msgIdentifier.LookupByID(reply.MessageID); err != nil { + msgNameCrc = err.Error() + } else { + msgNameCrc = nameCrc + } + + err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) "+ + "(check if multiple goroutines are not sharing single GoVPP channel)", + reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc) + return + } + + // decode the message + err = ch.msgDecoder.DecodeMsg(reply.Data, msg) + return +} + +// compareSeqNumbers returns -1, 0, 1 if sequence number precedes, equals to, +// or succeeds seq. number . +// Since sequence numbers cycle in the finite set of size 2^16, the function +// must assume that the distance between compared sequence numbers is less than +// (2^16)/2 to determine the order. +func compareSeqNumbers(seqNum1, seqNum2 uint16) int { + // calculate distance from seqNum1 to seqNum2 + var dist uint16 + if seqNum1 <= seqNum2 { + dist = seqNum2 - seqNum1 + } else { + dist = 0xffff - (seqNum1 - seqNum2 - 1) + } + if dist == 0 { + return 0 + } else if dist <= 0x8000 { + return -1 + } + return 1 +} diff --git a/api/api_test.go b/core/channel_test.go similarity index 97% rename from api/api_test.go rename to core/channel_test.go index 7cbd9f0..d573f29 100644 --- a/api/api_test.go +++ b/core/channel_test.go @@ -12,27 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -package api_test +package core import ( "testing" "time" "git.fd.io/govpp.git/adapter/mock" - "git.fd.io/govpp.git/api" - "git.fd.io/govpp.git/core" "git.fd.io/govpp.git/core/bin_api/vpe" "git.fd.io/govpp.git/examples/bin_api/interfaces" "git.fd.io/govpp.git/examples/bin_api/memif" "git.fd.io/govpp.git/examples/bin_api/tap" + "git.fd.io/govpp.git/api" . "github.com/onsi/gomega" ) type testCtx struct { mockVpp *mock.VppAdapter - conn *core.Connection - ch *api.Channel + conn *Connection + ch api.Channel } func setupTest(t *testing.T) *testCtx { @@ -43,7 +42,7 @@ func setupTest(t *testing.T) *testCtx { } var err error - ctx.conn, err = core.Connect(ctx.mockVpp) + ctx.conn, err = Connect(ctx.mockVpp) Expect(err).ShouldNot(HaveOccurred()) ctx.ch, err = ctx.conn.NewAPIChannel() @@ -196,7 +195,7 @@ func TestMultiRequestReplySwInterfaceTapDump(t *testing.T) { defer ctx.teardownTest() // mock reply - msgs := []api.Message{} + var msgs []api.Message for i := 1; i <= 10; i++ { msgs = append(msgs, &tap.SwInterfaceTapDetails{ SwIfIndex: uint32(i), @@ -225,7 +224,7 @@ func TestMultiRequestReplySwInterfaceMemifDump(t *testing.T) { defer ctx.teardownTest() // mock reply - msgs := []api.Message{} + var msgs []api.Message for i := 1; i <= 10; i++ { msgs = append(msgs, &memif.MemifDetails{ SwIfIndex: uint32(i), @@ -348,7 +347,7 @@ func TestSetReplyTimeoutMultiRequest(t *testing.T) { ctx.ch.SetReplyTimeout(time.Millisecond) - msgs := []api.Message{} + var msgs []api.Message for i := 1; i <= 3; i++ { msgs = append(msgs, &interfaces.SwInterfaceDetails{ SwIfIndex: uint32(i), @@ -392,19 +391,19 @@ func TestReceiveReplyNegative(t *testing.T) { defer ctx.teardownTest() // invalid context 1 - reqCtx1 := &api.RequestCtx{} + reqCtx1 := &requestCtxData{} err := reqCtx1.ReceiveReply(&vpe.ControlPingReply{}) Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("invalid request context")) // invalid context 2 - reqCtx2 := &api.MultiRequestCtx{} + reqCtx2 := &multiRequestCtxData{} _, err = reqCtx2.ReceiveReply(&vpe.ControlPingReply{}) Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("invalid request context")) // NU - reqCtx3 := &api.RequestCtx{} + reqCtx3 := &requestCtxData{} err = reqCtx3.ReceiveReply(nil) Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("invalid request context")) @@ -415,7 +414,7 @@ func TestMultiRequestDouble(t *testing.T) { defer ctx.teardownTest() // mock reply - msgs := []mock.MsgWithContext{} + var msgs []mock.MsgWithContext for i := 1; i <= 3; i++ { msgs = append(msgs, mock.MsgWithContext{ Msg: &interfaces.SwInterfaceDetails{ @@ -543,7 +542,7 @@ func TestReceiveReplyAfterTimeoutMultiRequest(t *testing.T) { Expect(cnt).To(BeEquivalentTo(0)) // simulating late replies - msgs := []mock.MsgWithContext{} + var msgs []mock.MsgWithContext for i := 1; i <= 3; i++ { msgs = append(msgs, mock.MsgWithContext{ Msg: &interfaces.SwInterfaceDetails{ diff --git a/core/core.go b/core/connection.go similarity index 90% rename from core/core.go rename to core/connection.go index 052eb0b..a44d0c4 100644 --- a/core/core.go +++ b/core/connection.go @@ -27,6 +27,7 @@ import ( "git.fd.io/govpp.git/adapter" "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/codec" "git.fd.io/govpp.git/core/bin_api/vpe" ) @@ -71,13 +72,13 @@ type ConnectionEvent struct { type Connection struct { vpp adapter.VppAdapter // VPP adapter connected uint32 // non-zero if the adapter is connected to VPP - codec *MsgCodec // message codec + codec *codec.MsgCodec // message codec msgIDsLock sync.RWMutex // lock for the message IDs map msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC - channelsLock sync.RWMutex // lock for the channels map - channels map[uint16]*api.Channel // map of all API channels indexed by the channel ID + channelsLock sync.RWMutex // lock for the channels map + channels map[uint16]*channel // map of all API channels indexed by the channel ID notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID @@ -197,8 +198,8 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { conn = &Connection{ vpp: vppAdapter, - codec: &MsgCodec{}, - channels: make(map[uint16]*api.Channel), + codec: &codec.MsgCodec{}, + channels: make(map[uint16]*channel), msgIDs: make(map[string]uint16), notifSubscriptions: make(map[uint16][]*api.NotifSubscription), } @@ -268,7 +269,7 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) { // it continues with connectLoop and tries to reconnect. func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { // create a separate API channel for health check probes - ch, err := conn.NewAPIChannelBuffered(1, 1) + ch, err := conn.newAPIChannelBuffered(1, 1) if err != nil { log.Error("Failed to create health check API channel, health check will be disabled:", err) return @@ -290,18 +291,18 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { // try draining probe replies from previous request before sending next one select { - case <-ch.ReplyChan: + case <-ch.replyChan: log.Debug("drained old probe reply from reply channel") default: } // send the control ping request - ch.ReqChan <- &api.VppRequest{Message: msgControlPing} + ch.reqChan <- &api.VppRequest{Message: msgControlPing} for { // expect response within timeout period select { - case vppReply := <-ch.ReplyChan: + case vppReply := <-ch.replyChan: err = vppReply.Error case <-time.After(healthCheckReplyTimeout): @@ -349,32 +350,34 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { c.connectLoop(connChan) } -// NewAPIChannel returns a new API channel for communication with VPP via govpp core. -// It uses default buffer sizes for the request and reply Go channels. -func (c *Connection) NewAPIChannel() (*api.Channel, error) { - if c == nil { - return nil, errors.New("nil connection passed in") - } - return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize) +func (c *Connection) NewAPIChannel() (api.Channel, error) { + return c.newAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize) +} + +func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) { + return c.newAPIChannelBuffered(reqChanBufSize, replyChanBufSize) } // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core. // It allows to specify custom buffer sizes for the request and reply Go channels. -func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) { +func (c *Connection) newAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*channel, error) { if c == nil { return nil, errors.New("nil connection passed in") } chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) - ch := api.NewChannelInternal(chID) - ch.MsgDecoder = c.codec - ch.MsgIdentifier = c + ch := &channel{ + id: chID, + replyTimeout: defaultReplyTimeout, + } + ch.msgDecoder = c.codec + ch.msgIdentifier = c // create the communication channels - ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize) - ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize) - ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize) - ch.NotifSubsReplyChan = make(chan error, replyChanBufSize) + ch.reqChan = make(chan *api.VppRequest, reqChanBufSize) + ch.replyChan = make(chan *api.VppReply, replyChanBufSize) + ch.notifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize) + ch.notifSubsReplyChan = make(chan error, replyChanBufSize) // store API channel within the client c.channelsLock.Lock() @@ -388,13 +391,13 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) } // releaseAPIChannel releases API channel that needs to be closed. -func (c *Connection) releaseAPIChannel(ch *api.Channel) { +func (c *Connection) releaseAPIChannel(ch *channel) { log.WithFields(logger.Fields{ - "ID": ch.ID, + "ID": ch.id, }).Debug("API channel closed.") // delete the channel from channels map c.channelsLock.Lock() - delete(c.channels, ch.ID) + delete(c.channels, ch.id) c.channelsLock.Unlock() } diff --git a/core/core_test.go b/core/connection_test.go similarity index 90% rename from core/core_test.go rename to core/connection_test.go index e4fbf63..b7c3aa0 100644 --- a/core/core_test.go +++ b/core/connection_test.go @@ -24,13 +24,14 @@ import ( "git.fd.io/govpp.git/examples/bin_api/interfaces" "git.fd.io/govpp.git/examples/bin_api/stats" + "git.fd.io/govpp.git/codec" . "github.com/onsi/gomega" ) type testCtx struct { mockVpp *mock.VppAdapter conn *core.Connection - ch *api.Channel + ch api.Channel } func setupTest(t *testing.T, bufferedChan bool) *testCtx { @@ -68,14 +69,14 @@ func TestSimpleRequest(t *testing.T) { reply := &vpe.ControlPingReply{} // send the request and receive a reply - ctx.ch.ReqChan <- &api.VppRequest{Message: req} - vppReply := <-ctx.ch.ReplyChan + ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: req} + vppReply := <-ctx.ch.GetReplyChannel() Expect(vppReply).ShouldNot(BeNil()) Expect(vppReply.Error).ShouldNot(HaveOccurred()) // decode the message - err := ctx.ch.MsgDecoder.DecodeMsg(vppReply.Data, reply) + err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply) Expect(err).ShouldNot(HaveOccurred()) Expect(reply.Retval).To(BeEquivalentTo(-5)) @@ -93,12 +94,12 @@ func TestMultiRequest(t *testing.T) { ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) // send multipart request - ctx.ch.ReqChan <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true} + ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &interfaces.SwInterfaceDump{}, Multipart: true} cnt := 0 for { // receive a reply - vppReply := <-ctx.ch.ReplyChan + vppReply := <-ctx.ch.GetReplyChannel() if vppReply.LastReplyReceived { break // break out of the loop } @@ -106,7 +107,7 @@ func TestMultiRequest(t *testing.T) { // decode the message reply := &interfaces.SwInterfaceDetails{} - err := ctx.ch.MsgDecoder.DecodeMsg(vppReply.Data, reply) + err := ctx.ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply) Expect(err).ShouldNot(HaveOccurred()) cnt++ } @@ -124,11 +125,11 @@ func TestNotifications(t *testing.T) { NotifChan: notifChan, MsgFactory: interfaces.NewSwInterfaceSetFlags, } - ctx.ch.NotifSubsChan <- &api.NotifSubscribeRequest{ + ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{ Subscription: subscription, Subscribe: true, } - err := <-ctx.ch.NotifSubsReplyChan + err := <-ctx.ch.GetNotificationReplyChannel() Expect(err).ShouldNot(HaveOccurred()) // mock the notification and force its delivery @@ -144,11 +145,11 @@ func TestNotifications(t *testing.T) { Expect(notif.SwIfIndex).To(BeEquivalentTo(3)) // unsubscribe notification - ctx.ch.NotifSubsChan <- &api.NotifSubscribeRequest{ + ctx.ch.GetNotificationChannel() <- &api.NotifSubscribeRequest{ Subscription: subscription, Subscribe: false, } - err = <-ctx.ch.NotifSubsReplyChan + err = <-ctx.ch.GetNotificationReplyChannel() Expect(err).ShouldNot(HaveOccurred()) } @@ -207,15 +208,15 @@ func TestFullBuffer(t *testing.T) { // send multiple requests, only one reply should be read for i := 0; i < 20; i++ { ctx.mockVpp.MockReply(&vpe.ControlPingReply{}) - ctx.ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}} + ctx.ch.GetRequestChannel() <- &api.VppRequest{Message: &vpe.ControlPing{}} } - vppReply := <-ctx.ch.ReplyChan + vppReply := <-ctx.ch.GetReplyChannel() Expect(vppReply).ShouldNot(BeNil()) var received bool select { - case <-ctx.ch.ReplyChan: + case <-ctx.ch.GetReplyChannel(): received = true // this should not happen default: received = false // no reply to be received @@ -226,35 +227,35 @@ func TestFullBuffer(t *testing.T) { func TestCodec(t *testing.T) { RegisterTestingT(t) - codec := &core.MsgCodec{} + msgCodec := &codec.MsgCodec{} // request - data, err := codec.EncodeMsg(&interfaces.CreateLoopback{MacAddress: []byte{1, 2, 3, 4, 5, 6}}, 11) + data, err := msgCodec.EncodeMsg(&interfaces.CreateLoopback{MacAddress: []byte{1, 2, 3, 4, 5, 6}}, 11) Expect(err).ShouldNot(HaveOccurred()) Expect(data).ShouldNot(BeEmpty()) msg1 := &interfaces.CreateLoopback{} - err = codec.DecodeMsg(data, msg1) + err = msgCodec.DecodeMsg(data, msg1) Expect(err).ShouldNot(HaveOccurred()) Expect(msg1.MacAddress).To(BeEquivalentTo([]byte{1, 2, 3, 4, 5, 6})) // reply - data, err = codec.EncodeMsg(&vpe.ControlPingReply{Retval: 55}, 22) + data, err = msgCodec.EncodeMsg(&vpe.ControlPingReply{Retval: 55}, 22) Expect(err).ShouldNot(HaveOccurred()) Expect(data).ShouldNot(BeEmpty()) msg2 := &vpe.ControlPingReply{} - err = codec.DecodeMsg(data, msg2) + err = msgCodec.DecodeMsg(data, msg2) Expect(err).ShouldNot(HaveOccurred()) Expect(msg2.Retval).To(BeEquivalentTo(55)) // other - data, err = codec.EncodeMsg(&stats.VnetIP4FibCounters{VrfID: 77}, 33) + data, err = msgCodec.EncodeMsg(&stats.VnetIP4FibCounters{VrfID: 77}, 33) Expect(err).ShouldNot(HaveOccurred()) Expect(data).ShouldNot(BeEmpty()) msg3 := &stats.VnetIP4FibCounters{} - err = codec.DecodeMsg(data, msg3) + err = msgCodec.DecodeMsg(data, msg3) Expect(err).ShouldNot(HaveOccurred()) Expect(msg3.VrfID).To(BeEquivalentTo(77)) } @@ -262,21 +263,21 @@ func TestCodec(t *testing.T) { func TestCodecNegative(t *testing.T) { RegisterTestingT(t) - codec := &core.MsgCodec{} + msgCodec := &codec.MsgCodec{} // nil message for encoding - data, err := codec.EncodeMsg(nil, 15) + data, err := msgCodec.EncodeMsg(nil, 15) Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("nil message")) Expect(data).Should(BeNil()) // nil message for decoding - err = codec.DecodeMsg(data, nil) + err = msgCodec.DecodeMsg(data, nil) Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("nil message")) // nil data for decoding - err = codec.DecodeMsg(nil, &vpe.ControlPingReply{}) + err = msgCodec.DecodeMsg(nil, &vpe.ControlPingReply{}) Expect(err).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("EOF")) } @@ -285,7 +286,7 @@ func TestSimpleRequestsWithSequenceNumbers(t *testing.T) { ctx := setupTest(t, false) defer ctx.teardownTest() - var reqCtx []*api.RequestCtx + var reqCtx []api.RequestCtx for i := 0; i < 10; i++ { ctx.mockVpp.MockReply(&vpe.ControlPingReply{Retval: int32(i)}) req := &vpe.ControlPing{} @@ -524,7 +525,7 @@ func TestCycleOverSetOfSequenceNumbers(t *testing.T) { defer ctx.teardownTest() numIters := 0xffff + 100 - reqCtx := make(map[int]*api.RequestCtx) + reqCtx := make(map[int]api.RequestCtx) for i := 0; i < numIters+30; /* receiver is 30 reqs behind */ i++ { if i < numIters { diff --git a/core/doc.go b/core/doc.go index a4ecd50..5b0b40e 100644 --- a/core/doc.go +++ b/core/doc.go @@ -17,4 +17,95 @@ // defer ch.Close() // // Note that one application can open only one connection, that can serve multiple API channels. +// +// The API offers two ways of communication with govpp core: using Go channels, or using convenient function +// wrappers over the Go channels. The latter should be sufficient for most of the use cases. +// +// The entry point to the API is the Channel structure, that can be obtained from the existing connection using +// the NewAPIChannel or NewAPIChannelBuffered functions: +// +// conn, err := govpp.Connect() +// if err != nil { +// // handle error! +// } +// defer conn.Disconnect() +// +// ch, err := conn.NewAPIChannel() +// if err != nil { +// // handle error! +// } +// defer ch.Close() +// +// +// Simple Request-Reply API +// +// The simple version of the API is based on blocking SendRequest / ReceiveReply calls, where a single request +// message is sent to VPP and a single reply message is filled in when the reply comes from VPP: +// +// req := &acl.ACLPluginGetVersion{} +// reply := &acl.ACLPluginGetVersionReply{} +// +// err := ch.SendRequest(req).ReceiveReply(reply) +// // process the reply +// +// Note that if the reply message type that comes from VPP does not match with provided one, you'll get an error. +// +// +// Multipart Reply API +// +// If multiple messages are expected as a reply to a request, SendMultiRequest API must be used: +// +// req := &interfaces.SwInterfaceDump{} +// reqCtx := ch.SendMultiRequest(req) +// +// for { +// reply := &interfaces.SwInterfaceDetails{} +// stop, err := reqCtx.ReceiveReply(reply) +// if stop { +// break // break out of the loop +// } +// // process the reply +// } +// +// Note that if the last reply has been already consumed, stop boolean return value is set to true. +// Do not use the message itself if stop is true - it won't be filled with actual data. +// +// +// Go Channels API +// +// The blocking API introduced above may be not sufficient for some management applications that strongly +// rely on usage of Go channels. In this case, the API allows to access the underlying Go channels directly, e.g. +// the following replacement of the SendRequest / ReceiveReply API: +// +// req := &acl.ACLPluginGetVersion{} +// // send the request to the request go channel +// ch.GetRequestChannel <- &api.VppRequest{Message: req} +// +// // receive a reply from the reply go channel +// vppReply := <-ch.GetReplyChannel +// +// // decode the message +// reply := &acl.ACLPluginGetVersionReply{} +// err := ch.MsgDecoder.DecodeMsg(vppReply.Data, reply) +// +// // process the reply +// +// +// Notifications API +// +// to subscribe for receiving of the specified notification messages via provided Go channel, use the +// SubscribeNotification API: +// +// // subscribe for specific notification message +// notifChan := make(chan api.Message, 100) +// subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags) +// +// // receive one notification +// notif := (<-notifChan).(*interfaces.SwInterfaceSetFlags) +// +// ch.UnsubscribeNotification(subs) +// +// Note that the caller is responsible for creating the Go channel with preferred buffer size. If the channel's +// buffer is full, the notifications will not be delivered into it. +// package core diff --git a/core/notification_handler.go b/core/notification_handler.go index 89c16a4..c0e8687 100644 --- a/core/notification_handler.go +++ b/core/notification_handler.go @@ -18,13 +18,12 @@ import ( "fmt" "reflect" - logger "github.com/sirupsen/logrus" - "git.fd.io/govpp.git/api" + logger "github.com/sirupsen/logrus" ) // processNotifSubscribeRequest processes a notification subscribe request. -func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.NotifSubscribeRequest) error { +func (c *Connection) processNotifSubscribeRequest(ch *channel, req *api.NotifSubscribeRequest) error { var err error // subscribe / unsubscribe @@ -36,7 +35,7 @@ func (c *Connection) processNotifSubscribeRequest(ch *api.Channel, req *api.Noti // send the reply into the go channel select { - case ch.NotifSubsReplyChan <- err: + case ch.notifSubsReplyChan <- err: // reply sent successfully default: // unable to write into the channel without blocking diff --git a/core/request_handler.go b/core/request_handler.go index 3bec38d..8681963 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -31,10 +31,10 @@ var ( ) // watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *api.Channel) { +func (c *Connection) watchRequests(ch *channel) { for { select { - case req, ok := <-ch.ReqChan: + case req, ok := <-ch.reqChan: // new request on the request channel if !ok { // after closing the request channel, release API channel and return @@ -43,7 +43,7 @@ func (c *Connection) watchRequests(ch *api.Channel) { } c.processRequest(ch, req) - case req := <-ch.NotifSubsChan: + case req := <-ch.notifSubsChan: // new request on the notification subscribe channel c.processNotifSubscribeRequest(ch, req) } @@ -51,7 +51,7 @@ func (c *Connection) watchRequests(ch *api.Channel) { } // processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error { +func (c *Connection) processRequest(ch *channel, req *api.VppRequest) error { // check whether we are connected to VPP if atomic.LoadUint32(&c.connected) == 0 { err := ErrNotConnected @@ -78,7 +78,7 @@ func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error if err != nil { err = fmt.Errorf("unable to encode the messge: %v", err) log.WithFields(logger.Fields{ - "channel": ch.ID, + "channel": ch.id, "msg_id": msgID, "seq_num": req.SeqNum, }).Error(err) @@ -88,7 +88,7 @@ func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ - "channel": ch.ID, + "channel": ch.id, "msg_id": msgID, "msg_size": len(data), "msg_name": req.Message.GetMessageName(), @@ -97,7 +97,7 @@ func (c *Connection) processRequest(ch *api.Channel, req *api.VppRequest) error } // send the request to VPP - context := packRequestContext(ch.ID, req.Multipart, req.SeqNum) + context := packRequestContext(ch.id, req.Multipart, req.SeqNum) err = c.vpp.SendMsg(context, data) if err != nil { err = fmt.Errorf("unable to send the message: %v", err) @@ -189,9 +189,9 @@ func msgCallback(context uint32, msgID uint16, data []byte) { // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise // it logs the error and do not send the message. -func sendReply(ch *api.Channel, reply *api.VppReply) { +func sendReply(ch *channel, reply *api.VppReply) { select { - case ch.ReplyChan <- reply: + case ch.replyChan <- reply: // reply sent successfully case <-time.After(time.Millisecond * 100): // receiver still not ready diff --git a/examples/cmd/perf-bench/perf-bench.go b/examples/cmd/perf-bench/perf-bench.go index f3ff752..b1f4dcf 100644 --- a/examples/cmd/perf-bench/perf-bench.go +++ b/examples/cmd/perf-bench/perf-bench.go @@ -95,7 +95,7 @@ func main() { fmt.Printf("Requests per second: %.0f\n", float64(cnt)/elapsed.Seconds()) } -func syncTest(ch *api.Channel, cnt int) { +func syncTest(ch api.Channel, cnt int) { fmt.Printf("Running synchronous perf test with %d requests...\n", cnt) for i := 0; i < cnt; i++ { @@ -110,7 +110,7 @@ func syncTest(ch *api.Channel, cnt int) { } } -func asyncTest(ch *api.Channel, cnt int) { +func asyncTest(ch api.Channel, cnt int) { fmt.Printf("Running asynchronous perf test with %d requests...\n", cnt) // start a new go routine that reads the replies @@ -125,20 +125,20 @@ func asyncTest(ch *api.Channel, cnt int) { wg.Wait() } -func sendAsyncRequests(ch *api.Channel, cnt int) { +func sendAsyncRequests(ch api.Channel, cnt int) { for i := 0; i < cnt; i++ { - ch.ReqChan <- &api.VppRequest{ + ch.GetRequestChannel() <- &api.VppRequest{ Message: &vpe.ControlPing{}, } } } -func readAsyncReplies(ch *api.Channel, expectedCnt int, wg *sync.WaitGroup) { +func readAsyncReplies(ch api.Channel, expectedCnt int, wg *sync.WaitGroup) { cnt := 0 for { // receive a reply - reply := <-ch.ReplyChan + reply := <-ch.GetReplyChannel() if reply.Error != nil { log.Println("Error in reply:", reply.Error) os.Exit(1) @@ -146,7 +146,7 @@ func readAsyncReplies(ch *api.Channel, expectedCnt int, wg *sync.WaitGroup) { // decode the message msg := &vpe.ControlPingReply{} - err := ch.MsgDecoder.DecodeMsg(reply.Data, msg) + err := ch.GetMessageDecoder().DecodeMsg(reply.Data, msg) if reply.Error != nil { log.Println("Error by decoding:", err) os.Exit(1) @@ -159,4 +159,4 @@ func readAsyncReplies(ch *api.Channel, expectedCnt int, wg *sync.WaitGroup) { return } } -} \ No newline at end of file +} diff --git a/examples/cmd/simple-client/simple_client.go b/examples/cmd/simple-client/simple_client.go index 67dc14b..7b7dbcd 100644 --- a/examples/cmd/simple-client/simple_client.go +++ b/examples/cmd/simple-client/simple_client.go @@ -66,7 +66,7 @@ func main() { // compatibilityCheck shows how an management application can check whether generated API messages are // compatible with the version of VPP which the library is connected to. -func compatibilityCheck(ch *api.Channel) { +func compatibilityCheck(ch api.Channel) { err := ch.CheckMessageCompatibility( &interfaces.SwInterfaceDump{}, &interfaces.SwInterfaceDetails{}, @@ -78,7 +78,7 @@ func compatibilityCheck(ch *api.Channel) { } // aclVersion is the simplest API example - one empty request message and one reply message. -func aclVersion(ch *api.Channel) { +func aclVersion(ch api.Channel) { req := &acl.ACLPluginGetVersion{} reply := &acl.ACLPluginGetVersionReply{} @@ -92,7 +92,7 @@ func aclVersion(ch *api.Channel) { } // aclConfig is another simple API example - in this case, the request contains structured data. -func aclConfig(ch *api.Channel) { +func aclConfig(ch api.Channel) { req := &acl.ACLAddReplace{ ACLIndex: ^uint32(0), Tag: []byte("access list 1"), @@ -127,7 +127,7 @@ func aclConfig(ch *api.Channel) { } // aclDump shows an example where SendRequest and ReceiveReply are not chained together. -func aclDump(ch *api.Channel) { +func aclDump(ch api.Channel) { req := &acl.ACLDump{} reply := &acl.ACLDetails{} @@ -143,17 +143,17 @@ func aclDump(ch *api.Channel) { // tapConnect example shows how the Go channels in the API channel can be accessed directly instead // of using SendRequest and ReceiveReply wrappers. -func tapConnect(ch *api.Channel) { +func tapConnect(ch api.Channel) { req := &tap.TapConnect{ TapName: []byte("testtap"), UseRandomMac: 1, } // send the request to the request go channel - ch.ReqChan <- &api.VppRequest{Message: req} + ch.GetRequestChannel() <- &api.VppRequest{Message: req} // receive a reply from the reply go channel - vppReply := <-ch.ReplyChan + vppReply := <-ch.GetReplyChannel() if vppReply.Error != nil { fmt.Println("Error:", vppReply.Error) return @@ -161,7 +161,7 @@ func tapConnect(ch *api.Channel) { // decode the message reply := &tap.TapConnectReply{} - err := ch.MsgDecoder.DecodeMsg(vppReply.Data, reply) + err := ch.GetMessageDecoder().DecodeMsg(vppReply.Data, reply) if err != nil { fmt.Println("Error:", err) @@ -171,7 +171,7 @@ func tapConnect(ch *api.Channel) { } // interfaceDump shows an example of multipart request (multiple replies are expected). -func interfaceDump(ch *api.Channel) { +func interfaceDump(ch api.Channel) { req := &interfaces.SwInterfaceDump{} reqCtx := ch.SendMultiRequest(req) @@ -191,7 +191,7 @@ func interfaceDump(ch *api.Channel) { // interfaceNotifications shows the usage of notification API. Note that for notifications, // you are supposed to create your own Go channel with your preferred buffer size. If the channel's // buffer is full, the notifications will not be delivered into it. -func interfaceNotifications(ch *api.Channel) { +func interfaceNotifications(ch api.Channel) { // subscribe for specific notification message notifChan := make(chan api.Message, 100) subs, _ := ch.SubscribeNotification(notifChan, interfaces.NewSwInterfaceSetFlags) @@ -218,4 +218,4 @@ func interfaceNotifications(ch *api.Channel) { // unsubscribe from delivery of the notifications ch.UnsubscribeNotification(subs) -} \ No newline at end of file +} diff --git a/examples/cmd/stats-client/stats_client.go b/examples/cmd/stats-client/stats_client.go index 17c7956..5f9966f 100644 --- a/examples/cmd/stats-client/stats_client.go +++ b/examples/cmd/stats-client/stats_client.go @@ -101,7 +101,7 @@ loop: } // subscribeNotifications subscribes for interface counters notifications. -func subscribeNotifications(ch *api.Channel) (*api.NotifSubscription, *api.NotifSubscription, chan api.Message) { +func subscribeNotifications(ch api.Channel) (*api.NotifSubscription, *api.NotifSubscription, chan api.Message) { notifChan := make(chan api.Message, 100) simpleCountersSubs, _ := ch.SubscribeNotification(notifChan, interfaces.NewVnetInterfaceSimpleCounters) @@ -111,7 +111,7 @@ func subscribeNotifications(ch *api.Channel) (*api.NotifSubscription, *api.Notif } // requestStatistics requests interface counters notifications from VPP. -func requestStatistics(ch *api.Channel) { +func requestStatistics(ch api.Channel) { ch.SendRequest(&stats.WantStats{ Pid: uint32(os.Getpid()), EnableDisable: 1, @@ -141,4 +141,4 @@ func processCombinedCounters(counters *interfaces.VnetInterfaceCombinedCounters) counters.FirstSwIfIndex+i, counterNames[counters.VnetCounterType], counters.Data[i].Packets, counterNames[counters.VnetCounterType], counters.Data[i].Bytes) } -} \ No newline at end of file +} -- 2.16.6