1 // Copyright (c) 2021 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
26 "git.fd.io/govpp.git/adapter"
27 "git.fd.io/govpp.git/api"
28 "git.fd.io/govpp.git/core"
33 ------------------------------------------------------------
34 received binapi request while VPP connection is down!
36 - have you called Connect on the binapi RPC ?
37 ------------------------------------------------------------
40 ------------------------------------------------------------
41 received stats request while stats connection is down!
43 - is the correct socket name configured ?
44 - have you called Connect on the stats RPC ?
45 ------------------------------------------------------------
49 type StatsRequest struct {
53 type StatsResponse struct {
54 SysStats *api.SystemStats
55 NodeStats *api.NodeStats
56 IfaceStats *api.InterfaceStats
57 ErrStats *api.ErrorStats
58 BufStats *api.BufferStats
59 MemStats *api.MemoryStats
62 // StatsRPC is a RPC server for proxying client request to api.StatsProvider.
63 type StatsRPC struct {
64 statsConn *core.StatsConnection
65 stats adapter.StatsAPI
68 // non-zero if the RPC service is available
70 // non-zero if connected to stats file.
72 // synchronizes access to statsConn.
76 // NewStatsRPC returns new StatsRPC to be used as RPC server
77 // proxying request to given api.StatsProvider.
78 func NewStatsRPC(stats adapter.StatsAPI) (*StatsRPC, error) {
80 if err := rpc.connect(stats); err != nil {
86 func (s *StatsRPC) watchConnection() {
87 heartbeatTicker := time.NewTicker(10 * time.Second).C
88 atomic.StoreUint32(&s.available, 1)
89 log.Debugln("enabling statsRPC service")
92 prev := new(api.SystemStats)
95 if err := s.statsConn.GetSystemStats(prev); err != nil {
96 atomic.StoreUint32(&s.available, 0)
97 log.Warnf("disabling statsRPC service, reason: %v", err)
103 case <-heartbeatTicker:
104 // If disconnect was called exit.
105 if atomic.LoadUint32(&s.isConnected) == 0 {
106 atomic.StoreUint32(&s.available, 0)
110 curr := new(api.SystemStats)
113 if err := s.statsConn.GetSystemStats(curr); err != nil {
114 atomic.StoreUint32(&s.available, 0)
115 log.Warnf("disabling statsRPC service, reason: %v", err)
119 if curr.Heartbeat <= prev.Heartbeat {
121 // vpp might have crashed/reset... try reconnecting
124 atomic.StoreUint32(&s.available, 0)
125 log.Warnln("disabling statsRPC service, reason: vpp might have crashed/reset...")
126 s.statsConn.Disconnect()
129 s.statsConn, err = core.ConnectStats(s.stats)
131 atomic.StoreUint32(&s.available, 1)
132 log.Debugln("enabling statsRPC service")
135 time.Sleep(5 * time.Second)
149 func (s *StatsRPC) connect(stats adapter.StatsAPI) error {
150 if atomic.LoadUint32(&s.isConnected) == 1 {
151 return errors.New("connection already exists")
155 s.statsConn, err = core.ConnectStats(s.stats)
159 s.done = make(chan struct{})
160 atomic.StoreUint32(&s.isConnected, 1)
162 go s.watchConnection()
166 func (s *StatsRPC) disconnect() {
167 if atomic.LoadUint32(&s.isConnected) == 1 {
168 atomic.StoreUint32(&s.isConnected, 0)
170 s.statsConn.Disconnect()
175 func (s *StatsRPC) serviceAvailable() bool {
176 return atomic.LoadUint32(&s.available) == 1
179 func (s *StatsRPC) GetStats(req StatsRequest, resp *StatsResponse) error {
180 if !s.serviceAvailable() {
181 log.Print(statsErrorMsg)
182 return errors.New("server does not support 'get stats' at this time, try again later")
184 log.Debugf("StatsRPC.GetStats - REQ: %+v", req)
189 switch req.StatsType {
191 resp.SysStats = new(api.SystemStats)
192 return s.statsConn.GetSystemStats(resp.SysStats)
194 resp.NodeStats = new(api.NodeStats)
195 return s.statsConn.GetNodeStats(resp.NodeStats)
197 resp.IfaceStats = new(api.InterfaceStats)
198 return s.statsConn.GetInterfaceStats(resp.IfaceStats)
200 resp.ErrStats = new(api.ErrorStats)
201 return s.statsConn.GetErrorStats(resp.ErrStats)
203 resp.BufStats = new(api.BufferStats)
204 return s.statsConn.GetBufferStats(resp.BufStats)
206 resp.MemStats = new(api.MemoryStats)
207 return s.statsConn.GetMemoryStats(resp.MemStats)
209 return fmt.Errorf("unknown stats type: %s", req.StatsType)
213 type BinapiRequest struct {
217 Timeout time.Duration
220 type BinapiResponse struct {
225 type BinapiCompatibilityRequest struct {
229 type BinapiCompatibilityResponse struct {
230 CompatibleMsgs map[string][]string
231 IncompatibleMsgs map[string][]string
234 // BinapiRPC is a RPC server for proxying client request to api.Channel
236 type BinapiRPC struct {
237 binapiConn *core.Connection
238 binapi adapter.VppAPI
240 streamsLock sync.Mutex
241 // local ID, different from api.Stream ID
243 streams map[uint32]api.Stream
245 events chan core.ConnectionEvent
247 // non-zero if the RPC service is available
249 // non-zero if connected to vpp.
253 // NewBinapiRPC returns new BinapiRPC to be used as RPC server
254 // proxying request to given api.Channel.
255 func NewBinapiRPC(binapi adapter.VppAPI) (*BinapiRPC, error) {
256 rpc := new(BinapiRPC)
257 if err := rpc.connect(binapi); err != nil {
263 func (s *BinapiRPC) watchConnection() {
266 case e := <-s.events:
267 // If disconnect was called exit.
268 if atomic.LoadUint32(&s.isConnected) == 0 {
269 atomic.StoreUint32(&s.available, 0)
275 if !s.serviceAvailable() {
276 atomic.StoreUint32(&s.available, 1)
277 log.Debugln("enabling binapiRPC service")
279 case core.Disconnected:
280 if s.serviceAvailable() {
281 atomic.StoreUint32(&s.available, 0)
282 log.Warnf("disabling binapiRPC, reason: %v\n", e.Error)
285 if s.serviceAvailable() {
286 atomic.StoreUint32(&s.available, 0)
287 log.Warnf("disabling binapiRPC, reason: %v\n", e.Error)
289 // vpp might have crashed/reset... reconnect
290 s.binapiConn.Disconnect()
293 s.binapiConn, s.events, err = core.AsyncConnect(s.binapi, 3, 5*time.Second)
304 func (s *BinapiRPC) connect(binapi adapter.VppAPI) error {
305 if atomic.LoadUint32(&s.isConnected) == 1 {
306 return errors.New("connection already exists")
310 s.binapiConn, s.events, err = core.AsyncConnect(binapi, 3, time.Second)
314 s.done = make(chan struct{})
315 atomic.StoreUint32(&s.isConnected, 1)
317 go s.watchConnection()
321 func (s *BinapiRPC) disconnect() {
322 if atomic.LoadUint32(&s.isConnected) == 1 {
323 atomic.StoreUint32(&s.isConnected, 0)
325 s.binapiConn.Disconnect()
330 func (s *BinapiRPC) serviceAvailable() bool {
331 return atomic.LoadUint32(&s.available) == 1
334 type RPCStreamReqResp struct {
339 func (s *BinapiRPC) NewAPIStream(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
340 if !s.serviceAvailable() {
341 log.Print(binapiErrorMsg)
342 return errors.New("server does not support RPC calls at this time, try again later")
344 log.Debugf("BinapiRPC.NewAPIStream - REQ: %#v", req)
346 stream, err := s.binapiConn.NewStream(context.Background())
351 if s.streams == nil {
352 s.streams = make(map[uint32]api.Stream)
357 s.streams[s.maxStreamID] = stream
358 resp.ID = s.maxStreamID
359 s.streamsLock.Unlock()
364 func (s *BinapiRPC) SendMessage(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
365 if !s.serviceAvailable() {
366 log.Print(binapiErrorMsg)
367 return errors.New("server does not support RPC calls at this time, try again later")
369 log.Debugf("BinapiRPC.SendMessage - REQ: %#v", req)
371 stream, err := s.getStream(req.ID)
376 return stream.SendMsg(req.Msg)
379 func (s *BinapiRPC) ReceiveMessage(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
380 if !s.serviceAvailable() {
381 log.Print(binapiErrorMsg)
382 return errors.New("server does not support RPC calls at this time, try again later")
384 log.Debugf("BinapiRPC.ReceiveMessage - REQ: %#v", req)
386 stream, err := s.getStream(req.ID)
391 resp.Msg, err = stream.RecvMsg()
395 func (s *BinapiRPC) CloseStream(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
396 if !s.serviceAvailable() {
397 log.Print(binapiErrorMsg)
398 return errors.New("server does not support RPC calls at this time, try again later")
400 log.Debugf("BinapiRPC.CloseStream - REQ: %#v", req)
402 stream, err := s.getStream(req.ID)
408 delete(s.streams, req.ID)
409 s.streamsLock.Unlock()
411 return stream.Close()
414 func (s *BinapiRPC) getStream(id uint32) (api.Stream, error) {
416 stream := s.streams[id]
417 s.streamsLock.Unlock()
419 if stream == nil || reflect.ValueOf(stream).IsNil() {
421 // delete the stream in case it is still in the map
422 delete(s.streams, id)
423 s.streamsLock.Unlock()
424 return nil, errors.New("BinapiRPC stream closed")
429 func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error {
430 if !s.serviceAvailable() {
431 log.Print(binapiErrorMsg)
432 return errors.New("server does not support 'invoke' at this time, try again later")
434 log.Debugf("BinapiRPC.Invoke - REQ: %#v", req)
436 ch, err := s.binapiConn.NewAPIChannel()
441 ch.SetReplyTimeout(req.Timeout)
444 multi := ch.SendMultiRequest(req.Msg)
446 // create new message in response of type ReplyMsg
447 msg := reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
449 stop, err := multi.ReceiveReply(msg)
456 resp.Msgs = append(resp.Msgs, msg)
459 // create new message in response of type ReplyMsg
460 resp.Msg = reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
462 err := ch.SendRequest(req.Msg).ReceiveReply(resp.Msg)
471 func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCompatibilityResponse) error {
472 if !s.serviceAvailable() {
473 log.Print(binapiErrorMsg)
474 return errors.New("server does not support 'compatibility check' at this time, try again later")
476 log.Debugf("BinapiRPC.Compatiblity - REQ: %#v", req)
478 ch, err := s.binapiConn.NewAPIChannel()
484 resp.CompatibleMsgs = make(map[string][]string)
485 resp.IncompatibleMsgs = make(map[string][]string)
487 for path, messages := range api.GetRegisteredMessages() {
488 resp.IncompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
489 resp.CompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
491 for _, msg := range req.MsgNameCrcs {
492 val, ok := messages[msg]
494 resp.IncompatibleMsgs[path] = append(resp.IncompatibleMsgs[path], msg)
497 if err = ch.CheckCompatiblity(val); err != nil {
498 resp.IncompatibleMsgs[path] = append(resp.IncompatibleMsgs[path], msg)
500 resp.CompatibleMsgs[path] = append(resp.CompatibleMsgs[path], msg)
506 for path, incompatibleMsgs := range resp.IncompatibleMsgs {
507 if len(incompatibleMsgs) == 0 {
510 log.Debugf("messages are incompatible for path %s", path)
514 return errors.New("compatibility check failed")