X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Frequest_handler.go;h=29685f6995747068ed1fa25adf29c96b345d3fc8;hb=8d3131f90f71271835e5fed91831565797894614;hp=ddd53075495b1869e2b71ace1fee393ef5480c2e;hpb=58601b470bbd4e5ef534fed83511aa5a7f1c2d1e;p=govpp.git diff --git a/core/request_handler.go b/core/request_handler.go index ddd5307..29685f6 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -21,6 +21,8 @@ import ( "time" logger "github.com/sirupsen/logrus" + + "git.fd.io/govpp.git/api" ) var ReplyChannelTimeout = time.Millisecond * 100 @@ -42,18 +44,72 @@ func (c *Connection) watchRequests(ch *Channel) { return } if err := c.processRequest(ch, req); err != nil { - sendReplyError(ch, req, err) + sendReply(ch, &vppReply{ + seqNum: req.seqNum, + err: fmt.Errorf("unable to process request: %w", err), + }) } } } } +// processRequest processes a single request received on the request channel. +func (c *Connection) sendMessage(context uint32, msg api.Message) error { + // check whether we are connected to VPP + if atomic.LoadUint32(&c.vppConnected) == 0 { + return ErrNotConnected + } + + /*log := log.WithFields(logger.Fields{ + "context": context, + "msg_name": msg.GetMessageName(), + "msg_crc": msg.GetCrcString(), + })*/ + + // retrieve message ID + msgID, err := c.GetMessageID(msg) + if err != nil { + //log.WithError(err).Debugf("unable to retrieve message ID: %#v", msg) + return err + } + + //log = log.WithField("msg_id", msgID) + + // encode the message + data, err := c.codec.EncodeMsg(msg, msgID) + if err != nil { + log.WithError(err).Debugf("unable to encode message: %#v", msg) + return err + } + + //log = log.WithField("msg_length", len(data)) + + if log.Level >= logger.DebugLevel { + log.Debugf("--> SEND: MSG %T %+v", msg, msg) + } + + // send message to VPP + err = c.vppClient.SendMsg(context, data) + if err != nil { + log.WithError(err).Debugf("unable to send message: %#v", msg) + return err + } + + return nil +} + // processRequest processes a single request received on the request channel. func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { // check whether we are connected to VPP if atomic.LoadUint32(&c.vppConnected) == 0 { err := ErrNotConnected - log.Errorf("processing request failed: %v", err) + log.WithFields(logger.Fields{ + "channel": ch.id, + "seq_num": req.seqNum, + "msg_name": req.msg.GetMessageName(), + "msg_crc": req.msg.GetCrcString(), + "error": err, + }).Warnf("Unable to process request") return err } @@ -61,12 +117,13 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { msgID, err := c.GetMessageID(req.msg) if err != nil { log.WithFields(logger.Fields{ + "channel": ch.id, "msg_name": req.msg.GetMessageName(), "msg_crc": req.msg.GetCrcString(), "seq_num": req.seqNum, "error": err, - }).Errorf("failed to retrieve message ID") - return fmt.Errorf("unable to retrieve message ID: %v", err) + }).Warnf("Unable to retrieve message ID") + return err } // encode the message into binary @@ -76,35 +133,42 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { "channel": ch.id, "msg_id": msgID, "msg_name": req.msg.GetMessageName(), + "msg_crc": req.msg.GetCrcString(), "seq_num": req.seqNum, "error": err, - }).Errorf("failed to encode message: %#v", req.msg) - return fmt.Errorf("unable to encode the message: %v", err) + }).Warnf("Unable to encode message: %T %+v", req.msg, req.msg) + return err } context := packRequestContext(ch.id, req.multi, req.seqNum) - if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled + if log.Level >= logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ "channel": ch.id, - "context": context, - "is_multi": req.multi, "msg_id": msgID, - "msg_size": len(data), - "seq_num": req.seqNum, + "msg_name": req.msg.GetMessageName(), "msg_crc": req.msg.GetCrcString(), - }).Debugf("==> govpp send: %s: %+v", req.msg.GetMessageName(), req.msg) + "seq_num": req.seqNum, + "is_multi": req.multi, + "context": context, + "data_len": len(data), + }).Debugf("--> SEND MSG: %T %+v", req.msg, req.msg) } // send the request to VPP err = c.vppClient.SendMsg(context, data) if err != nil { - err = fmt.Errorf("unable to send the message: %v", err) log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - "seq_num": req.seqNum, - }).Error(err) + "channel": ch.id, + "msg_id": msgID, + "msg_name": req.msg.GetMessageName(), + "msg_crc": req.msg.GetCrcString(), + "seq_num": req.seqNum, + "is_multi": req.multi, + "context": context, + "data_len": len(data), + "error": err, + }).Warnf("Unable to send message") return err } @@ -112,20 +176,24 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { // send a control ping to determine end of the multipart response pingData, _ := c.codec.EncodeMsg(c.msgControlPing, c.pingReqID) - log.WithFields(logger.Fields{ - "channel": ch.id, - "context": context, - "msg_id": c.pingReqID, - "msg_size": len(pingData), - "seq_num": req.seqNum, - }).Debug("--> sending control ping") + if log.Level >= logger.DebugLevel { + log.WithFields(logger.Fields{ + "channel": ch.id, + "msg_id": c.pingReqID, + "msg_name": c.msgControlPing.GetMessageName(), + "msg_crc": c.msgControlPing.GetCrcString(), + "seq_num": req.seqNum, + "context": context, + "data_len": len(pingData), + }).Debugf(" -> SEND MSG: %T", c.msgControlPing) + } if err := c.vppClient.SendMsg(context, pingData); err != nil { log.WithFields(logger.Fields{ "context": context, - "msg_id": msgID, "seq_num": req.seqNum, - }).Warnf("unable to send control ping: %v", err) + "error": err, + }).Warnf("unable to send control ping") } } @@ -135,13 +203,15 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { // msgCallback is called whenever any binary API message comes from VPP. func (c *Connection) msgCallback(msgID uint16, data []byte) { if c == nil { - log.Warn("Already disconnected, ignoring the message.") + log.WithField( + "msg_id", msgID, + ).Warn("Connection already disconnected, ignoring the message.") return } - msg, ok := c.msgMap[msgID] - if !ok { - log.Warnf("Unknown message received, ID: %d", msgID) + msgType, name, crc, err := c.getMessageDataByID(msgID) + if err != nil { + log.Warnln(err) return } @@ -150,12 +220,14 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { // - replies that don't have context as first field (comes as zero) // - events that don't have context at all (comes as non zero) // - context, err := c.codec.DecodeMsgContext(data, msg) + context, err := c.codec.DecodeMsgContext(data, msgType) if err != nil { - log.Errorf("decoding context failed: %v", err) + log.WithField("msg_id", msgID).Warnf("Unable to decode message context: %v", err) + return } chanID, isMulti, seqNum := unpackRequestContext(context) + if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ "context": context, @@ -164,8 +236,8 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { "channel": chanID, "is_multi": isMulti, "seq_num": seqNum, - "msg_crc": msg.GetCrcString(), - }).Debugf("<== govpp recv: %s", msg.GetMessageName()) + "msg_crc": crc, + }).Debugf("<-- govpp RECEIVE: %s %+v", name) } if context == 0 || c.isNotificationMessage(msgID) { @@ -208,23 +280,36 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise // it logs the error and do not send the message. func sendReply(ch *Channel, reply *vppReply) { + // first try to avoid creating timer + select { + case ch.replyChan <- reply: + return // reply sent ok + default: + // reply channel full + } + if ch.receiveReplyTimeout == 0 { + log.WithFields(logger.Fields{ + "channel": ch.id, + "msg_id": reply.msgID, + "seq_num": reply.seqNum, + "err": reply.err, + }).Warn("Reply channel full, dropping reply.") + return + } select { case ch.replyChan <- reply: - // reply sent successfully - case <-time.After(ReplyChannelTimeout): + return // reply sent ok + case <-time.After(ch.receiveReplyTimeout): // receiver still not ready log.WithFields(logger.Fields{ - "channel": ch, + "channel": ch.id, "msg_id": reply.msgID, "seq_num": reply.seqNum, - }).Warn("Unable to send the reply, reciever end not ready.") + "err": reply.err, + }).Warnf("Unable to send reply (reciever end not ready in %v).", ch.receiveReplyTimeout) } } -func sendReplyError(ch *Channel, req *vppRequest, err error) { - sendReply(ch, &vppReply{seqNum: req.seqNum, err: err}) -} - // isNotificationMessage returns true if someone has subscribed to provided message ID. func (c *Connection) isNotificationMessage(msgID uint16) bool { c.subscriptionsLock.RLock() @@ -255,7 +340,8 @@ func (c *Connection) sendNotifications(msgID uint16, data []byte) { "msg_name": sub.event.GetMessageName(), "msg_id": msgID, "msg_size": len(data), - }).Errorf("Unable to decode the notification message: %v", err) + "error": err, + }).Warnf("Unable to decode the notification message") continue } @@ -324,3 +410,13 @@ func compareSeqNumbers(seqNum1, seqNum2 uint16) int { } return 1 } + +// Returns message data based on the message ID not depending on the message path +func (c *Connection) getMessageDataByID(msgID uint16) (typ api.MessageType, name, crc string, err error) { + for _, msgs := range c.msgMapByPath { + if msg, ok := msgs[msgID]; ok { + return msg.GetMessageType(), msg.GetMessageName(), msg.GetCrcString(), nil + } + } + return typ, name, crc, fmt.Errorf("unknown message received, ID: %d", msgID) +}