1 """ test framework utilities """
5 from socket import AF_INET6
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP
13 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
15 from scapy.packet import Raw
16 from scapy.utils import hexdump
17 from scapy.utils6 import in6_mactoifaceid
19 from io import BytesIO
20 from vpp_papi import mac_pton
23 def ppp(headline, packet):
24 """ Return string containing the output of scapy packet.show() call. """
25 return '%s\n%s\n\n%s\n' % (headline,
26 hexdump(packet, dump=True),
27 packet.show(dump=True))
30 def ppc(headline, capture, limit=10):
31 """ Return string containing ppp() printout for a capture.
33 :param headline: printed as first line of output
34 :param capture: packets to print
35 :param limit: limit the print to # of packets
40 if limit < len(capture):
41 tail = "\nPrint limit reached, %s out of %s packets printed" % (
43 body = "".join([ppp("Packet #%s:" % count, p)
44 for count, p in zip(range(0, limit), capture)])
45 return "%s\n%s%s" % (headline, body, tail)
48 def ip4_range(ip4, s, e):
49 tmp = ip4.rsplit('.', 1)[0]
50 return ("%s.%d" % (tmp, i) for i in range(s, e))
53 def ip4n_range(ip4n, s, e):
54 ip4 = socket.inet_ntop(socket.AF_INET, ip4n)
55 return (socket.inet_pton(socket.AF_INET, ip)
56 for ip in ip4_range(ip4, s, e))
59 # wrapper around scapy library function.
61 euid = in6_mactoifaceid(str(mac))
62 addr = "fe80::" + euid
66 def ip6_normalize(ip6):
67 return socket.inet_ntop(socket.AF_INET6,
68 socket.inet_pton(socket.AF_INET6, ip6))
71 def get_core_path(tempdir):
72 return "%s/%s" % (tempdir, get_core_pattern())
75 def is_core_present(tempdir):
76 return os.path.isfile(get_core_path(tempdir))
79 def get_core_pattern():
80 with open("/proc/sys/kernel/core_pattern", "r") as f:
81 corefmt = f.read().strip()
85 def check_core_path(logger, core_path):
86 corefmt = get_core_pattern()
87 if corefmt.startswith("|"):
89 "WARNING: redirecting the core dump through a"
90 " filter may result in truncated dumps.")
92 " You may want to check the filter settings"
93 " or uninstall it and edit the"
94 " /proc/sys/kernel/core_pattern accordingly.")
96 " current core pattern is: %s" % corefmt)
99 class NumericConstant(object):
103 def __init__(self, value):
113 if self._value in self.desc_dict:
114 return self.desc_dict[self._value]
119 """ Generic test host "connected" to VPPs interface. """
129 return mac_pton(self._mac)
133 """ IPv4 address - string """
138 """ IPv4 address of remote host - raw, suitable as API parameter."""
139 return socket.inet_pton(socket.AF_INET, self._ip4)
143 """ IPv6 address - string """
148 """ IPv6 address of remote host - raw, suitable as API parameter."""
149 return socket.inet_pton(socket.AF_INET6, self._ip6)
153 """ IPv6 link-local address - string """
158 """ IPv6 link-local address of remote host -
159 raw, suitable as API parameter."""
160 return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
162 def __eq__(self, other):
163 if isinstance(other, Host):
164 return (self.mac == other.mac and
165 self.ip4 == other.ip4 and
166 self.ip6 == other.ip6 and
167 self.ip6_ll == other.ip6_ll)
171 def __ne__(self, other):
172 return not self.__eq__(other)
175 return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
181 return hash(self.__repr__())
183 def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
187 self._ip6_ll = ip6_ll
190 class ForeignAddressFactory(object):
193 net_template = '10.10.10.{}'
194 net = net_template.format(0) + '/' + str(prefix_len)
198 raise Exception("Network host address exhaustion")
200 return self.net_template.format(self.count)
204 """ L4 'connection' tied to two VPP interfaces """
206 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
207 self.testcase = testcase
208 self.ifs = [None, None]
211 self.address_family = af
212 self.l4proto = l4proto
213 self.ports = [None, None]
214 self.ports[0] = port1
215 self.ports[1] = port2
218 def pkt(self, side, l4args={}, payload="x"):
219 is_ip6 = 1 if self.address_family == AF_INET6 else 0
222 src_if = self.ifs[s0]
223 dst_if = self.ifs[s1]
224 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
225 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
226 merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
227 merged_l4args.update(l4args)
228 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
230 self.l4proto(**merged_l4args) /
234 def send(self, side, flags=None, payload=""):
236 if flags is not None:
237 l4args['flags'] = flags
238 self.ifs[side].add_stream(self.pkt(side,
239 l4args=l4args, payload=payload))
240 self.ifs[1 - side].enable_capture()
241 self.testcase.pg_start()
243 def recv(self, side):
244 p = self.ifs[side].wait_for_packet(1)
247 def send_through(self, side, flags=None, payload=""):
248 self.send(side, flags, payload)
249 p = self.recv(1 - side)
252 def send_pingpong(self, side, flags1=None, flags2=None):
253 p1 = self.send_through(side, flags1)
254 p2 = self.send_through(1 - side, flags2)
259 L4_CONN_SIDE_ZERO = 0
263 class LoggerWrapper(object):
264 def __init__(self, logger=None):
265 self._logger = logger
267 def debug(self, *args, **kwargs):
269 self._logger.debug(*args, **kwargs)
271 def error(self, *args, **kwargs):
273 self._logger.error(*args, **kwargs)
276 def fragment_rfc791(packet, fragsize, _logger=None):
278 Fragment an IPv4 packet per RFC 791
279 :param packet: packet to fragment
280 :param fragsize: size at which to fragment
281 :note: IP options are not supported
282 :returns: list of fragments
284 logger = LoggerWrapper(_logger)
285 logger.debug(ppp("Fragmenting packet:", packet))
286 packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
287 if len(packet[IP].options) > 0:
288 raise Exception("Not implemented")
289 if len(packet) <= fragsize:
292 pre_ip_len = len(packet) - len(packet[IP])
293 ip_header_len = packet[IP].ihl * 4
294 hex_packet = scapy.compat.raw(packet)
295 hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
296 hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
300 otl = len(packet[IP])
301 nfb = int((fragsize - pre_ip_len - ihl * 4) / 8)
304 p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
307 p[IP].len = ihl * 4 + nfb * 8
311 p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
312 p[IP].len = otl - nfb * 8
313 p[IP].frag = fo + nfb
316 more_fragments = fragment_rfc791(p, fragsize, _logger)
317 pkts.extend(more_fragments)
322 def fragment_rfc8200(packet, identification, fragsize, _logger=None):
324 Fragment an IPv6 packet per RFC 8200
325 :param packet: packet to fragment
326 :param fragsize: size at which to fragment
327 :note: IP options are not supported
328 :returns: list of fragments
330 logger = LoggerWrapper(_logger)
331 packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
332 if len(packet) <= fragsize:
334 logger.debug(ppp("Fragmenting packet:", packet))
338 hop_by_hop_hdr = None
342 l = packet.getlayer(counter)
344 if l.__class__ is IPv6:
346 # ignore 2nd IPv6 header and everything below..
350 elif l.__class__ is IPv6ExtHdrFragment:
351 raise Exception("Already fragmented")
352 elif l.__class__ is IPv6ExtHdrRouting:
353 routing_hdr = counter
354 elif l.__class__ is IPv6ExtHdrHopByHop:
355 hop_by_hop_hdr = counter
356 elif seen_ipv6 and not upper_layer and \
357 not l.__class__.__name__.startswith('IPv6ExtHdr'):
358 upper_layer = counter
359 counter = counter + 1
360 l = packet.getlayer(counter)
363 "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
364 (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
366 if upper_layer is None:
367 raise Exception("Upper layer header not found in IPv6 packet")
369 last_per_fragment_hdr = ipv6_nr
370 if routing_hdr is None:
371 if hop_by_hop_hdr is not None:
372 last_per_fragment_hdr = hop_by_hop_hdr
374 last_per_fragment_hdr = routing_hdr
375 logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
377 per_fragment_headers = packet.copy()
378 per_fragment_headers[last_per_fragment_hdr].remove_payload()
379 logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
381 ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
382 hex_payload = scapy.compat.raw(ext_and_upper_layer)
383 logger.debug("Payload length is %s" % len(hex_payload))
384 logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
386 fragment_ext_hdr = IPv6ExtHdrFragment()
387 logger.debug(ppp("Fragment header:", fragment_ext_hdr))
389 len_ext_and_upper_layer_payload = len(ext_and_upper_layer.payload)
390 if not len_ext_and_upper_layer_payload and \
391 hasattr(ext_and_upper_layer, "data"):
392 len_ext_and_upper_layer_payload = len(ext_and_upper_layer.data)
394 if len(per_fragment_headers) + len(fragment_ext_hdr) +\
395 len(ext_and_upper_layer) - len_ext_and_upper_layer_payload\
397 raise Exception("Cannot fragment this packet - MTU too small "
398 "(%s, %s, %s, %s, %s)" % (
399 len(per_fragment_headers), len(fragment_ext_hdr),
400 len(ext_and_upper_layer),
401 len_ext_and_upper_layer_payload, fragsize))
403 orig_nh = packet[IPv6].nh
404 p = per_fragment_headers
407 p = p / fragment_ext_hdr
408 del p[IPv6ExtHdrFragment].nh
409 first_payload_len_nfb = int((fragsize - len(p)) / 8)
410 p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
412 p[IPv6ExtHdrFragment].nh = orig_nh
413 p[IPv6ExtHdrFragment].id = identification
414 p[IPv6ExtHdrFragment].offset = 0
415 p[IPv6ExtHdrFragment].m = 1
416 p = p.__class__(scapy.compat.raw(p))
417 logger.debug(ppp("Fragment %s:" % len(pkts), p))
419 offset = first_payload_len_nfb * 8
420 logger.debug("Offset after first fragment: %s" % offset)
421 while len(hex_payload) > offset:
422 p = per_fragment_headers
425 p = p / fragment_ext_hdr
426 del p[IPv6ExtHdrFragment].nh
427 l_nfb = int((fragsize - len(p)) / 8)
428 p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
429 p[IPv6ExtHdrFragment].nh = orig_nh
430 p[IPv6ExtHdrFragment].id = identification
431 p[IPv6ExtHdrFragment].offset = int(offset / 8)
432 p[IPv6ExtHdrFragment].m = 1
433 p = p.__class__(scapy.compat.raw(p))
434 logger.debug(ppp("Fragment %s:" % len(pkts), p))
436 offset = offset + l_nfb * 8
438 pkts[-1][IPv6ExtHdrFragment].m = 0 # reset more-flags in last fragment
443 def reassemble4_core(listoffragments, return_ip):
445 first = listoffragments[0]
447 for pkt in listoffragments:
448 buffer.seek(pkt[IP].frag*8)
449 buffer.write(bytes(pkt[IP].payload))
450 first.len = len(buffer.getvalue()) + 20
454 header = bytes(first[IP])[:20]
455 return first[IP].__class__(header + buffer.getvalue())
457 header = bytes(first[Ether])[:34]
458 return first[Ether].__class__(header + buffer.getvalue())
461 def reassemble4_ether(listoffragments):
462 return reassemble4_core(listoffragments, False)
465 def reassemble4(listoffragments):
466 return reassemble4_core(listoffragments, True)