15 "github.com/fsnotify/fsnotify"
16 "github.com/lunixbochs/struc"
17 logger "github.com/sirupsen/logrus"
19 "git.fd.io/govpp.git/adapter"
20 "git.fd.io/govpp.git/codec"
21 "git.fd.io/govpp.git/examples/bin_api/memclnt"
25 // DefaultSocketName is default VPP API socket file name
26 DefaultSocketName = "/run/vpp-api.sock"
30 // DefaultConnectTimeout is default timeout for connecting
31 DefaultConnectTimeout = time.Second * 3
32 // DefaultDisconnectTimeout is default timeout for discconnecting
33 DefaultDisconnectTimeout = time.Second
34 // MaxWaitReady defines maximum duration before waiting for socket file
36 MaxWaitReady = time.Second * 15
37 // ClientName is used for identifying client in socket registration
38 ClientName = "govppsock"
42 // Debug is global variable that determines debug mode
43 Debug = os.Getenv("DEBUG_GOVPP_SOCK") != ""
44 // DebugMsgIds is global variable that determines debug mode for msg ids
45 DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != ""
47 Log = logger.New() // global logger
50 // init initializes global logger, which logs debug level messages to stdout.
54 Log.Level = logger.DebugLevel
58 type vppClient struct {
64 connectTimeout time.Duration
65 disconnectTimeout time.Duration
67 cb adapter.MsgCallback
69 msgTable map[string]uint16
77 func NewVppClient(sockAddr string) *vppClient {
79 sockAddr = DefaultSocketName
83 connectTimeout: DefaultConnectTimeout,
84 disconnectTimeout: DefaultDisconnectTimeout,
85 cb: func(msgID uint16, data []byte) {
86 Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
91 // SetConnectTimeout sets timeout used during connecting.
92 func (c *vppClient) SetConnectTimeout(t time.Duration) {
96 // SetDisconnectTimeout sets timeout used during disconnecting.
97 func (c *vppClient) SetDisconnectTimeout(t time.Duration) {
98 c.disconnectTimeout = t
101 // WaitReady checks socket file existence and waits for it if necessary
102 func (c *vppClient) WaitReady() error {
103 // check if file at the path already exists
104 if _, err := os.Stat(c.sockAddr); err == nil {
106 } else if os.IsExist(err) {
110 // if not, watch for it
111 watcher, err := fsnotify.NewWatcher()
116 if err := watcher.Close(); err != nil {
117 Log.Errorf("failed to close file watcher: %v", err)
121 // start watching directory
122 if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
128 case <-time.After(MaxWaitReady):
129 return fmt.Errorf("waiting for socket file timed out (%s)", MaxWaitReady)
130 case e := <-watcher.Errors:
132 case ev := <-watcher.Events:
133 Log.Debugf("watcher event: %+v", ev)
134 if ev.Name == c.sockAddr {
135 if (ev.Op & fsnotify.Create) == fsnotify.Create {
136 // socket was created, we are ready
144 func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
145 Log.Debug("SetMsgCallback")
149 func (c *vppClient) Connect() error {
150 Log.Debugf("Connecting to: %v", c.sockAddr)
152 if err := c.connect(c.sockAddr); err != nil {
156 if err := c.open(); err != nil {
160 c.quit = make(chan struct{})
167 func (c *vppClient) connect(sockAddr string) error {
168 addr, err := net.ResolveUnixAddr("unixpacket", sockAddr)
170 Log.Debugln("ResolveUnixAddr error:", err)
174 conn, err := net.DialUnix("unix", nil, addr)
176 if strings.Contains(err.Error(), "wrong type for socket") {
177 conn, err = net.DialUnix("unixpacket", nil, addr)
180 Log.Debugln("Dial error:", err)
186 c.reader = bufio.NewReader(c.conn)
187 c.writer = bufio.NewWriter(c.conn)
189 Log.Debugf("Connected to socket: %v", addr)
195 sockCreateMsgId = 15 // hard-coded sockclnt_create message ID
196 createMsgContext = byte(123)
197 deleteMsgContext = byte(124)
200 func (c *vppClient) open() error {
201 msgCodec := new(codec.MsgCodec)
203 req := &memclnt.SockclntCreate{
204 Name: []byte(ClientName),
206 msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId)
208 Log.Debugln("Encode error:", err)
212 msg[5] = createMsgContext
214 if err := c.write(msg); err != nil {
215 Log.Debugln("Write error: ", err)
219 readDeadline := time.Now().Add(c.connectTimeout)
220 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
223 msgReply, err := c.read()
225 Log.Println("Read error:", err)
228 // reset read deadline
229 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
233 reply := new(memclnt.SockclntCreateReply)
234 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
235 Log.Println("Decode error:", err)
239 Log.Debugf("SockclntCreateReply: Response=%v Index=%v Count=%v",
240 reply.Response, reply.Index, reply.Count)
242 c.clientIndex = reply.Index
243 c.msgTable = make(map[string]uint16, reply.Count)
244 for _, x := range reply.MessageTable {
245 name := string(bytes.TrimSuffix(bytes.Split(x.Name, []byte{0x00})[0], []byte{0x13}))
246 c.msgTable[name] = x.Index
247 if strings.HasPrefix(name, "sockclnt_delete_") {
248 c.sockDelMsgId = x.Index
251 Log.Debugf(" - %4d: %q", x.Index, name)
258 func (c *vppClient) Disconnect() error {
262 Log.Debugf("Disconnecting..")
266 // force readerLoop to timeout
267 if err := c.conn.SetReadDeadline(time.Now()); err != nil {
271 // wait for readerLoop to return
274 if err := c.close(); err != nil {
278 if err := c.conn.Close(); err != nil {
279 Log.Debugln("Close socket conn failed:", err)
286 func (c *vppClient) close() error {
287 msgCodec := new(codec.MsgCodec)
289 req := &memclnt.SockclntDelete{
290 Index: c.clientIndex,
292 msg, err := msgCodec.EncodeMsg(req, c.sockDelMsgId)
294 Log.Debugln("Encode error:", err)
298 msg[5] = deleteMsgContext
300 Log.Debugf("sending socklntDel (%d byes): % 0X\n", len(msg), msg)
301 if err := c.write(msg); err != nil {
302 Log.Debugln("Write error: ", err)
306 readDeadline := time.Now().Add(c.disconnectTimeout)
307 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
310 msgReply, err := c.read()
312 Log.Debugln("Read error:", err)
313 if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
314 // we accept timeout for reply
319 // reset read deadline
320 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
324 reply := new(memclnt.SockclntDeleteReply)
325 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
326 Log.Debugln("Decode error:", err)
330 Log.Debugf("SockclntDeleteReply: Response=%v", reply.Response)
335 func (c *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
336 msg := msgName + "_" + msgCrc
337 msgID, ok := c.msgTable[msg]
339 return 0, fmt.Errorf("unknown message: %q", msg)
344 type reqHeader struct {
350 func (c *vppClient) SendMsg(context uint32, data []byte) error {
352 ClientIndex: c.clientIndex,
355 buf := new(bytes.Buffer)
356 if err := struc.Pack(buf, h); err != nil {
359 copy(data[2:], buf.Bytes())
361 Log.Debugf("SendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
363 if err := c.write(data); err != nil {
364 Log.Debugln("write error: ", err)
371 func (c *vppClient) write(msg []byte) error {
373 DataLen: uint32(len(msg)),
375 buf := new(bytes.Buffer)
376 if err := struc.Pack(buf, h); err != nil {
379 header := buf.Bytes()
381 // we lock to prevent mixing multiple message sends
383 defer c.writeMu.Unlock()
385 if n, err := c.writer.Write(header); err != nil {
388 Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
391 if err := c.writer.Flush(); err != nil {
395 for i := 0; i <= len(msg)/c.writer.Size(); i++ {
396 x := i*c.writer.Size() + c.writer.Size()
400 Log.Debugf("x=%v i=%v len=%v mod=%v\n", x, i, len(msg), len(msg)/c.writer.Size())
401 if n, err := c.writer.Write(msg[i*c.writer.Size() : x]); err != nil {
404 Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
406 if err := c.writer.Flush(); err != nil {
415 type msgHeader struct {
420 func (c *vppClient) readerLoop() {
425 Log.Debugf("reader quit")
432 if isClosedError(err) {
435 Log.Debugf("READ FAILED: %v", err)
439 if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
440 Log.Debugf("unpacking header failed: %v", err)
444 Log.Debugf("recvMsg (%d) msgID=%d context=%v", len(msg), h.MsgID, h.Context)
449 type msgheader struct {
450 Q int `struc:"uint64"`
451 DataLen uint32 `struc:"uint32"`
452 GcMarkTimestamp uint32 `struc:"uint32"`
455 func (c *vppClient) read() ([]byte, error) {
456 Log.Debug("reading next msg..")
458 header := make([]byte, 16)
460 n, err := io.ReadAtLeast(c.reader, header, 16)
464 Log.Debugln("zero bytes header")
468 Log.Debug("invalid header data (%d): % 0X", n, header[:n])
469 return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
471 Log.Debugf(" - read header %d bytes: % 0X", n, header)
474 if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
477 Log.Debugf(" - decoded header: %+v", h)
479 msgLen := int(h.DataLen)
480 msg := make([]byte, msgLen)
482 n, err = c.reader.Read(msg)
486 Log.Debugf(" - read msg %d bytes (%d buffered)", n, c.reader.Buffered())
490 Log.Debugf("continue read for another %d bytes", remain)
495 nbytes, err := c.reader.Read(view)
498 } else if nbytes == 0 {
499 return nil, fmt.Errorf("zero nbytes")
503 Log.Debugf("another data received: %d bytes (remain: %d)", nbytes, remain)
512 func isClosedError(err error) bool {
516 return strings.HasSuffix(err.Error(), "use of closed network connection")