e11a30f43d91bfea71317d53232db54f0a677ba2
[govpp.git] / core / core.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 //go:generate binapi_generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api
18
19 import (
20         "errors"
21         "fmt"
22         "os"
23         "sync"
24         "sync/atomic"
25
26         logger "github.com/Sirupsen/logrus"
27
28         "gerrit.fd.io/r/govpp/adapter"
29         "gerrit.fd.io/r/govpp/api"
30         "gerrit.fd.io/r/govpp/core/bin_api/vpe"
31 )
32
33 const (
34         requestChannelBufSize = 100 // default size of the request channel buffers
35         replyChannelBufSize   = 100 // default size of the reply channel buffers
36 )
37
38 // Connection represents a shared memory connection to VPP via vppAdapter.
39 type Connection struct {
40         vpp   adapter.VppAdapter // VPP adapter
41         codec *MsgCodec          // message codec
42
43         msgIDs     map[string]uint16 // map os message IDs indexed by message name + CRC
44         msgIDsLock sync.RWMutex      // lock for the message IDs map
45
46         channels     map[uint32]*api.Channel // map of all API channels indexed by the channel ID
47         channelsLock sync.RWMutex            // lock for the channels map
48
49         notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
50         notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
51
52         maxChannelID uint32 // maximum used client ID
53         pingReqID    uint16 // ID if the ControlPing message
54         pingReplyID  uint16 // ID of the ControlPingReply message
55 }
56
57 // channelMetadata contains core-local metadata of an API channel.
58 type channelMetadata struct {
59         id        uint32 // channel ID
60         multipart uint32 // 1 if multipart request is being processed, 0 otherwise
61 }
62
63 var (
64         log      *logger.Logger // global logger
65         conn     *Connection    // global handle to the Connection (used in the message receive callback)
66         connLock sync.RWMutex   // lock for the global connection
67 )
68
69 // init initializes global logger, which logs debug level messages to stdout.
70 func init() {
71         log = logger.New()
72         log.Out = os.Stdout
73         log.Level = logger.DebugLevel
74 }
75
76 // SetLogger sets global logger to provided one.
77 func SetLogger(l *logger.Logger) {
78         log = l
79 }
80
81 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
82 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
83         connLock.Lock()
84         defer connLock.Unlock()
85
86         if conn != nil {
87                 return nil, errors.New("only one connection per process is supported")
88         }
89
90         conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
91         conn.channels = make(map[uint32]*api.Channel)
92         conn.msgIDs = make(map[string]uint16)
93         conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
94
95         conn.vpp.SetMsgCallback(msgCallback)
96
97         logger.Debug("Connecting to VPP...")
98
99         err := conn.vpp.Connect()
100         if err != nil {
101                 return nil, err
102         }
103
104         // store control ping IDs
105         conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{})
106         conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{})
107
108         logger.Debug("VPP connected.")
109
110         return conn, nil
111 }
112
113 // Disconnect disconnects from VPP.
114 func (c *Connection) Disconnect() {
115         connLock.Lock()
116         defer connLock.Unlock()
117
118         if c != nil && c.vpp != nil {
119                 c.vpp.Disconnect()
120         }
121         conn = nil
122 }
123
124 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
125 // It uses default buffer sizes for the request and reply Go channels.
126 func (c *Connection) NewAPIChannel() (*api.Channel, error) {
127         return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
128 }
129
130 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
131 // It allows to specify custom buffer sizes for the request and reply Go channels.
132 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
133         chID := atomic.AddUint32(&c.maxChannelID, 1)
134         chMeta := &channelMetadata{id: chID}
135
136         ch := api.NewChannelInternal(chMeta)
137         ch.MsgDecoder = c.codec
138         ch.MsgIdentifier = c
139
140         // create the communication channels
141         ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
142         ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
143         ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
144         ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
145
146         // store API channel within the client
147         c.channelsLock.Lock()
148         c.channels[chID] = ch
149         c.channelsLock.Unlock()
150
151         // start watching on the request channel
152         go c.watchRequests(ch, chMeta)
153
154         return ch, nil
155 }
156
157 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
158 func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
159         for {
160                 select {
161                 case req, ok := <-ch.ReqChan:
162                         // new request on the request channel
163                         if !ok {
164                                 // after closing the request channel, release API channel and return
165                                 c.releaseAPIChannel(ch, chMeta)
166                                 return
167                         }
168                         c.processRequest(ch, chMeta, req)
169
170                 case req := <-ch.NotifSubsChan:
171                         // new request on the notification subscribe channel
172                         c.processNotifSubscribeRequest(ch, req)
173                 }
174         }
175 }
176
177 // processRequest processes a single request received on the request channel.
178 func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
179         // retrieve message ID
180         msgID, err := c.GetMessageID(req.Message)
181         if err != nil {
182                 error := fmt.Errorf("unable to retrieve message ID: %v", err)
183                 log.WithFields(logger.Fields{
184                         "msg_name": req.Message.GetMessageName(),
185                         "msg_crc":  req.Message.GetCrcString(),
186                 }).Errorf("unable to retrieve message ID: %v", err)
187                 sendReply(ch, &api.VppReply{Error: error})
188                 return error
189         }
190
191         // encode the message into binary
192         data, err := c.codec.EncodeMsg(req.Message, msgID)
193         if err != nil {
194                 error := fmt.Errorf("unable to encode the messge: %v", err)
195                 log.WithFields(logger.Fields{
196                         "context": chMeta.id,
197                         "msg_id":  msgID,
198                 }).Errorf("%v", error)
199                 sendReply(ch, &api.VppReply{Error: error})
200                 return error
201         }
202
203         // send the message
204         log.WithFields(logger.Fields{
205                 "context":  chMeta.id,
206                 "msg_id":   msgID,
207                 "msg_size": len(data),
208         }).Debug("Sending a message to VPP.")
209
210         c.vpp.SendMsg(chMeta.id, data)
211
212         if req.Multipart {
213                 // multipart request
214                 atomic.StoreUint32(&chMeta.multipart, 1)
215
216                 // send a control ping
217                 ping := &vpe.ControlPing{}
218                 pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
219
220                 log.WithFields(logger.Fields{
221                         "context":  chMeta.id,
222                         "msg_id":   c.pingReqID,
223                         "msg_size": len(pingData),
224                 }).Debug("Sending a control ping to VPP.")
225
226                 c.vpp.SendMsg(chMeta.id, pingData)
227         }
228
229         return nil
230 }
231
232 // releaseAPIChannel releases API channel that needs to be closed.
233 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
234         log.WithFields(logger.Fields{
235                 "context": chMeta.id,
236         }).Debug("API channel closed.")
237
238         // delete the channel from channels map
239         c.channelsLock.Lock()
240         delete(c.channels, chMeta.id)
241         c.channelsLock.Unlock()
242 }
243
244 // msgCallback is called whenever any binary API message comes from VPP.
245 func msgCallback(context uint32, msgID uint16, data []byte) {
246         connLock.RLock()
247         defer connLock.RUnlock()
248
249         if conn == nil {
250                 log.Warn("Already disconnected, ignoring the message.")
251                 return
252         }
253
254         log.WithFields(logger.Fields{
255                 "context":  context,
256                 "msg_id":   msgID,
257                 "msg_size": len(data),
258         }).Debug("Received a message from VPP.")
259
260         if context == 0 || conn.isNotificationMessage(msgID) {
261                 // process the message as a notification
262                 conn.sendNotifications(msgID, data)
263                 return
264         }
265
266         // match ch according to the context
267         conn.channelsLock.RLock()
268         ch, ok := conn.channels[context]
269         conn.channelsLock.RUnlock()
270
271         if !ok {
272                 log.WithFields(logger.Fields{
273                         "context": context,
274                         "msg_id":  msgID,
275                 }).Error("Context ID not known, ignoring the message.")
276                 return
277         }
278
279         chMeta := ch.Metadata().(*channelMetadata)
280         lastReplyReceived := false
281         // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
282         if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
283                 lastReplyReceived = true
284         }
285
286         // send the data to the channel
287         sendReply(ch, &api.VppReply{
288                 MessageID:         msgID,
289                 Data:              data,
290                 LastReplyReceived: lastReplyReceived,
291         })
292 }
293
294 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
295 // it logs the error and do not send the message.
296 func sendReply(ch *api.Channel, reply *api.VppReply) {
297         select {
298         case ch.ReplyChan <- reply:
299                 // reply sent successfully
300         default:
301                 // unable to write into the channel without blocking
302                 log.WithFields(logger.Fields{
303                         "channel": ch,
304                         "msg_id":  reply.MessageID,
305                 }).Warn("Unable to send the reply, reciever end not ready.")
306         }
307 }
308
309 // GetMessageID returns message identifier of given API message.
310 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
311         return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
312 }
313
314 // messageNameToID returns message ID of a message identified by its name and CRC.
315 func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
316         // try to get the ID from the map
317         c.msgIDsLock.RLock()
318         id, ok := c.msgIDs[msgName+msgCrc]
319         c.msgIDsLock.RUnlock()
320         if ok {
321                 return id, nil
322         }
323
324         // get the ID using VPP API
325         id, err := c.vpp.GetMsgID(msgName, msgCrc)
326         if err != nil {
327                 error := fmt.Errorf("unable to retrieve message ID: %v", err)
328                 log.WithFields(logger.Fields{
329                         "msg_name": msgName,
330                         "msg_crc":  msgCrc,
331                 }).Errorf("unable to retrieve message ID: %v", err)
332                 return id, error
333         }
334
335         c.msgIDsLock.Lock()
336         c.msgIDs[msgName+msgCrc] = id
337         c.msgIDsLock.Unlock()
338
339         return id, nil
340 }