5b513e3beb72838376c6ba942202a6523b95a5c9
[govpp.git] / core / channel.go
1 // Copyright (c) 2018 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "strings"
22         "time"
23
24         "github.com/sirupsen/logrus"
25
26         "git.fd.io/govpp.git/api"
27 )
28
29 var (
30         ErrInvalidRequestCtx = errors.New("invalid request context")
31 )
32
33 // MessageCodec provides functionality for decoding binary data to generated API messages.
34 type MessageCodec interface {
35         // EncodeMsg encodes message into binary data.
36         EncodeMsg(msg api.Message, msgID uint16) ([]byte, error)
37         // DecodeMsg decodes binary-encoded data of a message into provided Message structure.
38         DecodeMsg(data []byte, msg api.Message) error
39 }
40
41 // MessageIdentifier provides identification of generated API messages.
42 type MessageIdentifier interface {
43         // GetMessageID returns message identifier of given API message.
44         GetMessageID(msg api.Message) (uint16, error)
45         // LookupByID looks up message name and crc by ID
46         LookupByID(msgID uint16) (api.Message, error)
47 }
48
49 // vppRequest is a request that will be sent to VPP.
50 type vppRequest struct {
51         seqNum uint16      // sequence number
52         msg    api.Message // binary API message to be send to VPP
53         multi  bool        // true if multipart response is expected
54 }
55
56 // vppReply is a reply received from VPP.
57 type vppReply struct {
58         seqNum       uint16 // sequence number
59         msgID        uint16 // ID of the message
60         data         []byte // encoded data with the message
61         lastReceived bool   // for multi request, true if the last reply has been already received
62         err          error  // in case of error, data is nil and this member contains error
63 }
64
65 // requestCtx is a context for request with single reply
66 type requestCtx struct {
67         ch     *Channel
68         seqNum uint16
69 }
70
71 // multiRequestCtx is a context for request with multiple responses
72 type multiRequestCtx struct {
73         ch     *Channel
74         seqNum uint16
75 }
76
77 // subscriptionCtx is a context of subscription for delivery of specific notification messages.
78 type subscriptionCtx struct {
79         ch         *Channel
80         notifChan  chan api.Message   // channel where notification messages will be delivered to
81         msgID      uint16             // message ID for the subscribed event message
82         event      api.Message        // event message that this subscription is for
83         msgFactory func() api.Message // function that returns a new instance of the specific message that is expected as a notification
84 }
85
86 // channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests
87 // to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels
88 // via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines
89 // concurrently, otherwise the responses could mix! Use multiple channels instead.
90 type Channel struct {
91         id   uint16
92         conn *Connection
93
94         reqChan   chan *vppRequest // channel for sending the requests to VPP
95         replyChan chan *vppReply   // channel where VPP replies are delivered to
96
97         msgCodec      MessageCodec      // used to decode binary data to generated API messages
98         msgIdentifier MessageIdentifier // used to retrieve message ID of a message
99
100         lastSeqNum uint16 // sequence number of the last sent request
101
102         delayedReply *vppReply     // reply already taken from ReplyChan, buffered for later delivery
103         replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
104 }
105
106 func newChannel(id uint16, conn *Connection, codec MessageCodec, identifier MessageIdentifier, reqSize, replySize int) *Channel {
107         return &Channel{
108                 id:            id,
109                 conn:          conn,
110                 msgCodec:      codec,
111                 msgIdentifier: identifier,
112                 reqChan:       make(chan *vppRequest, reqSize),
113                 replyChan:     make(chan *vppReply, replySize),
114                 replyTimeout:  DefaultReplyTimeout,
115         }
116 }
117
118 func (ch *Channel) GetID() uint16 {
119         return ch.id
120 }
121
122 func (ch *Channel) nextSeqNum() uint16 {
123         ch.lastSeqNum++
124         return ch.lastSeqNum
125 }
126
127 func (ch *Channel) SendRequest(msg api.Message) api.RequestCtx {
128         seqNum := ch.nextSeqNum()
129         ch.reqChan <- &vppRequest{
130                 msg:    msg,
131                 seqNum: seqNum,
132         }
133         return &requestCtx{ch: ch, seqNum: seqNum}
134 }
135
136 func (ch *Channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
137         seqNum := ch.nextSeqNum()
138         ch.reqChan <- &vppRequest{
139                 msg:    msg,
140                 seqNum: seqNum,
141                 multi:  true,
142         }
143         return &multiRequestCtx{ch: ch, seqNum: seqNum}
144 }
145
146 func (ch *Channel) CheckCompatiblity(msgs ...api.Message) error {
147         for _, msg := range msgs {
148                 // TODO: collect all incompatible messages and return summarized error
149                 _, err := ch.msgIdentifier.GetMessageID(msg)
150                 if err != nil {
151                         return err
152                 }
153         }
154         return nil
155 }
156
157 func (ch *Channel) SubscribeNotification(notifChan chan api.Message, event api.Message) (api.SubscriptionCtx, error) {
158         msgID, err := ch.msgIdentifier.GetMessageID(event)
159         if err != nil {
160                 log.WithFields(logrus.Fields{
161                         "msg_name": event.GetMessageName(),
162                         "msg_crc":  event.GetCrcString(),
163                 }).Errorf("unable to retrieve message ID: %v", err)
164                 return nil, fmt.Errorf("unable to retrieve event message ID: %v", err)
165         }
166
167         sub := &subscriptionCtx{
168                 ch:         ch,
169                 notifChan:  notifChan,
170                 msgID:      msgID,
171                 event:      event,
172                 msgFactory: getMsgFactory(event),
173         }
174
175         // add the subscription into map
176         ch.conn.subscriptionsLock.Lock()
177         defer ch.conn.subscriptionsLock.Unlock()
178
179         ch.conn.subscriptions[msgID] = append(ch.conn.subscriptions[msgID], sub)
180
181         return sub, nil
182 }
183
184 func (ch *Channel) SetReplyTimeout(timeout time.Duration) {
185         ch.replyTimeout = timeout
186 }
187
188 func (ch *Channel) Close() {
189         close(ch.reqChan)
190 }
191
192 func (req *requestCtx) ReceiveReply(msg api.Message) error {
193         if req == nil || req.ch == nil {
194                 return ErrInvalidRequestCtx
195         }
196
197         lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
198         if err != nil {
199                 return err
200         } else if lastReplyReceived {
201                 return errors.New("multipart reply recieved while a single reply expected")
202         }
203
204         return nil
205 }
206
207 func (req *multiRequestCtx) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) {
208         if req == nil || req.ch == nil {
209                 return false, ErrInvalidRequestCtx
210         }
211
212         return req.ch.receiveReplyInternal(msg, req.seqNum)
213 }
214
215 func (sub *subscriptionCtx) Unsubscribe() error {
216         log.WithFields(logrus.Fields{
217                 "msg_name": sub.event.GetMessageName(),
218                 "msg_id":   sub.msgID,
219         }).Debug("Removing notification subscription.")
220
221         // remove the subscription from the map
222         sub.ch.conn.subscriptionsLock.Lock()
223         defer sub.ch.conn.subscriptionsLock.Unlock()
224
225         for i, item := range sub.ch.conn.subscriptions[sub.msgID] {
226                 if item == sub {
227                         // remove i-th item in the slice
228                         sub.ch.conn.subscriptions[sub.msgID] = append(sub.ch.conn.subscriptions[sub.msgID][:i], sub.ch.conn.subscriptions[sub.msgID][i+1:]...)
229                         return nil
230                 }
231         }
232
233         return fmt.Errorf("subscription for %q not found", sub.event.GetMessageName())
234 }
235
236 // receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
237 func (ch *Channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
238         if msg == nil {
239                 return false, errors.New("nil message passed in")
240         }
241
242         var ignore bool
243
244         if vppReply := ch.delayedReply; vppReply != nil {
245                 // try the delayed reply
246                 ch.delayedReply = nil
247                 ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
248                 if !ignore {
249                         return lastReplyReceived, err
250                 }
251         }
252
253         timer := time.NewTimer(ch.replyTimeout)
254         for {
255                 select {
256                 // blocks until a reply comes to ReplyChan or until timeout expires
257                 case vppReply := <-ch.replyChan:
258                         ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
259                         if ignore {
260                                 log.WithFields(logrus.Fields{
261                                         "expSeqNum": expSeqNum,
262                                         "channel":   ch.id,
263                                 }).Warnf("ignoring received reply: %+v (expecting: %s)", vppReply, msg.GetMessageName())
264                                 continue
265                         }
266                         return lastReplyReceived, err
267
268                 case <-timer.C:
269                         log.WithFields(logrus.Fields{
270                                 "expSeqNum": expSeqNum,
271                                 "channel":   ch.id,
272                         }).Debugf("timeout (%v) waiting for reply: %s", ch.replyTimeout, msg.GetMessageName())
273                         err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
274                         return false, err
275                 }
276         }
277 }
278
279 func (ch *Channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
280         // check the sequence number
281         cmpSeqNums := compareSeqNumbers(reply.seqNum, expSeqNum)
282         if cmpSeqNums == -1 {
283                 // reply received too late, ignore the message
284                 log.WithField("seqNum", reply.seqNum).
285                         Warn("Received reply to an already closed binary API request")
286                 ignore = true
287                 return
288         }
289         if cmpSeqNums == 1 {
290                 ch.delayedReply = reply
291                 err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum)
292                 return
293         }
294
295         if reply.err != nil {
296                 err = reply.err
297                 return
298         }
299         if reply.lastReceived {
300                 lastReplyReceived = true
301                 return
302         }
303
304         // message checks
305         var expMsgID uint16
306         expMsgID, err = ch.msgIdentifier.GetMessageID(msg)
307         if err != nil {
308                 err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
309                         msg.GetMessageName(), msg.GetCrcString())
310                 return
311         }
312
313         if reply.msgID != expMsgID {
314                 var msgNameCrc string
315                 if replyMsg, err := ch.msgIdentifier.LookupByID(reply.msgID); err != nil {
316                         msgNameCrc = err.Error()
317                 } else {
318                         msgNameCrc = getMsgNameWithCrc(replyMsg)
319                 }
320
321                 err = fmt.Errorf("received invalid message ID (seqNum=%d), expected %d (%s), but got %d (%s) "+
322                         "(check if multiple goroutines are not sharing single GoVPP channel)",
323                         reply.seqNum, expMsgID, msg.GetMessageName(), reply.msgID, msgNameCrc)
324                 return
325         }
326
327         // decode the message
328         if err = ch.msgCodec.DecodeMsg(reply.data, msg); err != nil {
329                 return
330         }
331
332         // check Retval and convert it into VnetAPIError error
333         if strings.HasSuffix(msg.GetMessageName(), "_reply") {
334                 // TODO: use categories for messages to avoid checking message name
335                 if f := reflect.Indirect(reflect.ValueOf(msg)).FieldByName("Retval"); f.IsValid() {
336                         retval := int32(f.Int())
337                         err = api.RetvalToVPPApiError(retval)
338                 }
339         }
340
341         return
342 }