import socket
import unittest
+import scapy.compat
from scapy.contrib.mpls import MPLS
from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
from scapy.layers.l2 import Ether, Dot1Q, ARP
try:
ip = packet[IP]
udp = packet[UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug(
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
- n_dest_addr = '{:08x}'.format(dest_addr).decode('hex')
+ n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr))
self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len,
n_next_hop_addr)
added_ips.append(socket.inet_ntoa(n_dest_addr))
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
- n_dest_addr = '{:08x}'.format(dest_addr).decode('hex')
+ n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr))
self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len,
n_next_hop_addr, is_add=0)
removed_ips.append(socket.inet_ntoa(n_dest_addr))
def _find_ip_match(self, find_in, pkt):
for p in find_in:
- if self.payload_to_info(str(p[Raw])) == \
- self.payload_to_info(str(pkt[Raw])):
+ if self.payload_to_info(p[Raw]) == \
+ self.payload_to_info(pkt[Raw]):
if p[IP].src != pkt[IP].src:
break
if p[IP].dst != pkt[IP].dst:
for ip in ips:
self.assertTrue(_ip_in_route_dump(ip, fib_dump),
- 'IP {} is not in fib dump.'.format(ip))
+ 'IP {!s} is not in fib dump.'.format(ip))
def verify_not_in_route_dump(self, fib_dump, ips):
for ip in ips:
self.assertFalse(_ip_in_route_dump(ip, fib_dump),
- 'IP {} is in fib dump.'.format(ip))
+ 'IP {!s} is in fib dump.'.format(ip))
@classmethod
def setUpClass(cls):
super(TestIPv4FibCrud, self).setUp()
self.reset_packet_infos()
+ self.configured_routes = []
+ self.deleted_routes = []
+
def test_1_add_routes(self):
""" Add 1k routes
- delete 10 routes check with traffic script.
"""
+ # config 1M FIB entries
+ self.configured_routes.extend(self.config_fib_many_to_one(
+ "10.0.0.0", self.pg0.remote_ip4, 100))
self.deleted_routes.extend(self.unconfig_fib_many_to_one(
"10.0.0.10", self.pg0.remote_ip4, 10))
for x in self.deleted_routes:
- re-add 5 routes check with traffic script.
- add 100 routes check with traffic script.
"""
+ # config 1M FIB entries
+ self.configured_routes.extend(self.config_fib_many_to_one(
+ "10.0.0.0", self.pg0.remote_ip4, 100))
+ self.deleted_routes.extend(self.unconfig_fib_many_to_one(
+ "10.0.0.10", self.pg0.remote_ip4, 10))
+ for x in self.deleted_routes:
+ self.configured_routes.remove(x)
+
tmp = self.config_fib_many_to_one(
"10.0.0.10", self.pg0.remote_ip4, 5)
self.configured_routes.extend(tmp)
#
ip_addr_n = socket.inet_pton(socket.AF_INET, "10.10.10.10")
- self.vapi.sw_interface_add_del_address(self.pg0.sw_if_index,
- ip_addr_n,
- 16)
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=self.pg0.sw_if_index, address=ip_addr_n,
+ address_length=16)
pn = (Ether(src=self.pg1.remote_mac,
dst=self.pg1.local_mac) /
self.send_and_assert_no_replies(self.pg1, pb, "IP Broadcast address")
# remove the sub-net and we are forwarding via the cover again
- self.vapi.sw_interface_add_del_address(self.pg0.sw_if_index,
- ip_addr_n,
- 16,
- is_add=0)
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=self.pg0.sw_if_index, address=ip_addr_n,
+ address_length=16, is_add=0)
self.pg1.add_stream(pn)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
#
ip_addr_n = socket.inet_pton(socket.AF_INET, "10.10.10.10")
- self.vapi.sw_interface_add_del_address(self.pg0.sw_if_index,
- ip_addr_n,
- 31)
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=self.pg0.sw_if_index, address=ip_addr_n,
+ address_length=31)
pn = (Ether(src=self.pg1.remote_mac,
dst=self.pg1.local_mac) /
rx[ARP]
# remove the sub-net and we are forwarding via the cover again
- self.vapi.sw_interface_add_del_address(self.pg0.sw_if_index,
- ip_addr_n,
- 31,
- is_add=0)
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=self.pg0.sw_if_index, address=ip_addr_n,
+ address_length=31, is_add=0)
self.pg1.add_stream(pn)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()