test: new test infrastructure
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python
2
3 import logging
4 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
5
6 import unittest
7 from framework import VppTestCase, VppTestRunner
8 from util import Util
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, Dot1Q
12 from scapy.layers.inet6 import (IPv6, UDP,
13     ICMPv6ND_NS, ICMPv6NDOptSrcLLAddr,
14     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr)
15
16
17 @unittest.skip('Not finished yet.\n')
18 class TestIPv6(Util, VppTestCase):
19     """ IPv6 Test Case """
20
21     @classmethod
22     def setUpClass(cls):
23         super(TestIPv6, cls).setUpClass()
24
25         try:
26             cls.create_interfaces_and_subinterfaces()
27
28             # configure IPv6 on hardware interfaces
29             cls.config_ip6(cls.interfaces)
30
31             cls.config_ip6_on_software_interfaces(cls.interfaces)
32
33             # resolve ICMPv6 ND using hardware interfaces
34             cls.resolve_icmpv6_nd(cls.interfaces)
35
36             # let VPP know MAC addresses of peer (sub)interfaces
37             # cls.resolve_icmpv6_nd_on_software_interfaces(cls.interfaces)
38             cls.send_neighbour_advertisement_on_software_interfaces(cls.interfaces)
39
40             # config 2M FIB enries
41             #cls.config_fib_entries(2000000)
42             cls.config_fib_entries(1000000)
43
44         except Exception as e:
45             super(TestIPv6, cls).tearDownClass()
46             raise
47
48     def tearDown(self):
49         self.cli(2, "show int")
50         self.cli(2, "show trace")
51         self.cli(2, "show hardware")
52         self.cli(2, "show ip arp")
53         # self.cli(2, "show ip fib")  # 2M entries
54         self.cli(2, "show error")
55         self.cli(2, "show run")
56
57     @classmethod
58     def create_vlan_subif(cls, pg_index, vlan):
59         cls.api("create_vlan_subif pg%u vlan %u" % (pg_index, vlan))
60
61     @classmethod
62     def create_dot1ad_subif(cls, pg_index, sub_id, outer_vlan_id, inner_vlan_id):
63         cls.api("create_subif pg%u sub_id %u outer_vlan_id %u inner_vlan_id %u dot1ad"
64                  % (pg_index, sub_id, outer_vlan_id, inner_vlan_id))
65
66     class SoftInt(object):
67         pass
68
69     class HardInt(SoftInt):
70         pass
71
72     class Subint(SoftInt):
73         def __init__(self, sub_id):
74             self.sub_id = sub_id
75
76     class Dot1QSubint(Subint):
77         def __init__(self, sub_id, vlan=None):
78             if vlan is None:
79                 vlan = sub_id
80             super(TestIPv6.Dot1QSubint, self).__init__(sub_id)
81             self.vlan = vlan
82
83     class Dot1ADSubint(Subint):
84         def __init__(self, sub_id, outer_vlan, inner_vlan):
85             super(TestIPv6.Dot1ADSubint, self).__init__(sub_id)
86             self.outer_vlan = outer_vlan
87             self.inner_vlan = inner_vlan
88
89     @classmethod
90     def create_interfaces_and_subinterfaces(cls):
91         cls.interfaces = range(3)
92
93         cls.create_interfaces(cls.interfaces)
94
95         # Make vpp_api_test see interfaces created using debug CLI (in function create_interfaces)
96         cls.api("sw_interface_dump")
97
98         cls.INT_DETAILS = dict()
99
100         cls.INT_DETAILS[0] = cls.HardInt()
101
102         cls.INT_DETAILS[1] = cls.Dot1QSubint(100)
103         cls.create_vlan_subif(1, cls.INT_DETAILS[1].vlan)
104
105         # FIXME: Wrong packet format/wrong layer on output of interface 2
106         #self.INT_DETAILS[2] = self.Dot1ADSubint(10, 200, 300)
107         #self.create_dot1ad_subif(2, self.INT_DETAILS[2].sub_id, self.INT_DETAILS[2].outer_vlan, self.INT_DETAILS[2].inner_vlan)
108
109         # Use dor1q for now
110         cls.INT_DETAILS[2] = cls.Dot1QSubint(200)
111         cls.create_vlan_subif(2, cls.INT_DETAILS[2].vlan)
112
113         for i in cls.interfaces:
114             det = cls.INT_DETAILS[i]
115             if isinstance(det, cls.Subint):
116                 cls.api("sw_interface_set_flags pg%u.%u admin-up" % (i, det.sub_id))
117
118     # IP adresses on subinterfaces
119     MY_SOFT_IP6S = {}
120     VPP_SOFT_IP6S = {}
121
122     @classmethod
123     def config_ip6_on_software_interfaces(cls, args):
124         for i in args:
125             cls.MY_SOFT_IP6S[i] = "fd01:%u::2" % i
126             cls.VPP_SOFT_IP6S[i] = "fd01:%u::1" % i
127             if isinstance(cls.INT_DETAILS[i], cls.Subint):
128                 interface = "pg%u.%u" % (i, cls.INT_DETAILS[i].sub_id)
129             else:
130                 interface = "pg%u" % i
131             cls.api("sw_interface_add_del_address %s %s/32" % (interface, cls.VPP_SOFT_IP6S[i]))
132             cls.log("My subinterface IPv6 address is %s" % (cls.MY_SOFT_IP6S[i]))
133
134     # let VPP know MAC addresses of peer (sub)interfaces
135     @classmethod
136     def resolve_icmpv6_nd_on_software_interfaces(cls, args):
137         for i in args:
138             ip = cls.VPP_SOFT_IP6S[i]
139             cls.log("Sending ICMPv6ND_NS request for %s on port %u" % (ip, i))
140             nd_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) /
141                       IPv6(src=cls.MY_SOFT_IP6S[i], dst=ip) /
142                       ICMPv6ND_NS(tgt=ip) /
143                       ICMPv6NDOptSrcLLAddr(lladdr=cls.MY_MACS[i]))
144             cls.pg_add_stream(i, nd_req)
145             cls.pg_enable_capture([i])
146
147             cls.cli(2, "trace add pg-input 1")
148             cls.pg_start()
149
150             # We don't need to read output
151
152     # let VPP know MAC addresses of peer (sub)interfaces
153     @classmethod
154     def send_neighbour_advertisement_on_software_interfaces(cls, args):
155         for i in args:
156             ip = cls.VPP_SOFT_IP6S[i]
157             cls.log("Sending ICMPv6ND_NA message for %s on port %u" % (ip, i))
158             pkt = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) /
159                    IPv6(src=cls.MY_SOFT_IP6S[i], dst=ip) /
160                    ICMPv6ND_NA(tgt=ip, R=0, S=0) /
161                    ICMPv6NDOptDstLLAddr(lladdr=cls.MY_MACS[i]))
162             cls.pg_add_stream(i, pkt)
163             cls.pg_enable_capture([i])
164
165             cls.cli(2, "trace add pg-input 1")
166             cls.pg_start()
167
168     @classmethod
169     def config_fib_entries(cls, count):
170         n_int = len(cls.interfaces)
171         for i in cls.interfaces:
172             cls.api("ip_add_del_route fd02::1/128 via %s count %u" % (cls.VPP_SOFT_IP6S[i], count / n_int))
173
174     @classmethod
175     def add_dot1_layers(cls, i, packet):
176         assert(type(packet) is Ether)
177         payload = packet.payload
178         det = cls.INT_DETAILS[i]
179         if isinstance(det, cls.Dot1QSubint):
180             packet.remove_payload()
181             packet.add_payload(Dot1Q(vlan=det.sub_id) / payload)
182         elif isinstance(det, cls.Dot1ADSubint):
183             packet.remove_payload()
184             packet.add_payload(Dot1Q(vlan=det.outer_vlan) / Dot1Q(vlan=det.inner_vlan) / payload)
185             packet.type = 0x88A8
186
187     def remove_dot1_layers(self, i, packet):
188         self.assertEqual(type(packet), Ether)
189         payload = packet.payload
190         det = self.INT_DETAILS[i]
191         if isinstance(det, self.Dot1QSubint):
192             self.assertEqual(type(payload), Dot1Q)
193             self.assertEqual(payload.vlan, self.INT_DETAILS[i].vlan)
194             payload = payload.payload
195         elif isinstance(det, self.Dot1ADSubint):  # TODO: change 88A8 type
196             self.assertEqual(type(payload), Dot1Q)
197             self.assertEqual(payload.vlan, self.INT_DETAILS[i].outer_vlan)
198             payload = payload.payload
199             self.assertEqual(type(payload), Dot1Q)
200             self.assertEqual(payload.vlan, self.INT_DETAILS[i].inner_vlan)
201             payload = payload.payload
202         packet.remove_payload()
203         packet.add_payload(payload)
204
205     def create_stream(self, pg_id):
206         pg_targets = [None] * 3
207         pg_targets[0] = [1, 2]
208         pg_targets[1] = [0, 2]
209         pg_targets[2] = [0, 1]
210         pkts = []
211         for i in range(0, 257):
212             target_pg_id = pg_targets[pg_id][i % 2]
213             info = self.create_packet_info(pg_id, target_pg_id)
214             payload = self.info_to_payload(info)
215             p = (Ether(dst=self.VPP_MACS[pg_id], src=self.MY_MACS[pg_id]) /
216                  IPv6(src=self.MY_SOFT_IP6S[pg_id], dst=self.MY_SOFT_IP6S[target_pg_id]) /
217                  UDP(sport=1234, dport=1234) /
218                  Raw(payload))
219             info.data = p.copy()
220             self.add_dot1_layers(pg_id, p)
221             if not isinstance(self.INT_DETAILS[pg_id], self.Subint):
222                 packet_sizes = [76, 512, 1518, 9018]
223             else:
224                 packet_sizes = [76, 512, 1518+4, 9018+4]
225             size = packet_sizes[(i / 2) % len(packet_sizes)]
226             self.extend_packet(p, size)
227             pkts.append(p)
228         return pkts
229
230     def verify_capture(self, o, capture):
231         last_info = {}
232         for i in self.interfaces:
233             last_info[i] = None
234         for packet in capture:
235             self.remove_dot1_layers(o, packet)  # Check VLAN tags and Ethernet header
236             self.assertTrue(Dot1Q not in packet)
237             try:
238                 ip = packet[IPv6]
239                 udp = packet[UDP]
240                 payload_info = self.payload_to_info(str(packet[Raw]))
241                 packet_index = payload_info.index
242                 src_pg = payload_info.src
243                 dst_pg = payload_info.dst
244                 self.assertEqual(dst_pg, o)
245                 self.log("Got packet on port %u: src=%u (id=%u)" % (o, src_pg, packet_index), 2)
246                 next_info = self.get_next_packet_info_for_interface2(src_pg, dst_pg, last_info[src_pg])
247                 last_info[src_pg] = next_info
248                 self.assertTrue(next_info is not None)
249                 self.assertEqual(packet_index, next_info.index)
250                 saved_packet = next_info.data
251                 # Check standard fields
252                 self.assertEqual(ip.src, saved_packet[IPv6].src)
253                 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
254                 self.assertEqual(udp.sport, saved_packet[UDP].sport)
255                 self.assertEqual(udp.dport, saved_packet[UDP].dport)
256             except:
257                 self.log("Unexpected or invalid packet:")
258                 packet.show()
259                 raise
260         for i in self.interfaces:
261             remaining_packet = self.get_next_packet_info_for_interface2(i, o, last_info[i])
262             self.assertTrue(remaining_packet is None, "Port %u: Packet expected from source %u didn't arrive" % (o, i))
263
264     def test_fib(self):
265         """ IPv6 FIB test """
266
267         for i in self.interfaces:
268             pkts = self.create_stream(i)
269             self.pg_add_stream(i, pkts)
270
271         self.pg_enable_capture(self.interfaces)
272         self.pg_start()
273
274         for i in self.interfaces:
275             out = self.pg_get_capture(i)
276             self.log("Verifying capture %u" % i)
277             self.verify_capture(i, out)
278
279
280 if __name__ == '__main__':
281     unittest.main(testRunner = VppTestRunner)