-#!/usr/bin/env python
+#!/usr/bin/env python3
"""IP4 VRF Multi-instance Test Case HLD:
**NOTES:**
- higher number of pg-ip4 interfaces causes problems => only 15 pg-ip4 \
- interfaces in 5 VRFs are tested
+ interfaces in 5 VRFs are tested
- jumbo packets in configuration with 15 pg-ip4 interfaces leads to \
- problems too
+ problems too
**config 1**
- add 15 pg-ip4 interfaces
**verify 1**
- check VRF data by parsing output of ip_fib_dump API command
- - all packets received correctly in case of pg-ip4 interfaces in the same
- VRF
+ - all packets received correctly in case of pg-ip4 interfaces in the
+ same VRF
- no packet received in case of pg-ip4 interfaces not in VRF
- no packet received in case of pg-ip4 interfaces in different VRFs
- send IP4 packets between all pg-ip4 interfaces in all VRF groups
**verify 2**
- - all packets received correctly in case of pg-ip4 interfaces in the same
- VRF
+ - all packets received correctly in case of pg-ip4 interfaces in the
+ same VRF
- no packet received in case of pg-ip4 interfaces not in VRF
- no packet received in case of pg-ip4 interfaces in different VRFs
**verify 3**
- check VRF data by parsing output of ip_fib_dump API command
- - all packets received correctly in case of pg-ip4 interfaces in the same
- VRF
+ - all packets received correctly in case of pg-ip4 interfaces in the
+ same VRF
- no packet received in case of pg-ip4 interfaces not in VRF
- no packet received in case of pg-ip4 interfaces in different VRFs
**verify 4**
- check VRF data by parsing output of ip_fib_dump API command
- - all packets received correctly in case of pg-ip4 interfaces in the same
- VRF
+ - all packets received correctly in case of pg-ip4 interfaces in the
+ same VRF
- no packet received in case of pg-ip4 interfaces not in VRF
- no packet received in case of pg-ip4 interfaces in different VRFs
"""
import random
import socket
+import scapy.compat
from scapy.packet import Raw
-from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP, ARP
+from scapy.layers.l2 import Ether, ARP
+from scapy.layers.inet import IP, UDP
from framework import VppTestCase, VppTestRunner
from util import ppp
# Packet flows mapping pg0 -> pg1, pg2 etc.
cls.flows = dict()
for i in range(len(cls.pg_interfaces)):
- multiplicand = i / cls.pg_ifs_per_vrf
+ multiplicand = i // cls.pg_ifs_per_vrf
pg_list = [
cls.pg_interfaces[multiplicand * cls.pg_ifs_per_vrf + j]
for j in range(cls.pg_ifs_per_vrf)
cls.pg_not_in_vrf = [pg_if for pg_if in cls.pg_interfaces]
# Create mapping of pg_interfaces to VRF IDs
- cls.pg_if_by_vrf_id = dict()
+ cls.pg_if_sets = dict()
for i in range(cls.nr_of_vrfs):
- vrf_id = i + 1
+ set_id = i + 1
pg_list = [
cls.pg_interfaces[i * cls.pg_ifs_per_vrf + j]
for j in range(cls.pg_ifs_per_vrf)]
- cls.pg_if_by_vrf_id[vrf_id] = pg_list
+ cls.pg_if_sets[set_id] = pg_list
except Exception:
super(TestIp4VrfMultiInst, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIp4VrfMultiInst, cls).tearDownClass()
+
def setUp(self):
"""
Clear trace and packet infos before running each test.
Show various debug prints after each test.
"""
super(TestIp4VrfMultiInst, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(self.vapi.ppcli("show ip fib"))
- self.logger.info(self.vapi.ppcli("show ip arp"))
+
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.ppcli("show ip fib"))
+ self.logger.info(self.vapi.ppcli("show ip4 neighbors"))
+
+ def _assign_interfaces(self, vrf_id, if_set_id):
+ for i in range(self.pg_ifs_per_vrf):
+ pg_if = self.pg_if_sets[if_set_id][i]
+ pg_if.set_table_ip4(vrf_id)
+ self.logger.info("pg-interface %s added to IPv4 VRF ID %d"
+ % (pg_if.name, vrf_id))
+ if pg_if not in self.pg_in_vrf:
+ self.pg_in_vrf.append(pg_if)
+ if pg_if in self.pg_not_in_vrf:
+ self.pg_not_in_vrf.remove(pg_if)
+ pg_if.config_ip4()
+ pg_if.configure_ipv4_neighbors()
def create_vrf_and_assign_interfaces(self, count, start=1):
"""
for i in range(count):
vrf_id = i + start
- pg_if = self.pg_if_by_vrf_id[vrf_id][0]
- dest_addr = pg_if.local_ip4n
- dest_addr_len = 24
- self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id)
+ self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id})
self.logger.info("IPv4 VRF ID %d created" % vrf_id)
if vrf_id not in self.vrf_list:
self.vrf_list.append(vrf_id)
if vrf_id in self.vrf_reset_list:
self.vrf_reset_list.remove(vrf_id)
- for j in range(self.pg_ifs_per_vrf):
- pg_if = self.pg_if_by_vrf_id[vrf_id][j]
- pg_if.set_table_ip4(vrf_id)
- self.logger.info("pg-interface %s added to IPv4 VRF ID %d"
- % (pg_if.name, vrf_id))
- if pg_if not in self.pg_in_vrf:
- self.pg_in_vrf.append(pg_if)
- if pg_if in self.pg_not_in_vrf:
- self.pg_not_in_vrf.remove(pg_if)
- pg_if.config_ip4()
- pg_if.configure_ipv4_neighbors()
+ self._assign_interfaces(vrf_id, vrf_id)
+ self.logger.debug(self.vapi.ppcli("show ip fib"))
+ self.logger.debug(self.vapi.ppcli("show ip4 neighbors"))
+
+ def create_vrf_by_id_and_assign_interfaces(self, set_id,
+ vrf_id=0xffffffff):
+ """
+ Create a FIB table / VRF by vrf_id, put 3 pg-ip4 interfaces
+ to FIB table / VRF.
+
+ :param int vrf_id: Required table ID / VRF ID. \
+ (Default value = 0xffffffff, ID will be selected automatically)
+ """
+ ret = self.vapi.ip_table_allocate(table={'table_id': vrf_id})
+ vrf_id = ret.table.table_id
+ self.logger.info("IPv4 VRF ID %d created" % vrf_id)
+ if vrf_id not in self.vrf_list:
+ self.vrf_list.append(vrf_id)
+ if vrf_id in self.vrf_reset_list:
+ self.vrf_reset_list.remove(vrf_id)
+ self._assign_interfaces(vrf_id, set_id)
self.logger.debug(self.vapi.ppcli("show ip fib"))
- self.logger.debug(self.vapi.ppcli("show ip arp"))
+ self.logger.debug(self.vapi.ppcli("show ip4 neighbors"))
+
+ return vrf_id
- def reset_vrf_and_remove_from_vrf_list(self, vrf_id):
+ def reset_vrf_and_remove_from_vrf_list(self, vrf_id, if_set_id=None):
"""
Reset required FIB table / VRF and remove it from VRF list.
:param int vrf_id: The FIB table / VRF ID to be reset.
"""
- # self.vapi.reset_vrf(vrf_id, is_ipv6=0)
- self.vapi.reset_fib(vrf_id, is_ipv6=0)
+ if if_set_id is None:
+ if_set_id = vrf_id
+ self.vapi.ip_table_flush(table={'table_id': vrf_id})
if vrf_id in self.vrf_list:
self.vrf_list.remove(vrf_id)
if vrf_id not in self.vrf_reset_list:
self.vrf_reset_list.append(vrf_id)
for j in range(self.pg_ifs_per_vrf):
- pg_if = self.pg_if_by_vrf_id[vrf_id][j]
+ pg_if = self.pg_if_sets[if_set_id][j]
pg_if.unconfig_ip4()
if pg_if in self.pg_in_vrf:
self.pg_in_vrf.remove(pg_if)
self.pg_not_in_vrf.append(pg_if)
self.logger.info("IPv4 VRF ID %d reset finished" % vrf_id)
self.logger.debug(self.vapi.ppcli("show ip fib"))
- self.logger.debug(self.vapi.ppcli("show ip arp"))
- self.vapi.ip_table_add_del(is_add=0, table_id=vrf_id)
+ self.logger.debug(self.vapi.ppcli("show ip neighbors"))
+ self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id})
def create_stream(self, src_if, packet_sizes):
"""
vrf_lst = list(self.vrf_list)
vrf_lst.remove(vrf_id)
for vrf in vrf_lst:
- for dst_if in self.pg_if_by_vrf_id[vrf]:
+ for dst_if in self.pg_if_sets[vrf]:
for dst_host in dst_if.remote_hosts:
src_host = random.choice(src_hosts)
pkt_info = self.create_packet_info(src_if, dst_if)
"Port %u: Packet expected from source %u didn't arrive" %
(dst_sw_if_index, i.sw_if_index))
- def verify_vrf(self, vrf_id):
+ def verify_vrf(self, vrf_id, if_set_id=None):
"""
Check if the FIB table / VRF ID is configured.
:param int vrf_id: The FIB table / VRF ID to be verified.
:return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
"""
- ip_fib_dump = self.vapi.ip_fib_dump()
- vrf_exist = False
+ if if_set_id is None:
+ if_set_id = vrf_id
+ ip_fib_dump = self.vapi.ip_route_dump(vrf_id)
+ vrf_exist = len(ip_fib_dump)
vrf_count = 0
for ip_fib_details in ip_fib_dump:
- if ip_fib_details.table_id == vrf_id:
- if not vrf_exist:
- vrf_exist = True
- addr = socket.inet_ntoa(ip_fib_details.address)
- found = False
- for pg_if in self.pg_if_by_vrf_id[vrf_id]:
- if found:
+ addr = ip_fib_details.route.prefix.network_address
+ found = False
+ for pg_if in self.pg_if_sets[if_set_id]:
+ if found:
+ break
+ for host in pg_if.remote_hosts:
+ if str(addr) == host.ip4:
+ vrf_count += 1
+ found = True
break
- for host in pg_if.remote_hosts:
- if str(addr) == str(host.ip4):
- vrf_count += 1
- found = True
- break
if not vrf_exist and vrf_count == 0:
self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
return VRFState.not_configured
def run_crosswise_vrf_test(self):
"""
Create packet streams for every pg-ip4 interface in VRF towards all
- pg-ip4 interfaces in other VRFs, send all prepared packet streams and \
+ pg-ip4 interfaces in other VRFs, send all prepared packet streams and
verify that:
- - no packet received on all configured pg-ip4 interfaces
+
+ - no packet received on all configured pg-ip4 interfaces
:raise RuntimeError: If any packet is captured on any pg-ip4 interface.
"""
# Test
# Create incoming packet streams for packet-generator interfaces
for vrf_id in self.vrf_list:
- for pg_if in self.pg_if_by_vrf_id[vrf_id]:
+ for pg_if in self.pg_if_sets[vrf_id]:
pkts = self.create_stream_crosswise_vrf(
pg_if, vrf_id, self.pg_if_packet_sizes)
pg_if.add_stream(pkts)
self.run_verify_test()
self.run_crosswise_vrf_test()
+ def test_ip4_vrf_05(self):
+ """ IP4 VRF Multi-instance test 5 - id allocation
+ """
+ # Config 5
+ # Create several VRFs
+ # Set vrf_id manually first
+ self.create_vrf_by_id_and_assign_interfaces(1, 1)
+ # Set vrf_id automatically a few times
+ auto_vrf_id = [
+ self.create_vrf_by_id_and_assign_interfaces(i) for i in range(2, 5)
+ ]
+
+ # Verify 5
+ self.assert_equal(self.verify_vrf(1, 1), VRFState.configured, VRFState)
+ for i, vrf in enumerate(auto_vrf_id):
+ self.assert_equal(self.verify_vrf(vrf, i+2),
+ VRFState.configured, VRFState)
+
+ # Test 5
+ self.run_verify_test()
+
+ # Config 5.1
+ # Reset VRFs
+ self.reset_vrf_and_remove_from_vrf_list(1)
+ for i, vrf in enumerate(auto_vrf_id):
+ self.reset_vrf_and_remove_from_vrf_list(vrf, i+2)
+
+ # Verify 5.1
+ self.assert_equal(self.verify_vrf(1, 1), VRFState.reset, VRFState)
+ for i, vrf in enumerate(auto_vrf_id):
+ self.assert_equal(self.verify_vrf(vrf, i+2),
+ VRFState.reset, VRFState)
+
+ vrf_list_length = len(self.vrf_list)
+ self.assertEqual(
+ vrf_list_length, 0,
+ "List of configured VRFs is not empty: %s != 0" % vrf_list_length)
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)