Introduce Stream - experimental API for low-level access to VPP API
[govpp.git] / core / request_handler.go
index e272c6f..fc704cb 100644 (file)
@@ -45,18 +45,72 @@ func (c *Connection) watchRequests(ch *Channel) {
                                return
                        }
                        if err := c.processRequest(ch, req); err != nil {
-                               sendReplyError(ch, req, err)
+                               sendReply(ch, &vppReply{
+                                       seqNum: req.seqNum,
+                                       err:    fmt.Errorf("unable to process request: %w", err),
+                               })
                        }
                }
        }
 }
 
+// processRequest processes a single request received on the request channel.
+func (c *Connection) sendMessage(context uint32, msg api.Message) error {
+       // check whether we are connected to VPP
+       if atomic.LoadUint32(&c.vppConnected) == 0 {
+               return ErrNotConnected
+       }
+
+       /*log := log.WithFields(logger.Fields{
+               "context":  context,
+               "msg_name": msg.GetMessageName(),
+               "msg_crc":  msg.GetCrcString(),
+       })*/
+
+       // retrieve message ID
+       msgID, err := c.GetMessageID(msg)
+       if err != nil {
+               //log.WithError(err).Debugf("unable to retrieve message ID: %#v", msg)
+               return err
+       }
+
+       //log = log.WithField("msg_id", msgID)
+
+       // encode the message
+       data, err := c.codec.EncodeMsg(msg, msgID)
+       if err != nil {
+               log.WithError(err).Debugf("unable to encode message: %#v", msg)
+               return err
+       }
+
+       //log = log.WithField("msg_length", len(data))
+
+       if log.Level >= logger.DebugLevel {
+               log.Debugf("--> SEND: MSG %T %+v", msg, msg)
+       }
+
+       // send message to VPP
+       err = c.vppClient.SendMsg(context, data)
+       if err != nil {
+               log.WithError(err).Debugf("unable to send message: %#v", msg)
+               return err
+       }
+
+       return nil
+}
+
 // processRequest processes a single request received on the request channel.
 func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
        // check whether we are connected to VPP
        if atomic.LoadUint32(&c.vppConnected) == 0 {
                err := ErrNotConnected
-               log.Errorf("processing request failed: %v", err)
+               log.WithFields(logger.Fields{
+                       "channel":  ch.id,
+                       "seq_num":  req.seqNum,
+                       "msg_name": req.msg.GetMessageName(),
+                       "msg_crc":  req.msg.GetCrcString(),
+                       "error":    err,
+               }).Warnf("Unable to process request")
                return err
        }
 
@@ -64,12 +118,13 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
        msgID, err := c.GetMessageID(req.msg)
        if err != nil {
                log.WithFields(logger.Fields{
+                       "channel":  ch.id,
                        "msg_name": req.msg.GetMessageName(),
                        "msg_crc":  req.msg.GetCrcString(),
                        "seq_num":  req.seqNum,
                        "error":    err,
-               }).Errorf("failed to retrieve message ID")
-               return fmt.Errorf("unable to retrieve message ID: %v", err)
+               }).Warnf("Unable to retrieve message ID")
+               return err
        }
 
        // encode the message into binary
@@ -79,35 +134,42 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
                        "channel":  ch.id,
                        "msg_id":   msgID,
                        "msg_name": req.msg.GetMessageName(),
+                       "msg_crc":  req.msg.GetCrcString(),
                        "seq_num":  req.seqNum,
                        "error":    err,
-               }).Errorf("failed to encode message: %#v", req.msg)
-               return fmt.Errorf("unable to encode the message: %v", err)
+               }).Warnf("Unable to encode message: %T %+v", req.msg, req.msg)
+               return err
        }
 
        context := packRequestContext(ch.id, req.multi, req.seqNum)
 
-       if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
+       if log.Level >= logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
                log.WithFields(logger.Fields{
                        "channel":  ch.id,
-                       "context":  context,
-                       "is_multi": req.multi,
                        "msg_id":   msgID,
-                       "msg_size": len(data),
-                       "seq_num":  req.seqNum,
+                       "msg_name": req.msg.GetMessageName(),
                        "msg_crc":  req.msg.GetCrcString(),
-               }).Debugf("--> govpp SEND: %s %+v", req.msg.GetMessageName(), req.msg)
+                       "seq_num":  req.seqNum,
+                       "is_multi": req.multi,
+                       "context":  context,
+                       "data_len": len(data),
+               }).Debugf("--> SEND MSG: %T %+v", req.msg, req.msg)
        }
 
        // send the request to VPP
        err = c.vppClient.SendMsg(context, data)
        if err != nil {
-               err = fmt.Errorf("unable to send the message: %v", err)
                log.WithFields(logger.Fields{
-                       "context": context,
-                       "msg_id":  msgID,
-                       "seq_num": req.seqNum,
-               }).Error(err)
+                       "channel":  ch.id,
+                       "msg_id":   msgID,
+                       "msg_name": req.msg.GetMessageName(),
+                       "msg_crc":  req.msg.GetCrcString(),
+                       "seq_num":  req.seqNum,
+                       "is_multi": req.multi,
+                       "context":  context,
+                       "data_len": len(data),
+                       "error":    err,
+               }).Warnf("Unable to send message")
                return err
        }
 
@@ -115,20 +177,24 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
                // send a control ping to determine end of the multipart response
                pingData, _ := c.codec.EncodeMsg(c.msgControlPing, c.pingReqID)
 
-               log.WithFields(logger.Fields{
-                       "channel":  ch.id,
-                       "context":  context,
-                       "msg_id":   c.pingReqID,
-                       "msg_size": len(pingData),
-                       "seq_num":  req.seqNum,
-               }).Debug(" -> sending control ping")
+               if log.Level >= logger.DebugLevel {
+                       log.WithFields(logger.Fields{
+                               "channel":  ch.id,
+                               "msg_id":   c.pingReqID,
+                               "msg_name": c.msgControlPing.GetMessageName(),
+                               "msg_crc":  c.msgControlPing.GetCrcString(),
+                               "seq_num":  req.seqNum,
+                               "context":  context,
+                               "data_len": len(pingData),
+                       }).Debugf(" -> SEND MSG: %T", c.msgControlPing)
+               }
 
                if err := c.vppClient.SendMsg(context, pingData); err != nil {
                        log.WithFields(logger.Fields{
                                "context": context,
-                               "msg_id":  msgID,
                                "seq_num": req.seqNum,
-                       }).Warnf("unable to send control ping: %v", err)
+                               "error":   err,
+                       }).Warnf("unable to send control ping")
                }
        }
 
@@ -138,7 +204,9 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
 // msgCallback is called whenever any binary API message comes from VPP.
 func (c *Connection) msgCallback(msgID uint16, data []byte) {
        if c == nil {
-               log.Warn("Already disconnected, ignoring the message.")
+               log.WithField(
+                       "msg_id", msgID,
+               ).Warn("Connection already disconnected, ignoring the message.")
                return
        }
 
@@ -155,7 +223,8 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {
        //
        context, err := c.codec.DecodeMsgContext(data, msg)
        if err != nil {
-               log.Errorf("decoding context failed: %v", err)
+               log.WithField("msg_id", msgID).Warnf("Unable to decode message context: %v", err)
+               return
        }
 
        chanID, isMulti, seqNum := unpackRequestContext(context)
@@ -220,23 +289,36 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {
 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
 // it logs the error and do not send the message.
 func sendReply(ch *Channel, reply *vppReply) {
+       // first try to avoid creating timer
+       select {
+       case ch.replyChan <- reply:
+               return // reply sent ok
+       default:
+               // reply channel full
+       }
+       if ch.receiveReplyTimeout == 0 {
+               log.WithFields(logger.Fields{
+                       "channel": ch.id,
+                       "msg_id":  reply.msgID,
+                       "seq_num": reply.seqNum,
+                       "err":     reply.err,
+               }).Warn("Reply channel full, dropping reply.")
+               return
+       }
        select {
        case ch.replyChan <- reply:
-       // reply sent successfully
-       case <-time.After(ReplyChannelTimeout):
+               return // reply sent ok
+       case <-time.After(ch.receiveReplyTimeout):
                // receiver still not ready
                log.WithFields(logger.Fields{
-                       "channel": ch,
+                       "channel": ch.id,
                        "msg_id":  reply.msgID,
                        "seq_num": reply.seqNum,
-               }).Warn("Unable to send the reply, reciever end not ready.")
+                       "err":     reply.err,
+               }).Warnf("Unable to send reply (reciever end not ready in %v).", ch.receiveReplyTimeout)
        }
 }
 
-func sendReplyError(ch *Channel, req *vppRequest, err error) {
-       sendReply(ch, &vppReply{seqNum: req.seqNum, err: err})
-}
-
 // isNotificationMessage returns true if someone has subscribed to provided message ID.
 func (c *Connection) isNotificationMessage(msgID uint16) bool {
        c.subscriptionsLock.RLock()
@@ -267,7 +349,8 @@ func (c *Connection) sendNotifications(msgID uint16, data []byte) {
                                "msg_name": sub.event.GetMessageName(),
                                "msg_id":   msgID,
                                "msg_size": len(data),
-                       }).Errorf("Unable to decode the notification message: %v", err)
+                               "error":    err,
+                       }).Warnf("Unable to decode the notification message")
                        continue
                }