X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Fconnection.go;h=f796f3daa55fb5487e9094736e88527357f207ef;hb=a4112fac7b86fe09650d2bb57969fe46404edd7d;hp=ee5a06b82e04fe76c96ec2a7a132b2c45fb325a4;hpb=57de49f7583b8174c7f3d8e21956d4eaac64ac28;p=govpp.git diff --git a/core/connection.go b/core/connection.go index ee5a06b..f796f3d 100644 --- a/core/connection.go +++ b/core/connection.go @@ -102,15 +102,16 @@ type Connection struct { vppConnected uint32 // non-zero if the adapter is connected to VPP - connChan chan ConnectionEvent // connection status events are sent to this channel + connChan chan ConnectionEvent // connection status events are sent to this channel + healthCheckDone chan struct{} // used to terminate health check loop codec MessageCodec // message codec msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC msgMapByPath map[string]map[uint16]api.Message // map of messages indexed by message ID which are indexed by path - maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations) - channelsLock sync.RWMutex // lock for the channels map - channels map[uint16]*Channel // map of all API channels indexed by the channel ID + channelsLock sync.RWMutex // lock for the channels map and the channel ID + nextChannelID uint16 // next potential channel ID (the real limit is 2^15) + channels map[uint16]*Channel // map of all API channels indexed by the channel ID subscriptionsLock sync.RWMutex // lock for the subscriptions map subscriptions map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID @@ -140,6 +141,7 @@ func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) maxAttempts: attempts, recInterval: interval, connChan: make(chan ConnectionEvent, NotificationChanBufSize), + healthCheckDone: make(chan struct{}), codec: codec.DefaultCodec, msgIDs: make(map[string]uint16), msgMapByPath: make(map[string]map[uint16]api.Message), @@ -214,13 +216,18 @@ func (c *Connection) Disconnect() { return } if c.vppClient != nil { - c.disconnectVPP() + c.disconnectVPP(true) } } -// disconnectVPP disconnects from VPP in case it is connected. -func (c *Connection) disconnectVPP() { +// disconnectVPP disconnects from VPP in case it is connected. terminate tells +// that disconnectVPP() was called from Close(), so healthCheckLoop() can be +// terminated. +func (c *Connection) disconnectVPP(terminate bool) { if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) { + if terminate { + close(c.healthCheckDone) + } log.Debug("Disconnecting from VPP..") if err := c.vppClient.Disconnect(); err != nil { @@ -245,14 +252,10 @@ func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Chann return nil, errors.New("nil connection passed in") } - // create new channel - chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) - channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize) - - // store API channel within the client - c.channelsLock.Lock() - c.channels[chID] = channel - c.channelsLock.Unlock() + channel, err := c.newChannel(reqChanBufSize, replyChanBufSize) + if err != nil { + return nil, err + } // start watching on the request channel go c.watchRequests(channel) @@ -309,6 +312,7 @@ func (c *Connection) healthCheckLoop() { log.Error("Failed to create health check API channel, health check will be disabled:", err) return } + defer ch.Close() var ( sinceLastReply time.Duration @@ -316,73 +320,74 @@ func (c *Connection) healthCheckLoop() { ) // send health check probes until an error or timeout occurs - for { - // sleep until next health check probe period - time.Sleep(HealthCheckProbeInterval) + probeInterval := time.NewTicker(HealthCheckProbeInterval) + defer probeInterval.Stop() - if atomic.LoadUint32(&c.vppConnected) == 0 { - // Disconnect has been called in the meantime, return the healthcheck - reconnect loop +HealthCheck: + for { + select { + case <-c.healthCheckDone: + // Terminate the health check loop on connection disconnect log.Debug("Disconnected on request, exiting health check loop.") return - } - - // try draining probe replies from previous request before sending next one - select { - case <-ch.replyChan: - log.Debug("drained old probe reply from reply channel") - default: - } + case <-probeInterval.C: + // try draining probe replies from previous request before sending next one + select { + case <-ch.replyChan: + log.Debug("drained old probe reply from reply channel") + default: + } - // send the control ping request - ch.reqChan <- &vppRequest{msg: c.msgControlPing} + // send the control ping request + ch.reqChan <- &vppRequest{msg: c.msgControlPing} - for { - // expect response within timeout period - select { - case vppReply := <-ch.replyChan: - err = vppReply.err + for { + // expect response within timeout period + select { + case vppReply := <-ch.replyChan: + err = vppReply.err - case <-time.After(HealthCheckReplyTimeout): - err = ErrProbeTimeout + case <-time.After(HealthCheckReplyTimeout): + err = ErrProbeTimeout - // check if time since last reply from any other - // channel is less than health check reply timeout - c.lastReplyLock.Lock() - sinceLastReply = time.Since(c.lastReply) - c.lastReplyLock.Unlock() + // check if time since last reply from any other + // channel is less than health check reply timeout + c.lastReplyLock.Lock() + sinceLastReply = time.Since(c.lastReply) + c.lastReplyLock.Unlock() - if sinceLastReply < HealthCheckReplyTimeout { - log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply) - continue + if sinceLastReply < HealthCheckReplyTimeout { + log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply) + continue + } } + break } - break - } - if err == ErrProbeTimeout { - failedChecks++ - log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks) - if failedChecks > HealthCheckThreshold { - // in case of exceeded failed check threshold, assume VPP unresponsive - log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold) - c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding}) - break + if err == ErrProbeTimeout { + failedChecks++ + log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks) + if failedChecks > HealthCheckThreshold { + // in case of exceeded failed check threshold, assume VPP unresponsive + log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold) + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding}) + break HealthCheck + } + } else if err != nil { + // in case of error, assume VPP disconnected + log.Errorf("VPP health check probe failed: %v", err) + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}) + break HealthCheck + } else if failedChecks > 0 { + // in case of success after failed checks, clear failed check counter + failedChecks = 0 + log.Infof("VPP health check probe OK") } - } else if err != nil { - // in case of error, assume VPP disconnected - log.Errorf("VPP health check probe failed: %v", err) - c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}) - break - } else if failedChecks > 0 { - // in case of success after failed checks, clear failed check counter - failedChecks = 0 - log.Infof("VPP health check probe OK") } } // cleanup - ch.Close() - c.disconnectVPP() + c.disconnectVPP(false) // we are now disconnected, start connect loop c.connectLoop()