Simplify subscribing to events and fix events
[govpp.git] / core / connection.go
index c77358f..7d014ce 100644 (file)
@@ -29,42 +29,19 @@ import (
        "git.fd.io/govpp.git/codec"
 )
 
-const (
-       requestChannelBufSize      = 100 // default size of the request channel buffer
-       replyChannelBufSize        = 100 // default size of the reply channel buffer
-       notificationChannelBufSize = 100 // default size of the notification channel buffer
-
-       defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
+var (
+       RequestChanBufSize      = 100 // default size of the request channel buffer
+       ReplyChanBufSize        = 100 // default size of the reply channel buffer
+       NotificationChanBufSize = 100 // default size of the notification channel buffer
 )
 
 var (
-       healthCheckInterval     = time.Second * 1        // default health check interval
-       healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check
-       healthCheckThreshold    = 1                      // number of failed health checks until the error is reported
+       HealthCheckProbeInterval = time.Second * 1        // default health check probe interval
+       HealthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
+       HealthCheckThreshold     = 1                      // number of failed health checks until the error is reported
+       DefaultReplyTimeout      = time.Second * 1        // default timeout for replies from VPP
 )
 
-// SetHealthCheckProbeInterval sets health check probe interval.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckProbeInterval(interval time.Duration) {
-       healthCheckInterval = interval
-}
-
-// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
-// If reply arrives after the timeout, check is considered as failed.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckReplyTimeout(timeout time.Duration) {
-       healthCheckReplyTimeout = timeout
-}
-
-// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
-// Beware: Function is not thread-safe. It is recommended to setup this parameter
-// before connecting to vpp.
-func SetHealthCheckThreshold(threshold int) {
-       healthCheckThreshold = threshold
-}
-
 // ConnectionState represents the current state of the connection to VPP.
 type ConnectionState int
 
@@ -104,10 +81,10 @@ type Connection struct {
 
        maxChannelID uint32              // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
        channelsLock sync.RWMutex        // lock for the channels map
-       channels     map[uint16]*channel // map of all API channels indexed by the channel ID
+       channels     map[uint16]*Channel // map of all API channels indexed by the channel ID
 
-       notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
-       notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
+       subscriptionsLock sync.RWMutex                  // lock for the subscriptions map
+       subscriptions     map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
 
        pingReqID   uint16 // ID if the ControlPing message
        pingReplyID uint16 // ID of the ControlPingReply message
@@ -116,18 +93,30 @@ type Connection struct {
        lastReply     time.Time  // time of the last received reply from VPP
 }
 
+func newConnection(vpp adapter.VppAdapter) *Connection {
+       c := &Connection{
+               vpp:           vpp,
+               codec:         &codec.MsgCodec{},
+               msgIDs:        make(map[string]uint16),
+               msgMap:        make(map[uint16]api.Message),
+               channels:      make(map[uint16]*Channel),
+               subscriptions: make(map[uint16][]*subscriptionCtx),
+       }
+       vpp.SetMsgCallback(c.msgCallback)
+       return c
+}
+
 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
        // create new connection handle
-       c, err := newConnection(vppAdapter)
+       c, err := createConnection(vppAdapter)
        if err != nil {
                return nil, err
        }
 
        // blocking attempt to connect to VPP
-       err = c.connectVPP()
-       if err != nil {
+       if err := c.connectVPP(); err != nil {
                return nil, err
        }
 
@@ -140,13 +129,13 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
        // create new connection handle
-       c, err := newConnection(vppAdapter)
+       c, err := createConnection(vppAdapter)
        if err != nil {
                return nil, nil, err
        }
 
        // asynchronously attempt to connect to VPP
-       connChan := make(chan ConnectionEvent, notificationChannelBufSize)
+       connChan := make(chan ConnectionEvent, NotificationChanBufSize)
        go c.connectLoop(connChan)
 
        return c, connChan, nil
@@ -168,7 +157,7 @@ func (c *Connection) Disconnect() {
 }
 
 // newConnection returns new connection handle.
-func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
+func createConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
        connLock.Lock()
        defer connLock.Unlock()
 
@@ -176,15 +165,7 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
                return nil, errors.New("only one connection per process is supported")
        }
 
-       conn = &Connection{
-               vpp:                vppAdapter,
-               codec:              &codec.MsgCodec{},
-               channels:           make(map[uint16]*channel),
-               msgIDs:             make(map[string]uint16),
-               msgMap:             make(map[uint16]api.Message),
-               notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
-       }
-       conn.vpp.SetMsgCallback(conn.msgCallback)
+       conn = newConnection(vppAdapter)
 
        return conn, nil
 }
@@ -211,8 +192,72 @@ func (c *Connection) connectVPP() error {
        return nil
 }
 
-func getMsgNameWithCrc(x api.Message) string {
-       return x.GetMessageName() + "_" + x.GetCrcString()
+func (c *Connection) NewAPIChannel() (api.Channel, error) {
+       return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
+}
+
+func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
+       return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
+}
+
+// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
+// It allows to specify custom buffer sizes for the request and reply Go channels.
+func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
+       if c == nil {
+               return nil, errors.New("nil connection passed in")
+       }
+
+       // create new channel
+       chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
+       channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
+
+       // store API channel within the client
+       c.channelsLock.Lock()
+       c.channels[chID] = channel
+       c.channelsLock.Unlock()
+
+       // start watching on the request channel
+       go c.watchRequests(channel)
+
+       return channel, nil
+}
+
+// releaseAPIChannel releases API channel that needs to be closed.
+func (c *Connection) releaseAPIChannel(ch *Channel) {
+       log.WithFields(logger.Fields{
+               "channel": ch.id,
+       }).Debug("API channel released")
+
+       // delete the channel from channels map
+       c.channelsLock.Lock()
+       delete(c.channels, ch.id)
+       c.channelsLock.Unlock()
+}
+
+// GetMessageID returns message identifier of given API message.
+func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
+       if c == nil {
+               return 0, errors.New("nil connection passed in")
+       }
+
+       if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
+               return msgID, nil
+       }
+
+       return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
+}
+
+// LookupByID looks up message name and crc by ID.
+func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
+       if c == nil {
+               return nil, errors.New("nil connection passed in")
+       }
+
+       if msg, ok := c.msgMap[msgID]; ok {
+               return msg, nil
+       }
+
+       return nil, fmt.Errorf("unknown message ID: %d", msgID)
 }
 
 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
@@ -268,32 +313,6 @@ func (c *Connection) retrieveMessageIDs() (err error) {
        return nil
 }
 
-// GetMessageID returns message identifier of given API message.
-func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
-       if c == nil {
-               return 0, errors.New("nil connection passed in")
-       }
-
-       if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
-               return msgID, nil
-       }
-
-       return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
-}
-
-// LookupByID looks up message name and crc by ID.
-func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
-       if c == nil {
-               return nil, errors.New("nil connection passed in")
-       }
-
-       if msg, ok := c.msgMap[msgID]; ok {
-               return msg, nil
-       }
-
-       return nil, fmt.Errorf("unknown message ID: %d", msgID)
-}
-
 // disconnectVPP disconnects from VPP in case it is connected.
 func (c *Connection) disconnectVPP() {
        if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
@@ -341,7 +360,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
        // send health check probes until an error or timeout occurs
        for {
                // sleep until next health check probe period
-               time.Sleep(healthCheckInterval)
+               time.Sleep(HealthCheckProbeInterval)
 
                if atomic.LoadUint32(&c.connected) == 0 {
                        // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
@@ -365,7 +384,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
                        case vppReply := <-ch.replyChan:
                                err = vppReply.err
 
-                       case <-time.After(healthCheckReplyTimeout):
+                       case <-time.After(HealthCheckReplyTimeout):
                                err = ErrProbeTimeout
 
                                // check if time since last reply from any other
@@ -374,7 +393,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
                                sinceLastReply = time.Since(c.lastReply)
                                c.lastReplyLock.Unlock()
 
-                               if sinceLastReply < healthCheckReplyTimeout {
+                               if sinceLastReply < HealthCheckReplyTimeout {
                                        log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
                                        continue
                                }
@@ -384,10 +403,10 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
 
                if err == ErrProbeTimeout {
                        failedChecks++
-                       log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
-                       if failedChecks > healthCheckThreshold {
+                       log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
+                       if failedChecks > HealthCheckThreshold {
                                // in case of exceeded failed check treshold, assume VPP disconnected
-                               log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
+                               log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
                                connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
                                break
                        }
@@ -411,52 +430,6 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
        c.connectLoop(connChan)
 }
 
-func (c *Connection) NewAPIChannel() (api.Channel, error) {
-       return c.newAPIChannel(requestChannelBufSize, replyChannelBufSize)
-}
-
-func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
-       return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
-}
-
-// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
-// It allows to specify custom buffer sizes for the request and reply Go channels.
-func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*channel, error) {
-       if c == nil {
-               return nil, errors.New("nil connection passed in")
-       }
-
-       chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
-       ch := &channel{
-               id:                 chID,
-               replyTimeout:       defaultReplyTimeout,
-               msgDecoder:         c.codec,
-               msgIdentifier:      c,
-               reqChan:            make(chan *vppRequest, reqChanBufSize),
-               replyChan:          make(chan *vppReply, replyChanBufSize),
-               notifSubsChan:      make(chan *subscriptionRequest, reqChanBufSize),
-               notifSubsReplyChan: make(chan error, replyChanBufSize),
-       }
-
-       // store API channel within the client
-       c.channelsLock.Lock()
-       c.channels[chID] = ch
-       c.channelsLock.Unlock()
-
-       // start watching on the request channel
-       go c.watchRequests(ch)
-
-       return ch, nil
-}
-
-// releaseAPIChannel releases API channel that needs to be closed.
-func (c *Connection) releaseAPIChannel(ch *channel) {
-       log.WithFields(logger.Fields{
-               "channel": ch.id,
-       }).Debug("API channel released")
-
-       // delete the channel from channels map
-       c.channelsLock.Lock()
-       delete(c.channels, ch.id)
-       c.channelsLock.Unlock()
+func getMsgNameWithCrc(x api.Message) string {
+       return x.GetMessageName() + "_" + x.GetCrcString()
 }