api: clean up use of deprecated flag
[vpp.git] / test / util.py
1 """ test framework utilities """
2
3 import abc
4 import ipaddress
5 import logging
6 import socket
7 from socket import AF_INET6
8 import six
9 import sys
10 import os.path
11
12 import scapy.compat
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP
15 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
16     IPv6ExtHdrHopByHop
17 from scapy.packet import Raw
18 from scapy.utils import hexdump
19 from scapy.utils6 import in6_mactoifaceid
20
21 from io import BytesIO
22 from vpp_papi import mac_pton
23
24 # Set up an empty logger for the testcase that can be overridden as necessary
25 null_logger = logging.getLogger('VppTestCase.util')
26 null_logger.addHandler(logging.NullHandler())
27
28
29 def ppp(headline, packet):
30     """ Return string containing the output of scapy packet.show() call. """
31     return '%s\n%s\n\n%s\n' % (headline,
32                                hexdump(packet, dump=True),
33                                packet.show(dump=True))
34
35
36 def ppc(headline, capture, limit=10):
37     """ Return string containing ppp() printout for a capture.
38
39     :param headline: printed as first line of output
40     :param capture: packets to print
41     :param limit: limit the print to # of packets
42     """
43     if not capture:
44         return headline
45     tail = ""
46     if limit < len(capture):
47         tail = "\nPrint limit reached, %s out of %s packets printed" % (
48             limit, len(capture))
49     body = "".join([ppp("Packet #%s:" % count, p)
50                     for count, p in zip(range(0, limit), capture)])
51     return "%s\n%s%s" % (headline, body, tail)
52
53
54 def ip4_range(ip4, s, e):
55     tmp = ip4.rsplit('.', 1)[0]
56     return ("%s.%d" % (tmp, i) for i in range(s, e))
57
58
59 def mcast_ip_to_mac(ip):
60     ip = ipaddress.ip_address(ip)
61     if not ip.is_multicast:
62         raise ValueError("Must be multicast address.")
63     ip_as_int = int(ip)
64     if ip.version == 4:
65         mcast_mac = "01:00:5e:%02x:%02x:%02x" % ((ip_as_int >> 16) & 0x7f,
66                                                  (ip_as_int >> 8) & 0xff,
67                                                  ip_as_int & 0xff)
68     else:
69         mcast_mac = "33:33:%02x:%02x:%02x:%02x" % ((ip_as_int >> 24) & 0xff,
70                                                    (ip_as_int >> 16) & 0xff,
71                                                    (ip_as_int >> 8) & 0xff,
72                                                    ip_as_int & 0xff)
73     return mcast_mac
74
75
76 # wrapper around scapy library function.
77 def mk_ll_addr(mac):
78     euid = in6_mactoifaceid(str(mac))
79     addr = "fe80::" + euid
80     return addr
81
82
83 def ip6_normalize(ip6):
84     return socket.inet_ntop(socket.AF_INET6,
85                             socket.inet_pton(socket.AF_INET6, ip6))
86
87
88 def get_core_path(tempdir):
89     return "%s/%s" % (tempdir, get_core_pattern())
90
91
92 def is_core_present(tempdir):
93     return os.path.isfile(get_core_path(tempdir))
94
95
96 def get_core_pattern():
97     with open("/proc/sys/kernel/core_pattern", "r") as f:
98         corefmt = f.read().strip()
99     return corefmt
100
101
102 def check_core_path(logger, core_path):
103     corefmt = get_core_pattern()
104     if corefmt.startswith("|"):
105         logger.error(
106             "WARNING: redirecting the core dump through a"
107             " filter may result in truncated dumps.")
108         logger.error(
109             "   You may want to check the filter settings"
110             " or uninstall it and edit the"
111             " /proc/sys/kernel/core_pattern accordingly.")
112         logger.error(
113             "   current core pattern is: %s" % corefmt)
114
115
116 class NumericConstant(object):
117
118     desc_dict = {}
119
120     def __init__(self, value):
121         self._value = value
122
123     def __int__(self):
124         return self._value
125
126     def __long__(self):
127         return self._value
128
129     def __str__(self):
130         if self._value in self.desc_dict:
131             return self.desc_dict[self._value]
132         return ""
133
134
135 class Host(object):
136     """ Generic test host "connected" to VPPs interface. """
137
138     @property
139     def mac(self):
140         """ MAC address """
141         return self._mac
142
143     @property
144     def bin_mac(self):
145         """ MAC address """
146         return mac_pton(self._mac)
147
148     @property
149     def ip4(self):
150         """ IPv4 address - string """
151         return self._ip4
152
153     @property
154     def ip4n(self):
155         """ IPv4 address of remote host - raw, suitable as API parameter."""
156         return socket.inet_pton(socket.AF_INET, self._ip4)
157
158     @property
159     def ip6(self):
160         """ IPv6 address - string """
161         return self._ip6
162
163     @property
164     def ip6n(self):
165         """ IPv6 address of remote host - raw, suitable as API parameter."""
166         return socket.inet_pton(socket.AF_INET6, self._ip6)
167
168     @property
169     def ip6_ll(self):
170         """ IPv6 link-local address - string """
171         return self._ip6_ll
172
173     @property
174     def ip6n_ll(self):
175         """ IPv6 link-local address of remote host -
176         raw, suitable as API parameter."""
177         return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
178
179     def __eq__(self, other):
180         if isinstance(other, Host):
181             return (self.mac == other.mac and
182                     self.ip4 == other.ip4 and
183                     self.ip6 == other.ip6 and
184                     self.ip6_ll == other.ip6_ll)
185         else:
186             return False
187
188     def __ne__(self, other):
189         return not self.__eq__(other)
190
191     def __repr__(self):
192         return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
193                                                             self.ip4,
194                                                             self.ip6,
195                                                             self.ip6_ll)
196
197     def __hash__(self):
198         return hash(self.__repr__())
199
200     def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
201         self._mac = mac
202         self._ip4 = ip4
203         self._ip6 = ip6
204         self._ip6_ll = ip6_ll
205
206
207 class ForeignAddressFactory(object):
208     count = 0
209     prefix_len = 24
210     net_template = '10.10.10.{}'
211     net = net_template.format(0) + '/' + str(prefix_len)
212
213     def get_ip4(self):
214         if self.count > 255:
215             raise Exception("Network host address exhaustion")
216         self.count += 1
217         return self.net_template.format(self.count)
218
219
220 class L4_Conn():
221     """ L4 'connection' tied to two VPP interfaces """
222
223     def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
224         self.testcase = testcase
225         self.ifs = [None, None]
226         self.ifs[0] = if1
227         self.ifs[1] = if2
228         self.address_family = af
229         self.l4proto = l4proto
230         self.ports = [None, None]
231         self.ports[0] = port1
232         self.ports[1] = port2
233         self
234
235     def pkt(self, side, l4args={}, payload="x"):
236         is_ip6 = 1 if self.address_family == AF_INET6 else 0
237         s0 = side
238         s1 = 1 - side
239         src_if = self.ifs[s0]
240         dst_if = self.ifs[s1]
241         layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
242                    IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
243         merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
244         merged_l4args.update(l4args)
245         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
246              layer_3[is_ip6] /
247              self.l4proto(**merged_l4args) /
248              Raw(payload))
249         return p
250
251     def send(self, side, flags=None, payload=""):
252         l4args = {}
253         if flags is not None:
254             l4args['flags'] = flags
255         self.ifs[side].add_stream(self.pkt(side,
256                                            l4args=l4args, payload=payload))
257         self.ifs[1 - side].enable_capture()
258         self.testcase.pg_start()
259
260     def recv(self, side):
261         p = self.ifs[side].wait_for_packet(1)
262         return p
263
264     def send_through(self, side, flags=None, payload=""):
265         self.send(side, flags, payload)
266         p = self.recv(1 - side)
267         return p
268
269     def send_pingpong(self, side, flags1=None, flags2=None):
270         p1 = self.send_through(side, flags1)
271         p2 = self.send_through(1 - side, flags2)
272         return [p1, p2]
273
274
275 class L4_CONN_SIDE:
276     L4_CONN_SIDE_ZERO = 0
277     L4_CONN_SIDE_ONE = 1
278
279
280 def fragment_rfc791(packet, fragsize, logger=null_logger):
281     """
282     Fragment an IPv4 packet per RFC 791
283     :param packet: packet to fragment
284     :param fragsize: size at which to fragment
285     :note: IP options are not supported
286     :returns: list of fragments
287     """
288     logger.debug(ppp("Fragmenting packet:", packet))
289     packet = packet.__class__(scapy.compat.raw(packet))  # recalc. all values
290     if len(packet[IP].options) > 0:
291         raise Exception("Not implemented")
292     if len(packet) <= fragsize:
293         return [packet]
294
295     pre_ip_len = len(packet) - len(packet[IP])
296     ip_header_len = packet[IP].ihl * 4
297     hex_packet = scapy.compat.raw(packet)
298     hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
299     hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
300
301     pkts = []
302     ihl = packet[IP].ihl
303     otl = len(packet[IP])
304     nfb = int((fragsize - pre_ip_len - ihl * 4) / 8)
305     fo = packet[IP].frag
306
307     p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
308     p[IP].flags = "MF"
309     p[IP].frag = fo
310     p[IP].len = ihl * 4 + nfb * 8
311     del p[IP].chksum
312     pkts.append(p)
313
314     p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
315     p[IP].len = otl - nfb * 8
316     p[IP].frag = fo + nfb
317     del p[IP].chksum
318
319     more_fragments = fragment_rfc791(p, fragsize, logger)
320     pkts.extend(more_fragments)
321
322     return pkts
323
324
325 def fragment_rfc8200(packet, identification, fragsize, logger=null_logger):
326     """
327     Fragment an IPv6 packet per RFC 8200
328     :param packet: packet to fragment
329     :param fragsize: size at which to fragment
330     :note: IP options are not supported
331     :returns: list of fragments
332     """
333     packet = packet.__class__(scapy.compat.raw(packet))  # recalc. all values
334     if len(packet) <= fragsize:
335         return [packet]
336     logger.debug(ppp("Fragmenting packet:", packet))
337     pkts = []
338     counter = 0
339     routing_hdr = None
340     hop_by_hop_hdr = None
341     upper_layer = None
342     seen_ipv6 = False
343     ipv6_nr = -1
344     l = packet.getlayer(counter)
345     while l is not None:
346         if l.__class__ is IPv6:
347             if seen_ipv6:
348                 # ignore 2nd IPv6 header and everything below..
349                 break
350             ipv6_nr = counter
351             seen_ipv6 = True
352         elif l.__class__ is IPv6ExtHdrFragment:
353             raise Exception("Already fragmented")
354         elif l.__class__ is IPv6ExtHdrRouting:
355             routing_hdr = counter
356         elif l.__class__ is IPv6ExtHdrHopByHop:
357             hop_by_hop_hdr = counter
358         elif seen_ipv6 and not upper_layer and \
359                 not l.__class__.__name__.startswith('IPv6ExtHdr'):
360             upper_layer = counter
361         counter = counter + 1
362         l = packet.getlayer(counter)
363
364     logger.debug(
365         "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
366         (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
367
368     if upper_layer is None:
369         raise Exception("Upper layer header not found in IPv6 packet")
370
371     last_per_fragment_hdr = ipv6_nr
372     if routing_hdr is None:
373         if hop_by_hop_hdr is not None:
374             last_per_fragment_hdr = hop_by_hop_hdr
375     else:
376         last_per_fragment_hdr = routing_hdr
377     logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
378
379     per_fragment_headers = packet.copy()
380     per_fragment_headers[last_per_fragment_hdr].remove_payload()
381     logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
382
383     ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
384     hex_payload = scapy.compat.raw(ext_and_upper_layer)
385     logger.debug("Payload length is %s" % len(hex_payload))
386     logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
387
388     fragment_ext_hdr = IPv6ExtHdrFragment()
389     logger.debug(ppp("Fragment header:", fragment_ext_hdr))
390
391     len_ext_and_upper_layer_payload = len(ext_and_upper_layer.payload)
392     if not len_ext_and_upper_layer_payload and \
393        hasattr(ext_and_upper_layer, "data"):
394         len_ext_and_upper_layer_payload = len(ext_and_upper_layer.data)
395
396     if len(per_fragment_headers) + len(fragment_ext_hdr) +\
397             len(ext_and_upper_layer) - len_ext_and_upper_layer_payload\
398             > fragsize:
399         raise Exception("Cannot fragment this packet - MTU too small "
400                         "(%s, %s, %s, %s, %s)" % (
401                             len(per_fragment_headers), len(fragment_ext_hdr),
402                             len(ext_and_upper_layer),
403                             len_ext_and_upper_layer_payload, fragsize))
404
405     orig_nh = packet[IPv6].nh
406     p = per_fragment_headers
407     del p[IPv6].plen
408     del p[IPv6].nh
409     p = p / fragment_ext_hdr
410     del p[IPv6ExtHdrFragment].nh
411     first_payload_len_nfb = int((fragsize - len(p)) / 8)
412     p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
413     del p[IPv6].plen
414     p[IPv6ExtHdrFragment].nh = orig_nh
415     p[IPv6ExtHdrFragment].id = identification
416     p[IPv6ExtHdrFragment].offset = 0
417     p[IPv6ExtHdrFragment].m = 1
418     p = p.__class__(scapy.compat.raw(p))
419     logger.debug(ppp("Fragment %s:" % len(pkts), p))
420     pkts.append(p)
421     offset = first_payload_len_nfb * 8
422     logger.debug("Offset after first fragment: %s" % offset)
423     while len(hex_payload) > offset:
424         p = per_fragment_headers
425         del p[IPv6].plen
426         del p[IPv6].nh
427         p = p / fragment_ext_hdr
428         del p[IPv6ExtHdrFragment].nh
429         l_nfb = int((fragsize - len(p)) / 8)
430         p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
431         p[IPv6ExtHdrFragment].nh = orig_nh
432         p[IPv6ExtHdrFragment].id = identification
433         p[IPv6ExtHdrFragment].offset = int(offset / 8)
434         p[IPv6ExtHdrFragment].m = 1
435         p = p.__class__(scapy.compat.raw(p))
436         logger.debug(ppp("Fragment %s:" % len(pkts), p))
437         pkts.append(p)
438         offset = offset + l_nfb * 8
439
440     pkts[-1][IPv6ExtHdrFragment].m = 0  # reset more-flags in last fragment
441
442     return pkts
443
444
445 def reassemble4_core(listoffragments, return_ip):
446     buffer = BytesIO()
447     first = listoffragments[0]
448     buffer.seek(20)
449     for pkt in listoffragments:
450         buffer.seek(pkt[IP].frag*8)
451         buffer.write(bytes(pkt[IP].payload))
452     first.len = len(buffer.getvalue()) + 20
453     first.flags = 0
454     del(first.chksum)
455     if return_ip:
456         header = bytes(first[IP])[:20]
457         return first[IP].__class__(header + buffer.getvalue())
458     else:
459         header = bytes(first[Ether])[:34]
460         return first[Ether].__class__(header + buffer.getvalue())
461
462
463 def reassemble4_ether(listoffragments):
464     return reassemble4_core(listoffragments, False)
465
466
467 def reassemble4(listoffragments):
468     return reassemble4_core(listoffragments, True)