14 "github.com/lunixbochs/struc"
15 logger "github.com/sirupsen/logrus"
17 "git.fd.io/govpp.git/adapter"
18 "git.fd.io/govpp.git/codec"
19 "git.fd.io/govpp.git/examples/bin_api/memclnt"
23 // DefaultSocketName is default VPP API socket file name
24 DefaultSocketName = "/run/vpp-api.sock"
26 sockCreateMsgId = 15 // hard-coded id for sockclnt_create message
27 govppClientName = "govppsock" // client name used for socket registration
31 ConnectTimeout = time.Second * 3
32 DisconnectTimeout = time.Second
34 Debug = os.Getenv("DEBUG_GOVPP_SOCK") != ""
35 DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != ""
37 Log = logger.New() // global logger
40 // init initializes global logger, which logs debug level messages to stdout.
44 Log.Level = logger.DebugLevel
48 type vppClient struct {
52 cb adapter.MsgCallback
54 msgTable map[string]uint16
61 func NewVppClient(sockAddr string) *vppClient {
63 sockAddr = DefaultSocketName
71 func nilCallback(msgID uint16, data []byte) {
72 Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
75 func (*vppClient) WaitReady() error {
76 // TODO: add watcher for socket file?
80 func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
81 Log.Debug("SetMsgCallback")
85 func (c *vppClient) Connect() error {
86 Log.Debugf("Connecting to: %v", c.sockAddr)
88 if err := c.connect(c.sockAddr); err != nil {
92 if err := c.open(); err != nil {
96 c.quit = make(chan struct{})
103 func (c *vppClient) connect(sockAddr string) error {
104 addr, err := net.ResolveUnixAddr("unixpacket", sockAddr)
106 Log.Debugln("ResolveUnixAddr error:", err)
110 conn, err := net.DialUnix("unixpacket", nil, addr)
112 Log.Debugln("Dial error:", err)
117 c.reader = bufio.NewReader(c.conn)
119 Log.Debugf("Connected to socket: %v", addr)
124 func (c *vppClient) open() error {
125 msgCodec := new(codec.MsgCodec)
127 req := &memclnt.SockclntCreate{
128 Name: []byte(govppClientName),
130 msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId)
132 Log.Debugln("Encode error:", err)
138 if err := c.write(msg); err != nil {
139 Log.Debugln("Write error: ", err)
143 readDeadline := time.Now().Add(ConnectTimeout)
144 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
147 msgReply, err := c.read()
149 Log.Println("Read error:", err)
152 // reset read deadline
153 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
157 //log.Printf("Client got (%d): % 0X", len(msgReply), msgReply)
159 reply := new(memclnt.SockclntCreateReply)
160 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
161 Log.Println("Decode error:", err)
165 Log.Debugf("SockclntCreateReply: Response=%v Index=%v Count=%v",
166 reply.Response, reply.Index, reply.Count)
168 c.clientIndex = reply.Index
169 c.msgTable = make(map[string]uint16, reply.Count)
170 for _, x := range reply.MessageTable {
171 name := string(bytes.TrimSuffix(bytes.Split(x.Name, []byte{0x00})[0], []byte{0x13}))
172 c.msgTable[name] = x.Index
173 if strings.HasPrefix(name, "sockclnt_delete_") {
174 c.sockDelMsgId = x.Index
177 Log.Debugf(" - %4d: %q", x.Index, name)
184 func (c *vppClient) Disconnect() error {
188 Log.Debugf("Disconnecting..")
192 // force readerLoop to timeout
193 if err := c.conn.SetReadDeadline(time.Now()); err != nil {
197 // wait for readerLoop to return
200 if err := c.close(); err != nil {
204 if err := c.conn.Close(); err != nil {
205 Log.Debugln("Close socket conn failed:", err)
212 func (c *vppClient) close() error {
213 msgCodec := new(codec.MsgCodec)
215 req := &memclnt.SockclntDelete{
216 Index: c.clientIndex,
218 msg, err := msgCodec.EncodeMsg(req, c.sockDelMsgId)
220 Log.Debugln("Encode error:", err)
226 Log.Debugf("sending socklntDel (%d byes): % 0X\n", len(msg), msg)
227 if err := c.write(msg); err != nil {
228 Log.Debugln("Write error: ", err)
232 readDeadline := time.Now().Add(DisconnectTimeout)
233 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
236 msgReply, err := c.read()
238 Log.Debugln("Read error:", err)
239 if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
240 // we accept timeout for reply
245 // reset read deadline
246 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
250 reply := new(memclnt.SockclntDeleteReply)
251 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
252 Log.Debugln("Decode error:", err)
256 Log.Debugf("SockclntDeleteReply: Response=%v", reply.Response)
261 func (c *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
262 msg := msgName + "_" + msgCrc
263 msgID, ok := c.msgTable[msg]
265 return 0, fmt.Errorf("unknown message: %q", msg)
270 type reqHeader struct {
276 func (c *vppClient) SendMsg(context uint32, data []byte) error {
278 ClientIndex: c.clientIndex,
281 buf := new(bytes.Buffer)
282 if err := struc.Pack(buf, h); err != nil {
285 copy(data[2:], buf.Bytes())
287 Log.Debugf("SendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
289 if err := c.write(data); err != nil {
290 Log.Debugln("write error: ", err)
297 func (c *vppClient) write(msg []byte) error {
299 Data_len: uint32(len(msg)),
301 buf := new(bytes.Buffer)
302 if err := struc.Pack(buf, h); err != nil {
305 header := buf.Bytes()
307 // we lock to prevent mixing multiple message sends
309 defer c.writeMu.Unlock()
311 var w io.Writer = c.conn
313 if n, err := w.Write(header); err != nil {
316 Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
318 if n, err := w.Write(msg); err != nil {
321 Log.Debugf(" - msg sent (%d/%d): % 0X", n, len(msg), msg)
327 type msgHeader struct {
332 func (c *vppClient) readerLoop() {
337 Log.Debugf("reader quit")
344 if isClosedError(err) {
347 Log.Debugf("READ FAILED: %v", err)
351 if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
352 Log.Debugf("unpacking header failed: %v", err)
356 Log.Debugf("recvMsg (%d) msgID=%d context=%v", len(msg), h.MsgID, h.Context)
361 type msgheader struct {
362 Q int `struc:"uint64"`
363 Data_len uint32 `struc:"uint32"`
364 Gc_mark_timestamp uint32 `struc:"uint32"`
368 func (c *vppClient) read() ([]byte, error) {
369 Log.Debug("reading next msg..")
371 header := make([]byte, 16)
373 n, err := io.ReadAtLeast(c.reader, header, 16)
377 Log.Debugln("zero bytes header")
381 Log.Debug("invalid header data (%d): % 0X", n, header[:n])
382 return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
384 Log.Debugf(" - read header %d bytes: % 0X", n, header)
387 if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
390 Log.Debugf(" - decoded header: %+v", h)
392 msgLen := int(h.Data_len)
393 msg := make([]byte, msgLen)
395 n, err = c.reader.Read(msg)
399 Log.Debugf(" - read msg %d bytes (%d buffered)", n, c.reader.Buffered())
403 Log.Debugf("continue read for another %d bytes", remain)
408 nbytes, err := c.reader.Read(view)
411 } else if nbytes == 0 {
412 return nil, fmt.Errorf("zero nbytes")
416 Log.Debugf("another data received: %d bytes (remain: %d)", nbytes, remain)
425 func isClosedError(err error) bool {
429 return strings.HasSuffix(err.Error(), "use of closed network connection")