1 // Copyright (c) 2019 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
29 "github.com/fsnotify/fsnotify"
30 "github.com/lunixbochs/struc"
31 logger "github.com/sirupsen/logrus"
33 "git.fd.io/govpp.git/adapter"
34 "git.fd.io/govpp.git/codec"
35 "git.fd.io/govpp.git/examples/binapi/memclnt"
39 // DefaultSocketName is default VPP API socket file path.
40 DefaultSocketName = adapter.DefaultBinapiSocket
43 const socketMissing = `
44 ------------------------------------------------------------
45 VPP binary API socket file %s is missing!
47 - is VPP running with socket for binapi enabled?
48 - is the correct socket name configured?
50 To enable it add following section to your VPP config:
54 ------------------------------------------------------------
58 // DefaultConnectTimeout is default timeout for connecting
59 DefaultConnectTimeout = time.Second * 3
60 // DefaultDisconnectTimeout is default timeout for discconnecting
61 DefaultDisconnectTimeout = time.Millisecond * 100
62 // MaxWaitReady defines maximum duration before waiting for socket file
64 MaxWaitReady = time.Second * 10
65 // ClientName is used for identifying client in socket registration
66 ClientName = "govppsock"
70 // Debug is global variable that determines debug mode
71 Debug = os.Getenv("DEBUG_GOVPP_SOCK") != ""
72 // DebugMsgIds is global variable that determines debug mode for msg ids
73 DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != ""
75 // Log is global logger
79 // init initializes global logger, which logs debug level messages to stdout.
83 Log.Level = logger.DebugLevel
84 Log.Debug("govpp/socketclient: enabled debug mode")
88 type vppClient struct {
95 connectTimeout time.Duration
96 disconnectTimeout time.Duration
98 cb adapter.MsgCallback
100 msgTable map[string]uint16
108 func NewVppClient(sockAddr string) *vppClient {
110 sockAddr = DefaultSocketName
114 connectTimeout: DefaultConnectTimeout,
115 disconnectTimeout: DefaultDisconnectTimeout,
116 cb: func(msgID uint16, data []byte) {
117 Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
122 // SetConnectTimeout sets timeout used during connecting.
123 func (c *vppClient) SetConnectTimeout(t time.Duration) {
127 // SetDisconnectTimeout sets timeout used during disconnecting.
128 func (c *vppClient) SetDisconnectTimeout(t time.Duration) {
129 c.disconnectTimeout = t
132 // WaitReady checks socket file existence and waits for it if necessary
133 func (c *vppClient) WaitReady() error {
134 // check if socket already exists
135 if _, err := os.Stat(c.sockAddr); err == nil {
136 return nil // socket exists, we are ready
137 } else if !os.IsNotExist(err) {
138 return err // some other error occurred
141 // socket does not exist, watch for it
142 watcher, err := fsnotify.NewWatcher()
147 if err := watcher.Close(); err != nil {
148 Log.Warnf("failed to close file watcher: %v", err)
152 // start directory watcher
153 if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
157 timeout := time.NewTimer(MaxWaitReady)
161 return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.sockAddr)
163 case e := <-watcher.Errors:
166 case ev := <-watcher.Events:
167 Log.Debugf("watcher event: %+v", ev)
168 if ev.Name == c.sockAddr && (ev.Op&fsnotify.Create) == fsnotify.Create {
169 // socket created, we are ready
176 func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
177 Log.Debug("SetMsgCallback")
181 func (c *vppClient) Connect() error {
182 // check if socket exists
183 if _, err := os.Stat(c.sockAddr); os.IsNotExist(err) {
184 fmt.Fprintf(os.Stderr, socketMissing, c.sockAddr)
185 return fmt.Errorf("VPP API socket file %s does not exist", c.sockAddr)
186 } else if err != nil {
187 return fmt.Errorf("VPP API socket error: %v", err)
190 if err := c.connect(c.sockAddr); err != nil {
194 if err := c.open(); err != nil {
199 c.quit = make(chan struct{})
206 func (c *vppClient) Disconnect() error {
210 Log.Debugf("Disconnecting..")
214 if err := c.conn.CloseRead(); err != nil {
215 Log.Debugf("closing read failed: %v", err)
218 // wait for readerLoop to return
221 if err := c.close(); err != nil {
222 Log.Debugf("closing failed: %v", err)
225 if err := c.disconnect(); err != nil {
232 func (c *vppClient) connect(sockAddr string) error {
233 addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
235 Log.Debugf("Connecting to: %v", c.sockAddr)
237 conn, err := net.DialUnix("unix", nil, addr)
239 // we try different type of socket for backwards compatbility with VPP<=19.04
240 if strings.Contains(err.Error(), "wrong type for socket") {
241 addr.Net = "unixpacket"
242 Log.Debugf("%s, retrying connect with type unixpacket", err)
243 conn, err = net.DialUnix("unixpacket", nil, addr)
246 Log.Debugf("Connecting to socket %s failed: %s", addr, err)
252 Log.Debugf("Connected to socket (local addr: %v)", c.conn.LocalAddr().(*net.UnixAddr))
254 c.reader = bufio.NewReader(c.conn)
255 c.writer = bufio.NewWriter(c.conn)
260 func (c *vppClient) disconnect() error {
261 Log.Debugf("Closing socket")
262 if err := c.conn.Close(); err != nil {
263 Log.Debugln("Closing socket failed:", err)
270 sockCreateMsgId = 15 // hard-coded sockclnt_create message ID
271 createMsgContext = byte(123)
272 deleteMsgContext = byte(124)
275 func (c *vppClient) open() error {
276 msgCodec := new(codec.MsgCodec)
278 req := &memclnt.SockclntCreate{
279 Name: []byte(ClientName),
281 msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId)
283 Log.Debugln("Encode error:", err)
287 msg[5] = createMsgContext
289 if err := c.write(msg); err != nil {
290 Log.Debugln("Write error: ", err)
294 readDeadline := time.Now().Add(c.connectTimeout)
295 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
298 msgReply, err := c.read()
300 Log.Println("Read error:", err)
303 // reset read deadline
304 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
308 reply := new(memclnt.SockclntCreateReply)
309 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
310 Log.Println("Decode error:", err)
314 Log.Debugf("SockclntCreateReply: Response=%v Index=%v Count=%v",
315 reply.Response, reply.Index, reply.Count)
317 c.clientIndex = reply.Index
318 c.msgTable = make(map[string]uint16, reply.Count)
319 for _, x := range reply.MessageTable {
320 name := string(bytes.TrimSuffix(bytes.Split(x.Name, []byte{0x00})[0], []byte{0x13}))
321 c.msgTable[name] = x.Index
322 if strings.HasPrefix(name, "sockclnt_delete_") {
323 c.sockDelMsgId = x.Index
326 Log.Debugf(" - %4d: %q", x.Index, name)
333 func (c *vppClient) close() error {
334 msgCodec := new(codec.MsgCodec)
336 req := &memclnt.SockclntDelete{
337 Index: c.clientIndex,
339 msg, err := msgCodec.EncodeMsg(req, c.sockDelMsgId)
341 Log.Debugln("Encode error:", err)
345 msg[5] = deleteMsgContext
347 Log.Debugf("sending socklntDel (%d byes): % 0X", len(msg), msg)
348 if err := c.write(msg); err != nil {
349 Log.Debugln("Write error: ", err)
353 readDeadline := time.Now().Add(c.disconnectTimeout)
354 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
357 msgReply, err := c.read()
359 Log.Debugln("Read error:", err)
360 if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
361 // we accept timeout for reply
366 // reset read deadline
367 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
371 reply := new(memclnt.SockclntDeleteReply)
372 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
373 Log.Debugln("Decode error:", err)
377 Log.Debugf("SockclntDeleteReply: Response=%v", reply.Response)
382 func (c *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
383 msg := msgName + "_" + msgCrc
384 msgID, ok := c.msgTable[msg]
386 return 0, fmt.Errorf("unknown message: %q", msg)
391 type reqHeader struct {
397 func (c *vppClient) SendMsg(context uint32, data []byte) error {
399 ClientIndex: c.clientIndex,
402 buf := new(bytes.Buffer)
403 if err := struc.Pack(buf, h); err != nil {
406 copy(data[2:], buf.Bytes())
408 Log.Debugf("sendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
410 if err := c.write(data); err != nil {
411 Log.Debugln("write error: ", err)
418 func (c *vppClient) write(msg []byte) error {
420 DataLen: uint32(len(msg)),
422 buf := new(bytes.Buffer)
423 if err := struc.Pack(buf, h); err != nil {
426 header := buf.Bytes()
428 // we lock to prevent mixing multiple message sends
430 defer c.writeMu.Unlock()
432 if n, err := c.writer.Write(header); err != nil {
435 Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
438 writerSize := c.writer.Size()
439 for i := 0; i <= len(msg)/writerSize; i++ {
440 x := i*writerSize + writerSize
444 Log.Debugf(" - x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/writerSize)
445 if n, err := c.writer.Write(msg[i*writerSize : x]); err != nil {
448 Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
451 if err := c.writer.Flush(); err != nil {
455 Log.Debugf(" -- write done")
460 type msgHeader struct {
465 func (c *vppClient) readerLoop() {
467 defer Log.Debugf("reader quit")
478 if isClosedError(err) {
481 Log.Debugf("read failed: %v", err)
486 if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
487 Log.Debugf("unpacking header failed: %v", err)
491 Log.Debugf("recvMsg (%d) msgID=%d context=%v", len(msg), h.MsgID, h.Context)
496 type msgheader struct {
497 Q int `struc:"uint64"`
498 DataLen uint32 `struc:"uint32"`
499 GcMarkTimestamp uint32 `struc:"uint32"`
502 func (c *vppClient) read() ([]byte, error) {
503 Log.Debug(" reading next msg..")
505 header := make([]byte, 16)
507 n, err := io.ReadAtLeast(c.reader, header, 16)
512 Log.Debugln("zero bytes header")
515 Log.Debugf("invalid header data (%d): % 0X", n, header[:n])
516 return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
518 Log.Debugf(" read header %d bytes: % 0X", n, header)
521 if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
524 Log.Debugf(" - decoded header: %+v", h)
526 msgLen := int(h.DataLen)
527 msg := make([]byte, msgLen)
529 n, err = c.reader.Read(msg)
533 Log.Debugf(" - read msg %d bytes (%d buffered) % 0X", n, c.reader.Buffered(), msg[:n])
537 Log.Debugf("continue read for another %d bytes", remain)
541 nbytes, err := c.reader.Read(view)
544 } else if nbytes == 0 {
545 return nil, fmt.Errorf("zero nbytes")
549 Log.Debugf("another data received: %d bytes (remain: %d)", nbytes, remain)
555 Log.Debugf(" -- read done (buffered: %d)", c.reader.Buffered())
560 func isClosedError(err error) bool {
564 return strings.HasSuffix(err.Error(), "use of closed network connection")