2 """IP6 VRF Multi-instance Test Case HLD:
5 - higher number of pg-ip6 interfaces causes problems => only 15 pg-ip6 \
6 interfaces in 5 VRFs are tested
7 - jumbo packets in configuration with 15 pg-ip6 interfaces leads to \
11 - add 15 pg-ip6 interfaces
12 - configure 5 hosts per pg-ip6 interface
14 - add 3 pg-ip6 interfaces per VRF
17 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
20 - check VRF data by parsing output of ip_route_dump API command
21 - all packets received correctly in case of pg-ip6 interfaces in the
23 - no packet received in case of pg-ip6 interfaces not in VRF
24 - no packet received in case of pg-ip6 interfaces in different VRFs
30 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
33 - check VRF data by parsing output of ip_route_dump API command
34 - all packets received correctly in case of pg-ip6 interfaces in the
36 - no packet received in case of pg-ip6 interfaces not in VRF
37 - no packet received in case of pg-ip6 interfaces in different VRFs
40 - add 1 of reset VRFs and 1 new VRF
43 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
46 - check VRF data by parsing output of ip_route_dump API command
47 - all packets received correctly in case of pg-ip6 interfaces in the
49 - no packet received in case of pg-ip6 interfaces not in VRF
50 - no packet received in case of pg-ip6 interfaces in different VRFs
53 - reset all VRFs (i.e. no VRF except VRF=0 created)
56 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
59 - check VRF data by parsing output of ip_route_dump API command
60 - all packets received correctly in case of pg-ip6 interfaces in the
62 - no packet received in case of pg-ip6 interfaces not in VRF
63 - no packet received in case of pg-ip6 interfaces in different VRFs
70 from scapy.packet import Raw
71 from scapy.layers.l2 import Ether
72 from scapy.layers.inet6 import (
80 from scapy.utils6 import in6_ismaddr, in6_isllsnmaddr, in6_getAddrType
81 from scapy.pton_ntop import inet_ntop
83 from framework import VppTestCase, VppTestRunner
85 from vrf import VRFState
88 def is_ipv6_misc_ext(p):
89 """Is packet one of uninteresting IPv6 broadcasts (extended to filter out
90 ICMPv6 Neighbor Discovery - Neighbor Advertisement packets too)?"""
91 if p.haslayer(ICMPv6ND_RA):
92 if in6_ismaddr(p[IPv6].dst):
94 if p.haslayer(ICMPv6ND_NS):
95 if in6_isllsnmaddr(p[IPv6].dst):
97 if p.haslayer(IPv6ExtHdrHopByHop):
98 for o in p[IPv6ExtHdrHopByHop].options:
99 if isinstance(o, RouterAlert):
104 class TestIP6VrfMultiInst(VppTestCase):
105 """IP6 VRF Multi-instance Test Case"""
110 Perform standard class setup (defined by class method setUpClass in
111 class VppTestCase) before running the test case, set test case related
112 variables and configure VPP.
114 super(TestIP6VrfMultiInst, cls).setUpClass()
119 cls.pg_ifs_per_vrf = 3
122 # Create pg interfaces
123 cls.create_pg_interfaces(range(cls.nr_of_vrfs * cls.pg_ifs_per_vrf))
125 # Packet flows mapping pg0 -> pg1, pg2 etc.
127 for i in range(len(cls.pg_interfaces)):
128 multiplicand = i // cls.pg_ifs_per_vrf
130 cls.pg_interfaces[multiplicand * cls.pg_ifs_per_vrf + j]
131 for j in range(cls.pg_ifs_per_vrf)
132 if (multiplicand * cls.pg_ifs_per_vrf + j) != i
134 cls.flows[cls.pg_interfaces[i]] = pg_list
136 # Packet sizes - jumbo packet (9018 bytes) skipped
137 cls.pg_if_packet_sizes = [64, 512, 1518]
139 # Set up all interfaces
140 for pg_if in cls.pg_interfaces:
142 pg_if.generate_remote_hosts(cls.hosts_per_pg)
144 # Create list of VRFs
145 cls.vrf_list = list()
147 # Create list of reset VRFs
148 cls.vrf_reset_list = list()
150 # Create list of pg_interfaces in VRFs
151 cls.pg_in_vrf = list()
153 # Create list of pg_interfaces not in VRFs
154 cls.pg_not_in_vrf = [pg_if for pg_if in cls.pg_interfaces]
156 # Create mapping of pg_interfaces to VRF IDs
157 cls.pg_if_sets = dict()
158 for i in range(cls.nr_of_vrfs):
161 cls.pg_interfaces[i * cls.pg_ifs_per_vrf + j]
162 for j in range(cls.pg_ifs_per_vrf)
164 cls.pg_if_sets[set_id] = pg_list
167 super(TestIP6VrfMultiInst, cls).tearDownClass()
171 def tearDownClass(cls):
172 super(TestIP6VrfMultiInst, cls).tearDownClass()
176 Clear trace and packet infos before running each test.
178 super(TestIP6VrfMultiInst, self).setUp()
179 self.reset_packet_infos()
183 Show various debug prints after each test.
185 super(TestIP6VrfMultiInst, self).tearDown()
187 def show_commands_at_teardown(self):
188 self.logger.info(self.vapi.ppcli("show ip6 fib"))
189 self.logger.info(self.vapi.ppcli("show ip6 neighbors"))
191 def _assign_interfaces(self, vrf_id, if_set_id):
192 for i in range(self.pg_ifs_per_vrf):
193 pg_if = self.pg_if_sets[if_set_id][i]
194 pg_if.set_table_ip6(vrf_id)
196 "pg-interface %s added to IPv6 VRF ID %d" % (pg_if.name, vrf_id)
198 if pg_if not in self.pg_in_vrf:
199 self.pg_in_vrf.append(pg_if)
200 if pg_if in self.pg_not_in_vrf:
201 self.pg_not_in_vrf.remove(pg_if)
203 pg_if.disable_ipv6_ra()
204 pg_if.configure_ipv6_neighbors()
206 def create_vrf_and_assign_interfaces(self, count, start=1):
208 Create required number of FIB tables / VRFs, put 3 pg-ip6 interfaces
209 to every FIB table / VRF.
211 :param int count: Number of FIB tables / VRFs to be created.
212 :param int start: Starting number of the FIB table / VRF ID. \
215 for i in range(count):
217 self.vapi.ip_table_add_del(
218 is_add=1, table={"table_id": vrf_id, "is_ip6": 1}
220 self.logger.info("IPv6 VRF ID %d created" % vrf_id)
221 if vrf_id not in self.vrf_list:
222 self.vrf_list.append(vrf_id)
223 if vrf_id in self.vrf_reset_list:
224 self.vrf_reset_list.remove(vrf_id)
225 self._assign_interfaces(vrf_id, vrf_id)
226 self.logger.debug(self.vapi.ppcli("show ip6 fib"))
227 self.logger.debug(self.vapi.ppcli("show ip6 neighbors"))
229 def create_vrf_by_id_and_assign_interfaces(self, set_id, vrf_id=0xFFFFFFFF):
231 Create a FIB table / VRF by vrf_id, put 3 pg-ip6 interfaces
234 :param int vrf_id: Required table ID / VRF ID. \
235 (Default value = 0xffffffff, ID will be selected automatically)
237 ret = self.vapi.ip_table_allocate(table={"table_id": vrf_id, "is_ip6": 1})
238 vrf_id = ret.table.table_id
239 self.logger.info("IPv6 VRF ID %d created" % vrf_id)
240 if vrf_id not in self.vrf_list:
241 self.vrf_list.append(vrf_id)
242 if vrf_id in self.vrf_reset_list:
243 self.vrf_reset_list.remove(vrf_id)
244 self._assign_interfaces(vrf_id, set_id)
245 self.logger.debug(self.vapi.ppcli("show ip6 fib"))
246 self.logger.debug(self.vapi.ppcli("show ip6 neighbors"))
250 def reset_vrf_and_remove_from_vrf_list(self, vrf_id, if_set_id=None):
252 Reset required FIB table / VRF and remove it from VRF list.
254 :param int vrf_id: The FIB table / VRF ID to be reset.
256 if if_set_id is None:
258 self.vapi.ip_table_flush(table={"table_id": vrf_id, "is_ip6": 1})
259 if vrf_id in self.vrf_list:
260 self.vrf_list.remove(vrf_id)
261 if vrf_id not in self.vrf_reset_list:
262 self.vrf_reset_list.append(vrf_id)
263 for j in range(self.pg_ifs_per_vrf):
264 pg_if = self.pg_if_sets[if_set_id][j]
266 pg_if.set_table_ip6(0)
267 if pg_if in self.pg_in_vrf:
268 self.pg_in_vrf.remove(pg_if)
269 if pg_if not in self.pg_not_in_vrf:
270 self.pg_not_in_vrf.append(pg_if)
271 self.logger.info("IPv6 VRF ID %d reset finished" % vrf_id)
272 self.logger.debug(self.vapi.ppcli("show ip6 fib"))
273 self.logger.debug(self.vapi.ppcli("show ip6 neighbors"))
275 def delete_vrf(self, vrf_id):
276 if vrf_id in self.vrf_list:
277 self.vrf_list.remove(vrf_id)
278 if vrf_id in self.vrf_reset_list:
279 self.vrf_reset_list.remove(vrf_id)
280 self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id, "is_ip6": 1})
282 def create_stream(self, src_if, packet_sizes):
284 Create input packet stream for defined interface using hosts list.
286 :param object src_if: Interface to create packet stream for.
287 :param list packet_sizes: List of required packet sizes.
288 :return: Stream of packets.
291 src_hosts = src_if.remote_hosts
292 for dst_if in self.flows[src_if]:
293 for dst_host in dst_if.remote_hosts:
294 src_host = random.choice(src_hosts)
295 pkt_info = self.create_packet_info(src_if, dst_if)
296 payload = self.info_to_payload(pkt_info)
298 Ether(dst=src_if.local_mac, src=src_host.mac)
299 / IPv6(src=src_host.ip6, dst=dst_host.ip6)
300 / UDP(sport=1234, dport=1234)
303 pkt_info.data = p.copy()
304 size = random.choice(packet_sizes)
305 self.extend_packet(p, size)
308 "Input stream created for port %s. Length: %u pkt(s)"
309 % (src_if.name, len(pkts))
313 def create_stream_crosswise_vrf(self, src_if, vrf_id, packet_sizes):
315 Create input packet stream for negative test for leaking across
316 different VRFs for defined interface using hosts list.
318 :param object src_if: Interface to create packet stream for.
319 :param int vrf_id: The FIB table / VRF ID where src_if is assigned.
320 :param list packet_sizes: List of required packet sizes.
321 :return: Stream of packets.
324 src_hosts = src_if.remote_hosts
325 vrf_lst = list(self.vrf_list)
326 vrf_lst.remove(vrf_id)
328 for dst_if in self.pg_if_sets[vrf]:
329 for dst_host in dst_if.remote_hosts:
330 src_host = random.choice(src_hosts)
331 pkt_info = self.create_packet_info(src_if, dst_if)
332 payload = self.info_to_payload(pkt_info)
334 Ether(dst=src_if.local_mac, src=src_host.mac)
335 / IPv6(src=src_host.ip6, dst=dst_host.ip6)
336 / UDP(sport=1234, dport=1234)
339 pkt_info.data = p.copy()
340 size = random.choice(packet_sizes)
341 self.extend_packet(p, size)
344 "Input stream created for port %s. Length: %u pkt(s)"
345 % (src_if.name, len(pkts))
349 def verify_capture(self, pg_if, capture):
351 Verify captured input packet stream for defined interface.
353 :param object pg_if: Interface to verify captured packet stream for.
354 :param list capture: Captured packet stream.
357 for i in self.pg_interfaces:
358 last_info[i.sw_if_index] = None
359 dst_sw_if_index = pg_if.sw_if_index
360 for packet in capture:
364 payload_info = self.payload_to_info(packet[Raw])
365 packet_index = payload_info.index
366 self.assertEqual(payload_info.dst, dst_sw_if_index)
368 "Got packet on port %s: src=%u (id=%u)"
369 % (pg_if.name, payload_info.src, packet_index)
371 next_info = self.get_next_packet_info_for_interface2(
372 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
374 last_info[payload_info.src] = next_info
375 self.assertIsNotNone(next_info)
376 self.assertEqual(packet_index, next_info.index)
377 saved_packet = next_info.data
378 # Check standard fields
379 self.assertEqual(ip.src, saved_packet[IPv6].src)
380 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
381 self.assertEqual(udp.sport, saved_packet[UDP].sport)
382 self.assertEqual(udp.dport, saved_packet[UDP].dport)
384 self.logger.error(ppp("Unexpected or invalid packet:", packet))
386 for i in self.pg_interfaces:
387 remaining_packet = self.get_next_packet_info_for_interface2(
388 i, dst_sw_if_index, last_info[i.sw_if_index]
392 "Port %u: Packet expected from source %u didn't arrive"
393 % (dst_sw_if_index, i.sw_if_index),
396 def verify_vrf(self, vrf_id, if_set_id=None):
398 Check if the FIB table / VRF ID is configured.
400 :param int vrf_id: The FIB table / VRF ID to be verified.
401 :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
403 if if_set_id is None:
405 ip6_fib_dump = self.vapi.ip_route_dump(vrf_id, True)
406 vrf_exist = len(ip6_fib_dump)
408 for ip6_fib_details in ip6_fib_dump:
409 addr = ip6_fib_details.route.prefix.network_address
411 for pg_if in self.pg_if_sets[if_set_id]:
414 for host in pg_if.remote_hosts:
415 if str(addr) == host.ip6:
419 if not vrf_exist and vrf_count == 0:
420 self.logger.info("IPv6 VRF ID %d is not configured" % vrf_id)
421 return VRFState.not_configured
422 elif vrf_exist and vrf_count == 0:
423 self.logger.info("IPv6 VRF ID %d has been reset" % vrf_id)
424 return VRFState.reset
426 self.logger.info("IPv6 VRF ID %d is configured" % vrf_id)
427 return VRFState.configured
429 def run_verify_test(self):
431 Create packet streams for all configured pg interfaces, send all \
432 prepared packet streams and verify that:
433 - all packets received correctly on all pg-ip6 interfaces assigned
435 - no packet received on all pg-ip6 interfaces not assigned to VRFs
437 :raise RuntimeError: If no packet captured on pg-ip6 interface assigned
438 to VRF or if any packet is captured on pg-ip6 interface not
442 # Create incoming packet streams for packet-generator interfaces
443 for pg_if in self.pg_interfaces:
444 pkts = self.create_stream(pg_if, self.pg_if_packet_sizes)
445 pg_if.add_stream(pkts)
447 # Enable packet capture and start packet sending
448 self.pg_enable_capture(self.pg_interfaces)
452 # Verify outgoing packet streams per packet-generator interface
453 for pg_if in self.pg_interfaces:
454 if pg_if in self.pg_in_vrf:
455 capture = pg_if.get_capture(remark="interface is in VRF")
456 self.verify_capture(pg_if, capture)
457 elif pg_if in self.pg_not_in_vrf:
458 pg_if.assert_nothing_captured(
459 remark="interface is not in VRF", filter_out_fn=is_ipv6_misc_ext
461 self.logger.debug("No capture for interface %s" % pg_if.name)
463 raise Exception("Unknown interface: %s" % pg_if.name)
465 def run_crosswise_vrf_test(self):
467 Create packet streams for every pg-ip6 interface in VRF towards all
468 pg-ip6 interfaces in other VRFs, send all prepared packet streams and
471 - no packet received on all configured pg-ip6 interfaces
473 :raise RuntimeError: If any packet is captured on any pg-ip6 interface.
476 # Create incoming packet streams for packet-generator interfaces
477 for vrf_id in self.vrf_list:
478 for pg_if in self.pg_if_sets[vrf_id]:
479 pkts = self.create_stream_crosswise_vrf(
480 pg_if, vrf_id, self.pg_if_packet_sizes
482 pg_if.add_stream(pkts)
484 # Enable packet capture and start packet sending
485 self.pg_enable_capture(self.pg_interfaces)
489 # Verify outgoing packet streams per packet-generator interface
490 for pg_if in self.pg_interfaces:
491 pg_if.assert_nothing_captured(
492 remark="interface is in other VRF", filter_out_fn=is_ipv6_misc_ext
494 self.logger.debug("No capture for interface %s" % pg_if.name)
496 def test_ip6_vrf_01(self):
497 """IP6 VRF Multi-instance test 1 - create 4 VRFs"""
500 self.create_vrf_and_assign_interfaces(4)
503 for vrf_id in self.vrf_list:
504 self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState)
507 self.run_verify_test()
508 self.run_crosswise_vrf_test()
510 def test_ip6_vrf_02(self):
511 """IP6 VRF Multi-instance test 2 - reset 2 VRFs"""
514 self.reset_vrf_and_remove_from_vrf_list(1)
515 self.reset_vrf_and_remove_from_vrf_list(2)
518 for vrf_id in self.vrf_reset_list:
519 self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState)
520 for vrf_id in self.vrf_list:
521 self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState)
524 self.run_verify_test()
525 self.run_crosswise_vrf_test()
527 # Reset routes learned from ICMPv6 Neighbor Discovery
528 # for vrf_id in self.vrf_reset_list:
529 # self.reset_vrf_and_remove_from_vrf_list(vrf_id)
531 def test_ip6_vrf_03(self):
532 """IP6 VRF Multi-instance 3 - add 2 VRFs"""
534 # Add 1 of reset VRFs and 1 new VRF
535 self.create_vrf_and_assign_interfaces(1)
536 self.create_vrf_and_assign_interfaces(1, start=5)
539 for vrf_id in self.vrf_reset_list:
540 self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState)
541 for vrf_id in self.vrf_list:
542 self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState)
545 self.run_verify_test()
546 self.run_crosswise_vrf_test()
548 # Reset routes learned from ICMPv6 Neighbor Discovery
549 # for vrf_id in self.vrf_reset_list:
550 # self.reset_vrf_and_remove_from_vrf_list(vrf_id)
552 def test_ip6_vrf_04(self):
553 """IP6 VRF Multi-instance test 4 - reset 4 VRFs"""
555 # Reset all VRFs (i.e. no VRF except VRF=0 configured)
556 for i in range(len(self.vrf_list)):
557 # This call removes the first item of vrf_list as a side effect
558 self.reset_vrf_and_remove_from_vrf_list(self.vrf_list[0])
561 for vrf_id in self.vrf_reset_list:
562 self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState)
563 vrf_list_length = len(self.vrf_list)
567 "List of configured VRFs is not empty: %s != 0" % vrf_list_length,
571 self.run_verify_test()
572 self.run_crosswise_vrf_test()
574 def test_ip6_vrf_05(self):
575 """IP6 VRF Multi-instance test 5 - auto allocate vrf id"""
577 # Create several VRFs
578 # Set vrf_id manually first
579 self.create_vrf_by_id_and_assign_interfaces(1, 10)
580 # Set vrf_id automatically a few times
582 self.create_vrf_by_id_and_assign_interfaces(i) for i in range(2, 5)
586 self.assert_equal(self.verify_vrf(10, 1), VRFState.configured, VRFState)
587 for i, vrf in enumerate(auto_vrf_id):
589 self.verify_vrf(vrf, i + 2), VRFState.configured, VRFState
593 self.run_verify_test()
597 self.reset_vrf_and_remove_from_vrf_list(10, 1)
598 for i, vrf in enumerate(auto_vrf_id):
599 self.reset_vrf_and_remove_from_vrf_list(vrf, i + 2)
602 self.assert_equal(self.verify_vrf(10, 1), VRFState.reset, VRFState)
603 for i, vrf in enumerate(auto_vrf_id):
604 self.assert_equal(self.verify_vrf(vrf, i + 2), VRFState.reset, VRFState)
606 vrf_list_length = len(self.vrf_list)
610 "List of configured VRFs is not empty: %s != 0" % vrf_list_length,
613 # Cleanup our extra created VRFs
614 for vrf in auto_vrf_id:
619 def test_ip6_vrf_06(self):
620 """IP6 VRF Multi-instance test 6 - recreate 4 VRFs"""
621 # Reconfigure all the VRFs
622 self.create_vrf_and_assign_interfaces(4)
624 for vrf_id in self.vrf_list:
625 self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState)
627 self.run_verify_test()
628 self.run_crosswise_vrf_test()
630 for i in range(len(self.vrf_list)):
631 self.reset_vrf_and_remove_from_vrf_list(self.vrf_list[0])
633 for vrf_id in self.vrf_reset_list:
634 self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState)
635 vrf_list_length = len(self.vrf_list)
639 "List of configured VRFs is not empty: %s != 0" % vrf_list_length,
642 self.run_verify_test()
643 self.run_crosswise_vrf_test()
646 if __name__ == "__main__":
647 unittest.main(testRunner=VppTestRunner)