fix data races in proxy server
[govpp.git] / core / request_handler.go
index 14c095d..ddd5307 100644 (file)
@@ -23,6 +23,8 @@ import (
        logger "github.com/sirupsen/logrus"
 )
 
+var ReplyChannelTimeout = time.Millisecond * 100
+
 var (
        ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
        ErrProbeTimeout = errors.New("probe reply not received within timeout period")
@@ -39,7 +41,9 @@ func (c *Connection) watchRequests(ch *Channel) {
                                c.releaseAPIChannel(ch)
                                return
                        }
-                       c.processRequest(ch, req)
+                       if err := c.processRequest(ch, req); err != nil {
+                               sendReplyError(ch, req, err)
+                       }
                }
        }
 }
@@ -47,56 +51,53 @@ func (c *Connection) watchRequests(ch *Channel) {
 // processRequest processes a single request received on the request channel.
 func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
        // check whether we are connected to VPP
-       if atomic.LoadUint32(&c.connected) == 0 {
+       if atomic.LoadUint32(&c.vppConnected) == 0 {
                err := ErrNotConnected
                log.Errorf("processing request failed: %v", err)
-               sendReplyError(ch, req, err)
                return err
        }
 
        // retrieve message ID
        msgID, err := c.GetMessageID(req.msg)
        if err != nil {
-               err = fmt.Errorf("unable to retrieve message ID: %v", err)
                log.WithFields(logger.Fields{
                        "msg_name": req.msg.GetMessageName(),
                        "msg_crc":  req.msg.GetCrcString(),
                        "seq_num":  req.seqNum,
-               }).Error(err)
-               sendReplyError(ch, req, err)
-               return err
+                       "error":    err,
+               }).Errorf("failed to retrieve message ID")
+               return fmt.Errorf("unable to retrieve message ID: %v", err)
        }
 
        // encode the message into binary
        data, err := c.codec.EncodeMsg(req.msg, msgID)
        if err != nil {
-               err = fmt.Errorf("unable to encode the messge: %v", err)
                log.WithFields(logger.Fields{
                        "channel":  ch.id,
                        "msg_id":   msgID,
                        "msg_name": req.msg.GetMessageName(),
                        "seq_num":  req.seqNum,
-               }).Error(err)
-               sendReplyError(ch, req, err)
-               return err
+                       "error":    err,
+               }).Errorf("failed to encode message: %#v", req.msg)
+               return fmt.Errorf("unable to encode the message: %v", err)
        }
 
-       // get context
        context := packRequestContext(ch.id, req.multi, req.seqNum)
+
        if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
                log.WithFields(logger.Fields{
                        "channel":  ch.id,
                        "context":  context,
                        "is_multi": req.multi,
                        "msg_id":   msgID,
-                       "msg_name": req.msg.GetMessageName(),
                        "msg_size": len(data),
                        "seq_num":  req.seqNum,
-               }).Debug(" -> Sending a message to VPP.")
+                       "msg_crc":  req.msg.GetCrcString(),
+               }).Debugf("==> govpp send: %s: %+v", req.msg.GetMessageName(), req.msg)
        }
 
        // send the request to VPP
-       err = c.vpp.SendMsg(context, data)
+       err = c.vppClient.SendMsg(context, data)
        if err != nil {
                err = fmt.Errorf("unable to send the message: %v", err)
                log.WithFields(logger.Fields{
@@ -104,13 +105,12 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
                        "msg_id":  msgID,
                        "seq_num": req.seqNum,
                }).Error(err)
-               sendReplyError(ch, req, err)
                return err
        }
 
        if req.multi {
                // send a control ping to determine end of the multipart response
-               pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
+               pingData, _ := c.codec.EncodeMsg(c.msgControlPing, c.pingReqID)
 
                log.WithFields(logger.Fields{
                        "channel":  ch.id,
@@ -118,9 +118,9 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
                        "msg_id":   c.pingReqID,
                        "msg_size": len(pingData),
                        "seq_num":  req.seqNum,
-               }).Debug(" -> Sending a control ping to VPP.")
+               }).Debug("--> sending control ping")
 
-               if err := c.vpp.SendMsg(context, pingData); err != nil {
+               if err := c.vppClient.SendMsg(context, pingData); err != nil {
                        log.WithFields(logger.Fields{
                                "context": context,
                                "msg_id":  msgID,
@@ -134,9 +134,6 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
 
 // msgCallback is called whenever any binary API message comes from VPP.
 func (c *Connection) msgCallback(msgID uint16, data []byte) {
-       connLock.RLock()
-       defer connLock.RUnlock()
-
        if c == nil {
                log.Warn("Already disconnected, ignoring the message.")
                return
@@ -163,12 +160,12 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {
                log.WithFields(logger.Fields{
                        "context":  context,
                        "msg_id":   msgID,
-                       "msg_name": msg.GetMessageName(),
                        "msg_size": len(data),
                        "channel":  chanID,
                        "is_multi": isMulti,
                        "seq_num":  seqNum,
-               }).Debug(" <- Received a message from VPP.")
+                       "msg_crc":  msg.GetCrcString(),
+               }).Debugf("<== govpp recv: %s", msg.GetMessageName())
        }
 
        if context == 0 || c.isNotificationMessage(msgID) {
@@ -193,11 +190,12 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {
        // treat this as a last part of the reply
        lastReplyReceived := isMulti && msgID == c.pingReplyID
 
-       // send the data to the channel
+       // send the data to the channel, it needs to be copied,
+       // because it will be freed after this function returns
        sendReply(ch, &vppReply{
                msgID:        msgID,
                seqNum:       seqNum,
-               data:         data,
+               data:         append([]byte(nil), data...),
                lastReceived: lastReplyReceived,
        })
 
@@ -213,7 +211,7 @@ func sendReply(ch *Channel, reply *vppReply) {
        select {
        case ch.replyChan <- reply:
                // reply sent successfully
-       case <-time.After(time.Millisecond * 100):
+       case <-time.After(ReplyChannelTimeout):
                // receiver still not ready
                log.WithFields(logger.Fields{
                        "channel": ch,