-#!/usr/bin/env python
+#!/usr/bin/env python3
"""IP4 VRF Multi-instance Test Case HLD:
**NOTES:**
**verify 1**
- check VRF data by parsing output of ip_fib_dump API command
- - all packets received correctly in case of pg-ip4 interfaces in VRF
+ - all packets received correctly in case of pg-ip4 interfaces in the same
+ VRF
- no packet received in case of pg-ip4 interfaces not in VRF
+ - no packet received in case of pg-ip4 interfaces in different VRFs
**config 2**
- reset 2 VRFs
- send IP4 packets between all pg-ip4 interfaces in all VRF groups
**verify 2**
- - check VRF data by parsing output of ip_fib_dump API command
- - all packets received correctly in case of pg-ip4 interfaces in VRF
+ - all packets received correctly in case of pg-ip4 interfaces in the same
+ VRF
- no packet received in case of pg-ip4 interfaces not in VRF
+ - no packet received in case of pg-ip4 interfaces in different VRFs
**config 3**
- add 1 of reset VRFs and 1 new VRF
**verify 3**
- check VRF data by parsing output of ip_fib_dump API command
- - all packets received correctly in case of pg-ip4 interfaces in VRF
+ - all packets received correctly in case of pg-ip4 interfaces in the same
+ VRF
- no packet received in case of pg-ip4 interfaces not in VRF
+ - no packet received in case of pg-ip4 interfaces in different VRFs
**config 4**
- reset all created VRFs
**verify 4**
- check VRF data by parsing output of ip_fib_dump API command
- - all packets received correctly in case of pg-ip4 interfaces in VRF
+ - all packets received correctly in case of pg-ip4 interfaces in the same
+ VRF
- no packet received in case of pg-ip4 interfaces not in VRF
+ - no packet received in case of pg-ip4 interfaces in different VRFs
"""
import unittest
import random
import socket
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ARP
# Packet flows mapping pg0 -> pg1, pg2 etc.
cls.flows = dict()
for i in range(len(cls.pg_interfaces)):
- multiplicand = i / cls.pg_ifs_per_vrf
+ multiplicand = i // cls.pg_ifs_per_vrf
pg_list = [
cls.pg_interfaces[multiplicand * cls.pg_ifs_per_vrf + j]
for j in range(cls.pg_ifs_per_vrf)
# Create list of pg_interfaces in VRFs
cls.pg_in_vrf = list()
- # Create list of pg_interfaces not in BDs
+ # Create list of pg_interfaces not in VRFs
cls.pg_not_in_vrf = [pg_if for pg_if in cls.pg_interfaces]
# Create mapping of pg_interfaces to VRF IDs
super(TestIp4VrfMultiInst, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIp4VrfMultiInst, cls).tearDownClass()
+
def setUp(self):
"""
Clear trace and packet infos before running each test.
Show various debug prints after each test.
"""
super(TestIp4VrfMultiInst, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(self.vapi.ppcli("show ip fib"))
- self.logger.info(self.vapi.ppcli("show ip arp"))
+
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.ppcli("show ip fib"))
+ self.logger.info(self.vapi.ppcli("show ip arp"))
def create_vrf_and_assign_interfaces(self, count, start=1):
"""
- Create required number of FIB tables / VRFs, put 3 l2-pg interfaces
+ Create required number of FIB tables / VRFs, put 3 pg-ip4 interfaces
to every FIB table / VRF.
:param int count: Number of FIB tables / VRFs to be created.
pg_if = self.pg_if_by_vrf_id[vrf_id][0]
dest_addr = pg_if.local_ip4n
dest_addr_len = 24
- self.vapi.ip_table_add_del(vrf_id, is_add=1)
- self.vapi.ip_add_del_route(
- dest_addr, dest_addr_len, pg_if.local_ip4n,
- table_id=vrf_id, is_multipath=1)
+ self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id})
self.logger.info("IPv4 VRF ID %d created" % vrf_id)
if vrf_id not in self.vrf_list:
self.vrf_list.append(vrf_id)
:param int vrf_id: The FIB table / VRF ID to be reset.
"""
- # self.vapi.reset_vrf(vrf_id, is_ipv6=0)
- self.vapi.reset_fib(vrf_id, is_ipv6=0)
+ self.vapi.ip_table_flush(table={'table_id': vrf_id})
if vrf_id in self.vrf_list:
self.vrf_list.remove(vrf_id)
if vrf_id not in self.vrf_reset_list:
self.logger.info("IPv4 VRF ID %d reset finished" % vrf_id)
self.logger.debug(self.vapi.ppcli("show ip fib"))
self.logger.debug(self.vapi.ppcli("show ip arp"))
- self.vapi.ip_table_add_del(vrf_id, is_add=0)
+ self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id})
def create_stream(self, src_if, packet_sizes):
"""
% (src_if.name, len(pkts)))
return pkts
+ def create_stream_crosswise_vrf(self, src_if, vrf_id, packet_sizes):
+ """
+ Create input packet stream for negative test for leaking across
+ different VRFs for defined interface using hosts list.
+
+ :param object src_if: Interface to create packet stream for.
+ :param int vrf_id: The FIB table / VRF ID where src_if is assigned.
+ :param list packet_sizes: List of required packet sizes.
+ :return: Stream of packets.
+ """
+ pkts = []
+ src_hosts = src_if.remote_hosts
+ vrf_lst = list(self.vrf_list)
+ vrf_lst.remove(vrf_id)
+ for vrf in vrf_lst:
+ for dst_if in self.pg_if_by_vrf_id[vrf]:
+ for dst_host in dst_if.remote_hosts:
+ src_host = random.choice(src_hosts)
+ pkt_info = self.create_packet_info(src_if, dst_if)
+ payload = self.info_to_payload(pkt_info)
+ p = (Ether(dst=src_if.local_mac, src=src_host.mac) /
+ IP(src=src_host.ip4, dst=dst_host.ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw(payload))
+ pkt_info.data = p.copy()
+ size = random.choice(packet_sizes)
+ self.extend_packet(p, size)
+ pkts.append(p)
+ self.logger.debug("Input stream created for port %s. Length: %u pkt(s)"
+ % (src_if.name, len(pkts)))
+ return pkts
+
def verify_capture(self, pg_if, capture):
"""
Verify captured input packet stream for defined interface.
try:
ip = packet[IP]
udp = packet[UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
:param int vrf_id: The FIB table / VRF ID to be verified.
:return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
"""
- ip_fib_dump = self.vapi.ip_fib_dump()
- vrf_exist = False
+ ip_fib_dump = self.vapi.ip_route_dump(vrf_id)
+ vrf_exist = len(ip_fib_dump)
vrf_count = 0
for ip_fib_details in ip_fib_dump:
- if ip_fib_details.table_id == vrf_id:
- if not vrf_exist:
- vrf_exist = True
- addr = socket.inet_ntoa(ip_fib_details.address)
- found = False
- for pg_if in self.pg_if_by_vrf_id[vrf_id]:
- if found:
+ addr = ip_fib_details.route.prefix.network_address
+ found = False
+ for pg_if in self.pg_if_by_vrf_id[vrf_id]:
+ if found:
+ break
+ for host in pg_if.remote_hosts:
+ if str(addr) == host.ip4:
+ vrf_count += 1
+ found = True
break
- for host in pg_if.remote_hosts:
- if str(addr) == str(host.ip4):
- vrf_count += 1
- found = True
- break
if not vrf_exist and vrf_count == 0:
self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
return VRFState.not_configured
def run_verify_test(self):
"""
- Create packet streams for all configured l2-pg interfaces, send all \
+ Create packet streams for all configured pg interfaces, send all \
prepared packet streams and verify that:
- - all packets received correctly on all pg-l2 interfaces assigned
- to bridge domains
- - no packet received on all pg-l2 interfaces not assigned to bridge
- domains
-
- :raise RuntimeError: If no packet captured on l2-pg interface assigned
- to the bridge domain or if any packet is captured on l2-pg
- interface not assigned to the bridge domain.
+ - all packets received correctly on all pg-ip4 interfaces assigned
+ to VRFs
+ - no packet received on all pg-ip4 interfaces not assigned to VRFs
+
+ :raise RuntimeError: If no packet captured on pg-ip4 interface assigned
+ to VRF or if any packet is captured on pg-ip4 interface not
+ assigned to VRF.
"""
# Test
# Create incoming packet streams for packet-generator interfaces
else:
raise Exception("Unknown interface: %s" % pg_if.name)
+ def run_crosswise_vrf_test(self):
+ """
+ Create packet streams for every pg-ip4 interface in VRF towards all
+ pg-ip4 interfaces in other VRFs, send all prepared packet streams and \
+ verify that:
+ - no packet received on all configured pg-ip4 interfaces
+
+ :raise RuntimeError: If any packet is captured on any pg-ip4 interface.
+ """
+ # Test
+ # Create incoming packet streams for packet-generator interfaces
+ for vrf_id in self.vrf_list:
+ for pg_if in self.pg_if_by_vrf_id[vrf_id]:
+ pkts = self.create_stream_crosswise_vrf(
+ pg_if, vrf_id, self.pg_if_packet_sizes)
+ pg_if.add_stream(pkts)
+
+ # Enable packet capture and start packet sending
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Verify
+ # Verify outgoing packet streams per packet-generator interface
+ for pg_if in self.pg_interfaces:
+ pg_if.assert_nothing_captured(remark="interface is in other VRF",
+ filter_out_fn=is_ipv4_misc)
+ self.logger.debug("No capture for interface %s" % pg_if.name)
+
def test_ip4_vrf_01(self):
""" IP4 VRF Multi-instance test 1 - create 4 VRFs
"""
# Test 1
self.run_verify_test()
+ self.run_crosswise_vrf_test()
def test_ip4_vrf_02(self):
""" IP4 VRF Multi-instance test 2 - reset 2 VRFs
# Test 2
self.run_verify_test()
+ self.run_crosswise_vrf_test()
def test_ip4_vrf_03(self):
""" IP4 VRF Multi-instance 3 - add 2 VRFs
# Test 3
self.run_verify_test()
+ self.run_crosswise_vrf_test()
def test_ip4_vrf_04(self):
""" IP4 VRF Multi-instance test 4 - reset 4 VRFs
# Test 4
self.run_verify_test()
+ self.run_crosswise_vrf_test()
if __name__ == '__main__':