make api.Channel as interface
[govpp.git] / core / request_handler.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "sync/atomic"
21         "time"
22
23         logger "github.com/sirupsen/logrus"
24
25         "git.fd.io/govpp.git/api"
26 )
27
28 var (
29         ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
30         ErrProbeTimeout = errors.New("probe reply not received within timeout period")
31 )
32
33 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
34 func (c *Connection) watchRequests(ch *channel) {
35         for {
36                 select {
37                 case req, ok := <-ch.reqChan:
38                         // new request on the request channel
39                         if !ok {
40                                 // after closing the request channel, release API channel and return
41                                 c.releaseAPIChannel(ch)
42                                 return
43                         }
44                         c.processRequest(ch, req)
45
46                 case req := <-ch.notifSubsChan:
47                         // new request on the notification subscribe channel
48                         c.processNotifSubscribeRequest(ch, req)
49                 }
50         }
51 }
52
53 // processRequest processes a single request received on the request channel.
54 func (c *Connection) processRequest(ch *channel, req *api.VppRequest) error {
55         // check whether we are connected to VPP
56         if atomic.LoadUint32(&c.connected) == 0 {
57                 err := ErrNotConnected
58                 log.Error(err)
59                 sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
60                 return err
61         }
62
63         // retrieve message ID
64         msgID, err := c.GetMessageID(req.Message)
65         if err != nil {
66                 err = fmt.Errorf("unable to retrieve message ID: %v", err)
67                 log.WithFields(logger.Fields{
68                         "msg_name": req.Message.GetMessageName(),
69                         "msg_crc":  req.Message.GetCrcString(),
70                         "seq_num":  req.SeqNum,
71                 }).Error(err)
72                 sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
73                 return err
74         }
75
76         // encode the message into binary
77         data, err := c.codec.EncodeMsg(req.Message, msgID)
78         if err != nil {
79                 err = fmt.Errorf("unable to encode the messge: %v", err)
80                 log.WithFields(logger.Fields{
81                         "channel": ch.id,
82                         "msg_id":  msgID,
83                         "seq_num": req.SeqNum,
84                 }).Error(err)
85                 sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
86                 return err
87         }
88
89         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
90                 log.WithFields(logger.Fields{
91                         "channel":  ch.id,
92                         "msg_id":   msgID,
93                         "msg_size": len(data),
94                         "msg_name": req.Message.GetMessageName(),
95                         "seq_num":  req.SeqNum,
96                 }).Debug("Sending a message to VPP.")
97         }
98
99         // send the request to VPP
100         context := packRequestContext(ch.id, req.Multipart, req.SeqNum)
101         err = c.vpp.SendMsg(context, data)
102         if err != nil {
103                 err = fmt.Errorf("unable to send the message: %v", err)
104                 log.WithFields(logger.Fields{
105                         "context": context,
106                         "msg_id":  msgID,
107                         "seq_num": req.SeqNum,
108                 }).Error(err)
109                 sendReply(ch, &api.VppReply{SeqNum: req.SeqNum, Error: err})
110                 return err
111         }
112
113         if req.Multipart {
114                 // send a control ping to determine end of the multipart response
115                 pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
116
117                 log.WithFields(logger.Fields{
118                         "context":  context,
119                         "msg_id":   c.pingReqID,
120                         "msg_size": len(pingData),
121                         "seq_num":  req.SeqNum,
122                 }).Debug("Sending a control ping to VPP.")
123
124                 c.vpp.SendMsg(context, pingData)
125         }
126
127         return nil
128 }
129
130 // msgCallback is called whenever any binary API message comes from VPP.
131 func msgCallback(context uint32, msgID uint16, data []byte) {
132         connLock.RLock()
133         defer connLock.RUnlock()
134
135         if conn == nil {
136                 log.Warn("Already disconnected, ignoring the message.")
137                 return
138         }
139
140         chanID, isMultipart, seqNum := unpackRequestContext(context)
141         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
142                 log.WithFields(logger.Fields{
143                         "msg_id":       msgID,
144                         "msg_size":     len(data),
145                         "channel_id":   chanID,
146                         "is_multipart": isMultipart,
147                         "seq_num":      seqNum,
148                 }).Debug("Received a message from VPP.")
149         }
150
151         if context == 0 || conn.isNotificationMessage(msgID) {
152                 // process the message as a notification
153                 conn.sendNotifications(msgID, data)
154                 return
155         }
156
157         // match ch according to the context
158         conn.channelsLock.RLock()
159         ch, ok := conn.channels[chanID]
160         conn.channelsLock.RUnlock()
161
162         if !ok {
163                 log.WithFields(logger.Fields{
164                         "channel_id": chanID,
165                         "msg_id":     msgID,
166                 }).Error("Channel ID not known, ignoring the message.")
167                 return
168         }
169
170         lastReplyReceived := false
171         // if this is a control ping reply to a multipart request, treat this as a last part of the reply
172         if msgID == conn.pingReplyID && isMultipart {
173                 lastReplyReceived = true
174         }
175
176         // send the data to the channel
177         sendReply(ch, &api.VppReply{
178                 MessageID:         msgID,
179                 SeqNum:            seqNum,
180                 Data:              data,
181                 LastReplyReceived: lastReplyReceived,
182         })
183
184         // store actual time of this reply
185         conn.lastReplyLock.Lock()
186         conn.lastReply = time.Now()
187         conn.lastReplyLock.Unlock()
188 }
189
190 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
191 // it logs the error and do not send the message.
192 func sendReply(ch *channel, reply *api.VppReply) {
193         select {
194         case ch.replyChan <- reply:
195                 // reply sent successfully
196         case <-time.After(time.Millisecond * 100):
197                 // receiver still not ready
198                 log.WithFields(logger.Fields{
199                         "channel": ch,
200                         "msg_id":  reply.MessageID,
201                         "seq_num": reply.SeqNum,
202                 }).Warn("Unable to send the reply, reciever end not ready.")
203         }
204 }
205
206 // GetMessageID returns message identifier of given API message.
207 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
208         if c == nil {
209                 return 0, errors.New("nil connection passed in")
210         }
211         return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
212 }
213
214 // messageNameToID returns message ID of a message identified by its name and CRC.
215 func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
216         msgKey := msgName + "_" + msgCrc
217
218         // try to get the ID from the map
219         c.msgIDsLock.RLock()
220         id, ok := c.msgIDs[msgKey]
221         c.msgIDsLock.RUnlock()
222         if ok {
223                 return id, nil
224         }
225
226         // get the ID using VPP API
227         id, err := c.vpp.GetMsgID(msgName, msgCrc)
228         if err != nil {
229                 err = fmt.Errorf("unable to retrieve message ID: %v", err)
230                 log.WithFields(logger.Fields{
231                         "msg_name": msgName,
232                         "msg_crc":  msgCrc,
233                 }).Error(err)
234                 return id, err
235         }
236
237         c.msgIDsLock.Lock()
238         c.msgIDs[msgKey] = id
239         c.msgIDsLock.Unlock()
240
241         return id, nil
242 }
243
244 // LookupByID looks up message name and crc by ID.
245 func (c *Connection) LookupByID(ID uint16) (string, error) {
246         if c == nil {
247                 return "", errors.New("nil connection passed in")
248         }
249
250         c.msgIDsLock.Lock()
251         defer c.msgIDsLock.Unlock()
252
253         for key, id := range c.msgIDs {
254                 if id == ID {
255                         return key, nil
256                 }
257         }
258
259         return "", fmt.Errorf("unknown message ID: %d", ID)
260 }
261
262 // +------------------+-------------------+-----------------------+
263 // | 15b = channel ID | 1b = is multipart | 16b = sequence number |
264 // +------------------+-------------------+-----------------------+
265 func packRequestContext(chanID uint16, isMultipart bool, seqNum uint16) uint32 {
266         context := uint32(chanID) << 17
267         if isMultipart {
268                 context |= 1 << 16
269         }
270         context |= uint32(seqNum)
271         return context
272 }
273
274 func unpackRequestContext(context uint32) (chanID uint16, isMulipart bool, seqNum uint16) {
275         chanID = uint16(context >> 17)
276         if ((context >> 16) & 0x1) != 0 {
277                 isMulipart = true
278         }
279         seqNum = uint16(context & 0xffff)
280         return
281 }