fc0ebd7350079068daad85a63cdc652775bf22fa
[vpp.git] / test / util.py
1 """ test framework utilities """
2
3 import abc
4 import socket
5 from socket import AF_INET6
6 import six
7 import sys
8 import os.path
9
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP
12 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
13     IPv6ExtHdrHopByHop
14 from scapy.packet import Raw
15 from scapy.utils import hexdump
16 from scapy.utils6 import in6_mactoifaceid
17
18 from io import BytesIO
19 from vpp_papi import mac_pton
20
21
22 def ppp(headline, packet):
23     """ Return string containing the output of scapy packet.show() call. """
24     return '%s\n%s\n\n%s\n' % (headline,
25                                hexdump(packet, dump=True),
26                                packet.show(dump=True))
27
28
29 def ppc(headline, capture, limit=10):
30     """ Return string containing ppp() printout for a capture.
31
32     :param headline: printed as first line of output
33     :param capture: packets to print
34     :param limit: limit the print to # of packets
35     """
36     if not capture:
37         return headline
38     tail = ""
39     if limit < len(capture):
40         tail = "\nPrint limit reached, %s out of %s packets printed" % (
41             limit, len(capture))
42     body = "".join([ppp("Packet #%s:" % count, p)
43                     for count, p in zip(range(0, limit), capture)])
44     return "%s\n%s%s" % (headline, body, tail)
45
46
47 def ip4_range(ip4, s, e):
48     tmp = ip4.rsplit('.', 1)[0]
49     return ("%s.%d" % (tmp, i) for i in range(s, e))
50
51
52 def ip4n_range(ip4n, s, e):
53     ip4 = socket.inet_ntop(socket.AF_INET, ip4n)
54     return (socket.inet_pton(socket.AF_INET, ip)
55             for ip in ip4_range(ip4, s, e))
56
57
58 # wrapper around scapy library function.
59 def mk_ll_addr(mac):
60     euid = in6_mactoifaceid(mac)
61     addr = "fe80::" + euid
62     return addr
63
64
65 def ip6_normalize(ip6):
66     return socket.inet_ntop(socket.AF_INET6,
67                             socket.inet_pton(socket.AF_INET6, ip6))
68
69
70 def get_core_path(tempdir):
71     return "%s/%s" % (tempdir, get_core_pattern())
72
73
74 def is_core_present(tempdir):
75     return os.path.isfile(get_core_path(tempdir))
76
77
78 def get_core_pattern():
79     with open("/proc/sys/kernel/core_pattern", "r") as f:
80         corefmt = f.read().strip()
81     return corefmt
82
83
84 def check_core_path(logger, core_path):
85     corefmt = get_core_pattern()
86     if corefmt.startswith("|"):
87         logger.error(
88             "WARNING: redirecting the core dump through a"
89             " filter may result in truncated dumps.")
90         logger.error(
91             "   You may want to check the filter settings"
92             " or uninstall it and edit the"
93             " /proc/sys/kernel/core_pattern accordingly.")
94         logger.error(
95             "   current core pattern is: %s" % corefmt)
96
97
98 class NumericConstant(object):
99
100     desc_dict = {}
101
102     def __init__(self, value):
103         self._value = value
104
105     def __int__(self):
106         return self._value
107
108     def __long__(self):
109         return self._value
110
111     def __str__(self):
112         if self._value in self.desc_dict:
113             return self.desc_dict[self._value]
114         return ""
115
116
117 class Host(object):
118     """ Generic test host "connected" to VPPs interface. """
119
120     @property
121     def mac(self):
122         """ MAC address """
123         return self._mac
124
125     @property
126     def bin_mac(self):
127         """ MAC address """
128         return mac_pton(self._mac)
129
130     @property
131     def ip4(self):
132         """ IPv4 address - string """
133         return self._ip4
134
135     @property
136     def ip4n(self):
137         """ IPv4 address of remote host - raw, suitable as API parameter."""
138         return socket.inet_pton(socket.AF_INET, self._ip4)
139
140     @property
141     def ip6(self):
142         """ IPv6 address - string """
143         return self._ip6
144
145     @property
146     def ip6n(self):
147         """ IPv6 address of remote host - raw, suitable as API parameter."""
148         return socket.inet_pton(socket.AF_INET6, self._ip6)
149
150     @property
151     def ip6_ll(self):
152         """ IPv6 link-local address - string """
153         return self._ip6_ll
154
155     @property
156     def ip6n_ll(self):
157         """ IPv6 link-local address of remote host -
158         raw, suitable as API parameter."""
159         return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
160
161     def __eq__(self, other):
162         if isinstance(other, Host):
163             return (self.mac == other.mac and
164                     self.ip4 == other.ip4 and
165                     self.ip6 == other.ip6 and
166                     self.ip6_ll == other.ip6_ll)
167         else:
168             return False
169
170     def __ne__(self, other):
171         return not self.__eq__(other)
172
173     def __repr__(self):
174         return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
175                                                             self.ip4,
176                                                             self.ip6,
177                                                             self.ip6_ll)
178
179     def __hash__(self):
180         return hash(self.__repr__())
181
182     def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
183         self._mac = mac
184         self._ip4 = ip4
185         self._ip6 = ip6
186         self._ip6_ll = ip6_ll
187
188
189 class ForeignAddressFactory(object):
190     count = 0
191     prefix_len = 24
192     net_template = '10.10.10.{}'
193     net = net_template.format(0) + '/' + str(prefix_len)
194
195     def get_ip4(self):
196         if self.count > 255:
197             raise Exception("Network host address exhaustion")
198         self.count += 1
199         return self.net_template.format(self.count)
200
201
202 class L4_Conn():
203     """ L4 'connection' tied to two VPP interfaces """
204
205     def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
206         self.testcase = testcase
207         self.ifs = [None, None]
208         self.ifs[0] = if1
209         self.ifs[1] = if2
210         self.address_family = af
211         self.l4proto = l4proto
212         self.ports = [None, None]
213         self.ports[0] = port1
214         self.ports[1] = port2
215         self
216
217     def pkt(self, side, l4args={}, payload="x"):
218         is_ip6 = 1 if self.address_family == AF_INET6 else 0
219         s0 = side
220         s1 = 1 - side
221         src_if = self.ifs[s0]
222         dst_if = self.ifs[s1]
223         layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
224                    IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
225         merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
226         merged_l4args.update(l4args)
227         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
228              layer_3[is_ip6] /
229              self.l4proto(**merged_l4args) /
230              Raw(payload))
231         return p
232
233     def send(self, side, flags=None, payload=""):
234         l4args = {}
235         if flags is not None:
236             l4args['flags'] = flags
237         self.ifs[side].add_stream(self.pkt(side,
238                                            l4args=l4args, payload=payload))
239         self.ifs[1 - side].enable_capture()
240         self.testcase.pg_start()
241
242     def recv(self, side):
243         p = self.ifs[side].wait_for_packet(1)
244         return p
245
246     def send_through(self, side, flags=None, payload=""):
247         self.send(side, flags, payload)
248         p = self.recv(1 - side)
249         return p
250
251     def send_pingpong(self, side, flags1=None, flags2=None):
252         p1 = self.send_through(side, flags1)
253         p2 = self.send_through(1 - side, flags2)
254         return [p1, p2]
255
256
257 class L4_CONN_SIDE:
258     L4_CONN_SIDE_ZERO = 0
259     L4_CONN_SIDE_ONE = 1
260
261
262 class LoggerWrapper(object):
263     def __init__(self, logger=None):
264         self._logger = logger
265
266     def debug(self, *args, **kwargs):
267         if self._logger:
268             self._logger.debug(*args, **kwargs)
269
270     def error(self, *args, **kwargs):
271         if self._logger:
272             self._logger.error(*args, **kwargs)
273
274
275 def fragment_rfc791(packet, fragsize, _logger=None):
276     """
277     Fragment an IPv4 packet per RFC 791
278     :param packet: packet to fragment
279     :param fragsize: size at which to fragment
280     :note: IP options are not supported
281     :returns: list of fragments
282     """
283     logger = LoggerWrapper(_logger)
284     logger.debug(ppp("Fragmenting packet:", packet))
285     packet = packet.__class__(str(packet))  # recalculate all values
286     if len(packet[IP].options) > 0:
287         raise Exception("Not implemented")
288     if len(packet) <= fragsize:
289         return [packet]
290
291     pre_ip_len = len(packet) - len(packet[IP])
292     ip_header_len = packet[IP].ihl * 4
293     hex_packet = str(packet)
294     hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
295     hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
296
297     pkts = []
298     ihl = packet[IP].ihl
299     otl = len(packet[IP])
300     nfb = (fragsize - pre_ip_len - ihl * 4) / 8
301     fo = packet[IP].frag
302
303     p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
304     p[IP].flags = "MF"
305     p[IP].frag = fo
306     p[IP].len = ihl * 4 + nfb * 8
307     del p[IP].chksum
308     pkts.append(p)
309
310     p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
311     p[IP].len = otl - nfb * 8
312     p[IP].frag = fo + nfb
313     del p[IP].chksum
314
315     more_fragments = fragment_rfc791(p, fragsize, _logger)
316     pkts.extend(more_fragments)
317
318     return pkts
319
320
321 def fragment_rfc8200(packet, identification, fragsize, _logger=None):
322     """
323     Fragment an IPv6 packet per RFC 8200
324     :param packet: packet to fragment
325     :param fragsize: size at which to fragment
326     :note: IP options are not supported
327     :returns: list of fragments
328     """
329     logger = LoggerWrapper(_logger)
330     packet = packet.__class__(str(packet))  # recalculate all values
331     if len(packet) <= fragsize:
332         return [packet]
333     logger.debug(ppp("Fragmenting packet:", packet))
334     pkts = []
335     counter = 0
336     routing_hdr = None
337     hop_by_hop_hdr = None
338     upper_layer = None
339     seen_ipv6 = False
340     ipv6_nr = -1
341     l = packet.getlayer(counter)
342     while l is not None:
343         if l.__class__ is IPv6:
344             if seen_ipv6:
345                 # ignore 2nd IPv6 header and everything below..
346                 break
347             ipv6_nr = counter
348             seen_ipv6 = True
349         elif l.__class__ is IPv6ExtHdrFragment:
350             raise Exception("Already fragmented")
351         elif l.__class__ is IPv6ExtHdrRouting:
352             routing_hdr = counter
353         elif l.__class__ is IPv6ExtHdrHopByHop:
354             hop_by_hop_hdr = counter
355         elif seen_ipv6 and not upper_layer and \
356                 not l.__class__.__name__.startswith('IPv6ExtHdr'):
357             upper_layer = counter
358         counter = counter + 1
359         l = packet.getlayer(counter)
360
361     logger.debug(
362         "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
363         (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
364
365     if upper_layer is None:
366         raise Exception("Upper layer header not found in IPv6 packet")
367
368     last_per_fragment_hdr = ipv6_nr
369     if routing_hdr is None:
370         if hop_by_hop_hdr is not None:
371             last_per_fragment_hdr = hop_by_hop_hdr
372     else:
373         last_per_fragment_hdr = routing_hdr
374     logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
375
376     per_fragment_headers = packet.copy()
377     per_fragment_headers[last_per_fragment_hdr].remove_payload()
378     logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
379
380     ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
381     hex_payload = str(ext_and_upper_layer)
382     logger.debug("Payload length is %s" % len(hex_payload))
383     logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
384
385     fragment_ext_hdr = IPv6ExtHdrFragment()
386     logger.debug(ppp("Fragment header:", fragment_ext_hdr))
387
388     if len(per_fragment_headers) + len(fragment_ext_hdr) +\
389             len(ext_and_upper_layer) - len(ext_and_upper_layer.payload)\
390             > fragsize:
391         raise Exception("Cannot fragment this packet - MTU too small "
392                         "(%s, %s, %s, %s, %s)" % (
393                             len(per_fragment_headers), len(fragment_ext_hdr),
394                             len(ext_and_upper_layer),
395                             len(ext_and_upper_layer.payload), fragsize))
396
397     orig_nh = packet[IPv6].nh
398     p = per_fragment_headers
399     del p[IPv6].plen
400     del p[IPv6].nh
401     p = p / fragment_ext_hdr
402     del p[IPv6ExtHdrFragment].nh
403     first_payload_len_nfb = (fragsize - len(p)) / 8
404     p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
405     del p[IPv6].plen
406     p[IPv6ExtHdrFragment].nh = orig_nh
407     p[IPv6ExtHdrFragment].id = identification
408     p[IPv6ExtHdrFragment].offset = 0
409     p[IPv6ExtHdrFragment].m = 1
410     p = p.__class__(str(p))
411     logger.debug(ppp("Fragment %s:" % len(pkts), p))
412     pkts.append(p)
413     offset = first_payload_len_nfb * 8
414     logger.debug("Offset after first fragment: %s" % offset)
415     while len(hex_payload) > offset:
416         p = per_fragment_headers
417         del p[IPv6].plen
418         del p[IPv6].nh
419         p = p / fragment_ext_hdr
420         del p[IPv6ExtHdrFragment].nh
421         l_nfb = (fragsize - len(p)) / 8
422         p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
423         p[IPv6ExtHdrFragment].nh = orig_nh
424         p[IPv6ExtHdrFragment].id = identification
425         p[IPv6ExtHdrFragment].offset = offset / 8
426         p[IPv6ExtHdrFragment].m = 1
427         p = p.__class__(str(p))
428         logger.debug(ppp("Fragment %s:" % len(pkts), p))
429         pkts.append(p)
430         offset = offset + l_nfb * 8
431
432     pkts[-1][IPv6ExtHdrFragment].m = 0  # reset more-flags in last fragment
433
434     return pkts
435
436
437 def reassemble4_core(listoffragments, return_ip):
438     buffer = BytesIO()
439     first = listoffragments[0]
440     buffer.seek(20)
441     for pkt in listoffragments:
442         buffer.seek(pkt[IP].frag*8)
443         buffer.write(bytes(pkt[IP].payload))
444     first.len = len(buffer.getvalue()) + 20
445     first.flags = 0
446     del(first.chksum)
447     if return_ip:
448         header = bytes(first[IP])[:20]
449         return first[IP].__class__(header + buffer.getvalue())
450     else:
451         header = bytes(first[Ether])[:34]
452         return first[Ether].__class__(header + buffer.getvalue())
453
454
455 def reassemble4_ether(listoffragments):
456     return reassemble4_core(listoffragments, False)
457
458
459 def reassemble4(listoffragments):
460     return reassemble4_core(listoffragments, True)