1 """ test framework utilities """
6 from abc import abstractmethod, ABCMeta
7 from scapy.utils6 import in6_mactoifaceid
9 from scapy.layers.l2 import Ether
10 from scapy.packet import Raw
11 from scapy.layers.inet import IP
12 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
14 from scapy.utils import hexdump
15 from socket import AF_INET6
16 from io import BytesIO
17 from vpp_papi import mac_pton
20 def ppp(headline, packet):
21 """ Return string containing the output of scapy packet.show() call. """
23 old_stdout = sys.stdout
29 sys.stdout = old_stdout
33 def ppc(headline, capture, limit=10):
34 """ Return string containing ppp() printout for a capture.
36 :param headline: printed as first line of output
37 :param capture: packets to print
38 :param limit: limit the print to # of packets
43 if limit < len(capture):
44 tail = "\nPrint limit reached, %s out of %s packets printed" % (
46 body = "".join([ppp("Packet #%s:" % count, p)
47 for count, p in zip(range(0, limit), capture)])
48 return "%s\n%s%s" % (headline, body, tail)
51 def ip4_range(ip4, s, e):
52 tmp = ip4.rsplit('.', 1)[0]
53 return ("%s.%d" % (tmp, i) for i in range(s, e))
56 def ip4n_range(ip4n, s, e):
57 ip4 = socket.inet_ntop(socket.AF_INET, ip4n)
58 return (socket.inet_pton(socket.AF_INET, ip)
59 for ip in ip4_range(ip4, s, e))
63 euid = in6_mactoifaceid(mac)
64 addr = "fe80::" + euid
68 def ip6_normalize(ip6):
69 return socket.inet_ntop(socket.AF_INET6,
70 socket.inet_pton(socket.AF_INET6, ip6))
73 def get_core_path(tempdir):
74 return "%s/%s" % (tempdir, get_core_pattern())
77 def is_core_present(tempdir):
78 return os.path.isfile(get_core_path(tempdir))
81 def get_core_pattern():
82 with open("/proc/sys/kernel/core_pattern", "r") as f:
83 corefmt = f.read().strip()
87 def check_core_path(logger, core_path):
88 corefmt = get_core_pattern()
89 if corefmt.startswith("|"):
91 "WARNING: redirecting the core dump through a"
92 " filter may result in truncated dumps.")
94 " You may want to check the filter settings"
95 " or uninstall it and edit the"
96 " /proc/sys/kernel/core_pattern accordingly.")
98 " current core pattern is: %s" % corefmt)
101 class NumericConstant(object):
102 __metaclass__ = ABCMeta
107 def __init__(self, value):
117 if self._value in self.desc_dict:
118 return self.desc_dict[self._value]
123 """ Generic test host "connected" to VPPs interface. """
133 return mac_pton(self._mac)
137 """ IPv4 address - string """
142 """ IPv4 address of remote host - raw, suitable as API parameter."""
143 return socket.inet_pton(socket.AF_INET, self._ip4)
147 """ IPv6 address - string """
152 """ IPv6 address of remote host - raw, suitable as API parameter."""
153 return socket.inet_pton(socket.AF_INET6, self._ip6)
157 """ IPv6 link-local address - string """
162 """ IPv6 link-local address of remote host -
163 raw, suitable as API parameter."""
164 return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
166 def __eq__(self, other):
167 if isinstance(other, Host):
168 return (self.mac == other.mac and
169 self.ip4 == other.ip4 and
170 self.ip6 == other.ip6 and
171 self.ip6_ll == other.ip6_ll)
175 def __ne__(self, other):
176 return not self.__eq__(other)
179 return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
185 return hash(self.__repr__())
187 def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
191 self._ip6_ll = ip6_ll
194 class ForeignAddressFactory(object):
197 net_template = '10.10.10.{}'
198 net = net_template.format(0) + '/' + str(prefix_len)
202 raise Exception("Network host address exhaustion")
204 return self.net_template.format(self.count)
208 """ L4 'connection' tied to two VPP interfaces """
210 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
211 self.testcase = testcase
212 self.ifs = [None, None]
215 self.address_family = af
216 self.l4proto = l4proto
217 self.ports = [None, None]
218 self.ports[0] = port1
219 self.ports[1] = port2
222 def pkt(self, side, l4args={}, payload="x"):
223 is_ip6 = 1 if self.address_family == AF_INET6 else 0
226 src_if = self.ifs[s0]
227 dst_if = self.ifs[s1]
228 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
229 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
230 merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
231 merged_l4args.update(l4args)
232 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
234 self.l4proto(**merged_l4args) /
238 def send(self, side, flags=None, payload=""):
240 if flags is not None:
241 l4args['flags'] = flags
242 self.ifs[side].add_stream(self.pkt(side,
243 l4args=l4args, payload=payload))
244 self.ifs[1 - side].enable_capture()
245 self.testcase.pg_start()
247 def recv(self, side):
248 p = self.ifs[side].wait_for_packet(1)
251 def send_through(self, side, flags=None, payload=""):
252 self.send(side, flags, payload)
253 p = self.recv(1 - side)
256 def send_pingpong(self, side, flags1=None, flags2=None):
257 p1 = self.send_through(side, flags1)
258 p2 = self.send_through(1 - side, flags2)
263 L4_CONN_SIDE_ZERO = 0
267 class LoggerWrapper(object):
268 def __init__(self, logger=None):
269 self._logger = logger
271 def debug(self, *args, **kwargs):
273 self._logger.debug(*args, **kwargs)
275 def error(self, *args, **kwargs):
277 self._logger.error(*args, **kwargs)
280 def fragment_rfc791(packet, fragsize, _logger=None):
282 Fragment an IPv4 packet per RFC 791
283 :param packet: packet to fragment
284 :param fragsize: size at which to fragment
285 :note: IP options are not supported
286 :returns: list of fragments
288 logger = LoggerWrapper(_logger)
289 logger.debug(ppp("Fragmenting packet:", packet))
290 packet = packet.__class__(str(packet)) # recalculate all values
291 if len(packet[IP].options) > 0:
292 raise Exception("Not implemented")
293 if len(packet) <= fragsize:
296 pre_ip_len = len(packet) - len(packet[IP])
297 ip_header_len = packet[IP].ihl * 4
298 hex_packet = str(packet)
299 hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
300 hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
304 otl = len(packet[IP])
305 nfb = (fragsize - pre_ip_len - ihl * 4) / 8
308 p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
311 p[IP].len = ihl * 4 + nfb * 8
315 p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
316 p[IP].len = otl - nfb * 8
317 p[IP].frag = fo + nfb
320 more_fragments = fragment_rfc791(p, fragsize, _logger)
321 pkts.extend(more_fragments)
326 def fragment_rfc8200(packet, identification, fragsize, _logger=None):
328 Fragment an IPv6 packet per RFC 8200
329 :param packet: packet to fragment
330 :param fragsize: size at which to fragment
331 :note: IP options are not supported
332 :returns: list of fragments
334 logger = LoggerWrapper(_logger)
335 packet = packet.__class__(str(packet)) # recalculate all values
336 if len(packet) <= fragsize:
338 logger.debug(ppp("Fragmenting packet:", packet))
342 hop_by_hop_hdr = None
346 l = packet.getlayer(counter)
348 if l.__class__ is IPv6:
350 # ignore 2nd IPv6 header and everything below..
354 elif l.__class__ is IPv6ExtHdrFragment:
355 raise Exception("Already fragmented")
356 elif l.__class__ is IPv6ExtHdrRouting:
357 routing_hdr = counter
358 elif l.__class__ is IPv6ExtHdrHopByHop:
359 hop_by_hop_hdr = counter
360 elif seen_ipv6 and not upper_layer and \
361 not l.__class__.__name__.startswith('IPv6ExtHdr'):
362 upper_layer = counter
363 counter = counter + 1
364 l = packet.getlayer(counter)
367 "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
368 (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
370 if upper_layer is None:
371 raise Exception("Upper layer header not found in IPv6 packet")
373 last_per_fragment_hdr = ipv6_nr
374 if routing_hdr is None:
375 if hop_by_hop_hdr is not None:
376 last_per_fragment_hdr = hop_by_hop_hdr
378 last_per_fragment_hdr = routing_hdr
379 logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
381 per_fragment_headers = packet.copy()
382 per_fragment_headers[last_per_fragment_hdr].remove_payload()
383 logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
385 ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
386 hex_payload = str(ext_and_upper_layer)
387 logger.debug("Payload length is %s" % len(hex_payload))
388 logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
390 fragment_ext_hdr = IPv6ExtHdrFragment()
391 logger.debug(ppp("Fragment header:", fragment_ext_hdr))
393 if len(per_fragment_headers) + len(fragment_ext_hdr) +\
394 len(ext_and_upper_layer) - len(ext_and_upper_layer.payload)\
396 raise Exception("Cannot fragment this packet - MTU too small "
397 "(%s, %s, %s, %s, %s)" % (
398 len(per_fragment_headers), len(fragment_ext_hdr),
399 len(ext_and_upper_layer),
400 len(ext_and_upper_layer.payload), fragsize))
402 orig_nh = packet[IPv6].nh
403 p = per_fragment_headers
406 p = p / fragment_ext_hdr
407 del p[IPv6ExtHdrFragment].nh
408 first_payload_len_nfb = (fragsize - len(p)) / 8
409 p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
411 p[IPv6ExtHdrFragment].nh = orig_nh
412 p[IPv6ExtHdrFragment].id = identification
413 p[IPv6ExtHdrFragment].offset = 0
414 p[IPv6ExtHdrFragment].m = 1
415 p = p.__class__(str(p))
416 logger.debug(ppp("Fragment %s:" % len(pkts), p))
418 offset = first_payload_len_nfb * 8
419 logger.debug("Offset after first fragment: %s" % offset)
420 while len(hex_payload) > offset:
421 p = per_fragment_headers
424 p = p / fragment_ext_hdr
425 del p[IPv6ExtHdrFragment].nh
426 l_nfb = (fragsize - len(p)) / 8
427 p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
428 p[IPv6ExtHdrFragment].nh = orig_nh
429 p[IPv6ExtHdrFragment].id = identification
430 p[IPv6ExtHdrFragment].offset = offset / 8
431 p[IPv6ExtHdrFragment].m = 1
432 p = p.__class__(str(p))
433 logger.debug(ppp("Fragment %s:" % len(pkts), p))
435 offset = offset + l_nfb * 8
437 pkts[-1][IPv6ExtHdrFragment].m = 0 # reset more-flags in last fragment
442 def reassemble4_core(listoffragments, return_ip):
444 first = listoffragments[0]
446 for pkt in listoffragments:
447 buffer.seek(pkt[IP].frag*8)
448 buffer.write(bytes(pkt[IP].payload))
449 first.len = len(buffer.getvalue()) + 20
453 header = bytes(first[IP])[:20]
454 return first[IP].__class__(header + buffer.getvalue())
456 header = bytes(first[Ether])[:34]
457 return first[Ether].__class__(header + buffer.getvalue())
460 def reassemble4_ether(listoffragments):
461 return reassemble4_core(listoffragments, False)
464 def reassemble4(listoffragments):
465 return reassemble4_core(listoffragments, True)