1 """ test framework utilities """
5 from abc import abstractmethod, ABCMeta
6 from cStringIO import StringIO
7 from scapy.layers.inet6 import in6_mactoifaceid
9 from scapy.layers.l2 import Ether
10 from scapy.packet import Raw
11 from scapy.layers.inet import IP, UDP, TCP
12 from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
13 from scapy.packet import Packet
14 from socket import inet_pton, AF_INET, AF_INET6
17 def ppp(headline, packet):
18 """ Return string containing the output of scapy packet.show() call. """
20 old_stdout = sys.stdout
24 sys.stdout = old_stdout
28 def ppc(headline, capture, limit=10):
29 """ Return string containing ppp() printout for a capture.
31 :param headline: printed as first line of output
32 :param capture: packets to print
33 :param limit: limit the print to # of packets
38 if limit < len(capture):
39 tail = "\nPrint limit reached, %s out of %s packets printed" % (
42 body = "".join([ppp("Packet #%s:" % count, p)
43 for count, p in zip(range(0, limit), capture)])
44 return "%s\n%s%s" % (headline, body, tail)
47 def ip4_range(ip4, s, e):
48 tmp = ip4.rsplit('.', 1)[0]
49 return ("%s.%d" % (tmp, i) for i in range(s, e))
52 def ip4n_range(ip4n, s, e):
53 ip4 = socket.inet_ntop(socket.AF_INET, ip4n)
54 return (socket.inet_pton(socket.AF_INET, ip)
55 for ip in ip4_range(ip4, s, e))
59 """ Convert the : separated format into binary packet data for the API """
60 return mac.replace(':', '').decode('hex')
64 euid = in6_mactoifaceid(mac)
65 addr = "fe80::" + euid
69 class NumericConstant(object):
70 __metaclass__ = ABCMeta
75 def __init__(self, value):
85 if self._value in self.desc_dict:
86 return self.desc_dict[self._value]
91 """ Generic test host "connected" to VPPs interface. """
101 return mactobinary(self._mac)
105 """ IPv4 address - string """
110 """ IPv4 address of remote host - raw, suitable as API parameter."""
111 return socket.inet_pton(socket.AF_INET, self._ip4)
115 """ IPv6 address - string """
120 """ IPv6 address of remote host - raw, suitable as API parameter."""
121 return socket.inet_pton(socket.AF_INET6, self._ip6)
125 """ IPv6 link-local address - string """
130 """ IPv6 link-local address of remote host -
131 raw, suitable as API parameter."""
132 return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
134 def __eq__(self, other):
135 if isinstance(other, Host):
136 return (self.mac == other.mac and
137 self.ip4 == other.ip4 and
138 self.ip6 == other.ip6 and
139 self.ip6_ll == other.ip6_ll)
143 def __ne__(self, other):
144 return not self.__eq__(other)
147 return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
153 return hash(self.__repr__())
155 def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
159 self._ip6_ll = ip6_ll
162 class ForeignAddressFactory(object):
165 net_template = '10.10.10.{}'
166 net = net_template.format(0) + '/' + str(prefix_len)
170 raise Exception("Network host address exhaustion")
172 return self.net_template.format(self.count)
176 """ L4 'connection' tied to two VPP interfaces """
177 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
178 self.testcase = testcase
179 self.ifs = [None, None]
182 self.address_family = af
183 self.l4proto = l4proto
184 self.ports = [None, None]
185 self.ports[0] = port1
186 self.ports[1] = port2
189 def pkt(self, side, l4args={}, payload="x"):
190 is_ip6 = 1 if self.address_family == AF_INET6 else 0
193 src_if = self.ifs[s0]
194 dst_if = self.ifs[s1]
195 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
196 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
197 merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
198 merged_l4args.update(l4args)
199 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
201 self.l4proto(**merged_l4args) /
205 def send(self, side, flags=None, payload=""):
207 if flags is not None:
208 l4args['flags'] = flags
209 self.ifs[side].add_stream(self.pkt(side,
210 l4args=l4args, payload=payload))
211 self.ifs[1-side].enable_capture()
212 self.testcase.pg_start()
214 def recv(self, side):
215 p = self.ifs[side].wait_for_packet(1)
218 def send_through(self, side, flags=None, payload=""):
219 self.send(side, flags, payload)
220 p = self.recv(1-side)
223 def send_pingpong(self, side, flags1=None, flags2=None):
224 p1 = self.send_through(side, flags1)
225 p2 = self.send_through(1-side, flags2)
230 L4_CONN_SIDE_ZERO = 0