X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Frequest_handler.go;h=fc704cbe64492b9dfdfe7cda58d90ace8e931868;hb=df67791c6ffc96331f75aec7d3addfe2efca7739;hp=e272c6f8ec17531c6c7b9312a5236663b4074577;hpb=ceed73403bdb61387d04be8b47183e9c4a970749;p=govpp.git diff --git a/core/request_handler.go b/core/request_handler.go index e272c6f..fc704cb 100644 --- a/core/request_handler.go +++ b/core/request_handler.go @@ -45,18 +45,72 @@ func (c *Connection) watchRequests(ch *Channel) { return } if err := c.processRequest(ch, req); err != nil { - sendReplyError(ch, req, err) + sendReply(ch, &vppReply{ + seqNum: req.seqNum, + err: fmt.Errorf("unable to process request: %w", err), + }) } } } } +// processRequest processes a single request received on the request channel. +func (c *Connection) sendMessage(context uint32, msg api.Message) error { + // check whether we are connected to VPP + if atomic.LoadUint32(&c.vppConnected) == 0 { + return ErrNotConnected + } + + /*log := log.WithFields(logger.Fields{ + "context": context, + "msg_name": msg.GetMessageName(), + "msg_crc": msg.GetCrcString(), + })*/ + + // retrieve message ID + msgID, err := c.GetMessageID(msg) + if err != nil { + //log.WithError(err).Debugf("unable to retrieve message ID: %#v", msg) + return err + } + + //log = log.WithField("msg_id", msgID) + + // encode the message + data, err := c.codec.EncodeMsg(msg, msgID) + if err != nil { + log.WithError(err).Debugf("unable to encode message: %#v", msg) + return err + } + + //log = log.WithField("msg_length", len(data)) + + if log.Level >= logger.DebugLevel { + log.Debugf("--> SEND: MSG %T %+v", msg, msg) + } + + // send message to VPP + err = c.vppClient.SendMsg(context, data) + if err != nil { + log.WithError(err).Debugf("unable to send message: %#v", msg) + return err + } + + return nil +} + // processRequest processes a single request received on the request channel. func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { // check whether we are connected to VPP if atomic.LoadUint32(&c.vppConnected) == 0 { err := ErrNotConnected - log.Errorf("processing request failed: %v", err) + log.WithFields(logger.Fields{ + "channel": ch.id, + "seq_num": req.seqNum, + "msg_name": req.msg.GetMessageName(), + "msg_crc": req.msg.GetCrcString(), + "error": err, + }).Warnf("Unable to process request") return err } @@ -64,12 +118,13 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { msgID, err := c.GetMessageID(req.msg) if err != nil { log.WithFields(logger.Fields{ + "channel": ch.id, "msg_name": req.msg.GetMessageName(), "msg_crc": req.msg.GetCrcString(), "seq_num": req.seqNum, "error": err, - }).Errorf("failed to retrieve message ID") - return fmt.Errorf("unable to retrieve message ID: %v", err) + }).Warnf("Unable to retrieve message ID") + return err } // encode the message into binary @@ -79,35 +134,42 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { "channel": ch.id, "msg_id": msgID, "msg_name": req.msg.GetMessageName(), + "msg_crc": req.msg.GetCrcString(), "seq_num": req.seqNum, "error": err, - }).Errorf("failed to encode message: %#v", req.msg) - return fmt.Errorf("unable to encode the message: %v", err) + }).Warnf("Unable to encode message: %T %+v", req.msg, req.msg) + return err } context := packRequestContext(ch.id, req.multi, req.seqNum) - if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled + if log.Level >= logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled log.WithFields(logger.Fields{ "channel": ch.id, - "context": context, - "is_multi": req.multi, "msg_id": msgID, - "msg_size": len(data), - "seq_num": req.seqNum, + "msg_name": req.msg.GetMessageName(), "msg_crc": req.msg.GetCrcString(), - }).Debugf("--> govpp SEND: %s %+v", req.msg.GetMessageName(), req.msg) + "seq_num": req.seqNum, + "is_multi": req.multi, + "context": context, + "data_len": len(data), + }).Debugf("--> SEND MSG: %T %+v", req.msg, req.msg) } // send the request to VPP err = c.vppClient.SendMsg(context, data) if err != nil { - err = fmt.Errorf("unable to send the message: %v", err) log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - "seq_num": req.seqNum, - }).Error(err) + "channel": ch.id, + "msg_id": msgID, + "msg_name": req.msg.GetMessageName(), + "msg_crc": req.msg.GetCrcString(), + "seq_num": req.seqNum, + "is_multi": req.multi, + "context": context, + "data_len": len(data), + "error": err, + }).Warnf("Unable to send message") return err } @@ -115,20 +177,24 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { // send a control ping to determine end of the multipart response pingData, _ := c.codec.EncodeMsg(c.msgControlPing, c.pingReqID) - log.WithFields(logger.Fields{ - "channel": ch.id, - "context": context, - "msg_id": c.pingReqID, - "msg_size": len(pingData), - "seq_num": req.seqNum, - }).Debug(" -> sending control ping") + if log.Level >= logger.DebugLevel { + log.WithFields(logger.Fields{ + "channel": ch.id, + "msg_id": c.pingReqID, + "msg_name": c.msgControlPing.GetMessageName(), + "msg_crc": c.msgControlPing.GetCrcString(), + "seq_num": req.seqNum, + "context": context, + "data_len": len(pingData), + }).Debugf(" -> SEND MSG: %T", c.msgControlPing) + } if err := c.vppClient.SendMsg(context, pingData); err != nil { log.WithFields(logger.Fields{ "context": context, - "msg_id": msgID, "seq_num": req.seqNum, - }).Warnf("unable to send control ping: %v", err) + "error": err, + }).Warnf("unable to send control ping") } } @@ -138,7 +204,9 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error { // msgCallback is called whenever any binary API message comes from VPP. func (c *Connection) msgCallback(msgID uint16, data []byte) { if c == nil { - log.Warn("Already disconnected, ignoring the message.") + log.WithField( + "msg_id", msgID, + ).Warn("Connection already disconnected, ignoring the message.") return } @@ -155,7 +223,8 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { // context, err := c.codec.DecodeMsgContext(data, msg) if err != nil { - log.Errorf("decoding context failed: %v", err) + log.WithField("msg_id", msgID).Warnf("Unable to decode message context: %v", err) + return } chanID, isMulti, seqNum := unpackRequestContext(context) @@ -220,23 +289,36 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) { // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise // it logs the error and do not send the message. func sendReply(ch *Channel, reply *vppReply) { + // first try to avoid creating timer + select { + case ch.replyChan <- reply: + return // reply sent ok + default: + // reply channel full + } + if ch.receiveReplyTimeout == 0 { + log.WithFields(logger.Fields{ + "channel": ch.id, + "msg_id": reply.msgID, + "seq_num": reply.seqNum, + "err": reply.err, + }).Warn("Reply channel full, dropping reply.") + return + } select { case ch.replyChan <- reply: - // reply sent successfully - case <-time.After(ReplyChannelTimeout): + return // reply sent ok + case <-time.After(ch.receiveReplyTimeout): // receiver still not ready log.WithFields(logger.Fields{ - "channel": ch, + "channel": ch.id, "msg_id": reply.msgID, "seq_num": reply.seqNum, - }).Warn("Unable to send the reply, reciever end not ready.") + "err": reply.err, + }).Warnf("Unable to send reply (reciever end not ready in %v).", ch.receiveReplyTimeout) } } -func sendReplyError(ch *Channel, req *vppRequest, err error) { - sendReply(ch, &vppReply{seqNum: req.seqNum, err: err}) -} - // isNotificationMessage returns true if someone has subscribed to provided message ID. func (c *Connection) isNotificationMessage(msgID uint16) bool { c.subscriptionsLock.RLock() @@ -267,7 +349,8 @@ func (c *Connection) sendNotifications(msgID uint16, data []byte) { "msg_name": sub.event.GetMessageName(), "msg_id": msgID, "msg_size": len(data), - }).Errorf("Unable to decode the notification message: %v", err) + "error": err, + }).Warnf("Unable to decode the notification message") continue }