1 """ test framework utilities """
6 from socket import AF_INET6
8 from copy import deepcopy
9 from collections import UserDict
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import IP
14 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
16 from scapy.packet import Raw
17 from scapy.utils import hexdump
18 from scapy.utils6 import in6_mactoifaceid
20 from io import BytesIO
21 from vpp_papi import mac_pton
23 # Set up an empty logger for the testcase that can be overridden as necessary
24 null_logger = logging.getLogger('VppTestCase.util')
25 null_logger.addHandler(logging.NullHandler())
29 return packet.__repr__()
32 def ppp(headline, packet):
33 """ Return string containing headline and output of scapy packet.show() """
34 return '%s\n%s\n\n%s\n' % (headline,
35 hexdump(packet, dump=True),
36 packet.show(dump=True))
39 def ppc(headline, capture, limit=10):
40 """ Return string containing ppp() printout for a capture.
42 :param headline: printed as first line of output
43 :param capture: packets to print
44 :param limit: limit the print to # of packets
49 if limit < len(capture):
50 tail = "\nPrint limit reached, %s out of %s packets printed" % (
52 body = "".join([ppp("Packet #%s:" % count, p)
53 for count, p in zip(range(0, limit), capture)])
54 return "%s\n%s%s" % (headline, body, tail)
57 def ip4_range(ip4, s, e):
58 tmp = ip4.rsplit('.', 1)[0]
59 return ("%s.%d" % (tmp, i) for i in range(s, e))
62 def mcast_ip_to_mac(ip):
63 ip = ipaddress.ip_address(ip)
64 if not ip.is_multicast:
65 raise ValueError("Must be multicast address.")
68 mcast_mac = "01:00:5e:%02x:%02x:%02x" % ((ip_as_int >> 16) & 0x7f,
69 (ip_as_int >> 8) & 0xff,
72 mcast_mac = "33:33:%02x:%02x:%02x:%02x" % ((ip_as_int >> 24) & 0xff,
73 (ip_as_int >> 16) & 0xff,
74 (ip_as_int >> 8) & 0xff,
79 # wrapper around scapy library function.
81 euid = in6_mactoifaceid(str(mac))
82 addr = "fe80::" + euid
86 def ip6_normalize(ip6):
87 return socket.inet_ntop(socket.AF_INET6,
88 socket.inet_pton(socket.AF_INET6, ip6))
91 def get_core_path(tempdir):
92 return "%s/%s" % (tempdir, get_core_pattern())
95 def is_core_present(tempdir):
96 return os.path.isfile(get_core_path(tempdir))
99 def get_core_pattern():
100 with open("/proc/sys/kernel/core_pattern", "r") as f:
101 corefmt = f.read().strip()
105 def check_core_path(logger, core_path):
106 corefmt = get_core_pattern()
107 if corefmt.startswith("|"):
109 "WARNING: redirecting the core dump through a"
110 " filter may result in truncated dumps.")
112 " You may want to check the filter settings"
113 " or uninstall it and edit the"
114 " /proc/sys/kernel/core_pattern accordingly.")
116 " current core pattern is: %s" % corefmt)
119 class NumericConstant:
123 def __init__(self, value):
133 if self._value in self.desc_dict:
134 return self.desc_dict[self._value]
139 """ Generic test host "connected" to VPPs interface. """
149 return mac_pton(self._mac)
153 """ IPv4 address - string """
158 """ IPv4 address of remote host - raw, suitable as API parameter."""
159 return socket.inet_pton(socket.AF_INET, self._ip4)
163 """ IPv6 address - string """
168 """ IPv6 address of remote host - raw, suitable as API parameter."""
169 return socket.inet_pton(socket.AF_INET6, self._ip6)
173 """ IPv6 link-local address - string """
178 """ IPv6 link-local address of remote host -
179 raw, suitable as API parameter."""
180 return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
182 def __eq__(self, other):
183 if isinstance(other, Host):
184 return (self.mac == other.mac and
185 self.ip4 == other.ip4 and
186 self.ip6 == other.ip6 and
187 self.ip6_ll == other.ip6_ll)
191 def __ne__(self, other):
192 return not self.__eq__(other)
195 return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
201 return hash(self.__repr__())
203 def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
207 self._ip6_ll = ip6_ll
211 """ L4 'connection' tied to two VPP interfaces """
213 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
214 self.testcase = testcase
215 self.ifs = [None, None]
218 self.address_family = af
219 self.l4proto = l4proto
220 self.ports = [None, None]
221 self.ports[0] = port1
222 self.ports[1] = port2
225 def pkt(self, side, l4args={}, payload="x"):
226 is_ip6 = 1 if self.address_family == AF_INET6 else 0
229 src_if = self.ifs[s0]
230 dst_if = self.ifs[s1]
231 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
232 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
233 merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
234 merged_l4args.update(l4args)
235 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
237 self.l4proto(**merged_l4args) /
241 def send(self, side, flags=None, payload=""):
243 if flags is not None:
244 l4args['flags'] = flags
245 self.ifs[side].add_stream(self.pkt(side,
246 l4args=l4args, payload=payload))
247 self.ifs[1 - side].enable_capture()
248 self.testcase.pg_start()
250 def recv(self, side):
251 p = self.ifs[side].wait_for_packet(1)
254 def send_through(self, side, flags=None, payload=""):
255 self.send(side, flags, payload)
256 p = self.recv(1 - side)
259 def send_pingpong(self, side, flags1=None, flags2=None):
260 p1 = self.send_through(side, flags1)
261 p2 = self.send_through(1 - side, flags2)
266 L4_CONN_SIDE_ZERO = 0
270 def fragment_rfc791(packet, fragsize, logger=null_logger):
272 Fragment an IPv4 packet per RFC 791
273 :param packet: packet to fragment
274 :param fragsize: size at which to fragment
275 :note: IP options are not supported
276 :returns: list of fragments
278 logger.debug(ppp("Fragmenting packet:", packet))
279 packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
280 if len(packet[IP].options) > 0:
281 raise Exception("Not implemented")
282 if len(packet) <= fragsize:
285 pre_ip_len = len(packet) - len(packet[IP])
286 ip_header_len = packet[IP].ihl * 4
287 hex_packet = scapy.compat.raw(packet)
288 hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
289 hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
293 otl = len(packet[IP])
294 nfb = int((fragsize - pre_ip_len - ihl * 4) / 8)
297 p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
300 p[IP].len = ihl * 4 + nfb * 8
304 p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
305 p[IP].len = otl - nfb * 8
306 p[IP].frag = fo + nfb
309 more_fragments = fragment_rfc791(p, fragsize, logger)
310 pkts.extend(more_fragments)
315 def fragment_rfc8200(packet, identification, fragsize, logger=null_logger):
317 Fragment an IPv6 packet per RFC 8200
318 :param packet: packet to fragment
319 :param fragsize: size at which to fragment
320 :note: IP options are not supported
321 :returns: list of fragments
323 packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
324 if len(packet) <= fragsize:
326 logger.debug(ppp("Fragmenting packet:", packet))
330 hop_by_hop_hdr = None
334 l = packet.getlayer(counter)
336 if l.__class__ is IPv6:
338 # ignore 2nd IPv6 header and everything below..
342 elif l.__class__ is IPv6ExtHdrFragment:
343 raise Exception("Already fragmented")
344 elif l.__class__ is IPv6ExtHdrRouting:
345 routing_hdr = counter
346 elif l.__class__ is IPv6ExtHdrHopByHop:
347 hop_by_hop_hdr = counter
348 elif seen_ipv6 and not upper_layer and \
349 not l.__class__.__name__.startswith('IPv6ExtHdr'):
350 upper_layer = counter
351 counter = counter + 1
352 l = packet.getlayer(counter)
355 "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
356 (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
358 if upper_layer is None:
359 raise Exception("Upper layer header not found in IPv6 packet")
361 last_per_fragment_hdr = ipv6_nr
362 if routing_hdr is None:
363 if hop_by_hop_hdr is not None:
364 last_per_fragment_hdr = hop_by_hop_hdr
366 last_per_fragment_hdr = routing_hdr
367 logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
369 per_fragment_headers = packet.copy()
370 per_fragment_headers[last_per_fragment_hdr].remove_payload()
371 logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
373 ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
374 hex_payload = scapy.compat.raw(ext_and_upper_layer)
375 logger.debug("Payload length is %s" % len(hex_payload))
376 logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
378 fragment_ext_hdr = IPv6ExtHdrFragment()
379 logger.debug(ppp("Fragment header:", fragment_ext_hdr))
381 len_ext_and_upper_layer_payload = len(ext_and_upper_layer.payload)
382 if not len_ext_and_upper_layer_payload and \
383 hasattr(ext_and_upper_layer, "data"):
384 len_ext_and_upper_layer_payload = len(ext_and_upper_layer.data)
386 if len(per_fragment_headers) + len(fragment_ext_hdr) +\
387 len(ext_and_upper_layer) - len_ext_and_upper_layer_payload\
389 raise Exception("Cannot fragment this packet - MTU too small "
390 "(%s, %s, %s, %s, %s)" % (
391 len(per_fragment_headers), len(fragment_ext_hdr),
392 len(ext_and_upper_layer),
393 len_ext_and_upper_layer_payload, fragsize))
395 orig_nh = packet[IPv6].nh
396 p = per_fragment_headers
399 p = p / fragment_ext_hdr
400 del p[IPv6ExtHdrFragment].nh
401 first_payload_len_nfb = int((fragsize - len(p)) / 8)
402 p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
404 p[IPv6ExtHdrFragment].nh = orig_nh
405 p[IPv6ExtHdrFragment].id = identification
406 p[IPv6ExtHdrFragment].offset = 0
407 p[IPv6ExtHdrFragment].m = 1
408 p = p.__class__(scapy.compat.raw(p))
409 logger.debug(ppp("Fragment %s:" % len(pkts), p))
411 offset = first_payload_len_nfb * 8
412 logger.debug("Offset after first fragment: %s" % offset)
413 while len(hex_payload) > offset:
414 p = per_fragment_headers
417 p = p / fragment_ext_hdr
418 del p[IPv6ExtHdrFragment].nh
419 l_nfb = int((fragsize - len(p)) / 8)
420 p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
421 p[IPv6ExtHdrFragment].nh = orig_nh
422 p[IPv6ExtHdrFragment].id = identification
423 p[IPv6ExtHdrFragment].offset = int(offset / 8)
424 p[IPv6ExtHdrFragment].m = 1
425 p = p.__class__(scapy.compat.raw(p))
426 logger.debug(ppp("Fragment %s:" % len(pkts), p))
428 offset = offset + l_nfb * 8
430 pkts[-1][IPv6ExtHdrFragment].m = 0 # reset more-flags in last fragment
435 def reassemble4_core(listoffragments, return_ip):
437 first = listoffragments[0]
439 for pkt in listoffragments:
440 buffer.seek(pkt[IP].frag*8)
441 buffer.write(bytes(pkt[IP].payload))
442 first.len = len(buffer.getvalue()) + 20
446 header = bytes(first[IP])[:20]
447 return first[IP].__class__(header + buffer.getvalue())
449 header = bytes(first[Ether])[:34]
450 return first[Ether].__class__(header + buffer.getvalue())
453 def reassemble4_ether(listoffragments):
454 return reassemble4_core(listoffragments, False)
457 def reassemble4(listoffragments):
458 return reassemble4_core(listoffragments, True)
461 class UnexpectedPacketError(Exception):
462 def __init__(self, packet, msg=""):
467 return f"\nUnexpected packet:\n{pr(self.packet)}{self.msg}"
470 def recursive_dict_merge(dict_base, dict_update):
471 """Recursively merge base dict with update dict, return merged dict"""
472 for key in dict_update:
474 if type(dict_update[key]) is dict:
475 dict_base[key] = recursive_dict_merge(dict_base[key],
478 dict_base[key] = dict_update[key]
480 dict_base[key] = dict_update[key]
484 class StatsDiff(UserDict):
486 Diff dictionary is a dictionary of dictionaries of interesting stats:
490 "err" : { '/error/counter1' : 4, },
491 sw_if_index1 : { '/stat/segment/counter1' : 5,
492 '/stat/segment/counter2' : 6,
494 sw_if_index2 : { '/stat/segment/counter1' : 7,
498 It describes a per sw-if-index diffset, where each key is stat segment
499 path and value is the expected change for that counter for sw-if-index.
500 Special case string "err" is used for error counters, which are not per
504 __slots__ = () # prevent setting properties to act like a dictionary
506 def __init__(self, data):
507 super().__init__(data)
509 def __or__(self, other):
510 return recursive_dict_merge(deepcopy(self.data), other)