1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
17 //go:generate binapi-generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api
26 logger "github.com/Sirupsen/logrus"
28 "git.fd.io/govpp.git/adapter"
29 "git.fd.io/govpp.git/api"
30 "git.fd.io/govpp.git/core/bin_api/vpe"
34 requestChannelBufSize = 100 // default size of the request channel buffers
35 replyChannelBufSize = 100 // default size of the reply channel buffers
38 // Connection represents a shared memory connection to VPP via vppAdapter.
39 type Connection struct {
40 vpp adapter.VppAdapter // VPP adapter
41 codec *MsgCodec // message codec
43 msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
44 msgIDsLock sync.RWMutex // lock for the message IDs map
46 channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID
47 channelsLock sync.RWMutex // lock for the channels map
49 notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
50 notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
52 maxChannelID uint32 // maximum used client ID
53 pingReqID uint16 // ID if the ControlPing message
54 pingReplyID uint16 // ID of the ControlPingReply message
57 // channelMetadata contains core-local metadata of an API channel.
58 type channelMetadata struct {
59 id uint32 // channel ID
60 multipart uint32 // 1 if multipart request is being processed, 0 otherwise
64 log *logger.Logger // global logger
65 conn *Connection // global handle to the Connection (used in the message receive callback)
66 connLock sync.RWMutex // lock for the global connection
69 // init initializes global logger, which logs debug level messages to stdout.
73 log.Level = logger.DebugLevel
76 // SetLogger sets global logger to provided one.
77 func SetLogger(l *logger.Logger) {
81 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
82 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
84 defer connLock.Unlock()
87 return nil, errors.New("only one connection per process is supported")
90 conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
91 conn.channels = make(map[uint32]*api.Channel)
92 conn.msgIDs = make(map[string]uint16)
93 conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
95 conn.vpp.SetMsgCallback(msgCallback)
97 logger.Debug("Connecting to VPP...")
99 err := conn.vpp.Connect()
104 // store control ping IDs
105 conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{})
106 conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{})
108 logger.Debug("VPP connected.")
113 // Disconnect disconnects from VPP.
114 func (c *Connection) Disconnect() {
119 defer connLock.Unlock()
121 if c != nil && c.vpp != nil {
127 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
128 // It uses default buffer sizes for the request and reply Go channels.
129 func (c *Connection) NewAPIChannel() (*api.Channel, error) {
131 return nil, errors.New("nil connection passed in")
133 return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
136 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
137 // It allows to specify custom buffer sizes for the request and reply Go channels.
138 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
140 return nil, errors.New("nil connection passed in")
142 chID := atomic.AddUint32(&c.maxChannelID, 1)
143 chMeta := &channelMetadata{id: chID}
145 ch := api.NewChannelInternal(chMeta)
146 ch.MsgDecoder = c.codec
149 // create the communication channels
150 ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
151 ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
152 ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
153 ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
155 // store API channel within the client
156 c.channelsLock.Lock()
157 c.channels[chID] = ch
158 c.channelsLock.Unlock()
160 // start watching on the request channel
161 go c.watchRequests(ch, chMeta)
166 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
167 func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
170 case req, ok := <-ch.ReqChan:
171 // new request on the request channel
173 // after closing the request channel, release API channel and return
174 c.releaseAPIChannel(ch, chMeta)
177 c.processRequest(ch, chMeta, req)
179 case req := <-ch.NotifSubsChan:
180 // new request on the notification subscribe channel
181 c.processNotifSubscribeRequest(ch, req)
186 // processRequest processes a single request received on the request channel.
187 func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
188 // retrieve message ID
189 msgID, err := c.GetMessageID(req.Message)
191 error := fmt.Errorf("unable to retrieve message ID: %v", err)
192 log.WithFields(logger.Fields{
193 "msg_name": req.Message.GetMessageName(),
194 "msg_crc": req.Message.GetCrcString(),
195 }).Errorf("unable to retrieve message ID: %v", err)
196 sendReply(ch, &api.VppReply{Error: error})
200 // encode the message into binary
201 data, err := c.codec.EncodeMsg(req.Message, msgID)
203 error := fmt.Errorf("unable to encode the messge: %v", err)
204 log.WithFields(logger.Fields{
205 "context": chMeta.id,
207 }).Errorf("%v", error)
208 sendReply(ch, &api.VppReply{Error: error})
213 log.WithFields(logger.Fields{
214 "context": chMeta.id,
216 "msg_size": len(data),
217 }).Debug("Sending a message to VPP.")
220 // expect multipart response
221 atomic.StoreUint32(&chMeta.multipart, 1)
224 // send the request to VPP
225 c.vpp.SendMsg(chMeta.id, data)
228 // send a control ping to determine end of the multipart response
229 ping := &vpe.ControlPing{}
230 pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
232 log.WithFields(logger.Fields{
233 "context": chMeta.id,
234 "msg_id": c.pingReqID,
235 "msg_size": len(pingData),
236 }).Debug("Sending a control ping to VPP.")
238 c.vpp.SendMsg(chMeta.id, pingData)
244 // releaseAPIChannel releases API channel that needs to be closed.
245 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
246 log.WithFields(logger.Fields{
247 "context": chMeta.id,
248 }).Debug("API channel closed.")
250 // delete the channel from channels map
251 c.channelsLock.Lock()
252 delete(c.channels, chMeta.id)
253 c.channelsLock.Unlock()
256 // msgCallback is called whenever any binary API message comes from VPP.
257 func msgCallback(context uint32, msgID uint16, data []byte) {
259 defer connLock.RUnlock()
262 log.Warn("Already disconnected, ignoring the message.")
266 log.WithFields(logger.Fields{
269 "msg_size": len(data),
270 }).Debug("Received a message from VPP.")
272 if context == 0 || conn.isNotificationMessage(msgID) {
273 // process the message as a notification
274 conn.sendNotifications(msgID, data)
278 // match ch according to the context
279 conn.channelsLock.RLock()
280 ch, ok := conn.channels[context]
281 conn.channelsLock.RUnlock()
284 log.WithFields(logger.Fields{
287 }).Error("Context ID not known, ignoring the message.")
291 chMeta := ch.Metadata().(*channelMetadata)
292 lastReplyReceived := false
293 // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
294 if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
295 lastReplyReceived = true
298 // send the data to the channel
299 sendReply(ch, &api.VppReply{
302 LastReplyReceived: lastReplyReceived,
306 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
307 // it logs the error and do not send the message.
308 func sendReply(ch *api.Channel, reply *api.VppReply) {
310 case ch.ReplyChan <- reply:
311 // reply sent successfully
313 // unable to write into the channel without blocking
314 log.WithFields(logger.Fields{
316 "msg_id": reply.MessageID,
317 }).Warn("Unable to send the reply, reciever end not ready.")
321 // GetMessageID returns message identifier of given API message.
322 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
324 return 0, errors.New("nil connection passed in")
326 return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
329 // messageNameToID returns message ID of a message identified by its name and CRC.
330 func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
331 // try to get the ID from the map
333 id, ok := c.msgIDs[msgName+msgCrc]
334 c.msgIDsLock.RUnlock()
339 // get the ID using VPP API
340 id, err := c.vpp.GetMsgID(msgName, msgCrc)
342 error := fmt.Errorf("unable to retrieve message ID: %v", err)
343 log.WithFields(logger.Fields{
346 }).Errorf("unable to retrieve message ID: %v", err)
351 c.msgIDs[msgName+msgCrc] = id
352 c.msgIDsLock.Unlock()