Merge "Fix imports for github.com/sirupsen/logrus"
[govpp.git] / core / request_handler.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "sync/atomic"
21
22         logger "github.com/sirupsen/logrus"
23
24         "git.fd.io/govpp.git/api"
25         "git.fd.io/govpp.git/core/bin_api/vpe"
26 )
27
28 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
29 func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
30         for {
31                 select {
32                 case req, ok := <-ch.ReqChan:
33                         // new request on the request channel
34                         if !ok {
35                                 // after closing the request channel, release API channel and return
36                                 c.releaseAPIChannel(ch, chMeta)
37                                 return
38                         }
39                         c.processRequest(ch, chMeta, req)
40
41                 case req := <-ch.NotifSubsChan:
42                         // new request on the notification subscribe channel
43                         c.processNotifSubscribeRequest(ch, req)
44                 }
45         }
46 }
47
48 // processRequest processes a single request received on the request channel.
49 func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
50         // check whether we are connected to VPP
51         if atomic.LoadUint32(&c.connected) == 0 {
52                 error := errors.New("not connected to VPP, ignoring the request")
53                 log.Error(error)
54                 sendReply(ch, &api.VppReply{Error: error})
55                 return error
56         }
57
58         // retrieve message ID
59         msgID, err := c.GetMessageID(req.Message)
60         if err != nil {
61                 error := fmt.Errorf("unable to retrieve message ID: %v", err)
62                 log.WithFields(logger.Fields{
63                         "msg_name": req.Message.GetMessageName(),
64                         "msg_crc":  req.Message.GetCrcString(),
65                 }).Error(err)
66                 sendReply(ch, &api.VppReply{Error: error})
67                 return error
68         }
69
70         // encode the message into binary
71         data, err := c.codec.EncodeMsg(req.Message, msgID)
72         if err != nil {
73                 error := fmt.Errorf("unable to encode the messge: %v", err)
74                 log.WithFields(logger.Fields{
75                         "context": chMeta.id,
76                         "msg_id":  msgID,
77                 }).Error(error)
78                 sendReply(ch, &api.VppReply{Error: error})
79                 return error
80         }
81
82         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
83                 log.WithFields(logger.Fields{
84                         "context":  chMeta.id,
85                         "msg_id":   msgID,
86                         "msg_size": len(data),
87                 }).Debug("Sending a message to VPP.")
88         }
89
90         // send the message
91         if req.Multipart {
92                 // expect multipart response
93                 atomic.StoreUint32(&chMeta.multipart, 1)
94         }
95
96         // send the request to VPP
97         c.vpp.SendMsg(chMeta.id, data)
98
99         if req.Multipart {
100                 // send a control ping to determine end of the multipart response
101                 ping := &vpe.ControlPing{}
102                 pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
103
104                 log.WithFields(logger.Fields{
105                         "context":  chMeta.id,
106                         "msg_id":   c.pingReqID,
107                         "msg_size": len(pingData),
108                 }).Debug("Sending a control ping to VPP.")
109
110                 c.vpp.SendMsg(chMeta.id, pingData)
111         }
112
113         return nil
114 }
115
116 // msgCallback is called whenever any binary API message comes from VPP.
117 func msgCallback(context uint32, msgID uint16, data []byte) {
118         connLock.RLock()
119         defer connLock.RUnlock()
120
121         if conn == nil {
122                 log.Warn("Already disconnected, ignoring the message.")
123                 return
124         }
125
126         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
127                 log.WithFields(logger.Fields{
128                         "context":  context,
129                         "msg_id":   msgID,
130                         "msg_size": len(data),
131                 }).Debug("Received a message from VPP.")
132         }
133
134         if context == 0 || conn.isNotificationMessage(msgID) {
135                 // process the message as a notification
136                 conn.sendNotifications(msgID, data)
137                 return
138         }
139
140         // match ch according to the context
141         conn.channelsLock.RLock()
142         ch, ok := conn.channels[context]
143         conn.channelsLock.RUnlock()
144
145         if !ok {
146                 log.WithFields(logger.Fields{
147                         "context": context,
148                         "msg_id":  msgID,
149                 }).Error("Context ID not known, ignoring the message.")
150                 return
151         }
152
153         chMeta := ch.Metadata().(*channelMetadata)
154         lastReplyReceived := false
155         // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
156         if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
157                 lastReplyReceived = true
158         }
159
160         // send the data to the channel
161         sendReply(ch, &api.VppReply{
162                 MessageID:         msgID,
163                 Data:              data,
164                 LastReplyReceived: lastReplyReceived,
165         })
166 }
167
168 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
169 // it logs the error and do not send the message.
170 func sendReply(ch *api.Channel, reply *api.VppReply) {
171         select {
172         case ch.ReplyChan <- reply:
173                 // reply sent successfully
174         default:
175                 // unable to write into the channel without blocking
176                 log.WithFields(logger.Fields{
177                         "channel": ch,
178                         "msg_id":  reply.MessageID,
179                 }).Warn("Unable to send the reply, reciever end not ready.")
180         }
181 }
182
183 // GetMessageID returns message identifier of given API message.
184 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
185         if c == nil {
186                 return 0, errors.New("nil connection passed in")
187         }
188         return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
189 }
190
191 // messageNameToID returns message ID of a message identified by its name and CRC.
192 func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
193         // try to get the ID from the map
194         c.msgIDsLock.RLock()
195         id, ok := c.msgIDs[msgName+msgCrc]
196         c.msgIDsLock.RUnlock()
197         if ok {
198                 return id, nil
199         }
200
201         // get the ID using VPP API
202         id, err := c.vpp.GetMsgID(msgName, msgCrc)
203         if err != nil {
204                 error := fmt.Errorf("unable to retrieve message ID: %v", err)
205                 log.WithFields(logger.Fields{
206                         "msg_name": msgName,
207                         "msg_crc":  msgCrc,
208                 }).Errorf("unable to retrieve message ID: %v", err)
209                 return id, error
210         }
211
212         c.msgIDsLock.Lock()
213         c.msgIDs[msgName+msgCrc] = id
214         c.msgIDsLock.Unlock()
215
216         return id, nil
217 }