f4f5e92b70b7316cc8c87c6fa604ae3f03636947
[govpp.git] / core / request_handler.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "sync/atomic"
21
22         logger "github.com/Sirupsen/logrus"
23
24         "git.fd.io/govpp.git/api"
25         "git.fd.io/govpp.git/core/bin_api/vpe"
26 )
27
28 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
29 func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
30         for {
31                 select {
32                 case req, ok := <-ch.ReqChan:
33                         // new request on the request channel
34                         if !ok {
35                                 // after closing the request channel, release API channel and return
36                                 c.releaseAPIChannel(ch, chMeta)
37                                 return
38                         }
39                         c.processRequest(ch, chMeta, req)
40
41                 case req := <-ch.NotifSubsChan:
42                         // new request on the notification subscribe channel
43                         c.processNotifSubscribeRequest(ch, req)
44                 }
45         }
46 }
47
48 // processRequest processes a single request received on the request channel.
49 func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
50         // check whether we are connected to VPP
51         if atomic.LoadUint32(&c.connected) == 0 {
52                 error := errors.New("not connected to VPP, ignoring the request")
53                 log.Error(error)
54                 sendReply(ch, &api.VppReply{Error: error})
55                 return error
56         }
57
58         // retrieve message ID
59         msgID, err := c.GetMessageID(req.Message)
60         if err != nil {
61                 error := fmt.Errorf("unable to retrieve message ID: %v", err)
62                 log.WithFields(logger.Fields{
63                         "msg_name": req.Message.GetMessageName(),
64                         "msg_crc":  req.Message.GetCrcString(),
65                 }).Error(err)
66                 sendReply(ch, &api.VppReply{Error: error})
67                 return error
68         }
69
70         // encode the message into binary
71         data, err := c.codec.EncodeMsg(req.Message, msgID)
72         if err != nil {
73                 error := fmt.Errorf("unable to encode the messge: %v", err)
74                 log.WithFields(logger.Fields{
75                         "context": chMeta.id,
76                         "msg_id":  msgID,
77                 }).Error(error)
78                 sendReply(ch, &api.VppReply{Error: error})
79                 return error
80         }
81
82         // send the message
83         log.WithFields(logger.Fields{
84                 "context":  chMeta.id,
85                 "msg_id":   msgID,
86                 "msg_size": len(data),
87         }).Debug("Sending a message to VPP.")
88
89         if req.Multipart {
90                 // expect multipart response
91                 atomic.StoreUint32(&chMeta.multipart, 1)
92         }
93
94         // send the request to VPP
95         c.vpp.SendMsg(chMeta.id, data)
96
97         if req.Multipart {
98                 // send a control ping to determine end of the multipart response
99                 ping := &vpe.ControlPing{}
100                 pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
101
102                 log.WithFields(logger.Fields{
103                         "context":  chMeta.id,
104                         "msg_id":   c.pingReqID,
105                         "msg_size": len(pingData),
106                 }).Debug("Sending a control ping to VPP.")
107
108                 c.vpp.SendMsg(chMeta.id, pingData)
109         }
110
111         return nil
112 }
113
114 // msgCallback is called whenever any binary API message comes from VPP.
115 func msgCallback(context uint32, msgID uint16, data []byte) {
116         connLock.RLock()
117         defer connLock.RUnlock()
118
119         if conn == nil {
120                 log.Warn("Already disconnected, ignoring the message.")
121                 return
122         }
123
124         log.WithFields(logger.Fields{
125                 "context":  context,
126                 "msg_id":   msgID,
127                 "msg_size": len(data),
128         }).Debug("Received a message from VPP.")
129
130         if context == 0 || conn.isNotificationMessage(msgID) {
131                 // process the message as a notification
132                 conn.sendNotifications(msgID, data)
133                 return
134         }
135
136         // match ch according to the context
137         conn.channelsLock.RLock()
138         ch, ok := conn.channels[context]
139         conn.channelsLock.RUnlock()
140
141         if !ok {
142                 log.WithFields(logger.Fields{
143                         "context": context,
144                         "msg_id":  msgID,
145                 }).Error("Context ID not known, ignoring the message.")
146                 return
147         }
148
149         chMeta := ch.Metadata().(*channelMetadata)
150         lastReplyReceived := false
151         // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
152         if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
153                 lastReplyReceived = true
154         }
155
156         // send the data to the channel
157         sendReply(ch, &api.VppReply{
158                 MessageID:         msgID,
159                 Data:              data,
160                 LastReplyReceived: lastReplyReceived,
161         })
162 }
163
164 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
165 // it logs the error and do not send the message.
166 func sendReply(ch *api.Channel, reply *api.VppReply) {
167         select {
168         case ch.ReplyChan <- reply:
169                 // reply sent successfully
170         default:
171                 // unable to write into the channel without blocking
172                 log.WithFields(logger.Fields{
173                         "channel": ch,
174                         "msg_id":  reply.MessageID,
175                 }).Warn("Unable to send the reply, reciever end not ready.")
176         }
177 }
178
179 // GetMessageID returns message identifier of given API message.
180 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
181         if c == nil {
182                 return 0, errors.New("nil connection passed in")
183         }
184         return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
185 }
186
187 // messageNameToID returns message ID of a message identified by its name and CRC.
188 func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
189         // try to get the ID from the map
190         c.msgIDsLock.RLock()
191         id, ok := c.msgIDs[msgName+msgCrc]
192         c.msgIDsLock.RUnlock()
193         if ok {
194                 return id, nil
195         }
196
197         // get the ID using VPP API
198         id, err := c.vpp.GetMsgID(msgName, msgCrc)
199         if err != nil {
200                 error := fmt.Errorf("unable to retrieve message ID: %v", err)
201                 log.WithFields(logger.Fields{
202                         "msg_name": msgName,
203                         "msg_crc":  msgCrc,
204                 }).Errorf("unable to retrieve message ID: %v", err)
205                 return id, error
206         }
207
208         c.msgIDsLock.Lock()
209         c.msgIDs[msgName+msgCrc] = id
210         c.msgIDsLock.Unlock()
211
212         return id, nil
213 }