1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
17 //go:generate binapi_generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api
26 logger "github.com/Sirupsen/logrus"
28 "gerrit.fd.io/r/govpp/adapter"
29 "gerrit.fd.io/r/govpp/api"
30 "gerrit.fd.io/r/govpp/core/bin_api/vpe"
34 requestChannelBufSize = 100 // default size of the request channel buffers
35 replyChannelBufSize = 100 // default size of the reply channel buffers
38 // Connection represents a shared memory connection to VPP via vppAdapter.
39 type Connection struct {
40 vpp adapter.VppAdapter // VPP adapter
41 codec *MsgCodec // message codec
43 msgIDs map[string]uint16 // map os message IDs indexed by message name + CRC
44 msgIDsLock sync.RWMutex // lock for the message IDs map
46 channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID
47 channelsLock sync.RWMutex // lock for the channels map
49 notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
50 notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
52 maxChannelID uint32 // maximum used client ID
53 pingReqID uint16 // ID if the ControlPing message
54 pingReplyID uint16 // ID of the ControlPingReply message
57 // channelMetadata contains core-local metadata of an API channel.
58 type channelMetadata struct {
59 id uint32 // channel ID
60 multipart uint32 // 1 if multipart request is being processed, 0 otherwise
64 log *logger.Logger // global logger
65 conn *Connection // global handle to the Connection (used in the message receive callback)
66 connLock sync.RWMutex // lock for the global connection
69 // init initializes global logger, which logs debug level messages to stdout.
73 log.Level = logger.DebugLevel
76 // SetLogger sets global logger to provided one.
77 func SetLogger(l *logger.Logger) {
81 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
82 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
84 defer connLock.Unlock()
87 return nil, errors.New("only one connection per process is supported")
90 conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
91 conn.channels = make(map[uint32]*api.Channel)
92 conn.msgIDs = make(map[string]uint16)
93 conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
95 conn.vpp.SetMsgCallback(msgCallback)
97 logger.Debug("Connecting to VPP...")
99 err := conn.vpp.Connect()
104 // store control ping IDs
105 conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{})
106 conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{})
108 logger.Debug("VPP connected.")
113 // Disconnect disconnects from VPP.
114 func (c *Connection) Disconnect() {
116 defer connLock.Unlock()
118 if c != nil && c.vpp != nil {
124 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
125 // It uses default buffer sizes for the request and reply Go channels.
126 func (c *Connection) NewAPIChannel() (*api.Channel, error) {
127 return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
130 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
131 // It allows to specify custom buffer sizes for the request and reply Go channels.
132 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
133 chID := atomic.AddUint32(&c.maxChannelID, 1)
134 chMeta := &channelMetadata{id: chID}
136 ch := api.NewChannelInternal(chMeta)
137 ch.MsgDecoder = c.codec
140 // create the communication channels
141 ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
142 ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
143 ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
144 ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
146 // store API channel within the client
147 c.channelsLock.Lock()
148 c.channels[chID] = ch
149 c.channelsLock.Unlock()
151 // start watching on the request channel
152 go c.watchRequests(ch, chMeta)
157 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
158 func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
161 case req, ok := <-ch.ReqChan:
162 // new request on the request channel
164 // after closing the request channel, release API channel and return
165 c.releaseAPIChannel(ch, chMeta)
168 c.processRequest(ch, chMeta, req)
170 case req := <-ch.NotifSubsChan:
171 // new request on the notification subscribe channel
172 c.processNotifSubscribeRequest(ch, req)
177 // processRequest processes a single request received on the request channel.
178 func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
179 // retrieve message ID
180 msgID, err := c.GetMessageID(req.Message)
182 error := fmt.Errorf("unable to retrieve message ID: %v", err)
183 log.WithFields(logger.Fields{
184 "msg_name": req.Message.GetMessageName(),
185 "msg_crc": req.Message.GetCrcString(),
186 }).Errorf("unable to retrieve message ID: %v", err)
187 sendReply(ch, &api.VppReply{Error: error})
191 // encode the message into binary
192 data, err := c.codec.EncodeMsg(req.Message, msgID)
194 error := fmt.Errorf("unable to encode the messge: %v", err)
195 log.WithFields(logger.Fields{
196 "context": chMeta.id,
198 }).Errorf("%v", error)
199 sendReply(ch, &api.VppReply{Error: error})
204 log.WithFields(logger.Fields{
205 "context": chMeta.id,
207 "msg_size": len(data),
208 }).Debug("Sending a message to VPP.")
210 c.vpp.SendMsg(chMeta.id, data)
214 atomic.StoreUint32(&chMeta.multipart, 1)
216 // send a control ping
217 ping := &vpe.ControlPing{}
218 pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
220 log.WithFields(logger.Fields{
221 "context": chMeta.id,
222 "msg_id": c.pingReqID,
223 "msg_size": len(pingData),
224 }).Debug("Sending a control ping to VPP.")
226 c.vpp.SendMsg(chMeta.id, pingData)
232 // releaseAPIChannel releases API channel that needs to be closed.
233 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
234 log.WithFields(logger.Fields{
235 "context": chMeta.id,
236 }).Debug("API channel closed.")
238 // delete the channel from channels map
239 c.channelsLock.Lock()
240 delete(c.channels, chMeta.id)
241 c.channelsLock.Unlock()
244 // msgCallback is called whenever any binary API message comes from VPP.
245 func msgCallback(context uint32, msgID uint16, data []byte) {
247 defer connLock.RUnlock()
250 log.Warn("Already disconnected, ignoring the message.")
254 log.WithFields(logger.Fields{
257 "msg_size": len(data),
258 }).Debug("Received a message from VPP.")
260 if context == 0 || conn.isNotificationMessage(msgID) {
261 // process the message as a notification
262 conn.sendNotifications(msgID, data)
266 // match ch according to the context
267 conn.channelsLock.RLock()
268 ch, ok := conn.channels[context]
269 conn.channelsLock.RUnlock()
272 log.WithFields(logger.Fields{
275 }).Error("Context ID not known, ignoring the message.")
279 chMeta := ch.Metadata().(*channelMetadata)
280 lastReplyReceived := false
281 // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
282 if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
283 lastReplyReceived = true
286 // send the data to the channel
287 sendReply(ch, &api.VppReply{
290 LastReplyReceived: lastReplyReceived,
294 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
295 // it logs the error and do not send the message.
296 func sendReply(ch *api.Channel, reply *api.VppReply) {
298 case ch.ReplyChan <- reply:
299 // reply sent successfully
301 // unable to write into the channel without blocking
302 log.WithFields(logger.Fields{
304 "msg_id": reply.MessageID,
305 }).Warn("Unable to send the reply, reciever end not ready.")
309 // GetMessageID returns message identifier of given API message.
310 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
311 return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
314 // messageNameToID returns message ID of a message identified by its name and CRC.
315 func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
316 // try to get the ID from the map
318 id, ok := c.msgIDs[msgName+msgCrc]
319 c.msgIDsLock.RUnlock()
324 // get the ID using VPP API
325 id, err := c.vpp.GetMsgID(msgName, msgCrc)
327 error := fmt.Errorf("unable to retrieve message ID: %v", err)
328 log.WithFields(logger.Fields{
331 }).Errorf("unable to retrieve message ID: %v", err)
336 c.msgIDs[msgName+msgCrc] = id
337 c.msgIDsLock.Unlock()