make api.Channel as interface
[govpp.git] / core / channel.go
1 // Copyright (c) 2018 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "fmt"
19         "time"
20
21         "errors"
22
23         "git.fd.io/govpp.git/api"
24         "github.com/sirupsen/logrus"
25 )
26
27 const defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
28
29 // requestCtxData is a context of a ongoing request (simple one - only one response is expected).
30 type requestCtxData struct {
31         ch     *channel
32         seqNum uint16
33 }
34
35 // multiRequestCtxData is a context of a ongoing multipart request (multiple responses are expected).
36 type multiRequestCtxData struct {
37         ch     *channel
38         seqNum uint16
39 }
40
41 func (req *requestCtxData) ReceiveReply(msg api.Message) error {
42         if req == nil || req.ch == nil {
43                 return errors.New("invalid request context")
44         }
45
46         lastReplyReceived, err := req.ch.receiveReplyInternal(msg, req.seqNum)
47
48         if lastReplyReceived {
49                 err = errors.New("multipart reply recieved while a simple reply expected")
50         }
51         return err
52 }
53
54 func (req *multiRequestCtxData) ReceiveReply(msg api.Message) (lastReplyReceived bool, err error) {
55         if req == nil || req.ch == nil {
56                 return false, errors.New("invalid request context")
57         }
58
59         return req.ch.receiveReplyInternal(msg, req.seqNum)
60 }
61
62 // channel is the main communication interface with govpp core. It contains four Go channels, one for sending the requests
63 // to VPP, one for receiving the replies from it and the same set for notifications. The user can access the Go channels
64 // via methods provided by Channel interface in this package. Do not use the same channel from multiple goroutines
65 // concurrently, otherwise the responses could mix! Use multiple channels instead.
66 type channel struct {
67         id uint16 // channel ID
68
69         reqChan   chan *api.VppRequest // channel for sending the requests to VPP, closing this channel releases all resources in the ChannelProvider
70         replyChan chan *api.VppReply   // channel where VPP replies are delivered to
71
72         notifSubsChan      chan *api.NotifSubscribeRequest // channel for sending notification subscribe requests
73         notifSubsReplyChan chan error                      // channel where replies to notification subscribe requests are delivered to
74
75         msgDecoder    api.MessageDecoder    // used to decode binary data to generated API messages
76         msgIdentifier api.MessageIdentifier // used to retrieve message ID of a message
77
78         lastSeqNum uint16 // sequence number of the last sent request
79
80         delayedReply *api.VppReply // reply already taken from ReplyChan, buffered for later delivery
81         replyTimeout time.Duration // maximum time that the API waits for a reply from VPP before returning an error, can be set with SetReplyTimeout
82 }
83
84 func (ch *channel) SendRequest(msg api.Message) api.RequestCtx {
85         ch.lastSeqNum++
86         ch.reqChan <- &api.VppRequest{
87                 Message: msg,
88                 SeqNum:  ch.lastSeqNum,
89         }
90         return &requestCtxData{ch: ch, seqNum: ch.lastSeqNum}
91 }
92
93 func (ch *channel) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
94         ch.lastSeqNum++
95         ch.reqChan <- &api.VppRequest{
96                 Message:   msg,
97                 Multipart: true,
98                 SeqNum:    ch.lastSeqNum,
99         }
100         return &multiRequestCtxData{ch: ch, seqNum: ch.lastSeqNum}
101 }
102
103 func (ch *channel) SubscribeNotification(notifChan chan api.Message, msgFactory func() api.Message) (*api.NotifSubscription, error) {
104         subscription := &api.NotifSubscription{
105                 NotifChan:  notifChan,
106                 MsgFactory: msgFactory,
107         }
108         ch.notifSubsChan <- &api.NotifSubscribeRequest{
109                 Subscription: subscription,
110                 Subscribe:    true,
111         }
112         return subscription, <-ch.notifSubsReplyChan
113 }
114
115 func (ch *channel) UnsubscribeNotification(subscription *api.NotifSubscription) error {
116         ch.notifSubsChan <- &api.NotifSubscribeRequest{
117                 Subscription: subscription,
118                 Subscribe:    false,
119         }
120         return <-ch.notifSubsReplyChan
121 }
122
123 func (ch *channel) CheckMessageCompatibility(messages ...api.Message) error {
124         for _, msg := range messages {
125                 _, err := ch.msgIdentifier.GetMessageID(msg)
126                 if err != nil {
127                         return fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
128                                 msg.GetMessageName(), msg.GetCrcString())
129                 }
130         }
131         return nil
132 }
133
134 func (ch *channel) SetReplyTimeout(timeout time.Duration) {
135         ch.replyTimeout = timeout
136 }
137
138 func (ch *channel) GetRequestChannel() chan<- *api.VppRequest {
139         return ch.reqChan
140 }
141
142 func (ch *channel) GetReplyChannel() <-chan *api.VppReply {
143         return ch.replyChan
144 }
145
146 func (ch *channel) GetNotificationChannel() chan<- *api.NotifSubscribeRequest {
147         return ch.notifSubsChan
148 }
149
150 func (ch *channel) GetNotificationReplyChannel() <-chan error {
151         return ch.notifSubsReplyChan
152 }
153
154 func (ch *channel) GetMessageDecoder() api.MessageDecoder {
155         return ch.msgDecoder
156 }
157
158 func (ch *channel) GetID() uint16 {
159         return ch.id
160 }
161
162 func (ch *channel) Close() {
163         if ch.reqChan != nil {
164                 close(ch.reqChan)
165         }
166 }
167
168 // receiveReplyInternal receives a reply from the reply channel into the provided msg structure.
169 func (ch *channel) receiveReplyInternal(msg api.Message, expSeqNum uint16) (lastReplyReceived bool, err error) {
170         var ignore bool
171         if msg == nil {
172                 return false, errors.New("nil message passed in")
173         }
174
175         if ch.delayedReply != nil {
176                 // try the delayed reply
177                 vppReply := ch.delayedReply
178                 ch.delayedReply = nil
179                 ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
180                 if !ignore {
181                         return lastReplyReceived, err
182                 }
183         }
184
185         timer := time.NewTimer(ch.replyTimeout)
186         for {
187                 select {
188                 // blocks until a reply comes to ReplyChan or until timeout expires
189                 case vppReply := <-ch.replyChan:
190                         ignore, lastReplyReceived, err = ch.processReply(vppReply, expSeqNum, msg)
191                         if ignore {
192                                 continue
193                         }
194                         return lastReplyReceived, err
195
196                 case <-timer.C:
197                         err = fmt.Errorf("no reply received within the timeout period %s", ch.replyTimeout)
198                         return false, err
199                 }
200         }
201         return
202 }
203
204 func (ch *channel) processReply(reply *api.VppReply, expSeqNum uint16, msg api.Message) (ignore bool, lastReplyReceived bool, err error) {
205         // check the sequence number
206         cmpSeqNums := compareSeqNumbers(reply.SeqNum, expSeqNum)
207         if cmpSeqNums == -1 {
208                 // reply received too late, ignore the message
209                 logrus.WithField("sequence-number", reply.SeqNum).Warn(
210                         "Received reply to an already closed binary API request")
211                 ignore = true
212                 return
213         }
214         if cmpSeqNums == 1 {
215                 ch.delayedReply = reply
216                 err = fmt.Errorf("missing binary API reply with sequence number: %d", expSeqNum)
217                 return
218         }
219
220         if reply.Error != nil {
221                 err = reply.Error
222                 return
223         }
224         if reply.LastReplyReceived {
225                 lastReplyReceived = true
226                 return
227         }
228
229         // message checks
230         var expMsgID uint16
231         expMsgID, err = ch.msgIdentifier.GetMessageID(msg)
232         if err != nil {
233                 err = fmt.Errorf("message %s with CRC %s is not compatible with the VPP we are connected to",
234                         msg.GetMessageName(), msg.GetCrcString())
235                 return
236         }
237
238         if reply.MessageID != expMsgID {
239                 var msgNameCrc string
240                 if nameCrc, err := ch.msgIdentifier.LookupByID(reply.MessageID); err != nil {
241                         msgNameCrc = err.Error()
242                 } else {
243                         msgNameCrc = nameCrc
244                 }
245
246                 err = fmt.Errorf("received invalid message ID (seq-num=%d), expected %d (%s), but got %d (%s) "+
247                         "(check if multiple goroutines are not sharing single GoVPP channel)",
248                         reply.SeqNum, expMsgID, msg.GetMessageName(), reply.MessageID, msgNameCrc)
249                 return
250         }
251
252         // decode the message
253         err = ch.msgDecoder.DecodeMsg(reply.Data, msg)
254         return
255 }
256
257 // compareSeqNumbers returns -1, 0, 1 if sequence number <seqNum1> precedes, equals to,
258 // or succeeds seq. number <seqNum2>.
259 // Since sequence numbers cycle in the finite set of size 2^16, the function
260 // must assume that the distance between compared sequence numbers is less than
261 // (2^16)/2 to determine the order.
262 func compareSeqNumbers(seqNum1, seqNum2 uint16) int {
263         // calculate distance from seqNum1 to seqNum2
264         var dist uint16
265         if seqNum1 <= seqNum2 {
266                 dist = seqNum2 - seqNum1
267         } else {
268                 dist = 0xffff - (seqNum1 - seqNum2 - 1)
269         }
270         if dist == 0 {
271                 return 0
272         } else if dist <= 0x8000 {
273                 return -1
274         }
275         return 1
276 }