15 "github.com/fsnotify/fsnotify"
16 "github.com/lunixbochs/struc"
17 logger "github.com/sirupsen/logrus"
19 "git.fd.io/govpp.git/adapter"
20 "git.fd.io/govpp.git/codec"
21 "git.fd.io/govpp.git/examples/bin_api/memclnt"
25 // DefaultSocketName is default VPP API socket file name
26 DefaultSocketName = "/run/vpp-api.sock"
30 // DefaultConnectTimeout is default timeout for connecting
31 DefaultConnectTimeout = time.Second * 3
32 // DefaultDisconnectTimeout is default timeout for discconnecting
33 DefaultDisconnectTimeout = time.Second
34 // MaxWaitReady defines maximum duration before waiting for socket file
36 MaxWaitReady = time.Second * 15
37 // ClientName is used for identifying client in socket registration
38 ClientName = "govppsock"
42 // Debug is global variable that determines debug mode
43 Debug = os.Getenv("DEBUG_GOVPP_SOCK") != ""
44 // DebugMsgIds is global variable that determines debug mode for msg ids
45 DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != ""
47 Log = logger.New() // global logger
50 // init initializes global logger, which logs debug level messages to stdout.
54 Log.Level = logger.DebugLevel
58 type vppClient struct {
64 connectTimeout time.Duration
65 disconnectTimeout time.Duration
67 cb adapter.MsgCallback
69 msgTable map[string]uint16
77 func NewVppClient(sockAddr string) *vppClient {
79 sockAddr = DefaultSocketName
83 connectTimeout: DefaultConnectTimeout,
84 disconnectTimeout: DefaultDisconnectTimeout,
85 cb: func(msgID uint16, data []byte) {
86 Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
91 // SetConnectTimeout sets timeout used during connecting.
92 func (c *vppClient) SetConnectTimeout(t time.Duration) {
96 // SetDisconnectTimeout sets timeout used during disconnecting.
97 func (c *vppClient) SetDisconnectTimeout(t time.Duration) {
98 c.disconnectTimeout = t
101 // WaitReady checks socket file existence and waits for it if necessary
102 func (c *vppClient) WaitReady() error {
103 // check if file at the path already exists
104 if _, err := os.Stat(c.sockAddr); err == nil {
106 } else if os.IsExist(err) {
110 // if not, watch for it
111 watcher, err := fsnotify.NewWatcher()
116 if err := watcher.Close(); err != nil {
117 Log.Errorf("failed to close file watcher: %v", err)
121 // start watching directory
122 if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
128 case <-time.After(MaxWaitReady):
129 return fmt.Errorf("waiting for socket file timed out (%s)", MaxWaitReady)
130 case e := <-watcher.Errors:
132 case ev := <-watcher.Events:
133 Log.Debugf("watcher event: %+v", ev)
134 if ev.Name == c.sockAddr {
135 if (ev.Op & fsnotify.Create) == fsnotify.Create {
136 // socket was created, we are ready
144 func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
145 Log.Debug("SetMsgCallback")
149 func (c *vppClient) Connect() error {
150 Log.Debugf("Connecting to: %v", c.sockAddr)
152 if err := c.connect(c.sockAddr); err != nil {
156 if err := c.open(); err != nil {
160 c.quit = make(chan struct{})
167 func (c *vppClient) connect(sockAddr string) error {
168 addr, err := net.ResolveUnixAddr("unixpacket", sockAddr)
170 Log.Debugln("ResolveUnixAddr error:", err)
174 conn, err := net.DialUnix("unixpacket", nil, addr)
176 Log.Debugln("Dial error:", err)
181 c.reader = bufio.NewReader(c.conn)
182 c.writer = bufio.NewWriter(c.conn)
184 Log.Debugf("Connected to socket: %v", addr)
190 sockCreateMsgId = 15 // hard-coded sockclnt_create message ID
191 createMsgContext = byte(123)
192 deleteMsgContext = byte(124)
195 func (c *vppClient) open() error {
196 msgCodec := new(codec.MsgCodec)
198 req := &memclnt.SockclntCreate{
199 Name: []byte(ClientName),
201 msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId)
203 Log.Debugln("Encode error:", err)
207 msg[5] = createMsgContext
209 if err := c.write(msg); err != nil {
210 Log.Debugln("Write error: ", err)
214 readDeadline := time.Now().Add(c.connectTimeout)
215 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
218 msgReply, err := c.read()
220 Log.Println("Read error:", err)
223 // reset read deadline
224 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
228 reply := new(memclnt.SockclntCreateReply)
229 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
230 Log.Println("Decode error:", err)
234 Log.Debugf("SockclntCreateReply: Response=%v Index=%v Count=%v",
235 reply.Response, reply.Index, reply.Count)
237 c.clientIndex = reply.Index
238 c.msgTable = make(map[string]uint16, reply.Count)
239 for _, x := range reply.MessageTable {
240 name := string(bytes.TrimSuffix(bytes.Split(x.Name, []byte{0x00})[0], []byte{0x13}))
241 c.msgTable[name] = x.Index
242 if strings.HasPrefix(name, "sockclnt_delete_") {
243 c.sockDelMsgId = x.Index
246 Log.Debugf(" - %4d: %q", x.Index, name)
253 func (c *vppClient) Disconnect() error {
257 Log.Debugf("Disconnecting..")
261 // force readerLoop to timeout
262 if err := c.conn.SetReadDeadline(time.Now()); err != nil {
266 // wait for readerLoop to return
269 if err := c.close(); err != nil {
273 if err := c.conn.Close(); err != nil {
274 Log.Debugln("Close socket conn failed:", err)
281 func (c *vppClient) close() error {
282 msgCodec := new(codec.MsgCodec)
284 req := &memclnt.SockclntDelete{
285 Index: c.clientIndex,
287 msg, err := msgCodec.EncodeMsg(req, c.sockDelMsgId)
289 Log.Debugln("Encode error:", err)
293 msg[5] = deleteMsgContext
295 Log.Debugf("sending socklntDel (%d byes): % 0X\n", len(msg), msg)
296 if err := c.write(msg); err != nil {
297 Log.Debugln("Write error: ", err)
301 readDeadline := time.Now().Add(c.disconnectTimeout)
302 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
305 msgReply, err := c.read()
307 Log.Debugln("Read error:", err)
308 if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
309 // we accept timeout for reply
314 // reset read deadline
315 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
319 reply := new(memclnt.SockclntDeleteReply)
320 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
321 Log.Debugln("Decode error:", err)
325 Log.Debugf("SockclntDeleteReply: Response=%v", reply.Response)
330 func (c *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
331 msg := msgName + "_" + msgCrc
332 msgID, ok := c.msgTable[msg]
334 return 0, fmt.Errorf("unknown message: %q", msg)
339 type reqHeader struct {
345 func (c *vppClient) SendMsg(context uint32, data []byte) error {
347 ClientIndex: c.clientIndex,
350 buf := new(bytes.Buffer)
351 if err := struc.Pack(buf, h); err != nil {
354 copy(data[2:], buf.Bytes())
356 Log.Debugf("SendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
358 if err := c.write(data); err != nil {
359 Log.Debugln("write error: ", err)
366 func (c *vppClient) write(msg []byte) error {
368 DataLen: uint32(len(msg)),
370 buf := new(bytes.Buffer)
371 if err := struc.Pack(buf, h); err != nil {
374 header := buf.Bytes()
376 // we lock to prevent mixing multiple message sends
378 defer c.writeMu.Unlock()
380 if n, err := c.writer.Write(header); err != nil {
383 Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
386 if err := c.writer.Flush(); err != nil {
390 for i := 0; i <= len(msg)/c.writer.Size(); i++ {
391 x := i*c.writer.Size() + c.writer.Size()
395 Log.Debugf("x=%v i=%v len=%v mod=%v\n", x, i, len(msg), len(msg)/c.writer.Size())
396 if n, err := c.writer.Write(msg[i*c.writer.Size() : x]); err != nil {
399 Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
401 if err := c.writer.Flush(); err != nil {
410 type msgHeader struct {
415 func (c *vppClient) readerLoop() {
420 Log.Debugf("reader quit")
427 if isClosedError(err) {
430 Log.Debugf("READ FAILED: %v", err)
434 if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
435 Log.Debugf("unpacking header failed: %v", err)
439 Log.Debugf("recvMsg (%d) msgID=%d context=%v", len(msg), h.MsgID, h.Context)
444 type msgheader struct {
445 Q int `struc:"uint64"`
446 DataLen uint32 `struc:"uint32"`
447 GcMarkTimestamp uint32 `struc:"uint32"`
450 func (c *vppClient) read() ([]byte, error) {
451 Log.Debug("reading next msg..")
453 header := make([]byte, 16)
455 n, err := io.ReadAtLeast(c.reader, header, 16)
459 Log.Debugln("zero bytes header")
463 Log.Debug("invalid header data (%d): % 0X", n, header[:n])
464 return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
466 Log.Debugf(" - read header %d bytes: % 0X", n, header)
469 if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
472 Log.Debugf(" - decoded header: %+v", h)
474 msgLen := int(h.DataLen)
475 msg := make([]byte, msgLen)
477 n, err = c.reader.Read(msg)
481 Log.Debugf(" - read msg %d bytes (%d buffered)", n, c.reader.Buffered())
485 Log.Debugf("continue read for another %d bytes", remain)
490 nbytes, err := c.reader.Read(view)
493 } else if nbytes == 0 {
494 return nil, fmt.Errorf("zero nbytes")
498 Log.Debugf("another data received: %d bytes (remain: %d)", nbytes, remain)
507 func isClosedError(err error) bool {
511 return strings.HasSuffix(err.Error(), "use of closed network connection")