1 """ test framework utilities """
6 from abc import abstractmethod, ABCMeta
7 from scapy.utils6 import in6_mactoifaceid
9 from scapy.layers.l2 import Ether
10 from scapy.packet import Raw
11 from scapy.layers.inet import IP
12 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
14 from scapy.utils import hexdump
15 from socket import AF_INET6
16 from io import BytesIO
17 from vpp_papi import mac_pton
20 def ppp(headline, packet):
21 """ Return string containing the output of scapy packet.show() call. """
22 return '%s\n%s\n\n%s\n' % (headline,
23 hexdump(packet, dump=True),
24 packet.show(dump=True))
27 def ppc(headline, capture, limit=10):
28 """ Return string containing ppp() printout for a capture.
30 :param headline: printed as first line of output
31 :param capture: packets to print
32 :param limit: limit the print to # of packets
37 if limit < len(capture):
38 tail = "\nPrint limit reached, %s out of %s packets printed" % (
40 body = "".join([ppp("Packet #%s:" % count, p)
41 for count, p in zip(range(0, limit), capture)])
42 return "%s\n%s%s" % (headline, body, tail)
45 def ip4_range(ip4, s, e):
46 tmp = ip4.rsplit('.', 1)[0]
47 return ("%s.%d" % (tmp, i) for i in range(s, e))
50 def ip4n_range(ip4n, s, e):
51 ip4 = socket.inet_ntop(socket.AF_INET, ip4n)
52 return (socket.inet_pton(socket.AF_INET, ip)
53 for ip in ip4_range(ip4, s, e))
57 euid = in6_mactoifaceid(mac)
58 addr = "fe80::" + euid
62 def ip6_normalize(ip6):
63 return socket.inet_ntop(socket.AF_INET6,
64 socket.inet_pton(socket.AF_INET6, ip6))
67 def get_core_path(tempdir):
68 return "%s/%s" % (tempdir, get_core_pattern())
71 def is_core_present(tempdir):
72 return os.path.isfile(get_core_path(tempdir))
75 def get_core_pattern():
76 with open("/proc/sys/kernel/core_pattern", "r") as f:
77 corefmt = f.read().strip()
81 def check_core_path(logger, core_path):
82 corefmt = get_core_pattern()
83 if corefmt.startswith("|"):
85 "WARNING: redirecting the core dump through a"
86 " filter may result in truncated dumps.")
88 " You may want to check the filter settings"
89 " or uninstall it and edit the"
90 " /proc/sys/kernel/core_pattern accordingly.")
92 " current core pattern is: %s" % corefmt)
95 class NumericConstant(object):
96 __metaclass__ = ABCMeta
101 def __init__(self, value):
111 if self._value in self.desc_dict:
112 return self.desc_dict[self._value]
117 """ Generic test host "connected" to VPPs interface. """
127 return mac_pton(self._mac)
131 """ IPv4 address - string """
136 """ IPv4 address of remote host - raw, suitable as API parameter."""
137 return socket.inet_pton(socket.AF_INET, self._ip4)
141 """ IPv6 address - string """
146 """ IPv6 address of remote host - raw, suitable as API parameter."""
147 return socket.inet_pton(socket.AF_INET6, self._ip6)
151 """ IPv6 link-local address - string """
156 """ IPv6 link-local address of remote host -
157 raw, suitable as API parameter."""
158 return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
160 def __eq__(self, other):
161 if isinstance(other, Host):
162 return (self.mac == other.mac and
163 self.ip4 == other.ip4 and
164 self.ip6 == other.ip6 and
165 self.ip6_ll == other.ip6_ll)
169 def __ne__(self, other):
170 return not self.__eq__(other)
173 return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
179 return hash(self.__repr__())
181 def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
185 self._ip6_ll = ip6_ll
188 class ForeignAddressFactory(object):
191 net_template = '10.10.10.{}'
192 net = net_template.format(0) + '/' + str(prefix_len)
196 raise Exception("Network host address exhaustion")
198 return self.net_template.format(self.count)
202 """ L4 'connection' tied to two VPP interfaces """
204 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
205 self.testcase = testcase
206 self.ifs = [None, None]
209 self.address_family = af
210 self.l4proto = l4proto
211 self.ports = [None, None]
212 self.ports[0] = port1
213 self.ports[1] = port2
216 def pkt(self, side, l4args={}, payload="x"):
217 is_ip6 = 1 if self.address_family == AF_INET6 else 0
220 src_if = self.ifs[s0]
221 dst_if = self.ifs[s1]
222 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
223 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
224 merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
225 merged_l4args.update(l4args)
226 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
228 self.l4proto(**merged_l4args) /
232 def send(self, side, flags=None, payload=""):
234 if flags is not None:
235 l4args['flags'] = flags
236 self.ifs[side].add_stream(self.pkt(side,
237 l4args=l4args, payload=payload))
238 self.ifs[1 - side].enable_capture()
239 self.testcase.pg_start()
241 def recv(self, side):
242 p = self.ifs[side].wait_for_packet(1)
245 def send_through(self, side, flags=None, payload=""):
246 self.send(side, flags, payload)
247 p = self.recv(1 - side)
250 def send_pingpong(self, side, flags1=None, flags2=None):
251 p1 = self.send_through(side, flags1)
252 p2 = self.send_through(1 - side, flags2)
257 L4_CONN_SIDE_ZERO = 0
261 class LoggerWrapper(object):
262 def __init__(self, logger=None):
263 self._logger = logger
265 def debug(self, *args, **kwargs):
267 self._logger.debug(*args, **kwargs)
269 def error(self, *args, **kwargs):
271 self._logger.error(*args, **kwargs)
274 def fragment_rfc791(packet, fragsize, _logger=None):
276 Fragment an IPv4 packet per RFC 791
277 :param packet: packet to fragment
278 :param fragsize: size at which to fragment
279 :note: IP options are not supported
280 :returns: list of fragments
282 logger = LoggerWrapper(_logger)
283 logger.debug(ppp("Fragmenting packet:", packet))
284 packet = packet.__class__(str(packet)) # recalculate all values
285 if len(packet[IP].options) > 0:
286 raise Exception("Not implemented")
287 if len(packet) <= fragsize:
290 pre_ip_len = len(packet) - len(packet[IP])
291 ip_header_len = packet[IP].ihl * 4
292 hex_packet = str(packet)
293 hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
294 hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
298 otl = len(packet[IP])
299 nfb = (fragsize - pre_ip_len - ihl * 4) / 8
302 p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
305 p[IP].len = ihl * 4 + nfb * 8
309 p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
310 p[IP].len = otl - nfb * 8
311 p[IP].frag = fo + nfb
314 more_fragments = fragment_rfc791(p, fragsize, _logger)
315 pkts.extend(more_fragments)
320 def fragment_rfc8200(packet, identification, fragsize, _logger=None):
322 Fragment an IPv6 packet per RFC 8200
323 :param packet: packet to fragment
324 :param fragsize: size at which to fragment
325 :note: IP options are not supported
326 :returns: list of fragments
328 logger = LoggerWrapper(_logger)
329 packet = packet.__class__(str(packet)) # recalculate all values
330 if len(packet) <= fragsize:
332 logger.debug(ppp("Fragmenting packet:", packet))
336 hop_by_hop_hdr = None
340 l = packet.getlayer(counter)
342 if l.__class__ is IPv6:
344 # ignore 2nd IPv6 header and everything below..
348 elif l.__class__ is IPv6ExtHdrFragment:
349 raise Exception("Already fragmented")
350 elif l.__class__ is IPv6ExtHdrRouting:
351 routing_hdr = counter
352 elif l.__class__ is IPv6ExtHdrHopByHop:
353 hop_by_hop_hdr = counter
354 elif seen_ipv6 and not upper_layer and \
355 not l.__class__.__name__.startswith('IPv6ExtHdr'):
356 upper_layer = counter
357 counter = counter + 1
358 l = packet.getlayer(counter)
361 "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
362 (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
364 if upper_layer is None:
365 raise Exception("Upper layer header not found in IPv6 packet")
367 last_per_fragment_hdr = ipv6_nr
368 if routing_hdr is None:
369 if hop_by_hop_hdr is not None:
370 last_per_fragment_hdr = hop_by_hop_hdr
372 last_per_fragment_hdr = routing_hdr
373 logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
375 per_fragment_headers = packet.copy()
376 per_fragment_headers[last_per_fragment_hdr].remove_payload()
377 logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
379 ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
380 hex_payload = str(ext_and_upper_layer)
381 logger.debug("Payload length is %s" % len(hex_payload))
382 logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
384 fragment_ext_hdr = IPv6ExtHdrFragment()
385 logger.debug(ppp("Fragment header:", fragment_ext_hdr))
387 if len(per_fragment_headers) + len(fragment_ext_hdr) +\
388 len(ext_and_upper_layer) - len(ext_and_upper_layer.payload)\
390 raise Exception("Cannot fragment this packet - MTU too small "
391 "(%s, %s, %s, %s, %s)" % (
392 len(per_fragment_headers), len(fragment_ext_hdr),
393 len(ext_and_upper_layer),
394 len(ext_and_upper_layer.payload), fragsize))
396 orig_nh = packet[IPv6].nh
397 p = per_fragment_headers
400 p = p / fragment_ext_hdr
401 del p[IPv6ExtHdrFragment].nh
402 first_payload_len_nfb = (fragsize - len(p)) / 8
403 p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
405 p[IPv6ExtHdrFragment].nh = orig_nh
406 p[IPv6ExtHdrFragment].id = identification
407 p[IPv6ExtHdrFragment].offset = 0
408 p[IPv6ExtHdrFragment].m = 1
409 p = p.__class__(str(p))
410 logger.debug(ppp("Fragment %s:" % len(pkts), p))
412 offset = first_payload_len_nfb * 8
413 logger.debug("Offset after first fragment: %s" % offset)
414 while len(hex_payload) > offset:
415 p = per_fragment_headers
418 p = p / fragment_ext_hdr
419 del p[IPv6ExtHdrFragment].nh
420 l_nfb = (fragsize - len(p)) / 8
421 p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
422 p[IPv6ExtHdrFragment].nh = orig_nh
423 p[IPv6ExtHdrFragment].id = identification
424 p[IPv6ExtHdrFragment].offset = offset / 8
425 p[IPv6ExtHdrFragment].m = 1
426 p = p.__class__(str(p))
427 logger.debug(ppp("Fragment %s:" % len(pkts), p))
429 offset = offset + l_nfb * 8
431 pkts[-1][IPv6ExtHdrFragment].m = 0 # reset more-flags in last fragment
436 def reassemble4_core(listoffragments, return_ip):
438 first = listoffragments[0]
440 for pkt in listoffragments:
441 buffer.seek(pkt[IP].frag*8)
442 buffer.write(bytes(pkt[IP].payload))
443 first.len = len(buffer.getvalue()) + 20
447 header = bytes(first[IP])[:20]
448 return first[IP].__class__(header + buffer.getvalue())
450 header = bytes(first[Ether])[:34]
451 return first[Ether].__class__(header + buffer.getvalue())
454 def reassemble4_ether(listoffragments):
455 return reassemble4_core(listoffragments, False)
458 def reassemble4(listoffragments):
459 return reassemble4_core(listoffragments, True)