1 // Copyright (c) 2019 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
29 "github.com/fsnotify/fsnotify"
30 "github.com/lunixbochs/struc"
31 logger "github.com/sirupsen/logrus"
33 "git.fd.io/govpp.git/adapter"
34 "git.fd.io/govpp.git/codec"
38 // DefaultSocketName is default VPP API socket file path.
39 DefaultSocketName = adapter.DefaultBinapiSocket
40 legacySocketName = "/run/vpp-api.sock"
44 // DefaultConnectTimeout is default timeout for connecting
45 DefaultConnectTimeout = time.Second * 3
46 // DefaultDisconnectTimeout is default timeout for discconnecting
47 DefaultDisconnectTimeout = time.Millisecond * 100
48 // MaxWaitReady defines maximum duration before waiting for socket file
50 MaxWaitReady = time.Second * 10
51 // ClientName is used for identifying client in socket registration
52 ClientName = "govppsock"
56 // Debug is global variable that determines debug mode
57 Debug = os.Getenv("DEBUG_GOVPP_SOCK") != ""
58 // DebugMsgIds is global variable that determines debug mode for msg ids
59 DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != ""
61 // Log is global logger
65 // init initializes global logger, which logs debug level messages to stdout.
69 Log.Level = logger.DebugLevel
70 Log.Debug("govpp/socketclient: enabled debug mode")
74 const socketMissing = `
75 ------------------------------------------------------------
76 No socket file found at: %s
77 VPP binary API socket file is missing!
79 - is VPP running with socket for binapi enabled?
80 - is the correct socket name configured?
82 To enable it add following section to your VPP config:
86 ------------------------------------------------------------
89 var warnOnce sync.Once
91 func (c *vppClient) printMissingSocketMsg() {
92 fmt.Fprintf(os.Stderr, socketMissing, c.sockAddr)
95 type vppClient struct {
102 connectTimeout time.Duration
103 disconnectTimeout time.Duration
105 cb adapter.MsgCallback
107 msgTable map[string]uint16
115 func NewVppClient(sockAddr string) *vppClient {
117 sockAddr = DefaultSocketName
121 connectTimeout: DefaultConnectTimeout,
122 disconnectTimeout: DefaultDisconnectTimeout,
123 cb: func(msgID uint16, data []byte) {
124 Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
129 // SetConnectTimeout sets timeout used during connecting.
130 func (c *vppClient) SetConnectTimeout(t time.Duration) {
134 // SetDisconnectTimeout sets timeout used during disconnecting.
135 func (c *vppClient) SetDisconnectTimeout(t time.Duration) {
136 c.disconnectTimeout = t
139 func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
140 Log.Debug("SetMsgCallback")
144 func (c *vppClient) checkLegacySocket() bool {
145 if c.sockAddr == legacySocketName {
148 Log.Debugf("checking legacy socket: %s", legacySocketName)
149 // check if socket exists
150 if _, err := os.Stat(c.sockAddr); err == nil {
151 return false // socket exists
152 } else if !os.IsNotExist(err) {
153 return false // some other error occurred
155 // check if legacy socket exists
156 if _, err := os.Stat(legacySocketName); err == nil {
157 // legacy socket exists, update sockAddr
158 c.sockAddr = legacySocketName
161 // no socket socket found
165 // WaitReady checks socket file existence and waits for it if necessary
166 func (c *vppClient) WaitReady() error {
167 // check if socket already exists
168 if _, err := os.Stat(c.sockAddr); err == nil {
169 return nil // socket exists, we are ready
170 } else if !os.IsNotExist(err) {
171 return err // some other error occurred
174 if c.checkLegacySocket() {
178 // socket does not exist, watch for it
179 watcher, err := fsnotify.NewWatcher()
184 if err := watcher.Close(); err != nil {
185 Log.Warnf("failed to close file watcher: %v", err)
189 // start directory watcher
190 if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
194 timeout := time.NewTimer(MaxWaitReady)
198 if c.checkLegacySocket() {
201 return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.sockAddr)
203 case e := <-watcher.Errors:
206 case ev := <-watcher.Events:
207 Log.Debugf("watcher event: %+v", ev)
208 if ev.Name == c.sockAddr && (ev.Op&fsnotify.Create) == fsnotify.Create {
209 // socket created, we are ready
216 func (c *vppClient) Connect() error {
217 c.checkLegacySocket()
219 // check if socket exists
220 if _, err := os.Stat(c.sockAddr); os.IsNotExist(err) {
221 warnOnce.Do(c.printMissingSocketMsg)
222 return fmt.Errorf("VPP API socket file %s does not exist", c.sockAddr)
223 } else if err != nil {
224 return fmt.Errorf("VPP API socket error: %v", err)
227 if err := c.connect(c.sockAddr); err != nil {
231 if err := c.open(); err != nil {
236 c.quit = make(chan struct{})
243 func (c *vppClient) Disconnect() error {
247 Log.Debugf("Disconnecting..")
251 if err := c.conn.CloseRead(); err != nil {
252 Log.Debugf("closing read failed: %v", err)
255 // wait for readerLoop to return
258 if err := c.close(); err != nil {
259 Log.Debugf("closing failed: %v", err)
262 if err := c.disconnect(); err != nil {
269 func (c *vppClient) connect(sockAddr string) error {
270 addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
272 Log.Debugf("Connecting to: %v", c.sockAddr)
274 conn, err := net.DialUnix("unix", nil, addr)
276 // we try different type of socket for backwards compatbility with VPP<=19.04
277 if strings.Contains(err.Error(), "wrong type for socket") {
278 addr.Net = "unixpacket"
279 Log.Debugf("%s, retrying connect with type unixpacket", err)
280 conn, err = net.DialUnix("unixpacket", nil, addr)
283 Log.Debugf("Connecting to socket %s failed: %s", addr, err)
289 Log.Debugf("Connected to socket (local addr: %v)", c.conn.LocalAddr().(*net.UnixAddr))
291 c.reader = bufio.NewReader(c.conn)
292 c.writer = bufio.NewWriter(c.conn)
297 func (c *vppClient) disconnect() error {
298 Log.Debugf("Closing socket")
299 if err := c.conn.Close(); err != nil {
300 Log.Debugln("Closing socket failed:", err)
307 sockCreateMsgId = 15 // hard-coded sockclnt_create message ID
308 createMsgContext = byte(123)
309 deleteMsgContext = byte(124)
312 func (c *vppClient) open() error {
313 msgCodec := new(codec.MsgCodec)
315 req := &SockclntCreate{Name: ClientName}
316 msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId)
318 Log.Debugln("Encode error:", err)
322 msg[5] = createMsgContext
324 if err := c.write(msg); err != nil {
325 Log.Debugln("Write error: ", err)
329 readDeadline := time.Now().Add(c.connectTimeout)
330 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
333 msgReply, err := c.read()
335 Log.Println("Read error:", err)
338 // reset read deadline
339 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
343 reply := new(SockclntCreateReply)
344 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
345 Log.Println("Decode error:", err)
349 Log.Debugf("SockclntCreateReply: Response=%v Index=%v Count=%v",
350 reply.Response, reply.Index, reply.Count)
352 c.clientIndex = reply.Index
353 c.msgTable = make(map[string]uint16, reply.Count)
354 for _, x := range reply.MessageTable {
355 msgName := strings.Split(x.Name, "\x00")[0]
356 name := strings.TrimSuffix(msgName, "\x13")
357 c.msgTable[name] = x.Index
358 if strings.HasPrefix(name, "sockclnt_delete_") {
359 c.sockDelMsgId = x.Index
362 Log.Debugf(" - %4d: %q", x.Index, name)
369 func (c *vppClient) close() error {
370 msgCodec := new(codec.MsgCodec)
372 req := &SockclntDelete{
373 Index: c.clientIndex,
375 msg, err := msgCodec.EncodeMsg(req, c.sockDelMsgId)
377 Log.Debugln("Encode error:", err)
381 msg[5] = deleteMsgContext
383 Log.Debugf("sending socklntDel (%d byes): % 0X", len(msg), msg)
384 if err := c.write(msg); err != nil {
385 Log.Debugln("Write error: ", err)
389 readDeadline := time.Now().Add(c.disconnectTimeout)
390 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
393 msgReply, err := c.read()
395 Log.Debugln("Read error:", err)
396 if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
397 // we accept timeout for reply
402 // reset read deadline
403 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
407 reply := new(SockclntDeleteReply)
408 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
409 Log.Debugln("Decode error:", err)
413 Log.Debugf("SockclntDeleteReply: Response=%v", reply.Response)
418 func (c *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
419 msg := msgName + "_" + msgCrc
420 msgID, ok := c.msgTable[msg]
422 return 0, &adapter.UnknownMsgError{msgName, msgCrc}
427 type reqHeader struct {
433 func (c *vppClient) SendMsg(context uint32, data []byte) error {
435 ClientIndex: c.clientIndex,
438 buf := new(bytes.Buffer)
439 if err := struc.Pack(buf, h); err != nil {
442 copy(data[2:], buf.Bytes())
444 Log.Debugf("sendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
446 if err := c.write(data); err != nil {
447 Log.Debugln("write error: ", err)
454 func (c *vppClient) write(msg []byte) error {
456 DataLen: uint32(len(msg)),
458 buf := new(bytes.Buffer)
459 if err := struc.Pack(buf, h); err != nil {
462 header := buf.Bytes()
464 // we lock to prevent mixing multiple message sends
466 defer c.writeMu.Unlock()
468 if n, err := c.writer.Write(header); err != nil {
471 Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
474 writerSize := c.writer.Size()
475 for i := 0; i <= len(msg)/writerSize; i++ {
476 x := i*writerSize + writerSize
480 Log.Debugf(" - x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/writerSize)
481 if n, err := c.writer.Write(msg[i*writerSize : x]); err != nil {
484 Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
487 if err := c.writer.Flush(); err != nil {
491 Log.Debugf(" -- write done")
496 type msgHeader struct {
501 func (c *vppClient) readerLoop() {
503 defer Log.Debugf("reader quit")
514 if isClosedError(err) {
517 Log.Debugf("read failed: %v", err)
522 if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
523 Log.Debugf("unpacking header failed: %v", err)
527 Log.Debugf("recvMsg (%d) msgID=%d context=%v", len(msg), h.MsgID, h.Context)
532 type msgheader struct {
533 Q int `struc:"uint64"`
534 DataLen uint32 `struc:"uint32"`
535 GcMarkTimestamp uint32 `struc:"uint32"`
538 func (c *vppClient) read() ([]byte, error) {
539 Log.Debug(" reading next msg..")
541 header := make([]byte, 16)
543 n, err := io.ReadAtLeast(c.reader, header, 16)
548 Log.Debugln("zero bytes header")
551 Log.Debugf("invalid header data (%d): % 0X", n, header[:n])
552 return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
554 Log.Debugf(" read header %d bytes: % 0X", n, header)
557 if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
560 Log.Debugf(" - decoded header: %+v", h)
562 msgLen := int(h.DataLen)
563 msg := make([]byte, msgLen)
565 n, err = c.reader.Read(msg)
569 Log.Debugf(" - read msg %d bytes (%d buffered) % 0X", n, c.reader.Buffered(), msg[:n])
573 Log.Debugf("continue read for another %d bytes", remain)
577 nbytes, err := c.reader.Read(view)
580 } else if nbytes == 0 {
581 return nil, fmt.Errorf("zero nbytes")
585 Log.Debugf("another data received: %d bytes (remain: %d)", nbytes, remain)
591 Log.Debugf(" -- read done (buffered: %d)", c.reader.Buffered())
596 func isClosedError(err error) bool {
600 return strings.HasSuffix(err.Error(), "use of closed network connection")