"errors"
"fmt"
"reflect"
+ "sync"
"sync/atomic"
"time"
requestSize int
replySize int
replyTimeout time.Duration
+ // per-request context
+ pkgPath string
+ sync.Mutex
}
func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) {
if err := s.conn.processRequest(s.channel, req); err != nil {
return err
}
+ s.Lock()
+ s.pkgPath = s.conn.GetMessagePath(msg)
+ s.Unlock()
return nil
}
return nil, err
}
// resolve message type
- msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID)
+ s.Lock()
+ path := s.pkgPath
+ s.Unlock()
+ msg, err := s.channel.msgIdentifier.LookupByID(path, reply.msgID)
if err != nil {
return nil, err
}