Fix missing lock in LookupByID
[govpp.git] / core / request_handler.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "sync/atomic"
21
22         logger "github.com/sirupsen/logrus"
23
24         "git.fd.io/govpp.git/api"
25 )
26
27 var (
28         ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
29 )
30
31 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
32 func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
33         for {
34                 select {
35                 case req, ok := <-ch.ReqChan:
36                         // new request on the request channel
37                         if !ok {
38                                 // after closing the request channel, release API channel and return
39                                 c.releaseAPIChannel(ch, chMeta)
40                                 return
41                         }
42                         c.processRequest(ch, chMeta, req)
43
44                 case req := <-ch.NotifSubsChan:
45                         // new request on the notification subscribe channel
46                         c.processNotifSubscribeRequest(ch, req)
47                 }
48         }
49 }
50
51 // processRequest processes a single request received on the request channel.
52 func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
53         // check whether we are connected to VPP
54         if atomic.LoadUint32(&c.connected) == 0 {
55                 err := ErrNotConnected
56                 log.Error(err)
57                 sendReply(ch, &api.VppReply{Error: err})
58                 return err
59         }
60
61         // retrieve message ID
62         msgID, err := c.GetMessageID(req.Message)
63         if err != nil {
64                 err = fmt.Errorf("unable to retrieve message ID: %v", err)
65                 log.WithFields(logger.Fields{
66                         "msg_name": req.Message.GetMessageName(),
67                         "msg_crc":  req.Message.GetCrcString(),
68                 }).Error(err)
69                 sendReply(ch, &api.VppReply{Error: err})
70                 return err
71         }
72
73         // encode the message into binary
74         data, err := c.codec.EncodeMsg(req.Message, msgID)
75         if err != nil {
76                 err = fmt.Errorf("unable to encode the messge: %v", err)
77                 log.WithFields(logger.Fields{
78                         "context": chMeta.id,
79                         "msg_id":  msgID,
80                 }).Error(err)
81                 sendReply(ch, &api.VppReply{Error: err})
82                 return err
83         }
84
85         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
86                 log.WithFields(logger.Fields{
87                         "context":  chMeta.id,
88                         "msg_id":   msgID,
89                         "msg_size": len(data),
90                         "msg_name": req.Message.GetMessageName(),
91                 }).Debug("Sending a message to VPP.")
92         }
93
94         // send the message
95         if req.Multipart {
96                 // expect multipart response
97                 atomic.StoreUint32(&chMeta.multipart, 1)
98         }
99
100         // send the request to VPP
101         c.vpp.SendMsg(chMeta.id, data)
102
103         if req.Multipart {
104                 // send a control ping to determine end of the multipart response
105                 pingData, _ := c.codec.EncodeMsg(msgControlPing, c.pingReqID)
106
107                 log.WithFields(logger.Fields{
108                         "context":  chMeta.id,
109                         "msg_id":   c.pingReqID,
110                         "msg_size": len(pingData),
111                 }).Debug("Sending a control ping to VPP.")
112
113                 c.vpp.SendMsg(chMeta.id, pingData)
114         }
115
116         return nil
117 }
118
119 // msgCallback is called whenever any binary API message comes from VPP.
120 func msgCallback(context uint32, msgID uint16, data []byte) {
121         connLock.RLock()
122         defer connLock.RUnlock()
123
124         if conn == nil {
125                 log.Warn("Already disconnected, ignoring the message.")
126                 return
127         }
128
129         if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
130                 log.WithFields(logger.Fields{
131                         "context":  context,
132                         "msg_id":   msgID,
133                         "msg_size": len(data),
134                 }).Debug("Received a message from VPP.")
135         }
136
137         if context == 0 || conn.isNotificationMessage(msgID) {
138                 // process the message as a notification
139                 conn.sendNotifications(msgID, data)
140                 return
141         }
142
143         // match ch according to the context
144         conn.channelsLock.RLock()
145         ch, ok := conn.channels[context]
146         conn.channelsLock.RUnlock()
147
148         if !ok {
149                 log.WithFields(logger.Fields{
150                         "context": context,
151                         "msg_id":  msgID,
152                 }).Error("Context ID not known, ignoring the message.")
153                 return
154         }
155
156         chMeta := ch.Metadata().(*channelMetadata)
157         lastReplyReceived := false
158         // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
159         if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
160                 lastReplyReceived = true
161         }
162
163         // send the data to the channel
164         sendReply(ch, &api.VppReply{
165                 MessageID:         msgID,
166                 Data:              data,
167                 LastReplyReceived: lastReplyReceived,
168         })
169 }
170
171 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
172 // it logs the error and do not send the message.
173 func sendReply(ch *api.Channel, reply *api.VppReply) {
174         select {
175         case ch.ReplyChan <- reply:
176                 // reply sent successfully
177         default:
178                 // unable to write into the channel without blocking
179                 log.WithFields(logger.Fields{
180                         "channel": ch,
181                         "msg_id":  reply.MessageID,
182                 }).Warn("Unable to send the reply, reciever end not ready.")
183         }
184 }
185
186 // GetMessageID returns message identifier of given API message.
187 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
188         if c == nil {
189                 return 0, errors.New("nil connection passed in")
190         }
191         return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
192 }
193
194 // messageNameToID returns message ID of a message identified by its name and CRC.
195 func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
196         msgKey := msgName + "_" + msgCrc
197
198         // try to get the ID from the map
199         c.msgIDsLock.RLock()
200         id, ok := c.msgIDs[msgKey]
201         c.msgIDsLock.RUnlock()
202         if ok {
203                 return id, nil
204         }
205
206         // get the ID using VPP API
207         id, err := c.vpp.GetMsgID(msgName, msgCrc)
208         if err != nil {
209                 err = fmt.Errorf("unable to retrieve message ID: %v", err)
210                 log.WithFields(logger.Fields{
211                         "msg_name": msgName,
212                         "msg_crc":  msgCrc,
213                 }).Error(err)
214                 return id, err
215         }
216
217         c.msgIDsLock.Lock()
218         c.msgIDs[msgKey] = id
219         c.msgIDsLock.Unlock()
220
221         return id, nil
222 }
223
224 // LookupByID looks up message name and crc by ID.
225 func (c *Connection) LookupByID(ID uint16) (string, error) {
226         if c == nil {
227                 return "", errors.New("nil connection passed in")
228         }
229
230         c.msgIDsLock.Lock()
231         defer c.msgIDsLock.Unlock()
232
233         for key, id := range c.msgIDs {
234                 if id == ID {
235                         return key, nil
236                 }
237         }
238
239         return "", fmt.Errorf("unknown message ID: %d", ID)
240 }