X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Fcore.go;h=c2542146b7af16b8694935b4ee2250c532a8ba34;hb=67482a4f8c3a3a7cf17b4edf82e61c28b318ffd9;hp=7dc45de1a4be955553486aa188b27183971e1193;hpb=cf7fbee5c540d16a162236b476d438c66f513ad9;p=govpp.git diff --git a/core/core.go b/core/core.go index 7dc45de..c254214 100644 --- a/core/core.go +++ b/core/core.go @@ -12,16 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -package core +//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api -//go:generate binapi_generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api +package core import ( "errors" - "fmt" "os" "sync" "sync/atomic" + "time" logger "github.com/Sirupsen/logrus" @@ -31,16 +31,44 @@ import ( ) const ( - requestChannelBufSize = 100 // default size of the request channel buffers - replyChannelBufSize = 100 // default size of the reply channel buffers + requestChannelBufSize = 100 // default size of the request channel buffers + replyChannelBufSize = 100 // default size of the reply channel buffers + notificationChannelBufSize = 100 // default size of the notification channel buffers +) + +var ( + healthCheckProbeInterval = time.Second * 1 // default health check probe interval + healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe + healthCheckThreshold = 1 // number of failed healthProbe until the error is reported +) + +// ConnectionState holds the current state of the connection to VPP. +type ConnectionState int + +const ( + // Connected connection state means that the connection to VPP has been successfully established. + Connected ConnectionState = iota + + // Disconnected connection state means that the connection to VPP has been lost. + Disconnected = iota ) +// ConnectionEvent is a notification about change in the VPP connection state. +type ConnectionEvent struct { + // Timestamp holds the time when the event has been generated. + Timestamp time.Time + + // State holds the new state of the connection to VPP at the time when the event has been generated. + State ConnectionState +} + // Connection represents a shared memory connection to VPP via vppAdapter. type Connection struct { - vpp adapter.VppAdapter // VPP adapter - codec *MsgCodec // message codec + vpp adapter.VppAdapter // VPP adapter + connected uint32 // non-zero if the adapter is connected to VPP + codec *MsgCodec // message codec - msgIDs map[string]uint16 // map os message IDs indexed by message name + CRC + msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC msgIDsLock sync.RWMutex // lock for the message IDs map channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID @@ -78,8 +106,80 @@ func SetLogger(l *logger.Logger) { log = l } +// SetHealthCheckProbeInterval sets health check probe interval. +// Beware: Function is not thread-safe. It is recommended to setup this parameter +// before connecting to vpp. +func SetHealthCheckProbeInterval(interval time.Duration) { + healthCheckProbeInterval = interval +} + +// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe. +// If reply arrives after the timeout, check is considered as failed. +// Beware: Function is not thread-safe. It is recommended to setup this parameter +// before connecting to vpp. +func SetHealthCheckReplyTimeout(timeout time.Duration) { + healthCheckReplyTimeout = timeout +} + +// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported. +// Beware: Function is not thread-safe. It is recommended to setup this parameter +// before connecting to vpp. +func SetHealthCheckThreshold(threshold int) { + healthCheckThreshold = threshold +} + // Connect connects to VPP using specified VPP adapter and returns the connection handle. +// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { + // create new connection handle + c, err := newConnection(vppAdapter) + if err != nil { + return nil, err + } + + // blocking attempt to connect to VPP + err = c.connectVPP() + if err != nil { + return nil, err + } + + return conn, nil +} + +// AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle +// and ConnectionState channel. This call does not block until connection is established, it +// returns immediately. The caller is supposed to watch the returned ConnectionState channel for +// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect. +func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) { + // create new connection handle + c, err := newConnection(vppAdapter) + if err != nil { + return nil, nil, err + } + + // asynchronously attempt to connect to VPP + connChan := make(chan ConnectionEvent, notificationChannelBufSize) + go c.connectLoop(connChan) + + return conn, connChan, nil +} + +// Disconnect disconnects from VPP and releases all connection-related resources. +func (c *Connection) Disconnect() { + if c == nil { + return + } + connLock.Lock() + defer connLock.Unlock() + + if c != nil && c.vpp != nil { + c.disconnectVPP() + } + conn = nil +} + +// newConnection returns new connection handle. +func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { connLock.Lock() defer connLock.Unlock() @@ -87,49 +187,136 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { return nil, errors.New("only one connection per process is supported") } - conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}} - conn.channels = make(map[uint32]*api.Channel) - conn.msgIDs = make(map[string]uint16) - conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription) + conn = &Connection{ + vpp: vppAdapter, + codec: &MsgCodec{}, + channels: make(map[uint32]*api.Channel), + msgIDs: make(map[string]uint16), + notifSubscriptions: make(map[uint16][]*api.NotifSubscription), + } conn.vpp.SetMsgCallback(msgCallback) + return conn, nil +} - logger.Debug("Connecting to VPP...") +// connectVPP performs one blocking attempt to connect to VPP. +func (c *Connection) connectVPP() error { + log.Debug("Connecting to VPP...") - err := conn.vpp.Connect() + // blocking connect + err := c.vpp.Connect() if err != nil { - return nil, err + log.Warn(err) + return err } + // store connected state + atomic.StoreUint32(&c.connected, 1) + // store control ping IDs - conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{}) - conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{}) + c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{}) + c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{}) - logger.Debug("VPP connected.") + log.Info("Connected to VPP.") + return nil +} - return conn, nil +// disconnectVPP disconnects from VPP in case it is connected. +func (c *Connection) disconnectVPP() { + if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { + c.vpp.Disconnect() + } } -// Disconnect disconnects from VPP. -func (c *Connection) Disconnect() { - connLock.Lock() - defer connLock.Unlock() +// connectLoop attempts to connect to VPP until it succeeds. +// Then it continues with healthCheckLoop. +func (c *Connection) connectLoop(connChan chan ConnectionEvent) { + // loop until connected + waitForVpp := c.vpp.WaitReady() + for { + waitForVpp() + if err := c.connectVPP(); err == nil { + // signal connected event + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} + break + } + } - if c != nil && c.vpp != nil { - c.vpp.Disconnect() + // we are now connected, continue with health check loop + c.healthCheckLoop(connChan) +} + +// healthCheckLoop checks whether connection to VPP is alive. In case of disconnect, +// it continues with connectLoop and tries to reconnect. +func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { + // create a separate API channel for health check probes + ch, err := conn.NewAPIChannel() + if err != nil { + log.Error("Error by creating health check API channel, health check will be disabled:", err) + return } - conn = nil + + failedChecks := 0 + // send health check probes until an error occurs + for { + // wait for healthCheckProbeInterval + <-time.After(healthCheckProbeInterval) + + if atomic.LoadUint32(&c.connected) == 0 { + // Disconnect has been called in the meantime, return the healthcheck - reconnect loop + log.Debug("Disconnected on request, exiting health check loop.") + return + } + + // send the control ping + ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}} + + // expect response within timeout period + select { + case vppReply := <-ch.ReplyChan: + err = vppReply.Error + case <-time.After(healthCheckReplyTimeout): + err = errors.New("probe reply not received within the timeout period") + } + + if err != nil { + failedChecks++ + } else { + failedChecks = 0 + } + + if failedChecks >= healthCheckThreshold { + // in case of error, break & disconnect + log.Errorf("VPP health check failed: %v", err) + // signal disconnected event via channel + connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} + break + } + } + + // cleanup + ch.Close() + c.disconnectVPP() + + // we are now disconnected, start connect loop + c.connectLoop(connChan) } // NewAPIChannel returns a new API channel for communication with VPP via govpp core. // It uses default buffer sizes for the request and reply Go channels. func (c *Connection) NewAPIChannel() (*api.Channel, error) { + if c == nil { + return nil, errors.New("nil connection passed in") + } return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize) } // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core. // It allows to specify custom buffer sizes for the request and reply Go channels. func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) { + if c == nil { + return nil, errors.New("nil connection passed in") + } chID := atomic.AddUint32(&c.maxChannelID, 1) chMeta := &channelMetadata{id: chID} @@ -154,81 +341,6 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) return ch, nil } -// watchRequests watches for requests on the request API channel and forwards them as messages to VPP. -func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) { - for { - select { - case req, ok := <-ch.ReqChan: - // new request on the request channel - if !ok { - // after closing the request channel, release API channel and return - c.releaseAPIChannel(ch, chMeta) - return - } - c.processRequest(ch, chMeta, req) - - case req := <-ch.NotifSubsChan: - // new request on the notification subscribe channel - c.processNotifSubscribeRequest(ch, req) - } - } -} - -// processRequest processes a single request received on the request channel. -func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error { - // retrieve message ID - msgID, err := c.GetMessageID(req.Message) - if err != nil { - error := fmt.Errorf("unable to retrieve message ID: %v", err) - log.WithFields(logger.Fields{ - "msg_name": req.Message.GetMessageName(), - "msg_crc": req.Message.GetCrcString(), - }).Errorf("unable to retrieve message ID: %v", err) - sendReply(ch, &api.VppReply{Error: error}) - return error - } - - // encode the message into binary - data, err := c.codec.EncodeMsg(req.Message, msgID) - if err != nil { - error := fmt.Errorf("unable to encode the messge: %v", err) - log.WithFields(logger.Fields{ - "context": chMeta.id, - "msg_id": msgID, - }).Errorf("%v", error) - sendReply(ch, &api.VppReply{Error: error}) - return error - } - - // send the message - log.WithFields(logger.Fields{ - "context": chMeta.id, - "msg_id": msgID, - "msg_size": len(data), - }).Debug("Sending a message to VPP.") - - c.vpp.SendMsg(chMeta.id, data) - - if req.Multipart { - // multipart request - atomic.StoreUint32(&chMeta.multipart, 1) - - // send a control ping - ping := &vpe.ControlPing{} - pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID) - - log.WithFields(logger.Fields{ - "context": chMeta.id, - "msg_id": c.pingReqID, - "msg_size": len(pingData), - }).Debug("Sending a control ping to VPP.") - - c.vpp.SendMsg(chMeta.id, pingData) - } - - return nil -} - // releaseAPIChannel releases API channel that needs to be closed. func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) { log.WithFields(logger.Fields{ @@ -240,101 +352,3 @@ func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) delete(c.channels, chMeta.id) c.channelsLock.Unlock() } - -// msgCallback is called whenever any binary API message comes from VPP. -func msgCallback(context uint32, msgID uint16, data []byte) { - connLock.RLock() - defer connLock.RUnlock() - - if conn == nil { - log.Warn("Already disconnected, ignoring the message.") - return - } - - log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - "msg_size": len(data), - }).Debug("Received a message from VPP.") - - if context == 0 || conn.isNotificationMessage(msgID) { - // process the message as a notification - conn.sendNotifications(msgID, data) - return - } - - // match ch according to the context - conn.channelsLock.RLock() - ch, ok := conn.channels[context] - conn.channelsLock.RUnlock() - - if !ok { - log.WithFields(logger.Fields{ - "context": context, - "msg_id": msgID, - }).Error("Context ID not known, ignoring the message.") - return - } - - chMeta := ch.Metadata().(*channelMetadata) - lastReplyReceived := false - // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply - if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) { - lastReplyReceived = true - } - - // send the data to the channel - sendReply(ch, &api.VppReply{ - MessageID: msgID, - Data: data, - LastReplyReceived: lastReplyReceived, - }) -} - -// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise -// it logs the error and do not send the message. -func sendReply(ch *api.Channel, reply *api.VppReply) { - select { - case ch.ReplyChan <- reply: - // reply sent successfully - default: - // unable to write into the channel without blocking - log.WithFields(logger.Fields{ - "channel": ch, - "msg_id": reply.MessageID, - }).Warn("Unable to send the reply, reciever end not ready.") - } -} - -// GetMessageID returns message identifier of given API message. -func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { - return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString()) -} - -// messageNameToID returns message ID of a message identified by its name and CRC. -func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) { - // try to get the ID from the map - c.msgIDsLock.RLock() - id, ok := c.msgIDs[msgName+msgCrc] - c.msgIDsLock.RUnlock() - if ok { - return id, nil - } - - // get the ID using VPP API - id, err := c.vpp.GetMsgID(msgName, msgCrc) - if err != nil { - error := fmt.Errorf("unable to retrieve message ID: %v", err) - log.WithFields(logger.Fields{ - "msg_name": msgName, - "msg_crc": msgCrc, - }).Errorf("unable to retrieve message ID: %v", err) - return id, error - } - - c.msgIDsLock.Lock() - c.msgIDs[msgName+msgCrc] = id - c.msgIDsLock.Unlock() - - return id, nil -}