Support mocking reply for more multi requests at once
[govpp.git] / core / request_handler.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "sync/atomic"
21
22         logger "github.com/sirupsen/logrus"
23
24         "git.fd.io/govpp.git/api"
25 )
26
27 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
28 func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
29         for {
30                 select {
31                 case req, ok := <-ch.ReqChan:
32                         // new request on the request channel
33                         if !ok {
34                                 // after closing the request channel, release API channel and return
35                                 c.releaseAPIChannel(ch, chMeta)
36                                 return
37                         }
38                         c.processRequest(ch, chMeta, req)
39
40                 case req := <-ch.NotifSubsChan:
41                         // new request on the notification subscribe channel
42                         c.processNotifSubscribeRequest(ch, req)
43                 }
44         }
45 }
46
47 // processRequest processes a single request received on the request channel.
48 func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
49         // check whether we are connected to VPP
50         if atomic.LoadUint32(&c.connected) == 0 {
51                 err := errors.New("not connected to VPP, ignoring the request")
52                 log.Error(err)
53                 sendReply(ch, &api.VppReply{Error: err})
54                 return err
55         }
56
57         // retrieve message ID
58         msgID, err := c.GetMessageID(req.Message)
59         if err != nil {
60                 err = fmt.Errorf("unable to retrieve message ID: %v", err)
61                 log.WithFields(logger.Fields{
62                         "msg_name": req.Message.GetMessageName(),
63                         "msg_crc":  req.Message.GetCrcString(),
64                 }).Error(err)
65                 sendReply(ch, &api.VppReply{Error: err})
66                 return err
67         }
68
69         // encode the message into binary
70         data, err := c.codec.EncodeMsg(req.Message, msgID)
71         if err != nil {
72                 err = fmt.Errorf("unable to encode the messge: %v", err)
73                 log.WithFields(logger.Fields{
74                         "context": chMeta.id,
75                         "msg_id":  msgID,
76                 }).Error(err)
77                 sendReply(ch, &api.VppReply{Error: err})
78                 return err
79         }
80
81         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
82                 log.WithFields(logger.Fields{
83                         "context":  chMeta.id,
84                         "msg_id":   msgID,
85                         "msg_size": len(data),
86                         "msg_name": req.Message.GetMessageName(),
87                 }).Debug("Sending a message to VPP.")
88         }
89
90         // send the message
91         if req.Multipart {
92                 // expect multipart response
93                 atomic.StoreUint32(&chMeta.multipart, 1)
94         }
95
96         // send the request to VPP
97         c.vpp.SendMsg(chMeta.id, data)
98
99         if req.Multipart {
100                 // send a control ping to determine end of the multipart response
101                 pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
102
103                 log.WithFields(logger.Fields{
104                         "context":  chMeta.id,
105                         "msg_id":   c.pingReqID,
106                         "msg_size": len(pingData),
107                 }).Debug("Sending a control ping to VPP.")
108
109                 c.vpp.SendMsg(chMeta.id, pingData)
110         }
111
112         return nil
113 }
114
115 // msgCallback is called whenever any binary API message comes from VPP.
116 func msgCallback(context uint32, msgID uint16, data []byte) {
117         connLock.RLock()
118         defer connLock.RUnlock()
119
120         if conn == nil {
121                 log.Warn("Already disconnected, ignoring the message.")
122                 return
123         }
124
125         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
126                 log.WithFields(logger.Fields{
127                         "context":  context,
128                         "msg_id":   msgID,
129                         "msg_size": len(data),
130                 }).Debug("Received a message from VPP.")
131         }
132
133         if context == 0 || conn.isNotificationMessage(msgID) {
134                 // process the message as a notification
135                 conn.sendNotifications(msgID, data)
136                 return
137         }
138
139         // match ch according to the context
140         conn.channelsLock.RLock()
141         ch, ok := conn.channels[context]
142         conn.channelsLock.RUnlock()
143
144         if !ok {
145                 log.WithFields(logger.Fields{
146                         "context": context,
147                         "msg_id":  msgID,
148                 }).Error("Context ID not known, ignoring the message.")
149                 return
150         }
151
152         chMeta := ch.Metadata().(*channelMetadata)
153         lastReplyReceived := false
154         // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
155         if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
156                 lastReplyReceived = true
157         }
158
159         // send the data to the channel
160         sendReply(ch, &api.VppReply{
161                 MessageID:         msgID,
162                 Data:              data,
163                 LastReplyReceived: lastReplyReceived,
164         })
165 }
166
167 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
168 // it logs the error and do not send the message.
169 func sendReply(ch *api.Channel, reply *api.VppReply) {
170         select {
171         case ch.ReplyChan <- reply:
172                 // reply sent successfully
173         default:
174                 // unable to write into the channel without blocking
175                 log.WithFields(logger.Fields{
176                         "channel": ch,
177                         "msg_id":  reply.MessageID,
178                 }).Warn("Unable to send the reply, reciever end not ready.")
179         }
180 }
181
182 // GetMessageID returns message identifier of given API message.
183 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
184         if c == nil {
185                 return 0, errors.New("nil connection passed in")
186         }
187         return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
188 }
189
190 // messageNameToID returns message ID of a message identified by its name and CRC.
191 func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
192         // try to get the ID from the map
193         c.msgIDsLock.RLock()
194         id, ok := c.msgIDs[msgName+msgCrc]
195         c.msgIDsLock.RUnlock()
196         if ok {
197                 return id, nil
198         }
199
200         // get the ID using VPP API
201         id, err := c.vpp.GetMsgID(msgName, msgCrc)
202         if err != nil {
203                 err = fmt.Errorf("unable to retrieve message ID: %v", err)
204                 log.WithFields(logger.Fields{
205                         "msg_name": msgName,
206                         "msg_crc":  msgCrc,
207                 }).Error(err)
208                 return id, err
209         }
210
211         c.msgIDsLock.Lock()
212         c.msgIDs[msgName+msgCrc] = id
213         c.msgIDsLock.Unlock()
214
215         return id, nil
216 }