When creating a new channel and the channel ID wraps around, make sure
to not re-use a channel ID that is still in use. Re-using the channel
ID usually means that the connection health check will stop working and
other things might break as well.
Also rename maxChannelID to nextChannelID and use a lock to guard access
instead of using an atomic. The lock does anyway need to be acquired
because to put the entry in the map.
This commit was inspired by the following PR on Github:
https://github.com/FDio/govpp/pull/14.
Change-Id: I8c1a4ca63a53d07a6482b6047a3005065168c0b4
Signed-off-by: Lukas Vogel <vogel@anapaya.net>
"fmt"
"reflect"
"strings"
"fmt"
"reflect"
"strings"
"time"
"github.com/sirupsen/logrus"
"time"
"github.com/sirupsen/logrus"
receiveReplyTimeout time.Duration // maximum time that we wait for receiver to consume reply
}
receiveReplyTimeout time.Duration // maximum time that we wait for receiver to consume reply
}
-func (c *Connection) newChannel(reqChanBufSize, replyChanBufSize int) *Channel {
+func (c *Connection) newChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
- chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
conn: c,
msgCodec: c.codec,
msgIdentifier: c,
conn: c,
msgCodec: c.codec,
msgIdentifier: c,
// store API channel within the client
c.channelsLock.Lock()
// store API channel within the client
c.channelsLock.Lock()
- c.channels[chID] = channel
+ if len(c.channels) >= 0x7fff {
+ return nil, errors.New("all channel IDs are used")
+ }
+ for {
+ c.nextChannelID++
+ chID := c.nextChannelID & 0x7fff
+ _, ok := c.channels[chID]
+ if !ok {
+ channel.id = chID
+ c.channels[chID] = channel
+ break
+ }
+ }
}
func (ch *Channel) GetID() uint16 {
}
func (ch *Channel) GetID() uint16 {
msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
msgMapByPath map[string]map[uint16]api.Message // map of messages indexed by message ID which are indexed by path
msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
msgMapByPath map[string]map[uint16]api.Message // map of messages indexed by message ID which are indexed by path
- maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
- channelsLock sync.RWMutex // lock for the channels map
- channels map[uint16]*Channel // map of all API channels indexed by the channel ID
+ channelsLock sync.RWMutex // lock for the channels map and the channel ID
+ nextChannelID uint16 // next potential channel ID (the real limit is 2^15)
+ channels map[uint16]*Channel // map of all API channels indexed by the channel ID
subscriptionsLock sync.RWMutex // lock for the subscriptions map
subscriptions map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
subscriptionsLock sync.RWMutex // lock for the subscriptions map
subscriptions map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
return nil, errors.New("nil connection passed in")
}
return nil, errors.New("nil connection passed in")
}
- channel := c.newChannel(reqChanBufSize, replyChanBufSize)
+ channel, err := c.newChannel(reqChanBufSize, replyChanBufSize)
+ if err != nil {
+ return nil, err
+ }
// start watching on the request channel
go c.watchRequests(channel)
// start watching on the request channel
go c.watchRequests(channel)
- s.channel = c.newChannel(s.requestSize, s.replySize)
+ ch, err := c.newChannel(s.requestSize, s.replySize)
+ if err != nil {
+ return nil, err
+ }
+ s.channel = ch
s.channel.SetReplyTimeout(s.replyTimeout)
// Channel.watchRequests are not started here intentionally, because
s.channel.SetReplyTimeout(s.replyTimeout)
// Channel.watchRequests are not started here intentionally, because