2 """IP6 VRF Multi-instance Test Case HLD:
5 - higher number of pg-ip6 interfaces causes problems => only 15 pg-ip6 \
6 interfaces in 5 VRFs are tested
7 - jumbo packets in configuration with 15 pg-ip6 interfaces leads to \
11 - add 15 pg-ip6 interfaces
12 - configure 5 hosts per pg-ip6 interface
14 - add 3 pg-ip6 interfaces per VRF
17 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
20 - check VRF data by parsing output of ip6_fib_dump API command
21 - all packets received correctly in case of pg-ip6 interfaces in VRF
22 - no packet received in case of pg-ip6 interfaces not in VRF
28 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
31 - check VRF data by parsing output of ip6_fib_dump API command
32 - all packets received correctly in case of pg-ip6 interfaces in VRF
33 - no packet received in case of pg-ip6 interfaces not in VRF
36 - add 1 of reset VRFs and 1 new VRF
39 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
42 - check VRF data by parsing output of ip6_fib_dump API command
43 - all packets received correctly in case of pg-ip6 interfaces in VRF
44 - no packet received in case of pg-ip6 interfaces not in VRF
47 - reset all VRFs (i.e. no VRF except VRF=0 created)
50 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
53 - check VRF data by parsing output of ip6_fib_dump API command
54 - all packets received correctly in case of pg-ip6 interfaces in VRF
55 - no packet received in case of pg-ip6 interfaces not in VRF
62 from scapy.packet import Raw
63 from scapy.layers.l2 import Ether
64 from scapy.layers.inet6 import UDP, IPv6, ICMPv6ND_NS, ICMPv6ND_RA, \
65 RouterAlert, IPv6ExtHdrHopByHop
66 from scapy.utils6 import in6_ismaddr, in6_isllsnmaddr, in6_getAddrType
67 from scapy.pton_ntop import inet_ntop
68 from scapy.data import IPV6_ADDR_UNICAST
70 from framework import VppTestCase, VppTestRunner
73 # VRF status constants
74 VRF_NOT_CONFIGURED = 0
79 def is_ipv6_misc_ext(p):
80 """ Is packet one of uninteresting IPv6 broadcasts (extended to filter out
81 ICMPv6 Neighbor Discovery - Neighbor Advertisement packets too)? """
82 if p.haslayer(ICMPv6ND_RA):
83 if in6_ismaddr(p[IPv6].dst):
85 if p.haslayer(ICMPv6ND_NS):
86 if in6_isllsnmaddr(p[IPv6].dst):
88 if p.haslayer(IPv6ExtHdrHopByHop):
89 for o in p[IPv6ExtHdrHopByHop].options:
90 if isinstance(o, RouterAlert):
95 class TestIP6VrfMultiInst(VppTestCase):
96 """ IP6 VRF Multi-instance Test Case """
101 Perform standard class setup (defined by class method setUpClass in
102 class VppTestCase) before running the test case, set test case related
103 variables and configure VPP.
105 super(TestIP6VrfMultiInst, cls).setUpClass()
110 cls.pg_ifs_per_vrf = 3
113 # Create pg interfaces
114 cls.create_pg_interfaces(
115 range(cls.nr_of_vrfs * cls.pg_ifs_per_vrf))
117 # Packet flows mapping pg0 -> pg1, pg2 etc.
119 for i in range(len(cls.pg_interfaces)):
120 multiplicand = i / cls.pg_ifs_per_vrf
122 cls.pg_interfaces[multiplicand * cls.pg_ifs_per_vrf + j]
123 for j in range(cls.pg_ifs_per_vrf)
124 if (multiplicand * cls.pg_ifs_per_vrf + j) != i]
125 cls.flows[cls.pg_interfaces[i]] = pg_list
127 # Packet sizes - jumbo packet (9018 bytes) skipped
128 cls.pg_if_packet_sizes = [64, 512, 1518]
130 # Set up all interfaces
131 for pg_if in cls.pg_interfaces:
133 pg_if.generate_remote_hosts(cls.hosts_per_pg)
135 # Create list of VRFs
136 cls.vrf_list = list()
138 # Create list of reset VRFs
139 cls.vrf_reset_list = list()
141 # Create list of pg_interfaces in VRFs
142 cls.pg_in_vrf = list()
144 # Create list of pg_interfaces not in BDs
145 cls.pg_not_in_vrf = [pg_if for pg_if in cls.pg_interfaces]
147 # Create mapping of pg_interfaces to VRF IDs
148 cls.pg_if_by_vrf_id = dict()
149 for i in range(cls.nr_of_vrfs):
152 cls.pg_interfaces[i * cls.pg_ifs_per_vrf + j]
153 for j in range(cls.pg_ifs_per_vrf)]
154 cls.pg_if_by_vrf_id[vrf_id] = pg_list
157 super(TestIP6VrfMultiInst, cls).tearDownClass()
162 Clear trace and packet infos before running each test.
164 super(TestIP6VrfMultiInst, self).setUp()
165 self.reset_packet_infos()
169 Show various debug prints after each test.
171 super(TestIP6VrfMultiInst, self).tearDown()
172 if not self.vpp_dead:
173 self.logger.info(self.vapi.ppcli("show ip6 fib"))
174 self.logger.info(self.vapi.ppcli("show ip6 neighbors"))
176 def create_vrf_and_assign_interfaces(self, count, start=1):
178 Create required number of FIB tables / VRFs, put 3 l2-pg interfaces
179 to every FIB table / VRF.
181 :param int count: Number of FIB tables / VRFs to be created.
182 :param int start: Starting number of the FIB table / VRF ID. \
185 for i in range(count):
187 pg_if = self.pg_if_by_vrf_id[vrf_id][0]
188 dest_addr = pg_if.remote_hosts[0].ip6n
190 self.vapi.ip_table_add_del(vrf_id, is_add=1, is_ipv6=1)
191 self.vapi.ip_add_del_route(
192 dest_addr, dest_addr_len, pg_if.local_ip6n, is_ipv6=1,
193 table_id=vrf_id, is_multipath=1)
194 self.logger.info("IPv6 VRF ID %d created" % vrf_id)
195 if vrf_id not in self.vrf_list:
196 self.vrf_list.append(vrf_id)
197 if vrf_id in self.vrf_reset_list:
198 self.vrf_reset_list.remove(vrf_id)
199 for j in range(self.pg_ifs_per_vrf):
200 pg_if = self.pg_if_by_vrf_id[vrf_id][j]
201 pg_if.set_table_ip6(vrf_id)
202 self.logger.info("pg-interface %s added to IPv6 VRF ID %d"
203 % (pg_if.name, vrf_id))
204 if pg_if not in self.pg_in_vrf:
205 self.pg_in_vrf.append(pg_if)
206 if pg_if in self.pg_not_in_vrf:
207 self.pg_not_in_vrf.remove(pg_if)
209 pg_if.disable_ipv6_ra()
210 pg_if.configure_ipv6_neighbors()
211 self.logger.debug(self.vapi.ppcli("show ip6 fib"))
212 self.logger.debug(self.vapi.ppcli("show ip6 neighbors"))
214 def reset_vrf(self, vrf_id):
216 Reset required FIB table / VRF.
218 :param int vrf_id: The FIB table / VRF ID to be reset.
220 # self.vapi.reset_vrf(vrf_id, is_ipv6=1)
221 self.vapi.reset_fib(vrf_id, is_ipv6=1)
222 if vrf_id in self.vrf_list:
223 self.vrf_list.remove(vrf_id)
224 if vrf_id not in self.vrf_reset_list:
225 self.vrf_reset_list.append(vrf_id)
226 for j in range(self.pg_ifs_per_vrf):
227 pg_if = self.pg_if_by_vrf_id[vrf_id][j]
229 if pg_if in self.pg_in_vrf:
230 self.pg_in_vrf.remove(pg_if)
231 if pg_if not in self.pg_not_in_vrf:
232 self.pg_not_in_vrf.append(pg_if)
233 self.logger.info("IPv6 VRF ID %d reset" % vrf_id)
234 self.logger.debug(self.vapi.ppcli("show ip6 fib"))
235 self.logger.debug(self.vapi.ppcli("show ip6 neighbors"))
236 self.vapi.ip_table_add_del(vrf_id, is_add=0, is_ipv6=1)
238 def create_stream(self, src_if, packet_sizes):
240 Create input packet stream for defined interface using hosts list.
242 :param object src_if: Interface to create packet stream for.
243 :param list packet_sizes: List of required packet sizes.
244 :return: Stream of packets.
247 src_hosts = src_if.remote_hosts
248 for dst_if in self.flows[src_if]:
249 for dst_host in dst_if.remote_hosts:
250 src_host = random.choice(src_hosts)
251 pkt_info = self.create_packet_info(src_if, dst_if)
252 payload = self.info_to_payload(pkt_info)
253 p = (Ether(dst=src_if.local_mac, src=src_host.mac) /
254 IPv6(src=src_host.ip6, dst=dst_host.ip6) /
255 UDP(sport=1234, dport=1234) /
257 pkt_info.data = p.copy()
258 size = random.choice(packet_sizes)
259 self.extend_packet(p, size)
261 self.logger.debug("Input stream created for port %s. Length: %u pkt(s)"
262 % (src_if.name, len(pkts)))
265 def verify_capture(self, pg_if, capture):
267 Verify captured input packet stream for defined interface.
269 :param object pg_if: Interface to verify captured packet stream for.
270 :param list capture: Captured packet stream.
273 for i in self.pg_interfaces:
274 last_info[i.sw_if_index] = None
275 dst_sw_if_index = pg_if.sw_if_index
276 for packet in capture:
280 payload_info = self.payload_to_info(str(packet[Raw]))
281 packet_index = payload_info.index
282 self.assertEqual(payload_info.dst, dst_sw_if_index)
283 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
284 (pg_if.name, payload_info.src, packet_index))
285 next_info = self.get_next_packet_info_for_interface2(
286 payload_info.src, dst_sw_if_index,
287 last_info[payload_info.src])
288 last_info[payload_info.src] = next_info
289 self.assertIsNotNone(next_info)
290 self.assertEqual(packet_index, next_info.index)
291 saved_packet = next_info.data
292 # Check standard fields
293 self.assertEqual(ip.src, saved_packet[IPv6].src)
294 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
295 self.assertEqual(udp.sport, saved_packet[UDP].sport)
296 self.assertEqual(udp.dport, saved_packet[UDP].dport)
298 self.logger.error(ppp("Unexpected or invalid packet:", packet))
300 for i in self.pg_interfaces:
301 remaining_packet = self.get_next_packet_info_for_interface2(
302 i, dst_sw_if_index, last_info[i.sw_if_index])
305 "Port %u: Packet expected from source %u didn't arrive" %
306 (dst_sw_if_index, i.sw_if_index))
308 def verify_vrf(self, vrf_id):
310 Check if the FIB table / VRF ID is configured.
312 :param int vrf_id: The FIB table / VRF ID to be verified.
313 :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
315 ip6_fib_dump = self.vapi.ip6_fib_dump()
318 for ip6_fib_details in ip6_fib_dump:
319 if ip6_fib_details.table_id == vrf_id:
322 addr = inet_ntop(socket.AF_INET6, ip6_fib_details.address)
323 addrtype = in6_getAddrType(addr)
324 vrf_count += 1 if addrtype == IPV6_ADDR_UNICAST else 0
325 if not vrf_exist and vrf_count == 0:
326 self.logger.info("IPv6 VRF ID %d is not configured" % vrf_id)
327 return VRF_NOT_CONFIGURED
328 elif vrf_exist and vrf_count == 0:
329 self.logger.info("IPv6 VRF ID %d has been reset" % vrf_id)
332 self.logger.info("IPv6 VRF ID %d is configured" % vrf_id)
333 return VRF_CONFIGURED
335 def run_verify_test(self):
337 Create packet streams for all configured l2-pg interfaces, send all \
338 prepared packet streams and verify that:
339 - all packets received correctly on all pg-l2 interfaces assigned
341 - no packet received on all pg-l2 interfaces not assigned to bridge
344 :raise RuntimeError: If no packet captured on l2-pg interface assigned
345 to the bridge domain or if any packet is captured on l2-pg
346 interface not assigned to the bridge domain.
349 # Create incoming packet streams for packet-generator interfaces
350 for pg_if in self.pg_interfaces:
351 pkts = self.create_stream(pg_if, self.pg_if_packet_sizes)
352 pg_if.add_stream(pkts)
354 # Enable packet capture and start packet sending
355 self.pg_enable_capture(self.pg_interfaces)
359 # Verify outgoing packet streams per packet-generator interface
360 for pg_if in self.pg_interfaces:
361 if pg_if in self.pg_in_vrf:
362 capture = pg_if.get_capture(remark="interface is in VRF")
363 self.verify_capture(pg_if, capture)
364 elif pg_if in self.pg_not_in_vrf:
365 pg_if.assert_nothing_captured(remark="interface is not in VRF",
366 filter_out_fn=is_ipv6_misc_ext)
367 self.logger.debug("No capture for interface %s" % pg_if.name)
369 raise Exception("Unknown interface: %s" % pg_if.name)
371 def test_ip6_vrf_01(self):
372 """ IP6 VRF Multi-instance test 1 - create 4 VRFs
376 self.create_vrf_and_assign_interfaces(4)
379 for vrf_id in self.vrf_list:
380 self.assertEqual(self.verify_vrf(vrf_id), VRF_CONFIGURED)
383 self.run_verify_test()
385 def test_ip6_vrf_02(self):
386 """ IP6 VRF Multi-instance test 2 - reset 2 VRFs
394 for vrf_id in self.vrf_reset_list:
395 self.assertEqual(self.verify_vrf(vrf_id), VRF_RESET)
396 for vrf_id in self.vrf_list:
397 self.assertEqual(self.verify_vrf(vrf_id), VRF_CONFIGURED)
400 self.run_verify_test()
402 # Reset routes learned from ICMPv6 Neighbor Discovery
403 for vrf_id in self.vrf_reset_list:
404 self.reset_vrf(vrf_id)
406 def test_ip6_vrf_03(self):
407 """ IP6 VRF Multi-instance 3 - add 2 VRFs
410 # Add 1 of reset VRFs and 1 new VRF
411 self.create_vrf_and_assign_interfaces(1)
412 self.create_vrf_and_assign_interfaces(1, start=5)
415 for vrf_id in self.vrf_reset_list:
416 self.assertEqual(self.verify_vrf(vrf_id), VRF_RESET)
417 for vrf_id in self.vrf_list:
418 self.assertEqual(self.verify_vrf(vrf_id), VRF_CONFIGURED)
421 self.run_verify_test()
423 # Reset routes learned from ICMPv6 Neighbor Discovery
424 for vrf_id in self.vrf_reset_list:
425 self.reset_vrf(vrf_id)
427 def test_ip6_vrf_04(self):
428 """ IP6 VRF Multi-instance test 4 - reset 4 VRFs
431 # Reset all VRFs (i.e. no VRF except VRF=0 created)
432 for i in range(len(self.vrf_list)):
433 self.reset_vrf(self.vrf_list[0])
436 for vrf_id in self.vrf_reset_list:
437 self.assertEqual(self.verify_vrf(vrf_id), VRF_RESET)
438 for vrf_id in self.vrf_list:
439 self.assertEqual(self.verify_vrf(vrf_id), VRF_CONFIGURED)
442 self.run_verify_test()
445 if __name__ == '__main__':
446 unittest.main(testRunner=VppTestRunner)