1 """ test framework utilities """
6 from socket import AF_INET6
8 from copy import deepcopy
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP
13 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
15 from scapy.packet import Raw
16 from scapy.utils import hexdump
17 from scapy.utils6 import in6_mactoifaceid
19 from io import BytesIO
20 from vpp_papi import mac_pton
22 # Set up an empty logger for the testcase that can be overridden as necessary
23 null_logger = logging.getLogger('VppTestCase.util')
24 null_logger.addHandler(logging.NullHandler())
27 def ppp(headline, packet):
28 """ Return string containing the output of scapy packet.show() call. """
29 return '%s\n%s\n\n%s\n' % (headline,
30 hexdump(packet, dump=True),
31 packet.show(dump=True))
34 def ppc(headline, capture, limit=10):
35 """ Return string containing ppp() printout for a capture.
37 :param headline: printed as first line of output
38 :param capture: packets to print
39 :param limit: limit the print to # of packets
44 if limit < len(capture):
45 tail = "\nPrint limit reached, %s out of %s packets printed" % (
47 body = "".join([ppp("Packet #%s:" % count, p)
48 for count, p in zip(range(0, limit), capture)])
49 return "%s\n%s%s" % (headline, body, tail)
52 def ip4_range(ip4, s, e):
53 tmp = ip4.rsplit('.', 1)[0]
54 return ("%s.%d" % (tmp, i) for i in range(s, e))
57 def mcast_ip_to_mac(ip):
58 ip = ipaddress.ip_address(ip)
59 if not ip.is_multicast:
60 raise ValueError("Must be multicast address.")
63 mcast_mac = "01:00:5e:%02x:%02x:%02x" % ((ip_as_int >> 16) & 0x7f,
64 (ip_as_int >> 8) & 0xff,
67 mcast_mac = "33:33:%02x:%02x:%02x:%02x" % ((ip_as_int >> 24) & 0xff,
68 (ip_as_int >> 16) & 0xff,
69 (ip_as_int >> 8) & 0xff,
74 # wrapper around scapy library function.
76 euid = in6_mactoifaceid(str(mac))
77 addr = "fe80::" + euid
81 def ip6_normalize(ip6):
82 return socket.inet_ntop(socket.AF_INET6,
83 socket.inet_pton(socket.AF_INET6, ip6))
86 def get_core_path(tempdir):
87 return "%s/%s" % (tempdir, get_core_pattern())
90 def is_core_present(tempdir):
91 return os.path.isfile(get_core_path(tempdir))
94 def get_core_pattern():
95 with open("/proc/sys/kernel/core_pattern", "r") as f:
96 corefmt = f.read().strip()
100 def check_core_path(logger, core_path):
101 corefmt = get_core_pattern()
102 if corefmt.startswith("|"):
104 "WARNING: redirecting the core dump through a"
105 " filter may result in truncated dumps.")
107 " You may want to check the filter settings"
108 " or uninstall it and edit the"
109 " /proc/sys/kernel/core_pattern accordingly.")
111 " current core pattern is: %s" % corefmt)
114 class NumericConstant:
118 def __init__(self, value):
128 if self._value in self.desc_dict:
129 return self.desc_dict[self._value]
134 """ Generic test host "connected" to VPPs interface. """
144 return mac_pton(self._mac)
148 """ IPv4 address - string """
153 """ IPv4 address of remote host - raw, suitable as API parameter."""
154 return socket.inet_pton(socket.AF_INET, self._ip4)
158 """ IPv6 address - string """
163 """ IPv6 address of remote host - raw, suitable as API parameter."""
164 return socket.inet_pton(socket.AF_INET6, self._ip6)
168 """ IPv6 link-local address - string """
173 """ IPv6 link-local address of remote host -
174 raw, suitable as API parameter."""
175 return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
177 def __eq__(self, other):
178 if isinstance(other, Host):
179 return (self.mac == other.mac and
180 self.ip4 == other.ip4 and
181 self.ip6 == other.ip6 and
182 self.ip6_ll == other.ip6_ll)
186 def __ne__(self, other):
187 return not self.__eq__(other)
190 return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
196 return hash(self.__repr__())
198 def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
202 self._ip6_ll = ip6_ll
206 """ L4 'connection' tied to two VPP interfaces """
208 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
209 self.testcase = testcase
210 self.ifs = [None, None]
213 self.address_family = af
214 self.l4proto = l4proto
215 self.ports = [None, None]
216 self.ports[0] = port1
217 self.ports[1] = port2
220 def pkt(self, side, l4args={}, payload="x"):
221 is_ip6 = 1 if self.address_family == AF_INET6 else 0
224 src_if = self.ifs[s0]
225 dst_if = self.ifs[s1]
226 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
227 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
228 merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
229 merged_l4args.update(l4args)
230 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
232 self.l4proto(**merged_l4args) /
236 def send(self, side, flags=None, payload=""):
238 if flags is not None:
239 l4args['flags'] = flags
240 self.ifs[side].add_stream(self.pkt(side,
241 l4args=l4args, payload=payload))
242 self.ifs[1 - side].enable_capture()
243 self.testcase.pg_start()
245 def recv(self, side):
246 p = self.ifs[side].wait_for_packet(1)
249 def send_through(self, side, flags=None, payload=""):
250 self.send(side, flags, payload)
251 p = self.recv(1 - side)
254 def send_pingpong(self, side, flags1=None, flags2=None):
255 p1 = self.send_through(side, flags1)
256 p2 = self.send_through(1 - side, flags2)
261 L4_CONN_SIDE_ZERO = 0
265 def fragment_rfc791(packet, fragsize, logger=null_logger):
267 Fragment an IPv4 packet per RFC 791
268 :param packet: packet to fragment
269 :param fragsize: size at which to fragment
270 :note: IP options are not supported
271 :returns: list of fragments
273 logger.debug(ppp("Fragmenting packet:", packet))
274 packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
275 if len(packet[IP].options) > 0:
276 raise Exception("Not implemented")
277 if len(packet) <= fragsize:
280 pre_ip_len = len(packet) - len(packet[IP])
281 ip_header_len = packet[IP].ihl * 4
282 hex_packet = scapy.compat.raw(packet)
283 hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
284 hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
288 otl = len(packet[IP])
289 nfb = int((fragsize - pre_ip_len - ihl * 4) / 8)
292 p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
295 p[IP].len = ihl * 4 + nfb * 8
299 p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
300 p[IP].len = otl - nfb * 8
301 p[IP].frag = fo + nfb
304 more_fragments = fragment_rfc791(p, fragsize, logger)
305 pkts.extend(more_fragments)
310 def fragment_rfc8200(packet, identification, fragsize, logger=null_logger):
312 Fragment an IPv6 packet per RFC 8200
313 :param packet: packet to fragment
314 :param fragsize: size at which to fragment
315 :note: IP options are not supported
316 :returns: list of fragments
318 packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
319 if len(packet) <= fragsize:
321 logger.debug(ppp("Fragmenting packet:", packet))
325 hop_by_hop_hdr = None
329 l = packet.getlayer(counter)
331 if l.__class__ is IPv6:
333 # ignore 2nd IPv6 header and everything below..
337 elif l.__class__ is IPv6ExtHdrFragment:
338 raise Exception("Already fragmented")
339 elif l.__class__ is IPv6ExtHdrRouting:
340 routing_hdr = counter
341 elif l.__class__ is IPv6ExtHdrHopByHop:
342 hop_by_hop_hdr = counter
343 elif seen_ipv6 and not upper_layer and \
344 not l.__class__.__name__.startswith('IPv6ExtHdr'):
345 upper_layer = counter
346 counter = counter + 1
347 l = packet.getlayer(counter)
350 "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
351 (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
353 if upper_layer is None:
354 raise Exception("Upper layer header not found in IPv6 packet")
356 last_per_fragment_hdr = ipv6_nr
357 if routing_hdr is None:
358 if hop_by_hop_hdr is not None:
359 last_per_fragment_hdr = hop_by_hop_hdr
361 last_per_fragment_hdr = routing_hdr
362 logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
364 per_fragment_headers = packet.copy()
365 per_fragment_headers[last_per_fragment_hdr].remove_payload()
366 logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
368 ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
369 hex_payload = scapy.compat.raw(ext_and_upper_layer)
370 logger.debug("Payload length is %s" % len(hex_payload))
371 logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
373 fragment_ext_hdr = IPv6ExtHdrFragment()
374 logger.debug(ppp("Fragment header:", fragment_ext_hdr))
376 len_ext_and_upper_layer_payload = len(ext_and_upper_layer.payload)
377 if not len_ext_and_upper_layer_payload and \
378 hasattr(ext_and_upper_layer, "data"):
379 len_ext_and_upper_layer_payload = len(ext_and_upper_layer.data)
381 if len(per_fragment_headers) + len(fragment_ext_hdr) +\
382 len(ext_and_upper_layer) - len_ext_and_upper_layer_payload\
384 raise Exception("Cannot fragment this packet - MTU too small "
385 "(%s, %s, %s, %s, %s)" % (
386 len(per_fragment_headers), len(fragment_ext_hdr),
387 len(ext_and_upper_layer),
388 len_ext_and_upper_layer_payload, fragsize))
390 orig_nh = packet[IPv6].nh
391 p = per_fragment_headers
394 p = p / fragment_ext_hdr
395 del p[IPv6ExtHdrFragment].nh
396 first_payload_len_nfb = int((fragsize - len(p)) / 8)
397 p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
399 p[IPv6ExtHdrFragment].nh = orig_nh
400 p[IPv6ExtHdrFragment].id = identification
401 p[IPv6ExtHdrFragment].offset = 0
402 p[IPv6ExtHdrFragment].m = 1
403 p = p.__class__(scapy.compat.raw(p))
404 logger.debug(ppp("Fragment %s:" % len(pkts), p))
406 offset = first_payload_len_nfb * 8
407 logger.debug("Offset after first fragment: %s" % offset)
408 while len(hex_payload) > offset:
409 p = per_fragment_headers
412 p = p / fragment_ext_hdr
413 del p[IPv6ExtHdrFragment].nh
414 l_nfb = int((fragsize - len(p)) / 8)
415 p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
416 p[IPv6ExtHdrFragment].nh = orig_nh
417 p[IPv6ExtHdrFragment].id = identification
418 p[IPv6ExtHdrFragment].offset = int(offset / 8)
419 p[IPv6ExtHdrFragment].m = 1
420 p = p.__class__(scapy.compat.raw(p))
421 logger.debug(ppp("Fragment %s:" % len(pkts), p))
423 offset = offset + l_nfb * 8
425 pkts[-1][IPv6ExtHdrFragment].m = 0 # reset more-flags in last fragment
430 def reassemble4_core(listoffragments, return_ip):
432 first = listoffragments[0]
434 for pkt in listoffragments:
435 buffer.seek(pkt[IP].frag*8)
436 buffer.write(bytes(pkt[IP].payload))
437 first.len = len(buffer.getvalue()) + 20
441 header = bytes(first[IP])[:20]
442 return first[IP].__class__(header + buffer.getvalue())
444 header = bytes(first[Ether])[:34]
445 return first[Ether].__class__(header + buffer.getvalue())
448 def reassemble4_ether(listoffragments):
449 return reassemble4_core(listoffragments, False)
452 def reassemble4(listoffragments):
453 return reassemble4_core(listoffragments, True)
456 def recursive_dict_merge(dict_base, dict_update):
457 """Recursively merge base dict with update dict, return merged dict"""
458 for key in dict_update:
460 if type(dict_update[key]) is dict:
461 dict_base[key] = recursive_dict_merge(dict_base[key],
464 dict_base[key] = dict_update[key]
466 dict_base[key] = dict_update[key]
472 Diff dictionary is a dictionary of dictionaries of interesting stats:
476 "err" : { '/error/counter1' : 4, },
477 sw_if_index1 : { '/stat/segment/counter1' : 5,
478 '/stat/segment/counter2' : 6,
480 sw_if_index2 : { '/stat/segment/counter1' : 7,
484 It describes a per sw-if-index diffset, where each key is stat segment
485 path and value is the expected change for that counter for sw-if-index.
486 Special case string "err" is used for error counters, which are not per
490 def __init__(self, stats_diff={}):
491 self.stats_diff = stats_diff
493 def update(self, sw_if_index, key, value):
494 if sw_if_index in self.stats_diff:
495 self.stats_diff[sw_if_index][key] = value
497 self.stats_diff[sw_if_index] = {key: value}
499 def __or__(self, other):
500 return recursive_dict_merge(deepcopy(self.stats_diff), other)