make api.Channel as interface
[govpp.git] / core / request_handler.go
index 8f793f5..8681963 100644 (file)
@@ -31,19 +31,19 @@ var (
 )
 
 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
+func (c *Connection) watchRequests(ch *channel) {
        for {
                select {
-               case req, ok := <-ch.ReqChan:
+               case req, ok := <-ch.reqChan:
                        // new request on the request channel
                        if !ok {
                                // after closing the request channel, release API channel and return
-                               c.releaseAPIChannel(ch, chMeta)
+                               c.releaseAPIChannel(ch)
                                return
                        }
-                       c.processRequest(ch, chMeta, req)
+                       c.processRequest(ch, req)
 
-               case req := <-ch.NotifSubsChan:
+               case req := <-ch.notifSubsChan:
                        // new request on the notification subscribe channel
                        c.processNotifSubscribeRequest(ch, req)
                }
@@ -51,12 +51,12 @@ func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
 }
 
 // processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
+func (c *Connection) processRequest(ch *channel, req *api.VppRequest) error {
        // check whether we are connected to VPP
        if atomic.LoadUint32(&c.connected) == 0 {
                err := ErrNotConnected
                log.Error(err)
-               sendReply(ch, &api.VppReply{Error: err})
+               sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
                return err
        }
 
@@ -67,8 +67,9 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
                log.WithFields(logger.Fields{
                        "msg_name": req.Message.GetMessageName(),
                        "msg_crc":  req.Message.GetCrcString(),
+                       "seq_num":  req.SeqNum,
                }).Error(err)
-               sendReply(ch, &api.VppReply{Error: err})
+               sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
                return err
        }
 
@@ -77,37 +78,35 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
        if err != nil {
                err = fmt.Errorf("unable to encode the messge: %v", err)
                log.WithFields(logger.Fields{
-                       "context": chMeta.id,
+                       "channel": ch.id,
                        "msg_id":  msgID,
+                       "seq_num": req.SeqNum,
                }).Error(err)
-               sendReply(ch, &api.VppReply{Error: err})
+               sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
                return err
        }
 
        if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
                log.WithFields(logger.Fields{
-                       "context":  chMeta.id,
+                       "channel":  ch.id,
                        "msg_id":   msgID,
                        "msg_size": len(data),
                        "msg_name": req.Message.GetMessageName(),
+                       "seq_num":  req.SeqNum,
                }).Debug("Sending a message to VPP.")
        }
 
-       // send the message
-       if req.Multipart {
-               // expect multipart response
-               atomic.StoreUint32(&chMeta.multipart, 1)
-       }
-
        // send the request to VPP
-       err = c.vpp.SendMsg(chMeta.id, data)
+       context := packRequestContext(ch.id, req.Multipart, req.SeqNum)
+       err = c.vpp.SendMsg(context, data)
        if err != nil {
-               err = fmt.Errorf("unable to send the messge: %v", err)
+               err = fmt.Errorf("unable to send the message: %v", err)
                log.WithFields(logger.Fields{
-                       "context": chMeta.id,
+                       "context": context,
                        "msg_id":  msgID,
+                       "seq_num": req.SeqNum,
                }).Error(err)
-               sendReply(ch, &api.VppReply{Error: err})
+               sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
                return err
        }
 
@@ -116,12 +115,13 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
                pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
 
                log.WithFields(logger.Fields{
-                       "context":  chMeta.id,
+                       "context":  context,
                        "msg_id":   c.pingReqID,
                        "msg_size": len(pingData),
+                       "seq_num":  req.SeqNum,
                }).Debug("Sending a control ping to VPP.")
 
-               c.vpp.SendMsg(chMeta.id, pingData)
+               c.vpp.SendMsg(context, pingData)
        }
 
        return nil
@@ -137,11 +137,14 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
                return
        }
 
+       chanID, isMultipart, seqNum := unpackRequestContext(context)
        if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
                log.WithFields(logger.Fields{
-                       "context":  context,
-                       "msg_id":   msgID,
-                       "msg_size": len(data),
+                       "msg_id":       msgID,
+                       "msg_size":     len(data),
+                       "channel_id":   chanID,
+                       "is_multipart": isMultipart,
+                       "seq_num":      seqNum,
                }).Debug("Received a message from VPP.")
        }
 
@@ -153,27 +156,27 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
 
        // match ch according to the context
        conn.channelsLock.RLock()
-       ch, ok := conn.channels[context]
+       ch, ok := conn.channels[chanID]
        conn.channelsLock.RUnlock()
 
        if !ok {
                log.WithFields(logger.Fields{
-                       "context": context,
-                       "msg_id":  msgID,
-               }).Error("Context ID not known, ignoring the message.")
+                       "channel_id": chanID,
+                       "msg_id":     msgID,
+               }).Error("Channel ID not known, ignoring the message.")
                return
        }
 
-       chMeta := ch.Metadata().(*channelMetadata)
        lastReplyReceived := false
-       // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
-       if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
+       // if this is a control ping reply to a multipart request, treat this as a last part of the reply
+       if msgID == conn.pingReplyID && isMultipart {
                lastReplyReceived = true
        }
 
        // send the data to the channel
        sendReply(ch, &api.VppReply{
                MessageID:         msgID,
+               SeqNum:            seqNum,
                Data:              data,
                LastReplyReceived: lastReplyReceived,
        })
@@ -186,15 +189,16 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
 
 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
 // it logs the error and do not send the message.
-func sendReply(ch *api.Channel, reply *api.VppReply) {
+func sendReply(ch *channel, reply *api.VppReply) {
        select {
-       case ch.ReplyChan <- reply:
+       case ch.replyChan <- reply:
                // reply sent successfully
        case <-time.After(time.Millisecond * 100):
                // receiver still not ready
                log.WithFields(logger.Fields{
                        "channel": ch,
                        "msg_id":  reply.MessageID,
+                       "seq_num": reply.SeqNum,
                }).Warn("Unable to send the reply, reciever end not ready.")
        }
 }
@@ -254,3 +258,24 @@ func (c *Connection) LookupByID(ID uint16) (string, error) {
 
        return "", fmt.Errorf("unknown message ID: %d", ID)
 }
+
+// +------------------+-------------------+-----------------------+
+// | 15b = channel ID | 1b = is multipart | 16b = sequence number |
+// +------------------+-------------------+-----------------------+
+func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
+       context := uint32(chanID) << 17
+       if isMultipart {
+               context |= 1 << 16
+       }
+       context |= uint32(seqNum)
+       return context
+}
+
+func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
+       chanID = uint16(context >> 17)
+       if ((context >> 16) & 0x1) != 0 {
+               isMulipart = true
+       }
+       seqNum = uint16(context & 0xffff)
+       return
+}