FIB Interpose Source
[vpp.git] / test / vpp_punt_socket.py
1 from socket import socket, AF_UNIX, SOCK_DGRAM
2 from select import select
3 from time import time
4 from struct import unpack, calcsize
5 from util import ppc
6 from scapy.layers.l2 import Ether
7
8 client_uds_socket_name = "client-uds-socket"
9 vpp_uds_socket_name = "vpp-uds-socket"
10
11 VPP_PUNT_HEADER_FMT = '=Ii'
12 VPP_PUNT_HEADER_SIZE = calcsize(VPP_PUNT_HEADER_FMT)
13
14
15 class VppPuntAction:
16     PUNT_L2 = 0
17     PUNT_IP4_ROUTED = 1
18     PUNT_IP6_ROUTED = 2
19
20
21 class VppUDSPuntSocket(object):
22     def __init__(self, testcase, port, is_ip4=1, l4_protocol=0x11):
23         client_path = '%s/%s-%s-%s' % (testcase.tempdir,
24                                        client_uds_socket_name,
25                                        "4" if is_ip4 else "6", port)
26         testcase.vapi.punt_socket_register(
27             port, client_path, is_ip4=is_ip4, l4_protocol=l4_protocol)
28         self.testcase = testcase
29         self.uds = socket(AF_UNIX, SOCK_DGRAM)
30         self.uds.bind(client_path)
31         self.uds.connect(testcase.punt_socket_path)
32
33     def wait_for_packets(self, count, timeout=1):
34         packets = []
35         now = time()
36         deadline = now + timeout
37         while len(packets) < count and now < deadline:
38             r, w, e = select([self.uds], [], [self.uds], deadline - now)
39             if self.uds in r:
40                 x = self.uds.recv(1024 * 1024)
41                 sw_if_index, punt_action = unpack(
42                     VPP_PUNT_HEADER_FMT, x[:VPP_PUNT_HEADER_SIZE])
43                 packets.append({'sw_if_index': sw_if_index,
44                                 'punt_action': punt_action,
45                                 'packet': x[VPP_PUNT_HEADER_SIZE:]})
46
47             if self.uds in e:
48                 raise Exception("select() indicates error on UDS socket")
49             now = time()
50
51         if len(packets) != count:
52             raise Exception("Unexpected packet count received, got %s packets,"
53                             " expected %s packets" % (len(packets), count))
54         self.testcase.logger.debug(
55             "Got %s packets via punt socket" % len(packets))
56         return packets
57
58     def assert_nothing_captured(self, timeout=.25):
59         packets = []
60         now = time()
61         deadline = now + timeout
62         while now < deadline:
63             r, w, e = select([self.uds], [], [self.uds], deadline - now)
64             if self.uds in r:
65                 x = self.uds.recv(1024 * 1024)
66                 packets.append(Ether(x[VPP_PUNT_HEADER_SIZE:]))
67             if self.uds in e:
68                 raise Exception("select() indicates error on UDS socket")
69             now = time()
70
71         if len(packets) > 0:
72             self.testcase.logger.error(
73                 ppc("Unexpected packets captured:", packets))
74             raise Exception("Unexpected packet count received, got %s packets,"
75                             " expected no packets" % len(packets))