X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Fchannel.go;h=1b5e77e4a23b75dbbc4620bf00bd447ee63b6999;hb=df67791c6ffc96331f75aec7d3addfe2efca7739;hp=5b69eabfb30eb6a2b68c8dda573052424617eb0b;hpb=6476a2b64a2e1ea6c0d695127d726a348cc5c99b;p=govpp.git diff --git a/core/channel.go b/core/channel.go index 5b69eab..1b5e77e 100644 --- a/core/channel.go +++ b/core/channel.go @@ -21,8 +21,10 @@ import ( "strings" "time" - "git.fd.io/govpp.git/api" "github.com/sirupsen/logrus" + + "git.fd.io/govpp.git/adapter" + "git.fd.io/govpp.git/api" ) var ( @@ -35,6 +37,8 @@ type MessageCodec interface { EncodeMsg(msg api.Message, msgID uint16) ([]byte, error) // DecodeMsg decodes binary-encoded data of a message into provided Message structure. DecodeMsg(data []byte, msg api.Message) error + // DecodeMsgContext decodes context from message data. + DecodeMsgContext(data []byte, msg api.Message) (context uint32, err error) } // MessageIdentifier provides identification of generated API messages. @@ -82,7 +86,7 @@ type subscriptionCtx struct { msgFactory func() api.Message // function that returns a new instance of the specific message that is expected as a notification } -// channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests +// Channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests // to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels // via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines // concurrently, otherwise the responses could mix! Use multiple channels instead. @@ -98,19 +102,21 @@ type Channel struct { lastSeqNum uint16 // sequence number of the last sent request - delayedReply *vppReply // reply already taken from ReplyChan, buffered for later delivery - replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout + delayedReply *vppReply // reply already taken from ReplyChan, buffered for later delivery + replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout + receiveReplyTimeout time.Duration // maximum time that we wait for receiver to consume reply } func newChannel(id uint16, conn *Connection, codec MessageCodec, identifier MessageIdentifier, reqSize, replySize int) *Channel { return &Channel{ - id: id, - conn: conn, - msgCodec: codec, - msgIdentifier: identifier, - reqChan: make(chan *vppRequest, reqSize), - replyChan: make(chan *vppReply, replySize), - replyTimeout: DefaultReplyTimeout, + id: id, + conn: conn, + msgCodec: codec, + msgIdentifier: identifier, + reqChan: make(chan *vppRequest, reqSize), + replyChan: make(chan *vppReply, replySize), + replyTimeout: DefaultReplyTimeout, + receiveReplyTimeout: ReplyChannelTimeout, } } @@ -118,34 +124,49 @@ func (ch *Channel) GetID() uint16 { return ch.id } +func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx { + req := ch.newRequest(msg, false) + ch.reqChan <- req + return &requestCtx{ch: ch, seqNum: req.seqNum} +} + +func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { + req := ch.newRequest(msg, true) + ch.reqChan <- req + return &multiRequestCtx{ch: ch, seqNum: req.seqNum} +} + func (ch *Channel) nextSeqNum() uint16 { ch.lastSeqNum++ return ch.lastSeqNum } -func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx { - seqNum := ch.nextSeqNum() - ch.reqChan <- &vppRequest{ +func (ch *Channel) newRequest(msg api.Message, multi bool) *vppRequest { + return &vppRequest{ msg: msg, - seqNum: seqNum, + seqNum: ch.nextSeqNum(), + multi: multi, } - return &requestCtx{ch: ch, seqNum: seqNum} } -func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx { - seqNum := ch.nextSeqNum() - ch.reqChan <- &vppRequest{ - msg: msg, - seqNum: seqNum, - multi: true, +func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error { + var comperr api.CompatibilityError + for _, msg := range msgs { + _, err := ch.msgIdentifier.GetMessageID(msg) + if err != nil { + if uerr, ok := err.(*adapter.UnknownMsgError); ok { + comperr.IncompatibleMessages = append(comperr.IncompatibleMessages, getMsgID(uerr.MsgName, uerr.MsgCrc)) + continue + } + // other errors return immediatelly + return err + } + comperr.CompatibleMessages = append(comperr.CompatibleMessages, getMsgNameWithCrc(msg)) } - return &multiRequestCtx{ch: ch, seqNum: seqNum} -} - -func getMsgFactory(msg api.Message) func() api.Message { - return func() api.Message { - return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + if len(comperr.IncompatibleMessages) == 0 { + return nil } + return &comperr } func (ch *Channel) SubscribeNotification(notifChan chan api.Message, event api.Message) (api.SubscriptionCtx, error) { @@ -180,10 +201,7 @@ func (ch *Channel) SetReplyTimeout(timeout time.Duration) { } func (ch *Channel) Close() { - if ch.reqChan != nil { - close(ch.reqChan) - ch.reqChan = nil - } + close(ch.reqChan) } func (req *requestCtx) ReceiveReply(msg api.Message) error { @@ -221,6 +239,8 @@ func (sub *subscriptionCtx) Unsubscribe() error { for i, item := range sub.ch.conn.subscriptions[sub.msgID] { if item == sub { + // close notification channel + close(sub.ch.conn.subscriptions[sub.msgID][i].notifChan) // remove i-th item in the slice sub.ch.conn.subscriptions[sub.msgID] = append(sub.ch.conn.subscriptions[sub.msgID][:i], sub.ch.conn.subscriptions[sub.msgID][i+1:]...) return nil @@ -254,16 +274,23 @@ func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (last case vppReply := <-ch.replyChan: ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg) if ignore { + log.WithFields(logrus.Fields{ + "expSeqNum": expSeqNum, + "channel": ch.id, + }).Warnf("ignoring received reply: %+v (expecting: %s)", vppReply, msg.GetMessageName()) continue } return lastReplyReceived, err case <-timer.C: + log.WithFields(logrus.Fields{ + "expSeqNum": expSeqNum, + "channel": ch.id, + }).Debugf("timeout (%v) waiting for reply: %s", ch.replyTimeout, msg.GetMessageName()) err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout) return false, err } } - return } func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) { @@ -271,8 +298,8 @@ func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Messa cmpSeqNums := compareSeqNumbers(reply.seqNum, expSeqNum) if cmpSeqNums == -1 { // reply received too late, ignore the message - logrus.WithField("seqNum", reply.seqNum).Warn( - "Received reply to an already closed binary API request") + log.WithField("seqNum", reply.seqNum). + Warn("Received reply to an already closed binary API request") ignore = true return } @@ -308,9 +335,9 @@ func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Messa msgNameCrc = getMsgNameWithCrc(replyMsg) } - err = fmt.Errorf("received invalid message ID (seqNum=%d), expected %d (%s), but got %d (%s) "+ + err = fmt.Errorf("received unexpected message (seqNum=%d), expected %s (ID %d), but got %s (ID %d) "+ "(check if multiple goroutines are not sharing single GoVPP channel)", - reply.seqNum, expMsgID, msg.GetMessageName(), reply.msgID, msgNameCrc) + reply.seqNum, msg.GetMessageName(), expMsgID, msgNameCrc, reply.msgID) return }