Refactor GoVPP
[govpp.git] / core / connection.go
index a44d0c4..c77358f 100644 (file)
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
-
 package core
 
 import (
        "errors"
-       "os"
+       "fmt"
+       "reflect"
        "sync"
        "sync/atomic"
        "time"
@@ -28,115 +27,95 @@ import (
        "git.fd.io/govpp.git/adapter"
        "git.fd.io/govpp.git/api"
        "git.fd.io/govpp.git/codec"
-       "git.fd.io/govpp.git/core/bin_api/vpe"
-)
-
-var (
-       msgControlPing      api.Message = &vpe.ControlPing{}
-       msgControlPingReply api.Message = &vpe.ControlPingReply{}
 )
 
 const (
-       requestChannelBufSize      = 100 // default size of the request channel buffers
-       replyChannelBufSize        = 100 // default size of the reply channel buffers
-       notificationChannelBufSize = 100 // default size of the notification channel buffers
+       requestChannelBufSize      = 100 // default size of the request channel buffer
+       replyChannelBufSize        = 100 // default size of the reply channel buffer
+       notificationChannelBufSize = 100 // default size of the notification channel buffer
+
+       defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
 )
 
 var (
-       healthCheckProbeInterval = time.Second * 1        // default health check probe interval
-       healthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
-       healthCheckThreshold     = 1                      // number of failed healthProbe until the error is reported
+       healthCheckInterval     = time.Second * 1        // default health check interval
+       healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check
+       healthCheckThreshold    = 1                      // number of failed health checks until the error is reported
 )
 
-// ConnectionState holds the current state of the connection to VPP.
+// SetHealthCheckProbeInterval sets health check probe interval.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckProbeInterval(interval time.Duration) {
+       healthCheckInterval = interval
+}
+
+// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
+// If reply arrives after the timeout, check is considered as failed.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckReplyTimeout(timeout time.Duration) {
+       healthCheckReplyTimeout = timeout
+}
+
+// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckThreshold(threshold int) {
+       healthCheckThreshold = threshold
+}
+
+// ConnectionState represents the current state of the connection to VPP.
 type ConnectionState int
 
 const (
-       // Connected connection state means that the connection to VPP has been successfully established.
+       // Connected represents state in which the connection has been successfully established.
        Connected ConnectionState = iota
 
-       // Disconnected connection state means that the connection to VPP has been lost.
+       // Disconnected represents state in which the connection has been dropped.
        Disconnected
 )
 
 // ConnectionEvent is a notification about change in the VPP connection state.
 type ConnectionEvent struct {
-       // Timestamp holds the time when the event has been generated.
+       // Timestamp holds the time when the event has been created.
        Timestamp time.Time
 
-       // State holds the new state of the connection to VPP at the time when the event has been generated.
+       // State holds the new state of the connection at the time when the event has been created.
        State ConnectionState
+
+       // Error holds error if any encountered.
+       Error error
 }
 
+var (
+       connLock sync.RWMutex // lock for the global connection
+       conn     *Connection  // global handle to the Connection (used in the message receive callback)
+)
+
 // Connection represents a shared memory connection to VPP via vppAdapter.
 type Connection struct {
        vpp       adapter.VppAdapter // VPP adapter
        connected uint32             // non-zero if the adapter is connected to VPP
-       codec     *codec.MsgCodec    // message codec
 
-       msgIDsLock sync.RWMutex      // lock for the message IDs map
-       msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
+       codec  *codec.MsgCodec        // message codec
+       msgIDs map[string]uint16      // map of message IDs indexed by message name + CRC
+       msgMap map[uint16]api.Message // map of messages indexed by message ID
 
+       maxChannelID uint32              // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
        channelsLock sync.RWMutex        // lock for the channels map
        channels     map[uint16]*channel // map of all API channels indexed by the channel ID
 
        notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
        notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
 
-       maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
-       pingReqID    uint16 // ID if the ControlPing message
-       pingReplyID  uint16 // ID of the ControlPingReply message
+       pingReqID   uint16 // ID if the ControlPing message
+       pingReplyID uint16 // ID of the ControlPingReply message
 
        lastReplyLock sync.Mutex // lock for the last reply
        lastReply     time.Time  // time of the last received reply from VPP
 }
 
-var (
-       log      *logger.Logger // global logger
-       conn     *Connection    // global handle to the Connection (used in the message receive callback)
-       connLock sync.RWMutex   // lock for the global connection
-)
-
-// init initializes global logger, which logs debug level messages to stdout.
-func init() {
-       log = logger.New()
-       log.Out = os.Stdout
-       log.Level = logger.DebugLevel
-}
-
-// SetLogger sets global logger to provided one.
-func SetLogger(l *logger.Logger) {
-       log = l
-}
-
-// SetHealthCheckProbeInterval sets health check probe interval.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckProbeInterval(interval time.Duration) {
-       healthCheckProbeInterval = interval
-}
-
-// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
-// If reply arrives after the timeout, check is considered as failed.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckReplyTimeout(timeout time.Duration) {
-       healthCheckReplyTimeout = timeout
-}
-
-// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckThreshold(threshold int) {
-       healthCheckThreshold = threshold
-}
-
-// SetControlPingMessages sets the messages for ControlPing and ControlPingReply
-func SetControlPingMessages(controPing, controlPingReply api.Message) {
-       msgControlPing = controPing
-       msgControlPingReply = controlPingReply
-}
-
 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
@@ -152,7 +131,7 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
                return nil, err
        }
 
-       return conn, nil
+       return c, nil
 }
 
 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
@@ -170,7 +149,7 @@ func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEv
        connChan := make(chan ConnectionEvent, notificationChannelBufSize)
        go c.connectLoop(connChan)
 
-       return conn, connChan, nil
+       return c, connChan, nil
 }
 
 // Disconnect disconnects from VPP and releases all connection-related resources.
@@ -178,10 +157,11 @@ func (c *Connection) Disconnect() {
        if c == nil {
                return
        }
+
        connLock.Lock()
        defer connLock.Unlock()
 
-       if c != nil && c.vpp != nil {
+       if c.vpp != nil {
                c.disconnectVPP()
        }
        conn = nil
@@ -201,41 +181,119 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
                codec:              &codec.MsgCodec{},
                channels:           make(map[uint16]*channel),
                msgIDs:             make(map[string]uint16),
+               msgMap:             make(map[uint16]api.Message),
                notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
        }
+       conn.vpp.SetMsgCallback(conn.msgCallback)
 
-       conn.vpp.SetMsgCallback(msgCallback)
        return conn, nil
 }
 
-// connectVPP performs one blocking attempt to connect to VPP.
+// connectVPP performs blocking attempt to connect to VPP.
 func (c *Connection) connectVPP() error {
-       log.Debug("Connecting to VPP...")
+       log.Debug("Connecting to VPP..")
 
        // blocking connect
-       err := c.vpp.Connect()
-       if err != nil {
-               log.Warn(err)
+       if err := c.vpp.Connect(); err != nil {
                return err
        }
 
-       // store control ping IDs
-       if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil {
-               c.vpp.Disconnect()
-               return err
-       }
-       if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil {
+       log.Debugf("Connected to VPP.")
+
+       if err := c.retrieveMessageIDs(); err != nil {
                c.vpp.Disconnect()
-               return err
+               return fmt.Errorf("VPP is incompatible: %v", err)
        }
 
        // store connected state
        atomic.StoreUint32(&c.connected, 1)
 
-       log.Info("Connected to VPP.")
        return nil
 }
 
+func getMsgNameWithCrc(x api.Message) string {
+       return x.GetMessageName() + "_" + x.GetCrcString()
+}
+
+// retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
+func (c *Connection) retrieveMessageIDs() (err error) {
+       t := time.Now()
+
+       var addMsg = func(msgID uint16, msg api.Message) {
+               c.msgIDs[getMsgNameWithCrc(msg)] = msgID
+               c.msgMap[msgID] = msg
+       }
+
+       msgs := api.GetAllMessages()
+
+       for name, msg := range msgs {
+               msgID, err := c.vpp.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
+               if err != nil {
+                       return err
+               }
+
+               addMsg(msgID, msg)
+
+               if msg.GetMessageName() == msgControlPing.GetMessageName() {
+                       c.pingReqID = msgID
+                       msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+               } else if msg.GetMessageName() == msgControlPingReply.GetMessageName() {
+                       c.pingReplyID = msgID
+                       msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+               }
+
+               if debugMsgIDs {
+                       log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
+               }
+       }
+
+       log.Debugf("retrieving %d message IDs took %s", len(msgs), time.Since(t))
+
+       // fallback for control ping when vpe package is not imported
+       if c.pingReqID == 0 {
+               c.pingReqID, err = c.vpp.GetMsgID(msgControlPing.GetMessageName(), msgControlPing.GetCrcString())
+               if err != nil {
+                       return err
+               }
+               addMsg(c.pingReqID, msgControlPing)
+       }
+       if c.pingReplyID == 0 {
+               c.pingReplyID, err = c.vpp.GetMsgID(msgControlPingReply.GetMessageName(), msgControlPingReply.GetCrcString())
+               if err != nil {
+                       return err
+               }
+               addMsg(c.pingReplyID, msgControlPingReply)
+       }
+
+       return nil
+}
+
+// GetMessageID returns message identifier of given API message.
+func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
+       if c == nil {
+               return 0, errors.New("nil connection passed in")
+       }
+
+       if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
+               return msgID, nil
+       }
+
+       return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
+}
+
+// LookupByID looks up message name and crc by ID.
+func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
+       if c == nil {
+               return nil, errors.New("nil connection passed in")
+       }
+
+       if msg, ok := c.msgMap[msgID]; ok {
+               return msg, nil
+       }
+
+       return nil, fmt.Errorf("unknown message ID: %d", msgID)
+}
+
 // disconnectVPP disconnects from VPP in case it is connected.
 func (c *Connection) disconnectVPP() {
        if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
@@ -269,19 +327,21 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
 // it continues with connectLoop and tries to reconnect.
 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
        // create a separate API channel for health check probes
-       ch, err := conn.newAPIChannelBuffered(1, 1)
+       ch, err := c.newAPIChannel(1, 1)
        if err != nil {
                log.Error("Failed to create health check API channel, health check will be disabled:", err)
                return
        }
 
-       var sinceLastReply time.Duration
-       var failedChecks int
+       var (
+               sinceLastReply time.Duration
+               failedChecks   int
+       )
 
        // send health check probes until an error or timeout occurs
        for {
                // sleep until next health check probe period
-               time.Sleep(healthCheckProbeInterval)
+               time.Sleep(healthCheckInterval)
 
                if atomic.LoadUint32(&c.connected) == 0 {
                        // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
@@ -297,22 +357,22 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
                }
 
                // send the control ping request
-               ch.reqChan <- &api.VppRequest{Message: msgControlPing}
+               ch.reqChan <- &vppRequest{msg: msgControlPing}
 
                for {
                        // expect response within timeout period
                        select {
                        case vppReply := <-ch.replyChan:
-                               err = vppReply.Error
+                               err = vppReply.err
 
                        case <-time.After(healthCheckReplyTimeout):
                                err = ErrProbeTimeout
 
                                // check if time since last reply from any other
                                // channel is less than health check reply timeout
-                               conn.lastReplyLock.Lock()
+                               c.lastReplyLock.Lock()
                                sinceLastReply = time.Since(c.lastReply)
-                               conn.lastReplyLock.Unlock()
+                               c.lastReplyLock.Unlock()
 
                                if sinceLastReply < healthCheckReplyTimeout {
                                        log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
@@ -326,17 +386,18 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
                        failedChecks++
                        log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
                        if failedChecks > healthCheckThreshold {
-                               // in case of exceeded treshold disconnect
+                               // in case of exceeded failed check treshold, assume VPP disconnected
                                log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
                                connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
                                break
                        }
                } else if err != nil {
-                       // in case of error disconnect
+                       // in case of error, assume VPP disconnected
                        log.Errorf("VPP health check probe failed: %v", err)
-                       connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
+                       connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
                        break
                } else if failedChecks > 0 {
+                       // in case of success after failed checks, clear failed check counter
                        failedChecks = 0
                        log.Infof("VPP health check probe OK")
                }
@@ -351,33 +412,31 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
 }
 
 func (c *Connection) NewAPIChannel() (api.Channel, error) {
-       return c.newAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
+       return c.newAPIChannel(requestChannelBufSize, replyChannelBufSize)
 }
 
 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
-       return c.newAPIChannelBuffered(reqChanBufSize, replyChanBufSize)
+       return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
 }
 
 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
 // It allows to specify custom buffer sizes for the request and reply Go channels.
-func (c *Connection) newAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*channel, error) {
+func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*channel, error) {
        if c == nil {
                return nil, errors.New("nil connection passed in")
        }
 
        chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
        ch := &channel{
-               id:           chID,
-               replyTimeout: defaultReplyTimeout,
+               id:                 chID,
+               replyTimeout:       defaultReplyTimeout,
+               msgDecoder:         c.codec,
+               msgIdentifier:      c,
+               reqChan:            make(chan *vppRequest, reqChanBufSize),
+               replyChan:          make(chan *vppReply, replyChanBufSize),
+               notifSubsChan:      make(chan *subscriptionRequest, reqChanBufSize),
+               notifSubsReplyChan: make(chan error, replyChanBufSize),
        }
-       ch.msgDecoder = c.codec
-       ch.msgIdentifier = c
-
-       // create the communication channels
-       ch.reqChan = make(chan *api.VppRequest, reqChanBufSize)
-       ch.replyChan = make(chan *api.VppReply, replyChanBufSize)
-       ch.notifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
-       ch.notifSubsReplyChan = make(chan error, replyChanBufSize)
 
        // store API channel within the client
        c.channelsLock.Lock()
@@ -393,8 +452,8 @@ func (c *Connection) newAPIChannelBuffered(reqChanBufSize, replyChanBufSize int)
 // releaseAPIChannel releases API channel that needs to be closed.
 func (c *Connection) releaseAPIChannel(ch *channel) {
        log.WithFields(logger.Fields{
-               "ID": ch.id,
-       }).Debug("API channel closed.")
+               "channel": ch.id,
+       }).Debug("API channel released")
 
        // delete the channel from channels map
        c.channelsLock.Lock()