1 // Copyright (c) 2019 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
25 "git.fd.io/govpp.git/adapter"
26 "git.fd.io/govpp.git/api"
27 "git.fd.io/govpp.git/core"
32 ------------------------------------------------------------
33 received binapi request while VPP connection is down!
35 - have you called Connect on the binapi RPC ?
36 ------------------------------------------------------------
39 ------------------------------------------------------------
40 received stats request while stats connection is down!
42 - is the correct socket name configured ?
43 - have you called Connect on the stats RPC ?
44 ------------------------------------------------------------
48 type StatsRequest struct {
52 type StatsResponse struct {
53 SysStats *api.SystemStats
54 NodeStats *api.NodeStats
55 IfaceStats *api.InterfaceStats
56 ErrStats *api.ErrorStats
57 BufStats *api.BufferStats
58 MemStats *api.MemoryStats
61 // StatsRPC is a RPC server for proxying client request to api.StatsProvider.
62 type StatsRPC struct {
63 statsConn *core.StatsConnection
64 stats adapter.StatsAPI
67 // non-zero if the RPC service is available
69 // non-zero if connected to stats file.
71 // synchronizes access to statsConn.
75 // NewStatsRPC returns new StatsRPC to be used as RPC server
76 // proxying request to given api.StatsProvider.
77 func NewStatsRPC(stats adapter.StatsAPI) (*StatsRPC, error) {
79 if err := rpc.connect(stats); err != nil {
85 func (s *StatsRPC) watchConnection() {
86 heartbeatTicker := time.NewTicker(10 * time.Second).C
87 atomic.StoreUint32(&s.available, 1)
88 log.Debugln("enabling statsRPC service")
91 prev := new(api.SystemStats)
94 if err := s.statsConn.GetSystemStats(prev); err != nil {
95 atomic.StoreUint32(&s.available, 0)
96 log.Warnf("disabling statsRPC service, reason: %v", err)
102 case <-heartbeatTicker:
103 // If disconnect was called exit.
104 if atomic.LoadUint32(&s.isConnected) == 0 {
105 atomic.StoreUint32(&s.available, 0)
109 curr := new(api.SystemStats)
112 if err := s.statsConn.GetSystemStats(curr); err != nil {
113 atomic.StoreUint32(&s.available, 0)
114 log.Warnf("disabling statsRPC service, reason: %v", err)
118 if curr.Heartbeat <= prev.Heartbeat {
120 // vpp might have crashed/reset... try reconnecting
123 atomic.StoreUint32(&s.available, 0)
124 log.Warnln("disabling statsRPC service, reason: vpp might have crashed/reset...")
125 s.statsConn.Disconnect()
128 s.statsConn, err = core.ConnectStats(s.stats)
130 atomic.StoreUint32(&s.available, 1)
131 log.Debugln("enabling statsRPC service")
134 time.Sleep(5 * time.Second)
148 func (s *StatsRPC) connect(stats adapter.StatsAPI) error {
149 if atomic.LoadUint32(&s.isConnected) == 1 {
150 return errors.New("connection already exists")
154 s.statsConn, err = core.ConnectStats(s.stats)
158 s.done = make(chan struct{})
159 atomic.StoreUint32(&s.isConnected, 1)
161 go s.watchConnection()
165 func (s *StatsRPC) disconnect() {
166 if atomic.LoadUint32(&s.isConnected) == 1 {
167 atomic.StoreUint32(&s.isConnected, 0)
169 s.statsConn.Disconnect()
174 func (s *StatsRPC) serviceAvailable() bool {
175 return atomic.LoadUint32(&s.available) == 1
178 func (s *StatsRPC) GetStats(req StatsRequest, resp *StatsResponse) error {
179 if !s.serviceAvailable() {
180 log.Println(statsErrorMsg)
181 return errors.New("server does not support 'get stats' at this time, try again later")
183 log.Debugf("StatsRPC.GetStats - REQ: %+v", req)
188 switch req.StatsType {
190 resp.SysStats = new(api.SystemStats)
191 return s.statsConn.GetSystemStats(resp.SysStats)
193 resp.NodeStats = new(api.NodeStats)
194 return s.statsConn.GetNodeStats(resp.NodeStats)
196 resp.IfaceStats = new(api.InterfaceStats)
197 return s.statsConn.GetInterfaceStats(resp.IfaceStats)
199 resp.ErrStats = new(api.ErrorStats)
200 return s.statsConn.GetErrorStats(resp.ErrStats)
202 resp.BufStats = new(api.BufferStats)
203 return s.statsConn.GetBufferStats(resp.BufStats)
205 resp.MemStats = new(api.MemoryStats)
206 return s.statsConn.GetMemoryStats(resp.MemStats)
208 return fmt.Errorf("unknown stats type: %s", req.StatsType)
212 type BinapiRequest struct {
216 Timeout time.Duration
219 type BinapiResponse struct {
224 type BinapiCompatibilityRequest struct {
228 type BinapiCompatibilityResponse struct {
229 CompatibleMsgs map[string][]string
230 IncompatibleMsgs map[string][]string
233 // BinapiRPC is a RPC server for proxying client request to api.Channel.
234 type BinapiRPC struct {
235 binapiConn *core.Connection
236 binapi adapter.VppAPI
238 events chan core.ConnectionEvent
240 // non-zero if the RPC service is available
242 // non-zero if connected to vpp.
246 // NewBinapiRPC returns new BinapiRPC to be used as RPC server
247 // proxying request to given api.Channel.
248 func NewBinapiRPC(binapi adapter.VppAPI) (*BinapiRPC, error) {
249 rpc := new(BinapiRPC)
250 if err := rpc.connect(binapi); err != nil {
256 func (s *BinapiRPC) watchConnection() {
259 case e := <-s.events:
260 // If disconnect was called exit.
261 if atomic.LoadUint32(&s.isConnected) == 0 {
262 atomic.StoreUint32(&s.available, 0)
268 if !s.serviceAvailable() {
269 atomic.StoreUint32(&s.available, 1)
270 log.Debugln("enabling binapiRPC service")
272 case core.Disconnected:
273 if s.serviceAvailable() {
274 atomic.StoreUint32(&s.available, 0)
275 log.Warnf("disabling binapiRPC, reason: %v\n", e.Error)
278 if s.serviceAvailable() {
279 atomic.StoreUint32(&s.available, 0)
280 log.Warnf("disabling binapiRPC, reason: %v\n", e.Error)
282 // vpp might have crashed/reset... reconnect
283 s.binapiConn.Disconnect()
286 s.binapiConn, s.events, err = core.AsyncConnect(s.binapi, 3, 5*time.Second)
297 func (s *BinapiRPC) connect(binapi adapter.VppAPI) error {
298 if atomic.LoadUint32(&s.isConnected) == 1 {
299 return errors.New("connection already exists")
303 s.binapiConn, s.events, err = core.AsyncConnect(binapi, 3, time.Second)
307 s.done = make(chan struct{})
308 atomic.StoreUint32(&s.isConnected, 1)
310 go s.watchConnection()
314 func (s *BinapiRPC) disconnect() {
315 if atomic.LoadUint32(&s.isConnected) == 1 {
316 atomic.StoreUint32(&s.isConnected, 0)
318 s.binapiConn.Disconnect()
323 func (s *BinapiRPC) serviceAvailable() bool {
324 return atomic.LoadUint32(&s.available) == 1
327 func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error {
328 if !s.serviceAvailable() {
329 log.Println(binapiErrorMsg)
330 return errors.New("server does not support 'invoke' at this time, try again later")
332 log.Debugf("BinapiRPC.Invoke - REQ: %#v", req)
334 ch, err := s.binapiConn.NewAPIChannel()
339 ch.SetReplyTimeout(req.Timeout)
342 multi := ch.SendMultiRequest(req.Msg)
344 // create new message in response of type ReplyMsg
345 msg := reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
347 stop, err := multi.ReceiveReply(msg)
354 resp.Msgs = append(resp.Msgs, msg)
357 // create new message in response of type ReplyMsg
358 resp.Msg = reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
360 err := ch.SendRequest(req.Msg).ReceiveReply(resp.Msg)
369 func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCompatibilityResponse) error {
370 if !s.serviceAvailable() {
371 log.Println(binapiErrorMsg)
372 return errors.New("server does not support 'compatibility check' at this time, try again later")
374 log.Debugf("BinapiRPC.Compatiblity - REQ: %#v", req)
376 ch, err := s.binapiConn.NewAPIChannel()
382 resp.CompatibleMsgs = make(map[string][]string)
383 resp.IncompatibleMsgs = make(map[string][]string)
385 for path, messages := range api.GetRegisteredMessages() {
386 if resp.IncompatibleMsgs[path] == nil {
387 resp.IncompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
389 if resp.CompatibleMsgs[path] == nil {
390 resp.CompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
392 for _, msg := range req.MsgNameCrcs {
393 val, ok := messages[msg]
395 resp.IncompatibleMsgs[path] = append(resp.IncompatibleMsgs[path], msg)
398 if err = ch.CheckCompatiblity(val); err != nil {
399 resp.IncompatibleMsgs[path] = append(resp.IncompatibleMsgs[path], msg)
401 resp.CompatibleMsgs[path] = append(resp.CompatibleMsgs[path], msg)
405 for _, messages := range resp.IncompatibleMsgs {
406 if len(messages) > 0 {
407 return fmt.Errorf("compatibility check failed for messages: %v", resp.IncompatibleMsgs)