X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Fconnection.go;h=f796f3daa55fb5487e9094736e88527357f207ef;hb=a4112fac7b86fe09650d2bb57969fe46404edd7d;hp=a44d0c4d80597996d30f70ce34c34359a8159599;hpb=f1bef4a3c66f4408afdeb64cda62ccd8562d0fc6;p=govpp.git diff --git a/core/connection.go b/core/connection.go index a44d0c4..f796f3d 100644 --- a/core/connection.go +++ b/core/connection.go @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api - package core import ( "errors" - "os" + "fmt" + "path" + "reflect" "sync" "sync/atomic" "time" @@ -28,376 +28,487 @@ import ( "git.fd.io/govpp.git/adapter" "git.fd.io/govpp.git/api" "git.fd.io/govpp.git/codec" - "git.fd.io/govpp.git/core/bin_api/vpe" ) -var ( - msgControlPing api.Message = &vpe.ControlPing{} - msgControlPingReply api.Message = &vpe.ControlPingReply{} +const ( + DefaultReconnectInterval = time.Second / 2 // default interval between reconnect attempts + DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts ) -const ( - requestChannelBufSize = 100 // default size of the request channel buffers - replyChannelBufSize = 100 // default size of the reply channel buffers - notificationChannelBufSize = 100 // default size of the notification channel buffers +var ( + RequestChanBufSize = 100 // default size of the request channel buffer + ReplyChanBufSize = 100 // default size of the reply channel buffer + NotificationChanBufSize = 100 // default size of the notification channel buffer ) var ( - healthCheckProbeInterval = time.Second * 1 // default health check probe interval - healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe - healthCheckThreshold = 1 // number of failed healthProbe until the error is reported + HealthCheckProbeInterval = time.Second // default health check probe interval + HealthCheckReplyTimeout = time.Millisecond * 250 // timeout for reply to a health check probe + HealthCheckThreshold = 2 // number of failed health checks until the error is reported + DefaultReplyTimeout = time.Second // default timeout for replies from VPP ) -// ConnectionState holds the current state of the connection to VPP. +// ConnectionState represents the current state of the connection to VPP. type ConnectionState int const ( - // Connected connection state means that the connection to VPP has been successfully established. + // Connected represents state in which the connection has been successfully established. Connected ConnectionState = iota - // Disconnected connection state means that the connection to VPP has been lost. + // NotResponding represents a state where the VPP socket accepts messages but replies are received with delay, + // or not at all. GoVPP treats this state internally the same as disconnected. + NotResponding + + // Disconnected represents state in which the VPP socket is closed and the connection is considered dropped. Disconnected + + // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts. + Failed ) +func (s ConnectionState) String() string { + switch s { + case Connected: + return "Connected" + case NotResponding: + return "NotResponding" + case Disconnected: + return "Disconnected" + case Failed: + return "Failed" + default: + return fmt.Sprintf("UnknownState(%d)", s) + } +} + // ConnectionEvent is a notification about change in the VPP connection state. type ConnectionEvent struct { - // Timestamp holds the time when the event has been generated. + // Timestamp holds the time when the event has been created. Timestamp time.Time - // State holds the new state of the connection to VPP at the time when the event has been generated. + // State holds the new state of the connection at the time when the event has been created. State ConnectionState + + // Error holds error if any encountered. + Error error } // Connection represents a shared memory connection to VPP via vppAdapter. type Connection struct { - vpp adapter.VppAdapter // VPP adapter - connected uint32 // non-zero if the adapter is connected to VPP - codec *codec.MsgCodec // message codec + vppClient adapter.VppAPI // VPP binary API client - msgIDsLock sync.RWMutex // lock for the message IDs map - msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC + maxAttempts int // interval for reconnect attempts + recInterval time.Duration // maximum number of reconnect attempts - channelsLock sync.RWMutex // lock for the channels map - channels map[uint16]*channel // map of all API channels indexed by the channel ID + vppConnected uint32 // non-zero if the adapter is connected to VPP - notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map - notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID + connChan chan ConnectionEvent // connection status events are sent to this channel + healthCheckDone chan struct{} // used to terminate health check loop - maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations) - pingReqID uint16 // ID if the ControlPing message - pingReplyID uint16 // ID of the ControlPingReply message + codec MessageCodec // message codec + msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC + msgMapByPath map[string]map[uint16]api.Message // map of messages indexed by message ID which are indexed by path - lastReplyLock sync.Mutex // lock for the last reply - lastReply time.Time // time of the last received reply from VPP -} + channelsLock sync.RWMutex // lock for the channels map and the channel ID + nextChannelID uint16 // next potential channel ID (the real limit is 2^15) + channels map[uint16]*Channel // map of all API channels indexed by the channel ID -var ( - log *logger.Logger // global logger - conn *Connection // global handle to the Connection (used in the message receive callback) - connLock sync.RWMutex // lock for the global connection -) + subscriptionsLock sync.RWMutex // lock for the subscriptions map + subscriptions map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID -// init initializes global logger, which logs debug level messages to stdout. -func init() { - log = logger.New() - log.Out = os.Stdout - log.Level = logger.DebugLevel -} + pingReqID uint16 // ID if the ControlPing message + pingReplyID uint16 // ID of the ControlPingReply message -// SetLogger sets global logger to provided one. -func SetLogger(l *logger.Logger) { - log = l -} + lastReplyLock sync.Mutex // lock for the last reply + lastReply time.Time // time of the last received reply from VPP -// SetHealthCheckProbeInterval sets health check probe interval. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckProbeInterval(interval time.Duration) { - healthCheckProbeInterval = interval -} + msgControlPing api.Message + msgControlPingReply api.Message -// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe. -// If reply arrives after the timeout, check is considered as failed. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckReplyTimeout(timeout time.Duration) { - healthCheckReplyTimeout = timeout + apiTrace *trace // API tracer (disabled by default) } -// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported. -// Beware: Function is not thread-safe. It is recommended to setup this parameter -// before connecting to vpp. -func SetHealthCheckThreshold(threshold int) { - healthCheckThreshold = threshold -} +func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection { + if attempts == 0 { + attempts = DefaultMaxReconnectAttempts + } + if interval == 0 { + interval = DefaultReconnectInterval + } -// SetControlPingMessages sets the messages for ControlPing and ControlPingReply -func SetControlPingMessages(controPing, controlPingReply api.Message) { - msgControlPing = controPing - msgControlPingReply = controlPingReply + c := &Connection{ + vppClient: binapi, + maxAttempts: attempts, + recInterval: interval, + connChan: make(chan ConnectionEvent, NotificationChanBufSize), + healthCheckDone: make(chan struct{}), + codec: codec.DefaultCodec, + msgIDs: make(map[string]uint16), + msgMapByPath: make(map[string]map[uint16]api.Message), + channels: make(map[uint16]*Channel), + subscriptions: make(map[uint16][]*subscriptionCtx), + msgControlPing: msgControlPing, + msgControlPingReply: msgControlPingReply, + apiTrace: &trace{ + list: make([]*api.Record, 0), + mux: &sync.Mutex{}, + }, + } + binapi.SetMsgCallback(c.msgCallback) + return c } -// Connect connects to VPP using specified VPP adapter and returns the connection handle. -// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. -func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { +// Connect connects to VPP API using specified adapter and returns a connection handle. +// This call blocks until it is either connected, or an error occurs. +// Only one connection attempt will be performed. +func Connect(binapi adapter.VppAPI) (*Connection, error) { // create new connection handle - c, err := newConnection(vppAdapter) - if err != nil { - return nil, err - } + c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval) // blocking attempt to connect to VPP - err = c.connectVPP() - if err != nil { + if err := c.connectVPP(); err != nil { return nil, err } - return conn, nil + return c, nil } // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle // and ConnectionState channel. This call does not block until connection is established, it // returns immediately. The caller is supposed to watch the returned ConnectionState channel for // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect. -func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) { +func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) { // create new connection handle - c, err := newConnection(vppAdapter) - if err != nil { - return nil, nil, err - } + c := newConnection(binapi, attempts, interval) // asynchronously attempt to connect to VPP - connChan := make(chan ConnectionEvent, notificationChannelBufSize) - go c.connectLoop(connChan) + go c.connectLoop() + + return c, c.connChan, nil +} + +// connectVPP performs blocking attempt to connect to VPP. +func (c *Connection) connectVPP() error { + log.Debug("Connecting to VPP..") - return conn, connChan, nil + // blocking connect + if err := c.vppClient.Connect(); err != nil { + return err + } + log.Debugf("Connected to VPP") + + if err := c.retrieveMessageIDs(); err != nil { + if err := c.vppClient.Disconnect(); err != nil { + log.Debugf("disconnecting vpp client failed: %v", err) + } + return fmt.Errorf("VPP is incompatible: %v", err) + } + + // store connected state + atomic.StoreUint32(&c.vppConnected, 1) + + return nil } -// Disconnect disconnects from VPP and releases all connection-related resources. +// Disconnect disconnects from VPP API and releases all connection-related resources. func (c *Connection) Disconnect() { if c == nil { return } - connLock.Lock() - defer connLock.Unlock() - - if c != nil && c.vpp != nil { - c.disconnectVPP() + if c.vppClient != nil { + c.disconnectVPP(true) } - conn = nil } -// newConnection returns new connection handle. -func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { - connLock.Lock() - defer connLock.Unlock() - - if conn != nil { - return nil, errors.New("only one connection per process is supported") - } +// disconnectVPP disconnects from VPP in case it is connected. terminate tells +// that disconnectVPP() was called from Close(), so healthCheckLoop() can be +// terminated. +func (c *Connection) disconnectVPP(terminate bool) { + if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) { + if terminate { + close(c.healthCheckDone) + } + log.Debug("Disconnecting from VPP..") - conn = &Connection{ - vpp: vppAdapter, - codec: &codec.MsgCodec{}, - channels: make(map[uint16]*channel), - msgIDs: make(map[string]uint16), - notifSubscriptions: make(map[uint16][]*api.NotifSubscription), + if err := c.vppClient.Disconnect(); err != nil { + log.Debugf("Disconnect from VPP failed: %v", err) + } + log.Debug("Disconnected from VPP") } +} - conn.vpp.SetMsgCallback(msgCallback) - return conn, nil +func (c *Connection) NewAPIChannel() (api.Channel, error) { + return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize) } -// connectVPP performs one blocking attempt to connect to VPP. -func (c *Connection) connectVPP() error { - log.Debug("Connecting to VPP...") +func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) { + return c.newAPIChannel(reqChanBufSize, replyChanBufSize) +} - // blocking connect - err := c.vpp.Connect() - if err != nil { - log.Warn(err) - return err +// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core. +// It allows to specify custom buffer sizes for the request and reply Go channels. +func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) { + if c == nil { + return nil, errors.New("nil connection passed in") } - // store control ping IDs - if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil { - c.vpp.Disconnect() - return err - } - if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil { - c.vpp.Disconnect() - return err + channel, err := c.newChannel(reqChanBufSize, replyChanBufSize) + if err != nil { + return nil, err } - // store connected state - atomic.StoreUint32(&c.connected, 1) + // start watching on the request channel + go c.watchRequests(channel) - log.Info("Connected to VPP.") - return nil + return channel, nil } -// disconnectVPP disconnects from VPP in case it is connected. -func (c *Connection) disconnectVPP() { - if atomic.CompareAndSwapUint32(&c.connected, 1, 0) { - c.vpp.Disconnect() - } +// releaseAPIChannel releases API channel that needs to be closed. +func (c *Connection) releaseAPIChannel(ch *Channel) { + log.WithFields(logger.Fields{ + "channel": ch.id, + }).Debug("API channel released") + + // delete the channel from channels map + c.channelsLock.Lock() + delete(c.channels, ch.id) + c.channelsLock.Unlock() } // connectLoop attempts to connect to VPP until it succeeds. // Then it continues with healthCheckLoop. -func (c *Connection) connectLoop(connChan chan ConnectionEvent) { +func (c *Connection) connectLoop() { + var reconnectAttempts int + // loop until connected for { - if err := c.vpp.WaitReady(); err != nil { - log.Warnf("wait ready failed: %v", err) + if err := c.vppClient.WaitReady(); err != nil { + log.Debugf("wait ready failed: %v", err) } if err := c.connectVPP(); err == nil { // signal connected event - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected}) break + } else if reconnectAttempts < c.maxAttempts { + reconnectAttempts++ + log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err) + time.Sleep(c.recInterval) } else { - log.Errorf("connecting to VPP failed: %v", err) - time.Sleep(time.Second) + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}) + return } } // we are now connected, continue with health check loop - c.healthCheckLoop(connChan) + c.healthCheckLoop() } // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect, // it continues with connectLoop and tries to reconnect. -func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { +func (c *Connection) healthCheckLoop() { // create a separate API channel for health check probes - ch, err := conn.newAPIChannelBuffered(1, 1) + ch, err := c.newAPIChannel(1, 1) if err != nil { log.Error("Failed to create health check API channel, health check will be disabled:", err) return } + defer ch.Close() - var sinceLastReply time.Duration - var failedChecks int + var ( + sinceLastReply time.Duration + failedChecks int + ) // send health check probes until an error or timeout occurs - for { - // sleep until next health check probe period - time.Sleep(healthCheckProbeInterval) + probeInterval := time.NewTicker(HealthCheckProbeInterval) + defer probeInterval.Stop() - if atomic.LoadUint32(&c.connected) == 0 { - // Disconnect has been called in the meantime, return the healthcheck - reconnect loop +HealthCheck: + for { + select { + case <-c.healthCheckDone: + // Terminate the health check loop on connection disconnect log.Debug("Disconnected on request, exiting health check loop.") return - } - - // try draining probe replies from previous request before sending next one - select { - case <-ch.replyChan: - log.Debug("drained old probe reply from reply channel") - default: - } + case <-probeInterval.C: + // try draining probe replies from previous request before sending next one + select { + case <-ch.replyChan: + log.Debug("drained old probe reply from reply channel") + default: + } - // send the control ping request - ch.reqChan <- &api.VppRequest{Message: msgControlPing} + // send the control ping request + ch.reqChan <- &vppRequest{msg: c.msgControlPing} - for { - // expect response within timeout period - select { - case vppReply := <-ch.replyChan: - err = vppReply.Error + for { + // expect response within timeout period + select { + case vppReply := <-ch.replyChan: + err = vppReply.err - case <-time.After(healthCheckReplyTimeout): - err = ErrProbeTimeout + case <-time.After(HealthCheckReplyTimeout): + err = ErrProbeTimeout - // check if time since last reply from any other - // channel is less than health check reply timeout - conn.lastReplyLock.Lock() - sinceLastReply = time.Since(c.lastReply) - conn.lastReplyLock.Unlock() + // check if time since last reply from any other + // channel is less than health check reply timeout + c.lastReplyLock.Lock() + sinceLastReply = time.Since(c.lastReply) + c.lastReplyLock.Unlock() - if sinceLastReply < healthCheckReplyTimeout { - log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply) - continue + if sinceLastReply < HealthCheckReplyTimeout { + log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply) + continue + } } + break } - break - } - if err == ErrProbeTimeout { - failedChecks++ - log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks) - if failedChecks > healthCheckThreshold { - // in case of exceeded treshold disconnect - log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold) - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} - break + if err == ErrProbeTimeout { + failedChecks++ + log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks) + if failedChecks > HealthCheckThreshold { + // in case of exceeded failed check threshold, assume VPP unresponsive + log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold) + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding}) + break HealthCheck + } + } else if err != nil { + // in case of error, assume VPP disconnected + log.Errorf("VPP health check probe failed: %v", err) + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}) + break HealthCheck + } else if failedChecks > 0 { + // in case of success after failed checks, clear failed check counter + failedChecks = 0 + log.Infof("VPP health check probe OK") } - } else if err != nil { - // in case of error disconnect - log.Errorf("VPP health check probe failed: %v", err) - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} - break - } else if failedChecks > 0 { - failedChecks = 0 - log.Infof("VPP health check probe OK") } } // cleanup - ch.Close() - c.disconnectVPP() + c.disconnectVPP(false) // we are now disconnected, start connect loop - c.connectLoop(connChan) + c.connectLoop() } -func (c *Connection) NewAPIChannel() (api.Channel, error) { - return c.newAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize) +func getMsgNameWithCrc(x api.Message) string { + return getMsgID(x.GetMessageName(), x.GetCrcString()) } -func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) { - return c.newAPIChannelBuffered(reqChanBufSize, replyChanBufSize) +func getMsgID(name, crc string) string { + return name + "_" + crc } -// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core. -// It allows to specify custom buffer sizes for the request and reply Go channels. -func (c *Connection) newAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*channel, error) { +func getMsgFactory(msg api.Message) func() api.Message { + return func() api.Message { + return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } +} + +// GetMessageID returns message identifier of given API message. +func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { if c == nil { - return nil, errors.New("nil connection passed in") + return 0, errors.New("nil connection passed in") } + pkgPath := c.GetMessagePath(msg) + msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString()) + if err != nil { + return 0, err + } + if pathMsgs, pathOk := c.msgMapByPath[pkgPath]; !pathOk { + c.msgMapByPath[pkgPath] = make(map[uint16]api.Message) + c.msgMapByPath[pkgPath][msgID] = msg + } else if _, msgOk := pathMsgs[msgID]; !msgOk { + c.msgMapByPath[pkgPath][msgID] = msg + } + if _, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { + return msgID, nil + } + c.msgIDs[getMsgNameWithCrc(msg)] = msgID + return msgID, nil +} - chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) - ch := &channel{ - id: chID, - replyTimeout: defaultReplyTimeout, +// LookupByID looks up message name and crc by ID. +func (c *Connection) LookupByID(path string, msgID uint16) (api.Message, error) { + if c == nil { + return nil, errors.New("nil connection passed in") + } + if msg, ok := c.msgMapByPath[path][msgID]; ok { + return msg, nil } - ch.msgDecoder = c.codec - ch.msgIdentifier = c + return nil, fmt.Errorf("unknown message ID %d for path '%s'", msgID, path) +} - // create the communication channels - ch.reqChan = make(chan *api.VppRequest, reqChanBufSize) - ch.replyChan = make(chan *api.VppReply, replyChanBufSize) - ch.notifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize) - ch.notifSubsReplyChan = make(chan error, replyChanBufSize) +// GetMessagePath returns path for the given message +func (c *Connection) GetMessagePath(msg api.Message) string { + return path.Dir(reflect.TypeOf(msg).Elem().PkgPath()) +} - // store API channel within the client - c.channelsLock.Lock() - c.channels[chID] = ch - c.channelsLock.Unlock() +// retrieveMessageIDs retrieves IDs for all registered messages and stores them in map +func (c *Connection) retrieveMessageIDs() (err error) { + t := time.Now() - // start watching on the request channel - go c.watchRequests(ch) + msgsByPath := api.GetRegisteredMessages() + + var n int + for pkgPath, msgs := range msgsByPath { + for _, msg := range msgs { + msgID, err := c.GetMessageID(msg) + if err != nil { + if debugMsgIDs { + log.Debugf("retrieving message ID for %s.%s failed: %v", + pkgPath, msg.GetMessageName(), err) + } + continue + } + n++ + + if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() { + c.pingReqID = msgID + c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() { + c.pingReplyID = msgID + c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } + + if debugMsgIDs { + log.Debugf("message %q (%s) has ID: %d", msg.GetMessageName(), getMsgNameWithCrc(msg), msgID) + } + } + log.WithField("took", time.Since(t)). + Debugf("retrieved IDs for %d messages (registered %d) from path %s", n, len(msgs), pkgPath) + } - return ch, nil + return nil } -// releaseAPIChannel releases API channel that needs to be closed. -func (c *Connection) releaseAPIChannel(ch *channel) { - log.WithFields(logger.Fields{ - "ID": ch.id, - }).Debug("API channel closed.") +func (c *Connection) sendConnEvent(event ConnectionEvent) { + select { + case c.connChan <- event: + default: + log.Warn("Connection state channel is full, discarding value.") + } +} - // delete the channel from channels map - c.channelsLock.Lock() - delete(c.channels, ch.id) - c.channelsLock.Unlock() +// Trace gives access to the API trace interface +func (c *Connection) Trace() api.Trace { + return c.apiTrace +} + +// trace records api message +func (c *Connection) trace(msg api.Message, chId uint16, t time.Time, isReceived bool) { + if atomic.LoadInt32(&c.apiTrace.isEnabled) == 0 { + return + } + entry := &api.Record{ + Message: msg, + Timestamp: t, + IsReceived: isReceived, + ChannelID: chId, + } + c.apiTrace.mux.Lock() + c.apiTrace.list = append(c.apiTrace.list, entry) + c.apiTrace.mux.Unlock() }