1 // Copyright (c) 2019 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
29 "github.com/fsnotify/fsnotify"
30 "github.com/lunixbochs/struc"
31 logger "github.com/sirupsen/logrus"
33 "git.fd.io/govpp.git/adapter"
34 "git.fd.io/govpp.git/codec"
35 "git.fd.io/govpp.git/examples/binapi/memclnt"
39 // DefaultSocketName is default VPP API socket file path.
40 DefaultSocketName = adapter.DefaultBinapiSocket
44 // DefaultConnectTimeout is default timeout for connecting
45 DefaultConnectTimeout = time.Second * 3
46 // DefaultDisconnectTimeout is default timeout for discconnecting
47 DefaultDisconnectTimeout = time.Millisecond * 100
48 // MaxWaitReady defines maximum duration before waiting for socket file
50 MaxWaitReady = time.Second * 15
51 // ClientName is used for identifying client in socket registration
52 ClientName = "govppsock"
56 // Debug is global variable that determines debug mode
57 Debug = os.Getenv("DEBUG_GOVPP_SOCK") != ""
58 // DebugMsgIds is global variable that determines debug mode for msg ids
59 DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != ""
61 // Log is global logger
65 // init initializes global logger, which logs debug level messages to stdout.
69 Log.Level = logger.DebugLevel
70 Log.Debug("govpp/socketclient: enabled debug mode")
74 type vppClient struct {
80 connectTimeout time.Duration
81 disconnectTimeout time.Duration
83 cb adapter.MsgCallback
85 msgTable map[string]uint16
93 func NewVppClient(sockAddr string) *vppClient {
95 sockAddr = DefaultSocketName
99 connectTimeout: DefaultConnectTimeout,
100 disconnectTimeout: DefaultDisconnectTimeout,
101 cb: func(msgID uint16, data []byte) {
102 Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
107 // SetConnectTimeout sets timeout used during connecting.
108 func (c *vppClient) SetConnectTimeout(t time.Duration) {
112 // SetDisconnectTimeout sets timeout used during disconnecting.
113 func (c *vppClient) SetDisconnectTimeout(t time.Duration) {
114 c.disconnectTimeout = t
117 // WaitReady checks socket file existence and waits for it if necessary
118 func (c *vppClient) WaitReady() error {
119 // check if file at the path already exists
120 if _, err := os.Stat(c.sockAddr); err == nil {
122 } else if os.IsExist(err) {
126 // if not, watch for it
127 watcher, err := fsnotify.NewWatcher()
132 if err := watcher.Close(); err != nil {
133 Log.Errorf("failed to close file watcher: %v", err)
137 // start watching directory
138 if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
144 case <-time.After(MaxWaitReady):
145 return fmt.Errorf("waiting for socket file timed out (%s)", MaxWaitReady)
146 case e := <-watcher.Errors:
148 case ev := <-watcher.Events:
149 Log.Debugf("watcher event: %+v", ev)
150 if ev.Name == c.sockAddr {
151 if (ev.Op & fsnotify.Create) == fsnotify.Create {
152 // socket was created, we are ready
160 func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
161 Log.Debug("SetMsgCallback")
165 func (c *vppClient) Connect() error {
166 Log.Debugf("Connecting to: %v", c.sockAddr)
168 if err := c.connect(c.sockAddr); err != nil {
172 if err := c.open(); err != nil {
176 c.quit = make(chan struct{})
183 func (c *vppClient) connect(sockAddr string) error {
184 addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
186 conn, err := net.DialUnix("unix", nil, addr)
188 // we try different type of socket for backwards compatbility with VPP<=19.04
189 if strings.Contains(err.Error(), "wrong type for socket") {
190 addr.Net = "unixpacket"
191 Log.Debugf("%s, retrying connect with type unixpacket", err)
192 conn, err = net.DialUnix("unixpacket", nil, addr)
195 Log.Debugf("Connecting to socket %s failed: %s", addr, err)
201 c.reader = bufio.NewReader(c.conn)
202 c.writer = bufio.NewWriter(c.conn)
204 Log.Debugf("Connected to socket: %v", addr)
210 sockCreateMsgId = 15 // hard-coded sockclnt_create message ID
211 createMsgContext = byte(123)
212 deleteMsgContext = byte(124)
215 func (c *vppClient) open() error {
216 msgCodec := new(codec.MsgCodec)
218 req := &memclnt.SockclntCreate{
219 Name: []byte(ClientName),
221 msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId)
223 Log.Debugln("Encode error:", err)
227 msg[5] = createMsgContext
229 if err := c.write(msg); err != nil {
230 Log.Debugln("Write error: ", err)
234 readDeadline := time.Now().Add(c.connectTimeout)
235 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
238 msgReply, err := c.read()
240 Log.Println("Read error:", err)
243 // reset read deadline
244 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
248 reply := new(memclnt.SockclntCreateReply)
249 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
250 Log.Println("Decode error:", err)
254 Log.Debugf("SockclntCreateReply: Response=%v Index=%v Count=%v",
255 reply.Response, reply.Index, reply.Count)
257 c.clientIndex = reply.Index
258 c.msgTable = make(map[string]uint16, reply.Count)
259 for _, x := range reply.MessageTable {
260 name := string(bytes.TrimSuffix(bytes.Split(x.Name, []byte{0x00})[0], []byte{0x13}))
261 c.msgTable[name] = x.Index
262 if strings.HasPrefix(name, "sockclnt_delete_") {
263 c.sockDelMsgId = x.Index
266 Log.Debugf(" - %4d: %q", x.Index, name)
273 func (c *vppClient) Disconnect() error {
277 Log.Debugf("Disconnecting..")
281 // force readerLoop to timeout
282 if err := c.conn.SetReadDeadline(time.Now()); err != nil {
286 // wait for readerLoop to return
289 if err := c.close(); err != nil {
293 if err := c.conn.Close(); err != nil {
294 Log.Debugln("Closing socket failed:", err)
301 func (c *vppClient) close() error {
302 msgCodec := new(codec.MsgCodec)
304 req := &memclnt.SockclntDelete{
305 Index: c.clientIndex,
307 msg, err := msgCodec.EncodeMsg(req, c.sockDelMsgId)
309 Log.Debugln("Encode error:", err)
313 msg[5] = deleteMsgContext
315 Log.Debugf("sending socklntDel (%d byes): % 0X", len(msg), msg)
316 if err := c.write(msg); err != nil {
317 Log.Debugln("Write error: ", err)
321 readDeadline := time.Now().Add(c.disconnectTimeout)
322 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
325 msgReply, err := c.read()
327 Log.Debugln("Read error:", err)
328 if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
329 // we accept timeout for reply
334 // reset read deadline
335 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
339 reply := new(memclnt.SockclntDeleteReply)
340 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
341 Log.Debugln("Decode error:", err)
345 Log.Debugf("SockclntDeleteReply: Response=%v", reply.Response)
350 func (c *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
351 msg := msgName + "_" + msgCrc
352 msgID, ok := c.msgTable[msg]
354 return 0, fmt.Errorf("unknown message: %q", msg)
359 type reqHeader struct {
365 func (c *vppClient) SendMsg(context uint32, data []byte) error {
367 ClientIndex: c.clientIndex,
370 buf := new(bytes.Buffer)
371 if err := struc.Pack(buf, h); err != nil {
374 copy(data[2:], buf.Bytes())
376 Log.Debugf("SendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
378 if err := c.write(data); err != nil {
379 Log.Debugln("write error: ", err)
386 func (c *vppClient) write(msg []byte) error {
388 DataLen: uint32(len(msg)),
390 buf := new(bytes.Buffer)
391 if err := struc.Pack(buf, h); err != nil {
394 header := buf.Bytes()
396 // we lock to prevent mixing multiple message sends
398 defer c.writeMu.Unlock()
400 if n, err := c.writer.Write(header); err != nil {
403 Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
406 if err := c.writer.Flush(); err != nil {
410 for i := 0; i <= len(msg)/c.writer.Size(); i++ {
411 x := i*c.writer.Size() + c.writer.Size()
415 Log.Debugf("x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/c.writer.Size())
416 if n, err := c.writer.Write(msg[i*c.writer.Size() : x]); err != nil {
419 Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
421 if err := c.writer.Flush(); err != nil {
430 type msgHeader struct {
435 func (c *vppClient) readerLoop() {
437 defer Log.Debugf("reader quit")
447 if isClosedError(err) {
450 Log.Debugf("read failed: %v", err)
454 if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
455 Log.Debugf("unpacking header failed: %v", err)
459 Log.Debugf("recvMsg (%d) msgID=%d context=%v", len(msg), h.MsgID, h.Context)
464 type msgheader struct {
465 Q int `struc:"uint64"`
466 DataLen uint32 `struc:"uint32"`
467 GcMarkTimestamp uint32 `struc:"uint32"`
470 func (c *vppClient) read() ([]byte, error) {
471 Log.Debug("reading next msg..")
473 header := make([]byte, 16)
475 n, err := io.ReadAtLeast(c.reader, header, 16)
479 Log.Debugln("zero bytes header")
483 Log.Debugf("invalid header data (%d): % 0X", n, header[:n])
484 return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
486 Log.Debugf(" - read header %d bytes: % 0X", n, header)
489 if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
492 Log.Debugf(" - decoded header: %+v", h)
494 msgLen := int(h.DataLen)
495 msg := make([]byte, msgLen)
497 n, err = c.reader.Read(msg)
501 Log.Debugf(" - read msg %d bytes (%d buffered)", n, c.reader.Buffered())
505 Log.Debugf("continue read for another %d bytes", remain)
509 nbytes, err := c.reader.Read(view)
512 } else if nbytes == 0 {
513 return nil, fmt.Errorf("zero nbytes")
517 Log.Debugf("another data received: %d bytes (remain: %d)", nbytes, remain)
526 func isClosedError(err error) bool {
530 return strings.HasSuffix(err.Error(), "use of closed network connection")