ipsec: add missing ipv6 ah code & ipv6 tests
[vpp.git] / test / util.py
1 """ test framework utilities """
2
3 import socket
4 import sys
5 from abc import abstractmethod, ABCMeta
6 from cStringIO import StringIO
7 from scapy.utils6 import in6_mactoifaceid
8
9 from scapy.layers.l2 import Ether
10 from scapy.packet import Raw
11 from scapy.layers.inet import IP
12 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
13     IPv6ExtHdrHopByHop
14 from scapy.utils import hexdump
15 from socket import AF_INET6
16
17
18 def ppp(headline, packet):
19     """ Return string containing the output of scapy packet.show() call. """
20     o = StringIO()
21     old_stdout = sys.stdout
22     sys.stdout = o
23     print(headline)
24     hexdump(packet)
25     print("")
26     packet.show()
27     sys.stdout = old_stdout
28     return o.getvalue()
29
30
31 def ppc(headline, capture, limit=10):
32     """ Return string containing ppp() printout for a capture.
33
34     :param headline: printed as first line of output
35     :param capture: packets to print
36     :param limit: limit the print to # of packets
37     """
38     if not capture:
39         return headline
40     tail = ""
41     if limit < len(capture):
42         tail = "\nPrint limit reached, %s out of %s packets printed" % (
43             limit, len(capture))
44     body = "".join([ppp("Packet #%s:" % count, p)
45                     for count, p in zip(range(0, limit), capture)])
46     return "%s\n%s%s" % (headline, body, tail)
47
48
49 def ip4_range(ip4, s, e):
50     tmp = ip4.rsplit('.', 1)[0]
51     return ("%s.%d" % (tmp, i) for i in range(s, e))
52
53
54 def ip4n_range(ip4n, s, e):
55     ip4 = socket.inet_ntop(socket.AF_INET, ip4n)
56     return (socket.inet_pton(socket.AF_INET, ip)
57             for ip in ip4_range(ip4, s, e))
58
59
60 def mactobinary(mac):
61     """ Convert the : separated format into binary packet data for the API """
62     return mac.replace(':', '').decode('hex')
63
64
65 def mk_ll_addr(mac):
66     euid = in6_mactoifaceid(mac)
67     addr = "fe80::" + euid
68     return addr
69
70
71 def ip6_normalize(ip6):
72     return socket.inet_ntop(socket.AF_INET6,
73                             socket.inet_pton(socket.AF_INET6, ip6))
74
75
76 def check_core_path(logger, core_path):
77     with open("/proc/sys/kernel/core_pattern", "r") as f:
78         corefmt = f.read()
79         if corefmt.startswith("|"):
80             logger.error(
81                 "WARNING: redirecting the core dump through a"
82                 " filter may result in truncated dumps.")
83             logger.error(
84                 "   You may want to check the filter settings"
85                 " or uninstall it and edit the"
86                 " /proc/sys/kernel/core_pattern accordingly.")
87             logger.error(
88                 "   current core pattern is: %s" % corefmt)
89
90
91 class NumericConstant(object):
92     __metaclass__ = ABCMeta
93
94     desc_dict = {}
95
96     @abstractmethod
97     def __init__(self, value):
98         self._value = value
99
100     def __int__(self):
101         return self._value
102
103     def __long__(self):
104         return self._value
105
106     def __str__(self):
107         if self._value in self.desc_dict:
108             return self.desc_dict[self._value]
109         return ""
110
111
112 class Host(object):
113     """ Generic test host "connected" to VPPs interface. """
114
115     @property
116     def mac(self):
117         """ MAC address """
118         return self._mac
119
120     @property
121     def bin_mac(self):
122         """ MAC address """
123         return mactobinary(self._mac)
124
125     @property
126     def ip4(self):
127         """ IPv4 address - string """
128         return self._ip4
129
130     @property
131     def ip4n(self):
132         """ IPv4 address of remote host - raw, suitable as API parameter."""
133         return socket.inet_pton(socket.AF_INET, self._ip4)
134
135     @property
136     def ip6(self):
137         """ IPv6 address - string """
138         return self._ip6
139
140     @property
141     def ip6n(self):
142         """ IPv6 address of remote host - raw, suitable as API parameter."""
143         return socket.inet_pton(socket.AF_INET6, self._ip6)
144
145     @property
146     def ip6_ll(self):
147         """ IPv6 link-local address - string """
148         return self._ip6_ll
149
150     @property
151     def ip6n_ll(self):
152         """ IPv6 link-local address of remote host -
153         raw, suitable as API parameter."""
154         return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
155
156     def __eq__(self, other):
157         if isinstance(other, Host):
158             return (self.mac == other.mac and
159                     self.ip4 == other.ip4 and
160                     self.ip6 == other.ip6 and
161                     self.ip6_ll == other.ip6_ll)
162         else:
163             return False
164
165     def __ne__(self, other):
166         return not self.__eq__(other)
167
168     def __repr__(self):
169         return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
170                                                             self.ip4,
171                                                             self.ip6,
172                                                             self.ip6_ll)
173
174     def __hash__(self):
175         return hash(self.__repr__())
176
177     def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
178         self._mac = mac
179         self._ip4 = ip4
180         self._ip6 = ip6
181         self._ip6_ll = ip6_ll
182
183
184 class ForeignAddressFactory(object):
185     count = 0
186     prefix_len = 24
187     net_template = '10.10.10.{}'
188     net = net_template.format(0) + '/' + str(prefix_len)
189
190     def get_ip4(self):
191         if self.count > 255:
192             raise Exception("Network host address exhaustion")
193         self.count += 1
194         return self.net_template.format(self.count)
195
196
197 class L4_Conn():
198     """ L4 'connection' tied to two VPP interfaces """
199
200     def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
201         self.testcase = testcase
202         self.ifs = [None, None]
203         self.ifs[0] = if1
204         self.ifs[1] = if2
205         self.address_family = af
206         self.l4proto = l4proto
207         self.ports = [None, None]
208         self.ports[0] = port1
209         self.ports[1] = port2
210         self
211
212     def pkt(self, side, l4args={}, payload="x"):
213         is_ip6 = 1 if self.address_family == AF_INET6 else 0
214         s0 = side
215         s1 = 1 - side
216         src_if = self.ifs[s0]
217         dst_if = self.ifs[s1]
218         layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
219                    IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
220         merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
221         merged_l4args.update(l4args)
222         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
223              layer_3[is_ip6] /
224              self.l4proto(**merged_l4args) /
225              Raw(payload))
226         return p
227
228     def send(self, side, flags=None, payload=""):
229         l4args = {}
230         if flags is not None:
231             l4args['flags'] = flags
232         self.ifs[side].add_stream(self.pkt(side,
233                                            l4args=l4args, payload=payload))
234         self.ifs[1 - side].enable_capture()
235         self.testcase.pg_start()
236
237     def recv(self, side):
238         p = self.ifs[side].wait_for_packet(1)
239         return p
240
241     def send_through(self, side, flags=None, payload=""):
242         self.send(side, flags, payload)
243         p = self.recv(1 - side)
244         return p
245
246     def send_pingpong(self, side, flags1=None, flags2=None):
247         p1 = self.send_through(side, flags1)
248         p2 = self.send_through(1 - side, flags2)
249         return [p1, p2]
250
251
252 class L4_CONN_SIDE:
253     L4_CONN_SIDE_ZERO = 0
254     L4_CONN_SIDE_ONE = 1
255
256
257 class LoggerWrapper(object):
258     def __init__(self, logger=None):
259         self._logger = logger
260
261     def debug(self, *args, **kwargs):
262         if self._logger:
263             self._logger.debug(*args, **kwargs)
264
265     def error(self, *args, **kwargs):
266         if self._logger:
267             self._logger.error(*args, **kwargs)
268
269
270 def fragment_rfc791(packet, fragsize, _logger=None):
271     """
272     Fragment an IPv4 packet per RFC 791
273     :param packet: packet to fragment
274     :param fragsize: size at which to fragment
275     :note: IP options are not supported
276     :returns: list of fragments
277     """
278     logger = LoggerWrapper(_logger)
279     logger.debug(ppp("Fragmenting packet:", packet))
280     packet = packet.__class__(str(packet))  # recalculate all values
281     if len(packet[IP].options) > 0:
282         raise Exception("Not implemented")
283     if len(packet) <= fragsize:
284         return [packet]
285
286     pre_ip_len = len(packet) - len(packet[IP])
287     ip_header_len = packet[IP].ihl * 4
288     hex_packet = str(packet)
289     hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
290     hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
291
292     pkts = []
293     ihl = packet[IP].ihl
294     otl = len(packet[IP])
295     nfb = (fragsize - pre_ip_len - ihl * 4) / 8
296     fo = packet[IP].frag
297
298     p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
299     p[IP].flags = "MF"
300     p[IP].frag = fo
301     p[IP].len = ihl * 4 + nfb * 8
302     del p[IP].chksum
303     pkts.append(p)
304
305     p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
306     p[IP].len = otl - nfb * 8
307     p[IP].frag = fo + nfb
308     del p[IP].chksum
309
310     more_fragments = fragment_rfc791(p, fragsize, _logger)
311     pkts.extend(more_fragments)
312
313     return pkts
314
315
316 def fragment_rfc8200(packet, identification, fragsize, _logger=None):
317     """
318     Fragment an IPv6 packet per RFC 8200
319     :param packet: packet to fragment
320     :param fragsize: size at which to fragment
321     :note: IP options are not supported
322     :returns: list of fragments
323     """
324     logger = LoggerWrapper(_logger)
325     packet = packet.__class__(str(packet))  # recalculate all values
326     if len(packet) <= fragsize:
327         return [packet]
328     logger.debug(ppp("Fragmenting packet:", packet))
329     pkts = []
330     counter = 0
331     routing_hdr = None
332     hop_by_hop_hdr = None
333     upper_layer = None
334     seen_ipv6 = False
335     ipv6_nr = -1
336     l = packet.getlayer(counter)
337     while l is not None:
338         if l.__class__ is IPv6:
339             if seen_ipv6:
340                 # ignore 2nd IPv6 header and everything below..
341                 break
342             ipv6_nr = counter
343             seen_ipv6 = True
344         elif l.__class__ is IPv6ExtHdrFragment:
345             raise Exception("Already fragmented")
346         elif l.__class__ is IPv6ExtHdrRouting:
347             routing_hdr = counter
348         elif l.__class__ is IPv6ExtHdrHopByHop:
349             hop_by_hop_hdr = counter
350         elif seen_ipv6 and not upper_layer and \
351                 not l.__class__.__name__.startswith('IPv6ExtHdr'):
352             upper_layer = counter
353         counter = counter + 1
354         l = packet.getlayer(counter)
355
356     logger.debug(
357         "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
358         (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
359
360     if upper_layer is None:
361         raise Exception("Upper layer header not found in IPv6 packet")
362
363     last_per_fragment_hdr = ipv6_nr
364     if routing_hdr is None:
365         if hop_by_hop_hdr is not None:
366             last_per_fragment_hdr = hop_by_hop_hdr
367     else:
368         last_per_fragment_hdr = routing_hdr
369     logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
370
371     per_fragment_headers = packet.copy()
372     per_fragment_headers[last_per_fragment_hdr].remove_payload()
373     logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
374
375     ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
376     hex_payload = str(ext_and_upper_layer)
377     logger.debug("Payload length is %s" % len(hex_payload))
378     logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
379
380     fragment_ext_hdr = IPv6ExtHdrFragment()
381     logger.debug(ppp("Fragment header:", fragment_ext_hdr))
382
383     if len(per_fragment_headers) + len(fragment_ext_hdr) +\
384             len(ext_and_upper_layer) - len(ext_and_upper_layer.payload)\
385             > fragsize:
386         raise Exception("Cannot fragment this packet - MTU too small "
387                         "(%s, %s, %s, %s, %s)" % (
388                             len(per_fragment_headers), len(fragment_ext_hdr),
389                             len(ext_and_upper_layer),
390                             len(ext_and_upper_layer.payload), fragsize))
391
392     orig_nh = packet[IPv6].nh
393     p = per_fragment_headers
394     del p[IPv6].plen
395     del p[IPv6].nh
396     p = p / fragment_ext_hdr
397     del p[IPv6ExtHdrFragment].nh
398     first_payload_len_nfb = (fragsize - len(p)) / 8
399     p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
400     del p[IPv6].plen
401     p[IPv6ExtHdrFragment].nh = orig_nh
402     p[IPv6ExtHdrFragment].id = identification
403     p[IPv6ExtHdrFragment].offset = 0
404     p[IPv6ExtHdrFragment].m = 1
405     p = p.__class__(str(p))
406     logger.debug(ppp("Fragment %s:" % len(pkts), p))
407     pkts.append(p)
408     offset = first_payload_len_nfb * 8
409     logger.debug("Offset after first fragment: %s" % offset)
410     while len(hex_payload) > offset:
411         p = per_fragment_headers
412         del p[IPv6].plen
413         del p[IPv6].nh
414         p = p / fragment_ext_hdr
415         del p[IPv6ExtHdrFragment].nh
416         l_nfb = (fragsize - len(p)) / 8
417         p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
418         p[IPv6ExtHdrFragment].nh = orig_nh
419         p[IPv6ExtHdrFragment].id = identification
420         p[IPv6ExtHdrFragment].offset = offset / 8
421         p[IPv6ExtHdrFragment].m = 1
422         p = p.__class__(str(p))
423         logger.debug(ppp("Fragment %s:" % len(pkts), p))
424         pkts.append(p)
425         offset = offset + l_nfb * 8
426
427     pkts[-1][IPv6ExtHdrFragment].m = 0  # reset more-flags in last fragment
428
429     return pkts