1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
17 //go:generate binapi_generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api
26 logger "github.com/Sirupsen/logrus"
28 "git.fd.io/govpp.git/adapter"
29 "git.fd.io/govpp.git/api"
30 "git.fd.io/govpp.git/core/bin_api/vpe"
34 requestChannelBufSize = 100 // default size of the request channel buffers
35 replyChannelBufSize = 100 // default size of the reply channel buffers
38 // Connection represents a shared memory connection to VPP via vppAdapter.
39 type Connection struct {
40 vpp adapter.VppAdapter // VPP adapter
41 codec *MsgCodec // message codec
43 msgIDs map[string]uint16 // map os message IDs indexed by message name + CRC
44 msgIDsLock sync.RWMutex // lock for the message IDs map
46 channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID
47 channelsLock sync.RWMutex // lock for the channels map
49 notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
50 notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
52 maxChannelID uint32 // maximum used client ID
53 pingReqID uint16 // ID if the ControlPing message
54 pingReplyID uint16 // ID of the ControlPingReply message
57 // channelMetadata contains core-local metadata of an API channel.
58 type channelMetadata struct {
59 id uint32 // channel ID
60 multipart uint32 // 1 if multipart request is being processed, 0 otherwise
64 log *logger.Logger // global logger
65 conn *Connection // global handle to the Connection (used in the message receive callback)
66 connLock sync.RWMutex // lock for the global connection
69 // init initializes global logger, which logs debug level messages to stdout.
73 log.Level = logger.DebugLevel
76 // SetLogger sets global logger to provided one.
77 func SetLogger(l *logger.Logger) {
81 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
82 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
84 defer connLock.Unlock()
87 return nil, errors.New("only one connection per process is supported")
90 conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
91 conn.channels = make(map[uint32]*api.Channel)
92 conn.msgIDs = make(map[string]uint16)
93 conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
95 conn.vpp.SetMsgCallback(msgCallback)
97 logger.Debug("Connecting to VPP...")
99 err := conn.vpp.Connect()
104 // store control ping IDs
105 conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{})
106 conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{})
108 logger.Debug("VPP connected.")
113 // Disconnect disconnects from VPP.
114 func (c *Connection) Disconnect() {
116 defer connLock.Unlock()
118 if c != nil && c.vpp != nil {
124 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
125 // It uses default buffer sizes for the request and reply Go channels.
126 func (c *Connection) NewAPIChannel() (*api.Channel, error) {
127 return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
130 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
131 // It allows to specify custom buffer sizes for the request and reply Go channels.
132 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
133 chID := atomic.AddUint32(&c.maxChannelID, 1)
134 chMeta := &channelMetadata{id: chID}
136 ch := api.NewChannelInternal(chMeta)
137 ch.MsgDecoder = c.codec
140 // create the communication channels
141 ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
142 ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
143 ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
144 ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
146 // store API channel within the client
147 c.channelsLock.Lock()
148 c.channels[chID] = ch
149 c.channelsLock.Unlock()
151 // start watching on the request channel
152 go c.watchRequests(ch, chMeta)
157 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
158 func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
161 case req, ok := <-ch.ReqChan:
162 // new request on the request channel
164 // after closing the request channel, release API channel and return
165 c.releaseAPIChannel(ch, chMeta)
168 c.processRequest(ch, chMeta, req)
170 case req := <-ch.NotifSubsChan:
171 // new request on the notification subscribe channel
172 c.processNotifSubscribeRequest(ch, req)
177 // processRequest processes a single request received on the request channel.
178 func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
179 // retrieve message ID
180 msgID, err := c.GetMessageID(req.Message)
182 error := fmt.Errorf("unable to retrieve message ID: %v", err)
183 log.WithFields(logger.Fields{
184 "msg_name": req.Message.GetMessageName(),
185 "msg_crc": req.Message.GetCrcString(),
186 }).Errorf("unable to retrieve message ID: %v", err)
187 sendReply(ch, &api.VppReply{Error: error})
191 // encode the message into binary
192 data, err := c.codec.EncodeMsg(req.Message, msgID)
194 error := fmt.Errorf("unable to encode the messge: %v", err)
195 log.WithFields(logger.Fields{
196 "context": chMeta.id,
198 }).Errorf("%v", error)
199 sendReply(ch, &api.VppReply{Error: error})
204 log.WithFields(logger.Fields{
205 "context": chMeta.id,
207 "msg_size": len(data),
208 }).Debug("Sending a message to VPP.")
211 // expect multipart response
212 atomic.StoreUint32(&chMeta.multipart, 1)
215 // send the request to VPP
216 c.vpp.SendMsg(chMeta.id, data)
219 // send a control ping to determine end of the multipart response
220 ping := &vpe.ControlPing{}
221 pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
223 log.WithFields(logger.Fields{
224 "context": chMeta.id,
225 "msg_id": c.pingReqID,
226 "msg_size": len(pingData),
227 }).Debug("Sending a control ping to VPP.")
229 c.vpp.SendMsg(chMeta.id, pingData)
235 // releaseAPIChannel releases API channel that needs to be closed.
236 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
237 log.WithFields(logger.Fields{
238 "context": chMeta.id,
239 }).Debug("API channel closed.")
241 // delete the channel from channels map
242 c.channelsLock.Lock()
243 delete(c.channels, chMeta.id)
244 c.channelsLock.Unlock()
247 // msgCallback is called whenever any binary API message comes from VPP.
248 func msgCallback(context uint32, msgID uint16, data []byte) {
250 defer connLock.RUnlock()
253 log.Warn("Already disconnected, ignoring the message.")
257 log.WithFields(logger.Fields{
260 "msg_size": len(data),
261 }).Debug("Received a message from VPP.")
263 if context == 0 || conn.isNotificationMessage(msgID) {
264 // process the message as a notification
265 conn.sendNotifications(msgID, data)
269 // match ch according to the context
270 conn.channelsLock.RLock()
271 ch, ok := conn.channels[context]
272 conn.channelsLock.RUnlock()
275 log.WithFields(logger.Fields{
278 }).Error("Context ID not known, ignoring the message.")
282 chMeta := ch.Metadata().(*channelMetadata)
283 lastReplyReceived := false
284 // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
285 if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
286 lastReplyReceived = true
289 // send the data to the channel
290 sendReply(ch, &api.VppReply{
293 LastReplyReceived: lastReplyReceived,
297 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
298 // it logs the error and do not send the message.
299 func sendReply(ch *api.Channel, reply *api.VppReply) {
301 case ch.ReplyChan <- reply:
302 // reply sent successfully
304 // unable to write into the channel without blocking
305 log.WithFields(logger.Fields{
307 "msg_id": reply.MessageID,
308 }).Warn("Unable to send the reply, reciever end not ready.")
312 // GetMessageID returns message identifier of given API message.
313 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
314 return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
317 // messageNameToID returns message ID of a message identified by its name and CRC.
318 func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
319 // try to get the ID from the map
321 id, ok := c.msgIDs[msgName+msgCrc]
322 c.msgIDsLock.RUnlock()
327 // get the ID using VPP API
328 id, err := c.vpp.GetMsgID(msgName, msgCrc)
330 error := fmt.Errorf("unable to retrieve message ID: %v", err)
331 log.WithFields(logger.Fields{
334 }).Errorf("unable to retrieve message ID: %v", err)
339 c.msgIDs[msgName+msgCrc] = id
340 c.msgIDsLock.Unlock()