test: factor out "L4_Conn" into a class within util.py (VPP-931)
[vpp.git] / test / util.py
1 """ test framework utilities """
2
3 import socket
4 import sys
5 from abc import abstractmethod, ABCMeta
6 from cStringIO import StringIO
7 from scapy.layers.inet6 import in6_mactoifaceid
8
9 from scapy.layers.l2 import Ether
10 from scapy.packet import Raw
11 from scapy.layers.inet import IP, UDP, TCP
12 from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
13 from scapy.packet import Packet
14 from socket import inet_pton, AF_INET, AF_INET6
15
16
17 def ppp(headline, packet):
18     """ Return string containing the output of scapy packet.show() call. """
19     o = StringIO()
20     old_stdout = sys.stdout
21     sys.stdout = o
22     print(headline)
23     packet.show()
24     sys.stdout = old_stdout
25     return o.getvalue()
26
27
28 def ppc(headline, capture, limit=10):
29     """ Return string containing ppp() printout for a capture.
30
31     :param headline: printed as first line of output
32     :param capture: packets to print
33     :param limit: limit the print to # of packets
34     """
35     if not capture:
36         return headline
37     tail = ""
38     if limit < len(capture):
39         tail = "\nPrint limit reached, %s out of %s packets printed" % (
40             len(capture), limit)
41         limit = len(capture)
42     body = "".join([ppp("Packet #%s:" % count, p)
43                     for count, p in zip(range(0, limit), capture)])
44     return "%s\n%s%s" % (headline, body, tail)
45
46
47 def ip4_range(ip4, s, e):
48     tmp = ip4.rsplit('.', 1)[0]
49     return ("%s.%d" % (tmp, i) for i in range(s, e))
50
51
52 def ip4n_range(ip4n, s, e):
53     ip4 = socket.inet_ntop(socket.AF_INET, ip4n)
54     return (socket.inet_pton(socket.AF_INET, ip)
55             for ip in ip4_range(ip4, s, e))
56
57
58 def mactobinary(mac):
59     """ Convert the : separated format into binary packet data for the API """
60     return mac.replace(':', '').decode('hex')
61
62
63 def mk_ll_addr(mac):
64     euid = in6_mactoifaceid(mac)
65     addr = "fe80::" + euid
66     return addr
67
68
69 class NumericConstant(object):
70     __metaclass__ = ABCMeta
71
72     desc_dict = {}
73
74     @abstractmethod
75     def __init__(self, value):
76         self._value = value
77
78     def __int__(self):
79         return self._value
80
81     def __long__(self):
82         return self._value
83
84     def __str__(self):
85         if self._value in self.desc_dict:
86             return self.desc_dict[self._value]
87         return ""
88
89
90 class Host(object):
91     """ Generic test host "connected" to VPPs interface. """
92
93     @property
94     def mac(self):
95         """ MAC address """
96         return self._mac
97
98     @property
99     def bin_mac(self):
100         """ MAC address """
101         return mactobinary(self._mac)
102
103     @property
104     def ip4(self):
105         """ IPv4 address - string """
106         return self._ip4
107
108     @property
109     def ip4n(self):
110         """ IPv4 address of remote host - raw, suitable as API parameter."""
111         return socket.inet_pton(socket.AF_INET, self._ip4)
112
113     @property
114     def ip6(self):
115         """ IPv6 address - string """
116         return self._ip6
117
118     @property
119     def ip6n(self):
120         """ IPv6 address of remote host - raw, suitable as API parameter."""
121         return socket.inet_pton(socket.AF_INET6, self._ip6)
122
123     @property
124     def ip6_ll(self):
125         """ IPv6 link-local address - string """
126         return self._ip6_ll
127
128     @property
129     def ip6n_ll(self):
130         """ IPv6 link-local address of remote host -
131         raw, suitable as API parameter."""
132         return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
133
134     def __eq__(self, other):
135         if isinstance(other, Host):
136             return (self.mac == other.mac and
137                     self.ip4 == other.ip4 and
138                     self.ip6 == other.ip6 and
139                     self.ip6_ll == other.ip6_ll)
140         else:
141             return False
142
143     def __ne__(self, other):
144         return not self.__eq__(other)
145
146     def __repr__(self):
147         return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
148                                                             self.ip4,
149                                                             self.ip6,
150                                                             self.ip6_ll)
151
152     def __hash__(self):
153         return hash(self.__repr__())
154
155     def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
156         self._mac = mac
157         self._ip4 = ip4
158         self._ip6 = ip6
159         self._ip6_ll = ip6_ll
160
161
162 class ForeignAddressFactory(object):
163     count = 0
164     prefix_len = 24
165     net_template = '10.10.10.{}'
166     net = net_template.format(0) + '/' + str(prefix_len)
167
168     def get_ip4(self):
169         if self.count > 255:
170             raise Exception("Network host address exhaustion")
171         self.count += 1
172         return self.net_template.format(self.count)
173
174
175 class L4_Conn():
176     """ L4 'connection' tied to two VPP interfaces """
177     def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
178         self.testcase = testcase
179         self.ifs = [None, None]
180         self.ifs[0] = if1
181         self.ifs[1] = if2
182         self.address_family = af
183         self.l4proto = l4proto
184         self.ports = [None, None]
185         self.ports[0] = port1
186         self.ports[1] = port2
187         self
188
189     def pkt(self, side, l4args={}, payload="x"):
190         is_ip6 = 1 if self.address_family == AF_INET6 else 0
191         s0 = side
192         s1 = 1-side
193         src_if = self.ifs[s0]
194         dst_if = self.ifs[s1]
195         layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
196                    IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
197         merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
198         merged_l4args.update(l4args)
199         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
200              layer_3[is_ip6] /
201              self.l4proto(**merged_l4args) /
202              Raw(payload))
203         return p
204
205     def send(self, side, flags=None, payload=""):
206         l4args = {}
207         if flags is not None:
208             l4args['flags'] = flags
209         self.ifs[side].add_stream(self.pkt(side,
210                                            l4args=l4args, payload=payload))
211         self.ifs[1-side].enable_capture()
212         self.testcase.pg_start()
213
214     def recv(self, side):
215         p = self.ifs[side].wait_for_packet(1)
216         return p
217
218     def send_through(self, side, flags=None, payload=""):
219         self.send(side, flags, payload)
220         p = self.recv(1-side)
221         return p
222
223     def send_pingpong(self, side, flags1=None, flags2=None):
224         p1 = self.send_through(side, flags1)
225         p2 = self.send_through(1-side, flags2)
226         return [p1, p2]
227
228
229 class L4_CONN_SIDE:
230     L4_CONN_SIDE_ZERO = 0
231     L4_CONN_SIDE_ONE = 1