Refactor GoVPP
[govpp.git] / core / channel.go
1 // Copyright (c) 2018 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "strings"
22         "time"
23
24         "git.fd.io/govpp.git/api"
25         "github.com/sirupsen/logrus"
26 )
27
28 var (
29         ErrInvalidRequestCtx = errors.New("invalid request context")
30 )
31
32 // requestCtx is a context for request with single reply
33 type requestCtx struct {
34         ch     *channel
35         seqNum uint16
36 }
37
38 // multiRequestCtx is a context for request with multiple responses
39 type multiRequestCtx struct {
40         ch     *channel
41         seqNum uint16
42 }
43
44 func (req *requestCtx) ReceiveReply(msg api.Message) error {
45         if req == nil || req.ch == nil {
46                 return ErrInvalidRequestCtx
47         }
48
49         lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
50         if err != nil {
51                 return err
52         }
53         if lastReplyReceived {
54                 return errors.New("multipart reply recieved while a single reply expected")
55         }
56
57         return nil
58 }
59
60 func (req *multiRequestCtx) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) {
61         if req == nil || req.ch == nil {
62                 return false, ErrInvalidRequestCtx
63         }
64
65         return req.ch.receiveReplyInternal(msg, req.seqNum)
66 }
67
68 // vppRequest is a request that will be sent to VPP.
69 type vppRequest struct {
70         seqNum uint16      // sequence number
71         msg    api.Message // binary API message to be send to VPP
72         multi  bool        // true if multipart response is expected
73 }
74
75 // vppReply is a reply received from VPP.
76 type vppReply struct {
77         seqNum       uint16 // sequence number
78         msgID        uint16 // ID of the message
79         data         []byte // encoded data with the message
80         lastReceived bool   // for multi request, true if the last reply has been already received
81         err          error  // in case of error, data is nil and this member contains error
82 }
83
84 // NotifSubscribeRequest is a request to subscribe for delivery of specific notification messages.
85 type subscriptionRequest struct {
86         sub       *api.NotifSubscription // subscription details
87         subscribe bool                   // true if this is a request to subscribe
88 }
89
90 // channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests
91 // to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels
92 // via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines
93 // concurrently, otherwise the responses could mix! Use multiple channels instead.
94 type channel struct {
95         id uint16 // channel ID
96
97         reqChan   chan *vppRequest // channel for sending the requests to VPP
98         replyChan chan *vppReply   // channel where VPP replies are delivered to
99
100         notifSubsChan      chan *subscriptionRequest // channel for sending notification subscribe requests
101         notifSubsReplyChan chan error                // channel where replies to notification subscribe requests are delivered to
102
103         msgDecoder    api.MessageDecoder    // used to decode binary data to generated API messages
104         msgIdentifier api.MessageIdentifier // used to retrieve message ID of a message
105
106         lastSeqNum uint16 // sequence number of the last sent request
107
108         delayedReply *vppReply     // reply already taken from ReplyChan, buffered for later delivery
109         replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
110 }
111
112 func (ch *channel) GetID() uint16 {
113         return ch.id
114 }
115
116 func (ch *channel) nextSeqNum() uint16 {
117         ch.lastSeqNum++
118         return ch.lastSeqNum
119 }
120
121 func (ch *channel) SendRequest(msg api.Message) api.RequestCtx {
122         req := &vppRequest{
123                 msg:    msg,
124                 seqNum: ch.nextSeqNum(),
125         }
126         ch.reqChan <- req
127         return &requestCtx{ch: ch, seqNum: req.seqNum}
128 }
129
130 func (ch *channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
131         req := &vppRequest{
132                 msg:    msg,
133                 seqNum: ch.nextSeqNum(),
134                 multi:  true,
135         }
136         ch.reqChan <- req
137         return &multiRequestCtx{ch: ch, seqNum: req.seqNum}
138 }
139
140 func (ch *channel) SubscribeNotification(notifChan chan api.Message, msgFactory func() api.Message) (*api.NotifSubscription, error) {
141         sub := &api.NotifSubscription{
142                 NotifChan:  notifChan,
143                 MsgFactory: msgFactory,
144         }
145         // TODO: get rid of notifSubsChan and notfSubsReplyChan,
146         // it's no longer need because we know all message IDs and can store subscription right here
147         ch.notifSubsChan <- &subscriptionRequest{
148                 sub:       sub,
149                 subscribe: true,
150         }
151         return sub, <-ch.notifSubsReplyChan
152 }
153
154 func (ch *channel) UnsubscribeNotification(subscription *api.NotifSubscription) error {
155         ch.notifSubsChan <- &subscriptionRequest{
156                 sub:       subscription,
157                 subscribe: false,
158         }
159         return <-ch.notifSubsReplyChan
160 }
161
162 func (ch *channel) SetReplyTimeout(timeout time.Duration) {
163         ch.replyTimeout = timeout
164 }
165
166 func (ch *channel) Close() {
167         if ch.reqChan != nil {
168                 close(ch.reqChan)
169         }
170 }
171
172 // receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
173 func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
174         var ignore bool
175         if msg == nil {
176                 return false, errors.New("nil message passed in")
177         }
178
179         if vppReply := ch.delayedReply; vppReply != nil {
180                 // try the delayed reply
181                 ch.delayedReply = nil
182                 ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
183                 if !ignore {
184                         return lastReplyReceived, err
185                 }
186         }
187
188         timer := time.NewTimer(ch.replyTimeout)
189         for {
190                 select {
191                 // blocks until a reply comes to ReplyChan or until timeout expires
192                 case vppReply := <-ch.replyChan:
193                         ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
194                         if ignore {
195                                 continue
196                         }
197                         return lastReplyReceived, err
198
199                 case <-timer.C:
200                         err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
201                         return false, err
202                 }
203         }
204         return
205 }
206
207 func (ch *channel) processReply(reply *vppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
208         // check the sequence number
209         cmpSeqNums := compareSeqNumbers(reply.seqNum, expSeqNum)
210         if cmpSeqNums == -1 {
211                 // reply received too late, ignore the message
212                 logrus.WithField("sequence-number", reply.seqNum).Warn(
213                         "Received reply to an already closed binary API request")
214                 ignore = true
215                 return
216         }
217         if cmpSeqNums == 1 {
218                 ch.delayedReply = reply
219                 err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum)
220                 return
221         }
222
223         if reply.err != nil {
224                 err = reply.err
225                 return
226         }
227         if reply.lastReceived {
228                 lastReplyReceived = true
229                 return
230         }
231
232         // message checks
233         var expMsgID uint16
234         expMsgID, err = ch.msgIdentifier.GetMessageID(msg)
235         if err != nil {
236                 err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
237                         msg.GetMessageName(), msg.GetCrcString())
238                 return
239         }
240
241         if reply.msgID != expMsgID {
242                 var msgNameCrc string
243                 if replyMsg, err := ch.msgIdentifier.LookupByID(reply.msgID); err != nil {
244                         msgNameCrc = err.Error()
245                 } else {
246                         msgNameCrc = getMsgNameWithCrc(replyMsg)
247                 }
248
249                 err = fmt.Errorf("received invalid message ID (seqNum=%d), expected %d (%s), but got %d (%s) "+
250                         "(check if multiple goroutines are not sharing single GoVPP channel)",
251                         reply.seqNum, expMsgID, msg.GetMessageName(), reply.msgID, msgNameCrc)
252                 return
253         }
254
255         // decode the message
256         if err = ch.msgDecoder.DecodeMsg(reply.data, msg); err != nil {
257                 return
258         }
259
260         // check Retval and convert it into VnetAPIError error
261         if strings.HasSuffix(msg.GetMessageName(), "_reply") {
262                 // TODO: use categories for messages to avoid checking message name
263                 if f := reflect.Indirect(reflect.ValueOf(msg)).FieldByName("Retval"); f.IsValid() {
264                         if retval := f.Int(); retval != 0 {
265                                 err = api.VPPApiError(retval)
266                         }
267                 }
268         }
269
270         return
271 }