Optimize socketclient adapter and add various code improvements
[govpp.git] / core / channel.go
1 // Copyright (c) 2018 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "strings"
22         "time"
23
24         "github.com/sirupsen/logrus"
25
26         "git.fd.io/govpp.git/adapter"
27         "git.fd.io/govpp.git/api"
28 )
29
30 var (
31         ErrInvalidRequestCtx = errors.New("invalid request context")
32 )
33
34 // MessageCodec provides functionality for decoding binary data to generated API messages.
35 type MessageCodec interface {
36         // EncodeMsg encodes message into binary data.
37         EncodeMsg(msg api.Message, msgID uint16) ([]byte, error)
38         // DecodeMsg decodes binary-encoded data of a message into provided Message structure.
39         DecodeMsg(data []byte, msg api.Message) error
40         // DecodeMsgContext decodes context from message data.
41         DecodeMsgContext(data []byte, msg api.Message) (context uint32, err error)
42 }
43
44 // MessageIdentifier provides identification of generated API messages.
45 type MessageIdentifier interface {
46         // GetMessageID returns message identifier of given API message.
47         GetMessageID(msg api.Message) (uint16, error)
48         // LookupByID looks up message name and crc by ID
49         LookupByID(msgID uint16) (api.Message, error)
50 }
51
52 // vppRequest is a request that will be sent to VPP.
53 type vppRequest struct {
54         seqNum uint16      // sequence number
55         msg    api.Message // binary API message to be send to VPP
56         multi  bool        // true if multipart response is expected
57 }
58
59 // vppReply is a reply received from VPP.
60 type vppReply struct {
61         seqNum       uint16 // sequence number
62         msgID        uint16 // ID of the message
63         data         []byte // encoded data with the message
64         lastReceived bool   // for multi request, true if the last reply has been already received
65         err          error  // in case of error, data is nil and this member contains error
66 }
67
68 // requestCtx is a context for request with single reply
69 type requestCtx struct {
70         ch     *Channel
71         seqNum uint16
72 }
73
74 // multiRequestCtx is a context for request with multiple responses
75 type multiRequestCtx struct {
76         ch     *Channel
77         seqNum uint16
78 }
79
80 // subscriptionCtx is a context of subscription for delivery of specific notification messages.
81 type subscriptionCtx struct {
82         ch         *Channel
83         notifChan  chan api.Message   // channel where notification messages will be delivered to
84         msgID      uint16             // message ID for the subscribed event message
85         event      api.Message        // event message that this subscription is for
86         msgFactory func() api.Message // function that returns a new instance of the specific message that is expected as a notification
87 }
88
89 // Channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests
90 // to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels
91 // via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines
92 // concurrently, otherwise the responses could mix! Use multiple channels instead.
93 type Channel struct {
94         id   uint16
95         conn *Connection
96
97         reqChan   chan *vppRequest // channel for sending the requests to VPP
98         replyChan chan *vppReply   // channel where VPP replies are delivered to
99
100         msgCodec      MessageCodec      // used to decode binary data to generated API messages
101         msgIdentifier MessageIdentifier // used to retrieve message ID of a message
102
103         lastSeqNum uint16 // sequence number of the last sent request
104
105         delayedReply *vppReply     // reply already taken from ReplyChan, buffered for later delivery
106         replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
107 }
108
109 func newChannel(id uint16, conn *Connection, codec MessageCodec, identifier MessageIdentifier, reqSize, replySize int) *Channel {
110         return &Channel{
111                 id:            id,
112                 conn:          conn,
113                 msgCodec:      codec,
114                 msgIdentifier: identifier,
115                 reqChan:       make(chan *vppRequest, reqSize),
116                 replyChan:     make(chan *vppReply, replySize),
117                 replyTimeout:  DefaultReplyTimeout,
118         }
119 }
120
121 func (ch *Channel) GetID() uint16 {
122         return ch.id
123 }
124
125 func (ch *Channel) nextSeqNum() uint16 {
126         ch.lastSeqNum++
127         return ch.lastSeqNum
128 }
129
130 func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx {
131         seqNum := ch.nextSeqNum()
132         ch.reqChan <- &vppRequest{
133                 msg:    msg,
134                 seqNum: seqNum,
135         }
136         return &requestCtx{ch: ch, seqNum: seqNum}
137 }
138
139 func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
140         seqNum := ch.nextSeqNum()
141         ch.reqChan <- &vppRequest{
142                 msg:    msg,
143                 seqNum: seqNum,
144                 multi:  true,
145         }
146         return &multiRequestCtx{ch: ch, seqNum: seqNum}
147 }
148
149 func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error {
150         var comperr api.CompatibilityError
151         for _, msg := range msgs {
152                 _, err := ch.msgIdentifier.GetMessageID(msg)
153                 if err != nil {
154                         if uerr, ok := err.(*adapter.UnknownMsgError); ok {
155                                 comperr.IncompatibleMessages = append(comperr.IncompatibleMessages, getMsgID(uerr.MsgName, uerr.MsgCrc))
156                                 continue
157                         }
158                         // other errors return immediatelly
159                         return err
160                 }
161                 comperr.CompatibleMessages = append(comperr.CompatibleMessages, getMsgNameWithCrc(msg))
162         }
163         if len(comperr.IncompatibleMessages) == 0 {
164                 return nil
165         }
166         return &comperr
167 }
168
169 func (ch *Channel) SubscribeNotification(notifChan chan api.Message, event api.Message) (api.SubscriptionCtx, error) {
170         msgID, err := ch.msgIdentifier.GetMessageID(event)
171         if err != nil {
172                 log.WithFields(logrus.Fields{
173                         "msg_name": event.GetMessageName(),
174                         "msg_crc":  event.GetCrcString(),
175                 }).Errorf("unable to retrieve message ID: %v", err)
176                 return nil, fmt.Errorf("unable to retrieve event message ID: %v", err)
177         }
178
179         sub := &subscriptionCtx{
180                 ch:         ch,
181                 notifChan:  notifChan,
182                 msgID:      msgID,
183                 event:      event,
184                 msgFactory: getMsgFactory(event),
185         }
186
187         // add the subscription into map
188         ch.conn.subscriptionsLock.Lock()
189         defer ch.conn.subscriptionsLock.Unlock()
190
191         ch.conn.subscriptions[msgID] = append(ch.conn.subscriptions[msgID], sub)
192
193         return sub, nil
194 }
195
196 func (ch *Channel) SetReplyTimeout(timeout time.Duration) {
197         ch.replyTimeout = timeout
198 }
199
200 func (ch *Channel) Close() {
201         close(ch.reqChan)
202 }
203
204 func (req *requestCtx) ReceiveReply(msg api.Message) error {
205         if req == nil || req.ch == nil {
206                 return ErrInvalidRequestCtx
207         }
208
209         lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
210         if err != nil {
211                 return err
212         } else if lastReplyReceived {
213                 return errors.New("multipart reply recieved while a single reply expected")
214         }
215
216         return nil
217 }
218
219 func (req *multiRequestCtx) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) {
220         if req == nil || req.ch == nil {
221                 return false, ErrInvalidRequestCtx
222         }
223
224         return req.ch.receiveReplyInternal(msg, req.seqNum)
225 }
226
227 func (sub *subscriptionCtx) Unsubscribe() error {
228         log.WithFields(logrus.Fields{
229                 "msg_name": sub.event.GetMessageName(),
230                 "msg_id":   sub.msgID,
231         }).Debug("Removing notification subscription.")
232
233         // remove the subscription from the map
234         sub.ch.conn.subscriptionsLock.Lock()
235         defer sub.ch.conn.subscriptionsLock.Unlock()
236
237         for i, item := range sub.ch.conn.subscriptions[sub.msgID] {
238                 if item == sub {
239                         // close notification channel
240                         close(sub.ch.conn.subscriptions[sub.msgID][i].notifChan)
241                         // remove i-th item in the slice
242                         sub.ch.conn.subscriptions[sub.msgID] = append(sub.ch.conn.subscriptions[sub.msgID][:i], sub.ch.conn.subscriptions[sub.msgID][i+1:]...)
243                         return nil
244                 }
245         }
246
247         return fmt.Errorf("subscription for %q not found", sub.event.GetMessageName())
248 }
249
250 // receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
251 func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
252         if msg == nil {
253                 return false, errors.New("nil message passed in")
254         }
255
256         var ignore bool
257
258         if vppReply := ch.delayedReply; vppReply != nil {
259                 // try the delayed reply
260                 ch.delayedReply = nil
261                 ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
262                 if !ignore {
263                         return lastReplyReceived, err
264                 }
265         }
266
267         timer := time.NewTimer(ch.replyTimeout)
268         for {
269                 select {
270                 // blocks until a reply comes to ReplyChan or until timeout expires
271                 case vppReply := <-ch.replyChan:
272                         ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
273                         if ignore {
274                                 log.WithFields(logrus.Fields{
275                                         "expSeqNum": expSeqNum,
276                                         "channel":   ch.id,
277                                 }).Warnf("ignoring received reply: %+v (expecting: %s)", vppReply, msg.GetMessageName())
278                                 continue
279                         }
280                         return lastReplyReceived, err
281
282                 case <-timer.C:
283                         log.WithFields(logrus.Fields{
284                                 "expSeqNum": expSeqNum,
285                                 "channel":   ch.id,
286                         }).Debugf("timeout (%v) waiting for reply: %s", ch.replyTimeout, msg.GetMessageName())
287                         err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
288                         return false, err
289                 }
290         }
291 }
292
293 func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
294         // check the sequence number
295         cmpSeqNums := compareSeqNumbers(reply.seqNum, expSeqNum)
296         if cmpSeqNums == -1 {
297                 // reply received too late, ignore the message
298                 log.WithField("seqNum", reply.seqNum).
299                         Warn("Received reply to an already closed binary API request")
300                 ignore = true
301                 return
302         }
303         if cmpSeqNums == 1 {
304                 ch.delayedReply = reply
305                 err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum)
306                 return
307         }
308
309         if reply.err != nil {
310                 err = reply.err
311                 return
312         }
313         if reply.lastReceived {
314                 lastReplyReceived = true
315                 return
316         }
317
318         // message checks
319         var expMsgID uint16
320         expMsgID, err = ch.msgIdentifier.GetMessageID(msg)
321         if err != nil {
322                 err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
323                         msg.GetMessageName(), msg.GetCrcString())
324                 return
325         }
326
327         if reply.msgID != expMsgID {
328                 var msgNameCrc string
329                 if replyMsg, err := ch.msgIdentifier.LookupByID(reply.msgID); err != nil {
330                         msgNameCrc = err.Error()
331                 } else {
332                         msgNameCrc = getMsgNameWithCrc(replyMsg)
333                 }
334
335                 err = fmt.Errorf("received unexpected message (seqNum=%d), expected %s (ID %d), but got %s (ID %d) "+
336                         "(check if multiple goroutines are not sharing single GoVPP channel)",
337                         reply.seqNum, msg.GetMessageName(), expMsgID, msgNameCrc, reply.msgID)
338                 return
339         }
340
341         // decode the message
342         if err = ch.msgCodec.DecodeMsg(reply.data, msg); err != nil {
343                 return
344         }
345
346         // check Retval and convert it into VnetAPIError error
347         if strings.HasSuffix(msg.GetMessageName(), "_reply") {
348                 // TODO: use categories for messages to avoid checking message name
349                 if f := reflect.Indirect(reflect.ValueOf(msg)).FieldByName("Retval"); f.IsValid() {
350                         retval := int32(f.Int())
351                         err = api.RetvalToVPPApiError(retval)
352                 }
353         }
354
355         return
356 }