1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
25 logger "github.com/sirupsen/logrus"
27 "git.fd.io/govpp.git/adapter"
28 "git.fd.io/govpp.git/api"
29 "git.fd.io/govpp.git/codec"
33 DefaultReconnectInterval = time.Second / 2 // default interval between reconnect attempts
34 DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts
38 RequestChanBufSize = 100 // default size of the request channel buffer
39 ReplyChanBufSize = 100 // default size of the reply channel buffer
40 NotificationChanBufSize = 100 // default size of the notification channel buffer
44 HealthCheckProbeInterval = time.Second // default health check probe interval
45 HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe
46 HealthCheckThreshold = 1 // number of failed health checks until the error is reported
47 DefaultReplyTimeout = time.Second // default timeout for replies from VPP
50 // ConnectionState represents the current state of the connection to VPP.
51 type ConnectionState int
54 // Connected represents state in which the connection has been successfully established.
55 Connected ConnectionState = iota
57 // Disconnected represents state in which the connection has been dropped.
60 // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts.
64 func (s ConnectionState) String() string {
73 return fmt.Sprintf("UnknownState(%d)", s)
77 // ConnectionEvent is a notification about change in the VPP connection state.
78 type ConnectionEvent struct {
79 // Timestamp holds the time when the event has been created.
82 // State holds the new state of the connection at the time when the event has been created.
85 // Error holds error if any encountered.
89 // Connection represents a shared memory connection to VPP via vppAdapter.
90 type Connection struct {
91 vppClient adapter.VppAPI // VPP binary API client
93 maxAttempts int // interval for reconnect attempts
94 recInterval time.Duration // maximum number of reconnect attempts
96 vppConnected uint32 // non-zero if the adapter is connected to VPP
98 codec MessageCodec // message codec
99 msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
100 msgMap map[uint16]api.Message // map of messages indexed by message ID
102 maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
103 channelsLock sync.RWMutex // lock for the channels map
104 channels map[uint16]*Channel // map of all API channels indexed by the channel ID
106 subscriptionsLock sync.RWMutex // lock for the subscriptions map
107 subscriptions map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
109 pingReqID uint16 // ID if the ControlPing message
110 pingReplyID uint16 // ID of the ControlPingReply message
112 lastReplyLock sync.Mutex // lock for the last reply
113 lastReply time.Time // time of the last received reply from VPP
115 msgControlPing api.Message
116 msgControlPingReply api.Message
119 func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
121 attempts = DefaultMaxReconnectAttempts
124 interval = DefaultReconnectInterval
129 maxAttempts: attempts,
130 recInterval: interval,
131 codec: &codec.MsgCodec{},
132 msgIDs: make(map[string]uint16),
133 msgMap: make(map[uint16]api.Message),
134 channels: make(map[uint16]*Channel),
135 subscriptions: make(map[uint16][]*subscriptionCtx),
136 msgControlPing: msgControlPing,
137 msgControlPingReply: msgControlPingReply,
139 binapi.SetMsgCallback(c.msgCallback)
143 // Connect connects to VPP API using specified adapter and returns a connection handle.
144 // This call blocks until it is either connected, or an error occurs.
145 // Only one connection attempt will be performed.
146 func Connect(binapi adapter.VppAPI) (*Connection, error) {
147 // create new connection handle
148 c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
150 // blocking attempt to connect to VPP
151 if err := c.connectVPP(); err != nil {
158 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
159 // and ConnectionState channel. This call does not block until connection is established, it
160 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
161 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
162 func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
163 // create new connection handle
164 c := newConnection(binapi, attempts, interval)
166 // asynchronously attempt to connect to VPP
167 connChan := make(chan ConnectionEvent, NotificationChanBufSize)
168 go c.connectLoop(connChan)
170 return c, connChan, nil
173 // connectVPP performs blocking attempt to connect to VPP.
174 func (c *Connection) connectVPP() error {
175 log.Debug("Connecting to VPP..")
178 if err := c.vppClient.Connect(); err != nil {
181 log.Debugf("Connected to VPP")
183 if err := c.retrieveMessageIDs(); err != nil {
184 if err := c.vppClient.Disconnect(); err != nil {
185 log.Debugf("disconnecting vpp client failed: %v", err)
187 return fmt.Errorf("VPP is incompatible: %v", err)
190 // store connected state
191 atomic.StoreUint32(&c.vppConnected, 1)
196 // Disconnect disconnects from VPP API and releases all connection-related resources.
197 func (c *Connection) Disconnect() {
201 if c.vppClient != nil {
206 // disconnectVPP disconnects from VPP in case it is connected.
207 func (c *Connection) disconnectVPP() {
208 if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
209 log.Debug("Disconnecting from VPP..")
211 if err := c.vppClient.Disconnect(); err != nil {
212 log.Debugf("Disconnect from VPP failed: %v", err)
214 log.Debug("Disconnected from VPP")
218 func (c *Connection) NewAPIChannel() (api.Channel, error) {
219 return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
222 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
223 return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
226 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
227 // It allows to specify custom buffer sizes for the request and reply Go channels.
228 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
230 return nil, errors.New("nil connection passed in")
233 // create new channel
234 chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
235 channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
237 // store API channel within the client
238 c.channelsLock.Lock()
239 c.channels[chID] = channel
240 c.channelsLock.Unlock()
242 // start watching on the request channel
243 go c.watchRequests(channel)
248 // releaseAPIChannel releases API channel that needs to be closed.
249 func (c *Connection) releaseAPIChannel(ch *Channel) {
250 log.WithFields(logger.Fields{
252 }).Debug("API channel released")
254 // delete the channel from channels map
255 c.channelsLock.Lock()
256 delete(c.channels, ch.id)
257 c.channelsLock.Unlock()
260 // connectLoop attempts to connect to VPP until it succeeds.
261 // Then it continues with healthCheckLoop.
262 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
263 var reconnectAttempts int
265 // loop until connected
267 if err := c.vppClient.WaitReady(); err != nil {
268 log.Debugf("wait ready failed: %v", err)
270 if err := c.connectVPP(); err == nil {
271 // signal connected event
272 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
274 } else if reconnectAttempts < c.maxAttempts {
276 log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
277 time.Sleep(c.recInterval)
279 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
284 // we are now connected, continue with health check loop
285 c.healthCheckLoop(connChan)
288 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
289 // it continues with connectLoop and tries to reconnect.
290 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
291 // create a separate API channel for health check probes
292 ch, err := c.newAPIChannel(1, 1)
294 log.Error("Failed to create health check API channel, health check will be disabled:", err)
299 sinceLastReply time.Duration
303 // send health check probes until an error or timeout occurs
305 // sleep until next health check probe period
306 time.Sleep(HealthCheckProbeInterval)
308 if atomic.LoadUint32(&c.vppConnected) == 0 {
309 // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
310 log.Debug("Disconnected on request, exiting health check loop.")
314 // try draining probe replies from previous request before sending next one
317 log.Debug("drained old probe reply from reply channel")
321 // send the control ping request
322 ch.reqChan <- &vppRequest{msg: c.msgControlPing}
325 // expect response within timeout period
327 case vppReply := <-ch.replyChan:
330 case <-time.After(HealthCheckReplyTimeout):
331 err = ErrProbeTimeout
333 // check if time since last reply from any other
334 // channel is less than health check reply timeout
335 c.lastReplyLock.Lock()
336 sinceLastReply = time.Since(c.lastReply)
337 c.lastReplyLock.Unlock()
339 if sinceLastReply < HealthCheckReplyTimeout {
340 log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
347 if err == ErrProbeTimeout {
349 log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
350 if failedChecks > HealthCheckThreshold {
351 // in case of exceeded failed check treshold, assume VPP disconnected
352 log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
353 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
356 } else if err != nil {
357 // in case of error, assume VPP disconnected
358 log.Errorf("VPP health check probe failed: %v", err)
359 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
361 } else if failedChecks > 0 {
362 // in case of success after failed checks, clear failed check counter
364 log.Infof("VPP health check probe OK")
372 // we are now disconnected, start connect loop
373 c.connectLoop(connChan)
376 func getMsgNameWithCrc(x api.Message) string {
377 return getMsgID(x.GetMessageName(), x.GetCrcString())
380 func getMsgID(name, crc string) string {
381 return name + "_" + crc
384 func getMsgFactory(msg api.Message) func() api.Message {
385 return func() api.Message {
386 return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
390 // GetMessageID returns message identifier of given API message.
391 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
393 return 0, errors.New("nil connection passed in")
396 if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
400 msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
405 c.msgIDs[getMsgNameWithCrc(msg)] = msgID
406 c.msgMap[msgID] = msg
411 // LookupByID looks up message name and crc by ID.
412 func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
414 return nil, errors.New("nil connection passed in")
417 if msg, ok := c.msgMap[msgID]; ok {
421 return nil, fmt.Errorf("unknown message ID: %d", msgID)
424 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
425 func (c *Connection) retrieveMessageIDs() (err error) {
428 msgs := api.GetRegisteredMessages()
431 for name, msg := range msgs {
432 typ := reflect.TypeOf(msg).Elem()
433 path := fmt.Sprintf("%s.%s", typ.PkgPath(), typ.Name())
435 msgID, err := c.GetMessageID(msg)
438 log.Debugf("retrieving message ID for %s failed: %v", path, err)
444 if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() {
446 c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
447 } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() {
448 c.pingReplyID = msgID
449 c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
453 log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
456 log.WithField("took", time.Since(t)).
457 Debugf("retrieved IDs for %d messages (registered %d)", n, len(msgs))