tests: remove py2/py3 six compatability library
[vpp.git] / test / util.py
1 """ test framework utilities """
2
3 import abc
4 import ipaddress
5 import logging
6 import socket
7 from socket import AF_INET6
8 import sys
9 import os.path
10
11 import scapy.compat
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import IP
14 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
15     IPv6ExtHdrHopByHop
16 from scapy.packet import Raw
17 from scapy.utils import hexdump
18 from scapy.utils6 import in6_mactoifaceid
19
20 from io import BytesIO
21 from vpp_papi import mac_pton
22
23 # Set up an empty logger for the testcase that can be overridden as necessary
24 null_logger = logging.getLogger('VppTestCase.util')
25 null_logger.addHandler(logging.NullHandler())
26
27
28 def ppp(headline, packet):
29     """ Return string containing the output of scapy packet.show() call. """
30     return '%s\n%s\n\n%s\n' % (headline,
31                                hexdump(packet, dump=True),
32                                packet.show(dump=True))
33
34
35 def ppc(headline, capture, limit=10):
36     """ Return string containing ppp() printout for a capture.
37
38     :param headline: printed as first line of output
39     :param capture: packets to print
40     :param limit: limit the print to # of packets
41     """
42     if not capture:
43         return headline
44     tail = ""
45     if limit < len(capture):
46         tail = "\nPrint limit reached, %s out of %s packets printed" % (
47             limit, len(capture))
48     body = "".join([ppp("Packet #%s:" % count, p)
49                     for count, p in zip(range(0, limit), capture)])
50     return "%s\n%s%s" % (headline, body, tail)
51
52
53 def ip4_range(ip4, s, e):
54     tmp = ip4.rsplit('.', 1)[0]
55     return ("%s.%d" % (tmp, i) for i in range(s, e))
56
57
58 def mcast_ip_to_mac(ip):
59     ip = ipaddress.ip_address(ip)
60     if not ip.is_multicast:
61         raise ValueError("Must be multicast address.")
62     ip_as_int = int(ip)
63     if ip.version == 4:
64         mcast_mac = "01:00:5e:%02x:%02x:%02x" % ((ip_as_int >> 16) & 0x7f,
65                                                  (ip_as_int >> 8) & 0xff,
66                                                  ip_as_int & 0xff)
67     else:
68         mcast_mac = "33:33:%02x:%02x:%02x:%02x" % ((ip_as_int >> 24) & 0xff,
69                                                    (ip_as_int >> 16) & 0xff,
70                                                    (ip_as_int >> 8) & 0xff,
71                                                    ip_as_int & 0xff)
72     return mcast_mac
73
74
75 # wrapper around scapy library function.
76 def mk_ll_addr(mac):
77     euid = in6_mactoifaceid(str(mac))
78     addr = "fe80::" + euid
79     return addr
80
81
82 def ip6_normalize(ip6):
83     return socket.inet_ntop(socket.AF_INET6,
84                             socket.inet_pton(socket.AF_INET6, ip6))
85
86
87 def get_core_path(tempdir):
88     return "%s/%s" % (tempdir, get_core_pattern())
89
90
91 def is_core_present(tempdir):
92     return os.path.isfile(get_core_path(tempdir))
93
94
95 def get_core_pattern():
96     with open("/proc/sys/kernel/core_pattern", "r") as f:
97         corefmt = f.read().strip()
98     return corefmt
99
100
101 def check_core_path(logger, core_path):
102     corefmt = get_core_pattern()
103     if corefmt.startswith("|"):
104         logger.error(
105             "WARNING: redirecting the core dump through a"
106             " filter may result in truncated dumps.")
107         logger.error(
108             "   You may want to check the filter settings"
109             " or uninstall it and edit the"
110             " /proc/sys/kernel/core_pattern accordingly.")
111         logger.error(
112             "   current core pattern is: %s" % corefmt)
113
114
115 class NumericConstant(object):
116
117     desc_dict = {}
118
119     def __init__(self, value):
120         self._value = value
121
122     def __int__(self):
123         return self._value
124
125     def __long__(self):
126         return self._value
127
128     def __str__(self):
129         if self._value in self.desc_dict:
130             return self.desc_dict[self._value]
131         return ""
132
133
134 class Host(object):
135     """ Generic test host "connected" to VPPs interface. """
136
137     @property
138     def mac(self):
139         """ MAC address """
140         return self._mac
141
142     @property
143     def bin_mac(self):
144         """ MAC address """
145         return mac_pton(self._mac)
146
147     @property
148     def ip4(self):
149         """ IPv4 address - string """
150         return self._ip4
151
152     @property
153     def ip4n(self):
154         """ IPv4 address of remote host - raw, suitable as API parameter."""
155         return socket.inet_pton(socket.AF_INET, self._ip4)
156
157     @property
158     def ip6(self):
159         """ IPv6 address - string """
160         return self._ip6
161
162     @property
163     def ip6n(self):
164         """ IPv6 address of remote host - raw, suitable as API parameter."""
165         return socket.inet_pton(socket.AF_INET6, self._ip6)
166
167     @property
168     def ip6_ll(self):
169         """ IPv6 link-local address - string """
170         return self._ip6_ll
171
172     @property
173     def ip6n_ll(self):
174         """ IPv6 link-local address of remote host -
175         raw, suitable as API parameter."""
176         return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
177
178     def __eq__(self, other):
179         if isinstance(other, Host):
180             return (self.mac == other.mac and
181                     self.ip4 == other.ip4 and
182                     self.ip6 == other.ip6 and
183                     self.ip6_ll == other.ip6_ll)
184         else:
185             return False
186
187     def __ne__(self, other):
188         return not self.__eq__(other)
189
190     def __repr__(self):
191         return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
192                                                             self.ip4,
193                                                             self.ip6,
194                                                             self.ip6_ll)
195
196     def __hash__(self):
197         return hash(self.__repr__())
198
199     def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
200         self._mac = mac
201         self._ip4 = ip4
202         self._ip6 = ip6
203         self._ip6_ll = ip6_ll
204
205
206 class ForeignAddressFactory(object):
207     count = 0
208     prefix_len = 24
209     net_template = '10.10.10.{}'
210     net = net_template.format(0) + '/' + str(prefix_len)
211
212     def get_ip4(self):
213         if self.count > 255:
214             raise Exception("Network host address exhaustion")
215         self.count += 1
216         return self.net_template.format(self.count)
217
218
219 class L4_Conn():
220     """ L4 'connection' tied to two VPP interfaces """
221
222     def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
223         self.testcase = testcase
224         self.ifs = [None, None]
225         self.ifs[0] = if1
226         self.ifs[1] = if2
227         self.address_family = af
228         self.l4proto = l4proto
229         self.ports = [None, None]
230         self.ports[0] = port1
231         self.ports[1] = port2
232         self
233
234     def pkt(self, side, l4args={}, payload="x"):
235         is_ip6 = 1 if self.address_family == AF_INET6 else 0
236         s0 = side
237         s1 = 1 - side
238         src_if = self.ifs[s0]
239         dst_if = self.ifs[s1]
240         layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
241                    IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
242         merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
243         merged_l4args.update(l4args)
244         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
245              layer_3[is_ip6] /
246              self.l4proto(**merged_l4args) /
247              Raw(payload))
248         return p
249
250     def send(self, side, flags=None, payload=""):
251         l4args = {}
252         if flags is not None:
253             l4args['flags'] = flags
254         self.ifs[side].add_stream(self.pkt(side,
255                                            l4args=l4args, payload=payload))
256         self.ifs[1 - side].enable_capture()
257         self.testcase.pg_start()
258
259     def recv(self, side):
260         p = self.ifs[side].wait_for_packet(1)
261         return p
262
263     def send_through(self, side, flags=None, payload=""):
264         self.send(side, flags, payload)
265         p = self.recv(1 - side)
266         return p
267
268     def send_pingpong(self, side, flags1=None, flags2=None):
269         p1 = self.send_through(side, flags1)
270         p2 = self.send_through(1 - side, flags2)
271         return [p1, p2]
272
273
274 class L4_CONN_SIDE:
275     L4_CONN_SIDE_ZERO = 0
276     L4_CONN_SIDE_ONE = 1
277
278
279 def fragment_rfc791(packet, fragsize, logger=null_logger):
280     """
281     Fragment an IPv4 packet per RFC 791
282     :param packet: packet to fragment
283     :param fragsize: size at which to fragment
284     :note: IP options are not supported
285     :returns: list of fragments
286     """
287     logger.debug(ppp("Fragmenting packet:", packet))
288     packet = packet.__class__(scapy.compat.raw(packet))  # recalc. all values
289     if len(packet[IP].options) > 0:
290         raise Exception("Not implemented")
291     if len(packet) <= fragsize:
292         return [packet]
293
294     pre_ip_len = len(packet) - len(packet[IP])
295     ip_header_len = packet[IP].ihl * 4
296     hex_packet = scapy.compat.raw(packet)
297     hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
298     hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
299
300     pkts = []
301     ihl = packet[IP].ihl
302     otl = len(packet[IP])
303     nfb = int((fragsize - pre_ip_len - ihl * 4) / 8)
304     fo = packet[IP].frag
305
306     p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
307     p[IP].flags = "MF"
308     p[IP].frag = fo
309     p[IP].len = ihl * 4 + nfb * 8
310     del p[IP].chksum
311     pkts.append(p)
312
313     p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
314     p[IP].len = otl - nfb * 8
315     p[IP].frag = fo + nfb
316     del p[IP].chksum
317
318     more_fragments = fragment_rfc791(p, fragsize, logger)
319     pkts.extend(more_fragments)
320
321     return pkts
322
323
324 def fragment_rfc8200(packet, identification, fragsize, logger=null_logger):
325     """
326     Fragment an IPv6 packet per RFC 8200
327     :param packet: packet to fragment
328     :param fragsize: size at which to fragment
329     :note: IP options are not supported
330     :returns: list of fragments
331     """
332     packet = packet.__class__(scapy.compat.raw(packet))  # recalc. all values
333     if len(packet) <= fragsize:
334         return [packet]
335     logger.debug(ppp("Fragmenting packet:", packet))
336     pkts = []
337     counter = 0
338     routing_hdr = None
339     hop_by_hop_hdr = None
340     upper_layer = None
341     seen_ipv6 = False
342     ipv6_nr = -1
343     l = packet.getlayer(counter)
344     while l is not None:
345         if l.__class__ is IPv6:
346             if seen_ipv6:
347                 # ignore 2nd IPv6 header and everything below..
348                 break
349             ipv6_nr = counter
350             seen_ipv6 = True
351         elif l.__class__ is IPv6ExtHdrFragment:
352             raise Exception("Already fragmented")
353         elif l.__class__ is IPv6ExtHdrRouting:
354             routing_hdr = counter
355         elif l.__class__ is IPv6ExtHdrHopByHop:
356             hop_by_hop_hdr = counter
357         elif seen_ipv6 and not upper_layer and \
358                 not l.__class__.__name__.startswith('IPv6ExtHdr'):
359             upper_layer = counter
360         counter = counter + 1
361         l = packet.getlayer(counter)
362
363     logger.debug(
364         "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
365         (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
366
367     if upper_layer is None:
368         raise Exception("Upper layer header not found in IPv6 packet")
369
370     last_per_fragment_hdr = ipv6_nr
371     if routing_hdr is None:
372         if hop_by_hop_hdr is not None:
373             last_per_fragment_hdr = hop_by_hop_hdr
374     else:
375         last_per_fragment_hdr = routing_hdr
376     logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
377
378     per_fragment_headers = packet.copy()
379     per_fragment_headers[last_per_fragment_hdr].remove_payload()
380     logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
381
382     ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
383     hex_payload = scapy.compat.raw(ext_and_upper_layer)
384     logger.debug("Payload length is %s" % len(hex_payload))
385     logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
386
387     fragment_ext_hdr = IPv6ExtHdrFragment()
388     logger.debug(ppp("Fragment header:", fragment_ext_hdr))
389
390     len_ext_and_upper_layer_payload = len(ext_and_upper_layer.payload)
391     if not len_ext_and_upper_layer_payload and \
392        hasattr(ext_and_upper_layer, "data"):
393         len_ext_and_upper_layer_payload = len(ext_and_upper_layer.data)
394
395     if len(per_fragment_headers) + len(fragment_ext_hdr) +\
396             len(ext_and_upper_layer) - len_ext_and_upper_layer_payload\
397             > fragsize:
398         raise Exception("Cannot fragment this packet - MTU too small "
399                         "(%s, %s, %s, %s, %s)" % (
400                             len(per_fragment_headers), len(fragment_ext_hdr),
401                             len(ext_and_upper_layer),
402                             len_ext_and_upper_layer_payload, fragsize))
403
404     orig_nh = packet[IPv6].nh
405     p = per_fragment_headers
406     del p[IPv6].plen
407     del p[IPv6].nh
408     p = p / fragment_ext_hdr
409     del p[IPv6ExtHdrFragment].nh
410     first_payload_len_nfb = int((fragsize - len(p)) / 8)
411     p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
412     del p[IPv6].plen
413     p[IPv6ExtHdrFragment].nh = orig_nh
414     p[IPv6ExtHdrFragment].id = identification
415     p[IPv6ExtHdrFragment].offset = 0
416     p[IPv6ExtHdrFragment].m = 1
417     p = p.__class__(scapy.compat.raw(p))
418     logger.debug(ppp("Fragment %s:" % len(pkts), p))
419     pkts.append(p)
420     offset = first_payload_len_nfb * 8
421     logger.debug("Offset after first fragment: %s" % offset)
422     while len(hex_payload) > offset:
423         p = per_fragment_headers
424         del p[IPv6].plen
425         del p[IPv6].nh
426         p = p / fragment_ext_hdr
427         del p[IPv6ExtHdrFragment].nh
428         l_nfb = int((fragsize - len(p)) / 8)
429         p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
430         p[IPv6ExtHdrFragment].nh = orig_nh
431         p[IPv6ExtHdrFragment].id = identification
432         p[IPv6ExtHdrFragment].offset = int(offset / 8)
433         p[IPv6ExtHdrFragment].m = 1
434         p = p.__class__(scapy.compat.raw(p))
435         logger.debug(ppp("Fragment %s:" % len(pkts), p))
436         pkts.append(p)
437         offset = offset + l_nfb * 8
438
439     pkts[-1][IPv6ExtHdrFragment].m = 0  # reset more-flags in last fragment
440
441     return pkts
442
443
444 def reassemble4_core(listoffragments, return_ip):
445     buffer = BytesIO()
446     first = listoffragments[0]
447     buffer.seek(20)
448     for pkt in listoffragments:
449         buffer.seek(pkt[IP].frag*8)
450         buffer.write(bytes(pkt[IP].payload))
451     first.len = len(buffer.getvalue()) + 20
452     first.flags = 0
453     del(first.chksum)
454     if return_ip:
455         header = bytes(first[IP])[:20]
456         return first[IP].__class__(header + buffer.getvalue())
457     else:
458         header = bytes(first[Ether])[:34]
459         return first[Ether].__class__(header + buffer.getvalue())
460
461
462 def reassemble4_ether(listoffragments):
463     return reassemble4_core(listoffragments, False)
464
465
466 def reassemble4(listoffragments):
467     return reassemble4_core(listoffragments, True)