Improve handling of probes on timeouts
[govpp.git] / core / request_handler.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "sync/atomic"
21         "time"
22
23         logger "github.com/sirupsen/logrus"
24
25         "git.fd.io/govpp.git/api"
26 )
27
28 var (
29         ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
30         ErrProbeTimeout = errors.New("probe reply not received within timeout period")
31 )
32
33 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
34 func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
35         for {
36                 select {
37                 case req, ok := <-ch.ReqChan:
38                         // new request on the request channel
39                         if !ok {
40                                 // after closing the request channel, release API channel and return
41                                 c.releaseAPIChannel(ch, chMeta)
42                                 return
43                         }
44                         c.processRequest(ch, chMeta, req)
45
46                 case req := <-ch.NotifSubsChan:
47                         // new request on the notification subscribe channel
48                         c.processNotifSubscribeRequest(ch, req)
49                 }
50         }
51 }
52
53 // processRequest processes a single request received on the request channel.
54 func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
55         // check whether we are connected to VPP
56         if atomic.LoadUint32(&c.connected) == 0 {
57                 err := ErrNotConnected
58                 log.Error(err)
59                 sendReply(ch, &api.VppReply{Error: err})
60                 return err
61         }
62
63         // retrieve message ID
64         msgID, err := c.GetMessageID(req.Message)
65         if err != nil {
66                 err = fmt.Errorf("unable to retrieve message ID: %v", err)
67                 log.WithFields(logger.Fields{
68                         "msg_name": req.Message.GetMessageName(),
69                         "msg_crc":  req.Message.GetCrcString(),
70                 }).Error(err)
71                 sendReply(ch, &api.VppReply{Error: err})
72                 return err
73         }
74
75         // encode the message into binary
76         data, err := c.codec.EncodeMsg(req.Message, msgID)
77         if err != nil {
78                 err = fmt.Errorf("unable to encode the messge: %v", err)
79                 log.WithFields(logger.Fields{
80                         "context": chMeta.id,
81                         "msg_id":  msgID,
82                 }).Error(err)
83                 sendReply(ch, &api.VppReply{Error: err})
84                 return err
85         }
86
87         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
88                 log.WithFields(logger.Fields{
89                         "context":  chMeta.id,
90                         "msg_id":   msgID,
91                         "msg_size": len(data),
92                         "msg_name": req.Message.GetMessageName(),
93                 }).Debug("Sending a message to VPP.")
94         }
95
96         // send the message
97         if req.Multipart {
98                 // expect multipart response
99                 atomic.StoreUint32(&chMeta.multipart, 1)
100         }
101
102         // send the request to VPP
103         err = c.vpp.SendMsg(chMeta.id, data)
104         if err != nil {
105                 err = fmt.Errorf("unable to send the messge: %v", err)
106                 log.WithFields(logger.Fields{
107                         "context": chMeta.id,
108                         "msg_id":  msgID,
109                 }).Error(err)
110                 sendReply(ch, &api.VppReply{Error: err})
111                 return err
112         }
113
114         if req.Multipart {
115                 // send a control ping to determine end of the multipart response
116                 pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
117
118                 log.WithFields(logger.Fields{
119                         "context":  chMeta.id,
120                         "msg_id":   c.pingReqID,
121                         "msg_size": len(pingData),
122                 }).Debug("Sending a control ping to VPP.")
123
124                 c.vpp.SendMsg(chMeta.id, pingData)
125         }
126
127         return nil
128 }
129
130 // msgCallback is called whenever any binary API message comes from VPP.
131 func msgCallback(context uint32, msgID uint16, data []byte) {
132         connLock.RLock()
133         defer connLock.RUnlock()
134
135         if conn == nil {
136                 log.Warn("Already disconnected, ignoring the message.")
137                 return
138         }
139
140         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
141                 log.WithFields(logger.Fields{
142                         "context":  context,
143                         "msg_id":   msgID,
144                         "msg_size": len(data),
145                 }).Debug("Received a message from VPP.")
146         }
147
148         if context == 0 || conn.isNotificationMessage(msgID) {
149                 // process the message as a notification
150                 conn.sendNotifications(msgID, data)
151                 return
152         }
153
154         // match ch according to the context
155         conn.channelsLock.RLock()
156         ch, ok := conn.channels[context]
157         conn.channelsLock.RUnlock()
158
159         if !ok {
160                 log.WithFields(logger.Fields{
161                         "context": context,
162                         "msg_id":  msgID,
163                 }).Error("Context ID not known, ignoring the message.")
164                 return
165         }
166
167         chMeta := ch.Metadata().(*channelMetadata)
168         lastReplyReceived := false
169         // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
170         if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
171                 lastReplyReceived = true
172         }
173
174         // send the data to the channel
175         sendReply(ch, &api.VppReply{
176                 MessageID:         msgID,
177                 Data:              data,
178                 LastReplyReceived: lastReplyReceived,
179         })
180
181         // store actual time of this reply
182         conn.lastReplyLock.Lock()
183         conn.lastReply = time.Now()
184         conn.lastReplyLock.Unlock()
185 }
186
187 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
188 // it logs the error and do not send the message.
189 func sendReply(ch *api.Channel, reply *api.VppReply) {
190         select {
191         case ch.ReplyChan <- reply:
192                 // reply sent successfully
193         case <-time.After(time.Millisecond * 100):
194                 // receiver still not ready
195                 log.WithFields(logger.Fields{
196                         "channel": ch,
197                         "msg_id":  reply.MessageID,
198                 }).Warn("Unable to send the reply, reciever end not ready.")
199         }
200 }
201
202 // GetMessageID returns message identifier of given API message.
203 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
204         if c == nil {
205                 return 0, errors.New("nil connection passed in")
206         }
207         return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
208 }
209
210 // messageNameToID returns message ID of a message identified by its name and CRC.
211 func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
212         msgKey := msgName + "_" + msgCrc
213
214         // try to get the ID from the map
215         c.msgIDsLock.RLock()
216         id, ok := c.msgIDs[msgKey]
217         c.msgIDsLock.RUnlock()
218         if ok {
219                 return id, nil
220         }
221
222         // get the ID using VPP API
223         id, err := c.vpp.GetMsgID(msgName, msgCrc)
224         if err != nil {
225                 err = fmt.Errorf("unable to retrieve message ID: %v", err)
226                 log.WithFields(logger.Fields{
227                         "msg_name": msgName,
228                         "msg_crc":  msgCrc,
229                 }).Error(err)
230                 return id, err
231         }
232
233         c.msgIDsLock.Lock()
234         c.msgIDs[msgKey] = id
235         c.msgIDsLock.Unlock()
236
237         return id, nil
238 }
239
240 // LookupByID looks up message name and crc by ID.
241 func (c *Connection) LookupByID(ID uint16) (string, error) {
242         if c == nil {
243                 return "", errors.New("nil connection passed in")
244         }
245
246         c.msgIDsLock.Lock()
247         defer c.msgIDsLock.Unlock()
248
249         for key, id := range c.msgIDs {
250                 if id == ID {
251                         return key, nil
252                 }
253         }
254
255         return "", fmt.Errorf("unknown message ID: %d", ID)
256 }