2 """IP6 VRF Multi-instance Test Case HLD:
5 - higher number of pg-ip6 interfaces causes problems => only 15 pg-ip6 \
6 interfaces in 5 VRFs are tested
7 - jumbo packets in configuration with 15 pg-ip6 interfaces leads to \
11 - add 15 pg-ip6 interfaces
12 - configure 5 hosts per pg-ip6 interface
14 - add 3 pg-ip6 interfaces per VRF
17 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
20 - check VRF data by parsing output of ip6_fib_dump API command
21 - all packets received correctly in case of pg-ip6 interfaces in the same
23 - no packet received in case of pg-ip6 interfaces not in VRF
24 - no packet received in case of pg-ip6 interfaces in different VRFs
30 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
33 - check VRF data by parsing output of ip6_fib_dump API command
34 - all packets received correctly in case of pg-ip6 interfaces in the same
36 - no packet received in case of pg-ip6 interfaces not in VRF
37 - no packet received in case of pg-ip6 interfaces in different VRFs
40 - add 1 of reset VRFs and 1 new VRF
43 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
46 - check VRF data by parsing output of ip6_fib_dump API command
47 - all packets received correctly in case of pg-ip6 interfaces in the same
49 - no packet received in case of pg-ip6 interfaces not in VRF
50 - no packet received in case of pg-ip6 interfaces in different VRFs
53 - reset all VRFs (i.e. no VRF except VRF=0 created)
56 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
59 - check VRF data by parsing output of ip6_fib_dump API command
60 - all packets received correctly in case of pg-ip6 interfaces in the same
62 - no packet received in case of pg-ip6 interfaces not in VRF
63 - no packet received in case of pg-ip6 interfaces in different VRFs
70 from scapy.packet import Raw
71 from scapy.layers.l2 import Ether
72 from scapy.layers.inet6 import UDP, IPv6, ICMPv6ND_NS, ICMPv6ND_RA, \
73 RouterAlert, IPv6ExtHdrHopByHop
74 from scapy.utils6 import in6_ismaddr, in6_isllsnmaddr, in6_getAddrType
75 from scapy.pton_ntop import inet_ntop
77 from framework import VppTestCase, VppTestRunner
79 from vrf import VRFState
82 def is_ipv6_misc_ext(p):
83 """ Is packet one of uninteresting IPv6 broadcasts (extended to filter out
84 ICMPv6 Neighbor Discovery - Neighbor Advertisement packets too)? """
85 if p.haslayer(ICMPv6ND_RA):
86 if in6_ismaddr(p[IPv6].dst):
88 if p.haslayer(ICMPv6ND_NS):
89 if in6_isllsnmaddr(p[IPv6].dst):
91 if p.haslayer(IPv6ExtHdrHopByHop):
92 for o in p[IPv6ExtHdrHopByHop].options:
93 if isinstance(o, RouterAlert):
98 class TestIP6VrfMultiInst(VppTestCase):
99 """ IP6 VRF Multi-instance Test Case """
104 Perform standard class setup (defined by class method setUpClass in
105 class VppTestCase) before running the test case, set test case related
106 variables and configure VPP.
108 super(TestIP6VrfMultiInst, cls).setUpClass()
113 cls.pg_ifs_per_vrf = 3
116 # Create pg interfaces
117 cls.create_pg_interfaces(
118 range(cls.nr_of_vrfs * cls.pg_ifs_per_vrf))
120 # Packet flows mapping pg0 -> pg1, pg2 etc.
122 for i in range(len(cls.pg_interfaces)):
123 multiplicand = i / cls.pg_ifs_per_vrf
125 cls.pg_interfaces[multiplicand * cls.pg_ifs_per_vrf + j]
126 for j in range(cls.pg_ifs_per_vrf)
127 if (multiplicand * cls.pg_ifs_per_vrf + j) != i]
128 cls.flows[cls.pg_interfaces[i]] = pg_list
130 # Packet sizes - jumbo packet (9018 bytes) skipped
131 cls.pg_if_packet_sizes = [64, 512, 1518]
133 # Set up all interfaces
134 for pg_if in cls.pg_interfaces:
136 pg_if.generate_remote_hosts(cls.hosts_per_pg)
138 # Create list of VRFs
139 cls.vrf_list = list()
141 # Create list of reset VRFs
142 cls.vrf_reset_list = list()
144 # Create list of pg_interfaces in VRFs
145 cls.pg_in_vrf = list()
147 # Create list of pg_interfaces not in VRFs
148 cls.pg_not_in_vrf = [pg_if for pg_if in cls.pg_interfaces]
150 # Create mapping of pg_interfaces to VRF IDs
151 cls.pg_if_by_vrf_id = dict()
152 for i in range(cls.nr_of_vrfs):
155 cls.pg_interfaces[i * cls.pg_ifs_per_vrf + j]
156 for j in range(cls.pg_ifs_per_vrf)]
157 cls.pg_if_by_vrf_id[vrf_id] = pg_list
160 super(TestIP6VrfMultiInst, cls).tearDownClass()
165 Clear trace and packet infos before running each test.
167 super(TestIP6VrfMultiInst, self).setUp()
168 self.reset_packet_infos()
172 Show various debug prints after each test.
174 super(TestIP6VrfMultiInst, self).tearDown()
175 if not self.vpp_dead:
176 self.logger.info(self.vapi.ppcli("show ip6 fib"))
177 self.logger.info(self.vapi.ppcli("show ip6 neighbors"))
179 def create_vrf_and_assign_interfaces(self, count, start=1):
181 Create required number of FIB tables / VRFs, put 3 pg-ip6 interfaces
182 to every FIB table / VRF.
184 :param int count: Number of FIB tables / VRFs to be created.
185 :param int start: Starting number of the FIB table / VRF ID. \
188 for i in range(count):
190 pg_if = self.pg_if_by_vrf_id[vrf_id][0]
191 dest_addr = pg_if.local_ip6n
193 self.vapi.ip_table_add_del(vrf_id, is_add=1, is_ipv6=1)
194 self.logger.info("IPv6 VRF ID %d created" % vrf_id)
195 if vrf_id not in self.vrf_list:
196 self.vrf_list.append(vrf_id)
197 if vrf_id in self.vrf_reset_list:
198 self.vrf_reset_list.remove(vrf_id)
199 for j in range(self.pg_ifs_per_vrf):
200 pg_if = self.pg_if_by_vrf_id[vrf_id][j]
201 pg_if.set_table_ip6(vrf_id)
202 self.logger.info("pg-interface %s added to IPv6 VRF ID %d"
203 % (pg_if.name, vrf_id))
204 if pg_if not in self.pg_in_vrf:
205 self.pg_in_vrf.append(pg_if)
206 if pg_if in self.pg_not_in_vrf:
207 self.pg_not_in_vrf.remove(pg_if)
209 pg_if.disable_ipv6_ra()
210 pg_if.configure_ipv6_neighbors()
211 self.logger.debug(self.vapi.ppcli("show ip6 fib"))
212 self.logger.debug(self.vapi.ppcli("show ip6 neighbors"))
214 def reset_vrf_and_remove_from_vrf_list(self, vrf_id):
216 Reset required FIB table / VRF and remove it from VRF list.
218 :param int vrf_id: The FIB table / VRF ID to be reset.
220 # self.vapi.reset_vrf(vrf_id, is_ipv6=1)
221 self.vapi.reset_fib(vrf_id, is_ipv6=1)
222 if vrf_id in self.vrf_list:
223 self.vrf_list.remove(vrf_id)
224 if vrf_id not in self.vrf_reset_list:
225 self.vrf_reset_list.append(vrf_id)
226 for j in range(self.pg_ifs_per_vrf):
227 pg_if = self.pg_if_by_vrf_id[vrf_id][j]
229 if pg_if in self.pg_in_vrf:
230 self.pg_in_vrf.remove(pg_if)
231 if pg_if not in self.pg_not_in_vrf:
232 self.pg_not_in_vrf.append(pg_if)
233 self.logger.info("IPv6 VRF ID %d reset finished" % vrf_id)
234 self.logger.debug(self.vapi.ppcli("show ip6 fib"))
235 self.logger.debug(self.vapi.ppcli("show ip6 neighbors"))
236 self.vapi.ip_table_add_del(vrf_id, is_add=0, is_ipv6=1)
238 def create_stream(self, src_if, packet_sizes):
240 Create input packet stream for defined interface using hosts list.
242 :param object src_if: Interface to create packet stream for.
243 :param list packet_sizes: List of required packet sizes.
244 :return: Stream of packets.
247 src_hosts = src_if.remote_hosts
248 for dst_if in self.flows[src_if]:
249 for dst_host in dst_if.remote_hosts:
250 src_host = random.choice(src_hosts)
251 pkt_info = self.create_packet_info(src_if, dst_if)
252 payload = self.info_to_payload(pkt_info)
253 p = (Ether(dst=src_if.local_mac, src=src_host.mac) /
254 IPv6(src=src_host.ip6, dst=dst_host.ip6) /
255 UDP(sport=1234, dport=1234) /
257 pkt_info.data = p.copy()
258 size = random.choice(packet_sizes)
259 self.extend_packet(p, size)
261 self.logger.debug("Input stream created for port %s. Length: %u pkt(s)"
262 % (src_if.name, len(pkts)))
265 def create_stream_crosswise_vrf(self, src_if, vrf_id, packet_sizes):
267 Create input packet stream for negative test for leaking across
268 different VRFs for defined interface using hosts list.
270 :param object src_if: Interface to create packet stream for.
271 :param int vrf_id: The FIB table / VRF ID where src_if is assigned.
272 :param list packet_sizes: List of required packet sizes.
273 :return: Stream of packets.
276 src_hosts = src_if.remote_hosts
277 vrf_lst = list(self.vrf_list)
278 vrf_lst.remove(vrf_id)
280 for dst_if in self.pg_if_by_vrf_id[vrf]:
281 for dst_host in dst_if.remote_hosts:
282 src_host = random.choice(src_hosts)
283 pkt_info = self.create_packet_info(src_if, dst_if)
284 payload = self.info_to_payload(pkt_info)
285 p = (Ether(dst=src_if.local_mac, src=src_host.mac) /
286 IPv6(src=src_host.ip6, dst=dst_host.ip6) /
287 UDP(sport=1234, dport=1234) /
289 pkt_info.data = p.copy()
290 size = random.choice(packet_sizes)
291 self.extend_packet(p, size)
293 self.logger.debug("Input stream created for port %s. Length: %u pkt(s)"
294 % (src_if.name, len(pkts)))
297 def verify_capture(self, pg_if, capture):
299 Verify captured input packet stream for defined interface.
301 :param object pg_if: Interface to verify captured packet stream for.
302 :param list capture: Captured packet stream.
305 for i in self.pg_interfaces:
306 last_info[i.sw_if_index] = None
307 dst_sw_if_index = pg_if.sw_if_index
308 for packet in capture:
312 payload_info = self.payload_to_info(str(packet[Raw]))
313 packet_index = payload_info.index
314 self.assertEqual(payload_info.dst, dst_sw_if_index)
315 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
316 (pg_if.name, payload_info.src, packet_index))
317 next_info = self.get_next_packet_info_for_interface2(
318 payload_info.src, dst_sw_if_index,
319 last_info[payload_info.src])
320 last_info[payload_info.src] = next_info
321 self.assertIsNotNone(next_info)
322 self.assertEqual(packet_index, next_info.index)
323 saved_packet = next_info.data
324 # Check standard fields
325 self.assertEqual(ip.src, saved_packet[IPv6].src)
326 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
327 self.assertEqual(udp.sport, saved_packet[UDP].sport)
328 self.assertEqual(udp.dport, saved_packet[UDP].dport)
330 self.logger.error(ppp("Unexpected or invalid packet:", packet))
332 for i in self.pg_interfaces:
333 remaining_packet = self.get_next_packet_info_for_interface2(
334 i, dst_sw_if_index, last_info[i.sw_if_index])
337 "Port %u: Packet expected from source %u didn't arrive" %
338 (dst_sw_if_index, i.sw_if_index))
340 def verify_vrf(self, vrf_id):
342 Check if the FIB table / VRF ID is configured.
344 :param int vrf_id: The FIB table / VRF ID to be verified.
345 :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
347 ip6_fib_dump = self.vapi.ip6_fib_dump()
350 for ip6_fib_details in ip6_fib_dump:
351 if ip6_fib_details.table_id == vrf_id:
354 addr = inet_ntop(socket.AF_INET6, ip6_fib_details.address)
356 for pg_if in self.pg_if_by_vrf_id[vrf_id]:
359 for host in pg_if.remote_hosts:
360 if str(addr) == str(host.ip6):
364 if not vrf_exist and vrf_count == 0:
365 self.logger.info("IPv6 VRF ID %d is not configured" % vrf_id)
366 return VRFState.not_configured
367 elif vrf_exist and vrf_count == 0:
368 self.logger.info("IPv6 VRF ID %d has been reset" % vrf_id)
369 return VRFState.reset
371 self.logger.info("IPv6 VRF ID %d is configured" % vrf_id)
372 return VRFState.configured
374 def run_verify_test(self):
376 Create packet streams for all configured pg interfaces, send all \
377 prepared packet streams and verify that:
378 - all packets received correctly on all pg-ip6 interfaces assigned
380 - no packet received on all pg-ip6 interfaces not assigned to VRFs
382 :raise RuntimeError: If no packet captured on pg-ip6 interface assigned
383 to VRF or if any packet is captured on pg-ip6 interface not
387 # Create incoming packet streams for packet-generator interfaces
388 for pg_if in self.pg_interfaces:
389 pkts = self.create_stream(pg_if, self.pg_if_packet_sizes)
390 pg_if.add_stream(pkts)
392 # Enable packet capture and start packet sending
393 self.pg_enable_capture(self.pg_interfaces)
397 # Verify outgoing packet streams per packet-generator interface
398 for pg_if in self.pg_interfaces:
399 if pg_if in self.pg_in_vrf:
400 capture = pg_if.get_capture(remark="interface is in VRF")
401 self.verify_capture(pg_if, capture)
402 elif pg_if in self.pg_not_in_vrf:
403 pg_if.assert_nothing_captured(remark="interface is not in VRF",
404 filter_out_fn=is_ipv6_misc_ext)
405 self.logger.debug("No capture for interface %s" % pg_if.name)
407 raise Exception("Unknown interface: %s" % pg_if.name)
409 def run_crosswise_vrf_test(self):
411 Create packet streams for every pg-ip6 interface in VRF towards all
412 pg-ip6 interfaces in other VRFs, send all prepared packet streams and \
414 - no packet received on all configured pg-ip6 interfaces
416 :raise RuntimeError: If any packet is captured on any pg-ip6 interface.
419 # Create incoming packet streams for packet-generator interfaces
420 for vrf_id in self.vrf_list:
421 for pg_if in self.pg_if_by_vrf_id[vrf_id]:
422 pkts = self.create_stream_crosswise_vrf(
423 pg_if, vrf_id, self.pg_if_packet_sizes)
424 pg_if.add_stream(pkts)
426 # Enable packet capture and start packet sending
427 self.pg_enable_capture(self.pg_interfaces)
431 # Verify outgoing packet streams per packet-generator interface
432 for pg_if in self.pg_interfaces:
433 pg_if.assert_nothing_captured(remark="interface is in other VRF",
434 filter_out_fn=is_ipv6_misc_ext)
435 self.logger.debug("No capture for interface %s" % pg_if.name)
437 def test_ip6_vrf_01(self):
438 """ IP6 VRF Multi-instance test 1 - create 4 VRFs
442 self.create_vrf_and_assign_interfaces(4)
445 for vrf_id in self.vrf_list:
446 self.assert_equal(self.verify_vrf(vrf_id),
447 VRFState.configured, VRFState)
450 self.run_verify_test()
451 self.run_crosswise_vrf_test()
453 def test_ip6_vrf_02(self):
454 """ IP6 VRF Multi-instance test 2 - reset 2 VRFs
458 self.reset_vrf_and_remove_from_vrf_list(1)
459 self.reset_vrf_and_remove_from_vrf_list(2)
462 for vrf_id in self.vrf_reset_list:
463 self.assert_equal(self.verify_vrf(vrf_id),
464 VRFState.reset, VRFState)
465 for vrf_id in self.vrf_list:
466 self.assert_equal(self.verify_vrf(vrf_id),
467 VRFState.configured, VRFState)
470 self.run_verify_test()
471 self.run_crosswise_vrf_test()
473 # Reset routes learned from ICMPv6 Neighbor Discovery
474 for vrf_id in self.vrf_reset_list:
475 self.reset_vrf_and_remove_from_vrf_list(vrf_id)
477 def test_ip6_vrf_03(self):
478 """ IP6 VRF Multi-instance 3 - add 2 VRFs
481 # Add 1 of reset VRFs and 1 new VRF
482 self.create_vrf_and_assign_interfaces(1)
483 self.create_vrf_and_assign_interfaces(1, start=5)
486 for vrf_id in self.vrf_reset_list:
487 self.assert_equal(self.verify_vrf(vrf_id),
488 VRFState.reset, VRFState)
489 for vrf_id in self.vrf_list:
490 self.assert_equal(self.verify_vrf(vrf_id),
491 VRFState.configured, VRFState)
494 self.run_verify_test()
495 self.run_crosswise_vrf_test()
497 # Reset routes learned from ICMPv6 Neighbor Discovery
498 for vrf_id in self.vrf_reset_list:
499 self.reset_vrf_and_remove_from_vrf_list(vrf_id)
501 def test_ip6_vrf_04(self):
502 """ IP6 VRF Multi-instance test 4 - reset 4 VRFs
505 # Reset all VRFs (i.e. no VRF except VRF=0 configured)
506 for i in range(len(self.vrf_list)):
507 self.reset_vrf_and_remove_from_vrf_list(self.vrf_list[0])
510 for vrf_id in self.vrf_reset_list:
511 self.assert_equal(self.verify_vrf(vrf_id),
512 VRFState.reset, VRFState)
513 vrf_list_length = len(self.vrf_list)
516 "List of configured VRFs is not empty: %s != 0" % vrf_list_length)
519 self.run_verify_test()
520 self.run_crosswise_vrf_test()
523 if __name__ == '__main__':
524 unittest.main(testRunner=VppTestRunner)