"errors"
"fmt"
"reflect"
- "sync/atomic"
+ "sync"
"time"
"git.fd.io/govpp.git/api"
)
type Stream struct {
- id uint32
conn *Connection
ctx context.Context
channel *Channel
requestSize int
replySize int
replyTimeout time.Duration
+ // per-request context
+ pkgPath string
+ sync.Mutex
}
func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) {
for _, option := range options {
option(s)
}
- // create and store a new channel
- s.id = atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff
- s.channel = newChannel(uint16(s.id), c, c.codec, c, s.requestSize, s.replySize)
- s.channel.SetReplyTimeout(s.replyTimeout)
- // store API channel within the client
- c.channelsLock.Lock()
- c.channels[uint16(s.id)] = s.channel
- c.channelsLock.Unlock()
+ ch, err := c.newChannel(s.requestSize, s.replySize)
+ if err != nil {
+ return nil, err
+ }
+ s.channel = ch
+ s.channel.SetReplyTimeout(s.replyTimeout)
// Channel.watchRequests are not started here intentionally, because
// requests are sent directly by SendMsg.
if err != nil {
return err
}
+ defer func() { _ = stream.Close() }()
if err := stream.SendMsg(req); err != nil {
return err
}
if err := s.conn.processRequest(s.channel, req); err != nil {
return err
}
+ s.Lock()
+ s.pkgPath = s.conn.GetMessagePath(msg)
+ s.Unlock()
return nil
}
return nil, err
}
// resolve message type
- msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID)
+ s.Lock()
+ path := s.pkgPath
+ s.Unlock()
+ msg, err := s.channel.msgIdentifier.LookupByID(path, reply.msgID)
if err != nil {
return nil, err
}