1 """ test framework utilities """
5 from abc import abstractmethod, ABCMeta
6 from cStringIO import StringIO
7 from scapy.utils6 import in6_mactoifaceid
9 from scapy.layers.l2 import Ether
10 from scapy.packet import Raw
11 from scapy.layers.inet import IP
12 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
14 from scapy.utils import hexdump
15 from socket import AF_INET6
18 def ppp(headline, packet):
19 """ Return string containing the output of scapy packet.show() call. """
21 old_stdout = sys.stdout
27 sys.stdout = old_stdout
31 def ppc(headline, capture, limit=10):
32 """ Return string containing ppp() printout for a capture.
34 :param headline: printed as first line of output
35 :param capture: packets to print
36 :param limit: limit the print to # of packets
41 if limit < len(capture):
42 tail = "\nPrint limit reached, %s out of %s packets printed" % (
44 body = "".join([ppp("Packet #%s:" % count, p)
45 for count, p in zip(range(0, limit), capture)])
46 return "%s\n%s%s" % (headline, body, tail)
49 def ip4_range(ip4, s, e):
50 tmp = ip4.rsplit('.', 1)[0]
51 return ("%s.%d" % (tmp, i) for i in range(s, e))
54 def ip4n_range(ip4n, s, e):
55 ip4 = socket.inet_ntop(socket.AF_INET, ip4n)
56 return (socket.inet_pton(socket.AF_INET, ip)
57 for ip in ip4_range(ip4, s, e))
61 """ Convert the : separated format into binary packet data for the API """
62 return mac.replace(':', '').decode('hex')
66 euid = in6_mactoifaceid(mac)
67 addr = "fe80::" + euid
71 def ip6_normalize(ip6):
72 return socket.inet_ntop(socket.AF_INET6,
73 socket.inet_pton(socket.AF_INET6, ip6))
76 def check_core_path(logger, core_path):
77 with open("/proc/sys/kernel/core_pattern", "r") as f:
79 if corefmt.startswith("|"):
81 "WARNING: redirecting the core dump through a"
82 " filter may result in truncated dumps.")
84 " You may want to check the filter settings"
85 " or uninstall it and edit the"
86 " /proc/sys/kernel/core_pattern accordingly.")
88 " current core pattern is: %s" % corefmt)
91 class NumericConstant(object):
92 __metaclass__ = ABCMeta
97 def __init__(self, value):
107 if self._value in self.desc_dict:
108 return self.desc_dict[self._value]
113 """ Generic test host "connected" to VPPs interface. """
123 return mactobinary(self._mac)
127 """ IPv4 address - string """
132 """ IPv4 address of remote host - raw, suitable as API parameter."""
133 return socket.inet_pton(socket.AF_INET, self._ip4)
137 """ IPv6 address - string """
142 """ IPv6 address of remote host - raw, suitable as API parameter."""
143 return socket.inet_pton(socket.AF_INET6, self._ip6)
147 """ IPv6 link-local address - string """
152 """ IPv6 link-local address of remote host -
153 raw, suitable as API parameter."""
154 return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
156 def __eq__(self, other):
157 if isinstance(other, Host):
158 return (self.mac == other.mac and
159 self.ip4 == other.ip4 and
160 self.ip6 == other.ip6 and
161 self.ip6_ll == other.ip6_ll)
165 def __ne__(self, other):
166 return not self.__eq__(other)
169 return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
175 return hash(self.__repr__())
177 def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
181 self._ip6_ll = ip6_ll
184 class ForeignAddressFactory(object):
187 net_template = '10.10.10.{}'
188 net = net_template.format(0) + '/' + str(prefix_len)
192 raise Exception("Network host address exhaustion")
194 return self.net_template.format(self.count)
198 """ L4 'connection' tied to two VPP interfaces """
200 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
201 self.testcase = testcase
202 self.ifs = [None, None]
205 self.address_family = af
206 self.l4proto = l4proto
207 self.ports = [None, None]
208 self.ports[0] = port1
209 self.ports[1] = port2
212 def pkt(self, side, l4args={}, payload="x"):
213 is_ip6 = 1 if self.address_family == AF_INET6 else 0
216 src_if = self.ifs[s0]
217 dst_if = self.ifs[s1]
218 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
219 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
220 merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
221 merged_l4args.update(l4args)
222 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
224 self.l4proto(**merged_l4args) /
228 def send(self, side, flags=None, payload=""):
230 if flags is not None:
231 l4args['flags'] = flags
232 self.ifs[side].add_stream(self.pkt(side,
233 l4args=l4args, payload=payload))
234 self.ifs[1 - side].enable_capture()
235 self.testcase.pg_start()
237 def recv(self, side):
238 p = self.ifs[side].wait_for_packet(1)
241 def send_through(self, side, flags=None, payload=""):
242 self.send(side, flags, payload)
243 p = self.recv(1 - side)
246 def send_pingpong(self, side, flags1=None, flags2=None):
247 p1 = self.send_through(side, flags1)
248 p2 = self.send_through(1 - side, flags2)
253 L4_CONN_SIDE_ZERO = 0
257 class LoggerWrapper(object):
258 def __init__(self, logger=None):
259 self._logger = logger
261 def debug(self, *args, **kwargs):
263 self._logger.debug(*args, **kwargs)
265 def error(self, *args, **kwargs):
267 self._logger.error(*args, **kwargs)
270 def fragment_rfc791(packet, fragsize, _logger=None):
272 Fragment an IPv4 packet per RFC 791
273 :param packet: packet to fragment
274 :param fragsize: size at which to fragment
275 :note: IP options are not supported
276 :returns: list of fragments
278 logger = LoggerWrapper(_logger)
279 logger.debug(ppp("Fragmenting packet:", packet))
280 packet = packet.__class__(str(packet)) # recalculate all values
281 if len(packet[IP].options) > 0:
282 raise Exception("Not implemented")
283 if len(packet) <= fragsize:
286 pre_ip_len = len(packet) - len(packet[IP])
287 ip_header_len = packet[IP].ihl * 4
288 hex_packet = str(packet)
289 hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
290 hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
294 otl = len(packet[IP])
295 nfb = (fragsize - pre_ip_len - ihl * 4) / 8
298 p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
301 p[IP].len = ihl * 4 + nfb * 8
305 p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
306 p[IP].len = otl - nfb * 8
307 p[IP].frag = fo + nfb
310 more_fragments = fragment_rfc791(p, fragsize, _logger)
311 pkts.extend(more_fragments)
316 def fragment_rfc8200(packet, identification, fragsize, _logger=None):
318 Fragment an IPv6 packet per RFC 8200
319 :param packet: packet to fragment
320 :param fragsize: size at which to fragment
321 :note: IP options are not supported
322 :returns: list of fragments
324 logger = LoggerWrapper(_logger)
325 packet = packet.__class__(str(packet)) # recalculate all values
326 if len(packet) <= fragsize:
328 logger.debug(ppp("Fragmenting packet:", packet))
332 hop_by_hop_hdr = None
336 l = packet.getlayer(counter)
338 if l.__class__ is IPv6:
340 # ignore 2nd IPv6 header and everything below..
344 elif l.__class__ is IPv6ExtHdrFragment:
345 raise Exception("Already fragmented")
346 elif l.__class__ is IPv6ExtHdrRouting:
347 routing_hdr = counter
348 elif l.__class__ is IPv6ExtHdrHopByHop:
349 hop_by_hop_hdr = counter
350 elif seen_ipv6 and not upper_layer and \
351 not l.__class__.__name__.startswith('IPv6ExtHdr'):
352 upper_layer = counter
353 counter = counter + 1
354 l = packet.getlayer(counter)
357 "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
358 (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
360 if upper_layer is None:
361 raise Exception("Upper layer header not found in IPv6 packet")
363 last_per_fragment_hdr = ipv6_nr
364 if routing_hdr is None:
365 if hop_by_hop_hdr is not None:
366 last_per_fragment_hdr = hop_by_hop_hdr
368 last_per_fragment_hdr = routing_hdr
369 logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
371 per_fragment_headers = packet.copy()
372 per_fragment_headers[last_per_fragment_hdr].remove_payload()
373 logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
375 ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
376 hex_payload = str(ext_and_upper_layer)
377 logger.debug("Payload length is %s" % len(hex_payload))
378 logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
380 fragment_ext_hdr = IPv6ExtHdrFragment()
381 logger.debug(ppp("Fragment header:", fragment_ext_hdr))
383 if len(per_fragment_headers) + len(fragment_ext_hdr) +\
384 len(ext_and_upper_layer) - len(ext_and_upper_layer.payload)\
386 raise Exception("Cannot fragment this packet - MTU too small "
387 "(%s, %s, %s, %s, %s)" % (
388 len(per_fragment_headers), len(fragment_ext_hdr),
389 len(ext_and_upper_layer),
390 len(ext_and_upper_layer.payload), fragsize))
392 orig_nh = packet[IPv6].nh
393 p = per_fragment_headers
396 p = p / fragment_ext_hdr
397 del p[IPv6ExtHdrFragment].nh
398 first_payload_len_nfb = (fragsize - len(p)) / 8
399 p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
401 p[IPv6ExtHdrFragment].nh = orig_nh
402 p[IPv6ExtHdrFragment].id = identification
403 p[IPv6ExtHdrFragment].offset = 0
404 p[IPv6ExtHdrFragment].m = 1
405 p = p.__class__(str(p))
406 logger.debug(ppp("Fragment %s:" % len(pkts), p))
408 offset = first_payload_len_nfb * 8
409 logger.debug("Offset after first fragment: %s" % offset)
410 while len(hex_payload) > offset:
411 p = per_fragment_headers
414 p = p / fragment_ext_hdr
415 del p[IPv6ExtHdrFragment].nh
416 l_nfb = (fragsize - len(p)) / 8
417 p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
418 p[IPv6ExtHdrFragment].nh = orig_nh
419 p[IPv6ExtHdrFragment].id = identification
420 p[IPv6ExtHdrFragment].offset = offset / 8
421 p[IPv6ExtHdrFragment].m = 1
422 p = p.__class__(str(p))
423 logger.debug(ppp("Fragment %s:" % len(pkts), p))
425 offset = offset + l_nfb * 8
427 pkts[-1][IPv6ExtHdrFragment].m = 0 # reset more-flags in last fragment