binapi-generator renamed & moved, finished documentation
[govpp.git] / core / core.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 //go:generate binapi-generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api
18
19 import (
20         "errors"
21         "fmt"
22         "os"
23         "sync"
24         "sync/atomic"
25
26         logger "github.com/Sirupsen/logrus"
27
28         "git.fd.io/govpp.git/adapter"
29         "git.fd.io/govpp.git/api"
30         "git.fd.io/govpp.git/core/bin_api/vpe"
31 )
32
33 const (
34         requestChannelBufSize = 100 // default size of the request channel buffers
35         replyChannelBufSize   = 100 // default size of the reply channel buffers
36 )
37
38 // Connection represents a shared memory connection to VPP via vppAdapter.
39 type Connection struct {
40         vpp   adapter.VppAdapter // VPP adapter
41         codec *MsgCodec          // message codec
42
43         msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
44         msgIDsLock sync.RWMutex      // lock for the message IDs map
45
46         channels     map[uint32]*api.Channel // map of all API channels indexed by the channel ID
47         channelsLock sync.RWMutex            // lock for the channels map
48
49         notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
50         notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
51
52         maxChannelID uint32 // maximum used client ID
53         pingReqID    uint16 // ID if the ControlPing message
54         pingReplyID  uint16 // ID of the ControlPingReply message
55 }
56
57 // channelMetadata contains core-local metadata of an API channel.
58 type channelMetadata struct {
59         id        uint32 // channel ID
60         multipart uint32 // 1 if multipart request is being processed, 0 otherwise
61 }
62
63 var (
64         log      *logger.Logger // global logger
65         conn     *Connection    // global handle to the Connection (used in the message receive callback)
66         connLock sync.RWMutex   // lock for the global connection
67 )
68
69 // init initializes global logger, which logs debug level messages to stdout.
70 func init() {
71         log = logger.New()
72         log.Out = os.Stdout
73         log.Level = logger.DebugLevel
74 }
75
76 // SetLogger sets global logger to provided one.
77 func SetLogger(l *logger.Logger) {
78         log = l
79 }
80
81 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
82 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
83         connLock.Lock()
84         defer connLock.Unlock()
85
86         if conn != nil {
87                 return nil, errors.New("only one connection per process is supported")
88         }
89
90         conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
91         conn.channels = make(map[uint32]*api.Channel)
92         conn.msgIDs = make(map[string]uint16)
93         conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
94
95         conn.vpp.SetMsgCallback(msgCallback)
96
97         logger.Debug("Connecting to VPP...")
98
99         err := conn.vpp.Connect()
100         if err != nil {
101                 return nil, err
102         }
103
104         // store control ping IDs
105         conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{})
106         conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{})
107
108         logger.Debug("VPP connected.")
109
110         return conn, nil
111 }
112
113 // Disconnect disconnects from VPP.
114 func (c *Connection) Disconnect() {
115         if c == nil {
116                 return
117         }
118         connLock.Lock()
119         defer connLock.Unlock()
120
121         if c != nil && c.vpp != nil {
122                 c.vpp.Disconnect()
123         }
124         conn = nil
125 }
126
127 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
128 // It uses default buffer sizes for the request and reply Go channels.
129 func (c *Connection) NewAPIChannel() (*api.Channel, error) {
130         if c == nil {
131                 return nil, errors.New("nil connection passed in")
132         }
133         return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
134 }
135
136 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
137 // It allows to specify custom buffer sizes for the request and reply Go channels.
138 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
139         if c == nil {
140                 return nil, errors.New("nil connection passed in")
141         }
142         chID := atomic.AddUint32(&c.maxChannelID, 1)
143         chMeta := &channelMetadata{id: chID}
144
145         ch := api.NewChannelInternal(chMeta)
146         ch.MsgDecoder = c.codec
147         ch.MsgIdentifier = c
148
149         // create the communication channels
150         ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
151         ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
152         ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
153         ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
154
155         // store API channel within the client
156         c.channelsLock.Lock()
157         c.channels[chID] = ch
158         c.channelsLock.Unlock()
159
160         // start watching on the request channel
161         go c.watchRequests(ch, chMeta)
162
163         return ch, nil
164 }
165
166 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
167 func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
168         for {
169                 select {
170                 case req, ok := <-ch.ReqChan:
171                         // new request on the request channel
172                         if !ok {
173                                 // after closing the request channel, release API channel and return
174                                 c.releaseAPIChannel(ch, chMeta)
175                                 return
176                         }
177                         c.processRequest(ch, chMeta, req)
178
179                 case req := <-ch.NotifSubsChan:
180                         // new request on the notification subscribe channel
181                         c.processNotifSubscribeRequest(ch, req)
182                 }
183         }
184 }
185
186 // processRequest processes a single request received on the request channel.
187 func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
188         // retrieve message ID
189         msgID, err := c.GetMessageID(req.Message)
190         if err != nil {
191                 error := fmt.Errorf("unable to retrieve message ID: %v", err)
192                 log.WithFields(logger.Fields{
193                         "msg_name": req.Message.GetMessageName(),
194                         "msg_crc":  req.Message.GetCrcString(),
195                 }).Errorf("unable to retrieve message ID: %v", err)
196                 sendReply(ch, &api.VppReply{Error: error})
197                 return error
198         }
199
200         // encode the message into binary
201         data, err := c.codec.EncodeMsg(req.Message, msgID)
202         if err != nil {
203                 error := fmt.Errorf("unable to encode the messge: %v", err)
204                 log.WithFields(logger.Fields{
205                         "context": chMeta.id,
206                         "msg_id":  msgID,
207                 }).Errorf("%v", error)
208                 sendReply(ch, &api.VppReply{Error: error})
209                 return error
210         }
211
212         // send the message
213         log.WithFields(logger.Fields{
214                 "context":  chMeta.id,
215                 "msg_id":   msgID,
216                 "msg_size": len(data),
217         }).Debug("Sending a message to VPP.")
218
219         if req.Multipart {
220                 // expect multipart response
221                 atomic.StoreUint32(&chMeta.multipart, 1)
222         }
223
224         // send the request to VPP
225         c.vpp.SendMsg(chMeta.id, data)
226
227         if req.Multipart {
228                 // send a control ping to determine end of the multipart response
229                 ping := &vpe.ControlPing{}
230                 pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
231
232                 log.WithFields(logger.Fields{
233                         "context":  chMeta.id,
234                         "msg_id":   c.pingReqID,
235                         "msg_size": len(pingData),
236                 }).Debug("Sending a control ping to VPP.")
237
238                 c.vpp.SendMsg(chMeta.id, pingData)
239         }
240
241         return nil
242 }
243
244 // releaseAPIChannel releases API channel that needs to be closed.
245 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
246         log.WithFields(logger.Fields{
247                 "context": chMeta.id,
248         }).Debug("API channel closed.")
249
250         // delete the channel from channels map
251         c.channelsLock.Lock()
252         delete(c.channels, chMeta.id)
253         c.channelsLock.Unlock()
254 }
255
256 // msgCallback is called whenever any binary API message comes from VPP.
257 func msgCallback(context uint32, msgID uint16, data []byte) {
258         connLock.RLock()
259         defer connLock.RUnlock()
260
261         if conn == nil {
262                 log.Warn("Already disconnected, ignoring the message.")
263                 return
264         }
265
266         log.WithFields(logger.Fields{
267                 "context":  context,
268                 "msg_id":   msgID,
269                 "msg_size": len(data),
270         }).Debug("Received a message from VPP.")
271
272         if context == 0 || conn.isNotificationMessage(msgID) {
273                 // process the message as a notification
274                 conn.sendNotifications(msgID, data)
275                 return
276         }
277
278         // match ch according to the context
279         conn.channelsLock.RLock()
280         ch, ok := conn.channels[context]
281         conn.channelsLock.RUnlock()
282
283         if !ok {
284                 log.WithFields(logger.Fields{
285                         "context": context,
286                         "msg_id":  msgID,
287                 }).Error("Context ID not known, ignoring the message.")
288                 return
289         }
290
291         chMeta := ch.Metadata().(*channelMetadata)
292         lastReplyReceived := false
293         // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
294         if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
295                 lastReplyReceived = true
296         }
297
298         // send the data to the channel
299         sendReply(ch, &api.VppReply{
300                 MessageID:         msgID,
301                 Data:              data,
302                 LastReplyReceived: lastReplyReceived,
303         })
304 }
305
306 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
307 // it logs the error and do not send the message.
308 func sendReply(ch *api.Channel, reply *api.VppReply) {
309         select {
310         case ch.ReplyChan <- reply:
311                 // reply sent successfully
312         default:
313                 // unable to write into the channel without blocking
314                 log.WithFields(logger.Fields{
315                         "channel": ch,
316                         "msg_id":  reply.MessageID,
317                 }).Warn("Unable to send the reply, reciever end not ready.")
318         }
319 }
320
321 // GetMessageID returns message identifier of given API message.
322 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
323         if c == nil {
324                 return 0, errors.New("nil connection passed in")
325         }
326         return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
327 }
328
329 // messageNameToID returns message ID of a message identified by its name and CRC.
330 func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
331         // try to get the ID from the map
332         c.msgIDsLock.RLock()
333         id, ok := c.msgIDs[msgName+msgCrc]
334         c.msgIDsLock.RUnlock()
335         if ok {
336                 return id, nil
337         }
338
339         // get the ID using VPP API
340         id, err := c.vpp.GetMsgID(msgName, msgCrc)
341         if err != nil {
342                 error := fmt.Errorf("unable to retrieve message ID: %v", err)
343                 log.WithFields(logger.Fields{
344                         "msg_name": msgName,
345                         "msg_crc":  msgCrc,
346                 }).Errorf("unable to retrieve message ID: %v", err)
347                 return id, error
348         }
349
350         c.msgIDsLock.Lock()
351         c.msgIDs[msgName+msgCrc] = id
352         c.msgIDsLock.Unlock()
353
354         return id, nil
355 }