from framework import VppTestCase, VppTestRunner
from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
+from vpp_ip_route import VppIpRoute, VppRoutePath
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
-from scapy.layers.inet import IP, UDP
+from scapy.layers.inet import IP, UDP, ICMP, icmptypes, icmpcodes
from util import ppp
pkts = []
for i in range(0, 257):
dst_if = self.flows[src_if][i % 2]
- info = self.create_packet_info(
- src_if.sw_if_index, dst_if.sw_if_index)
+ info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
payload_info = self.payload_to_info(str(packet[Raw]))
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
- self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
- (dst_if.name, payload_info.src, packet_index))
+ self.logger.debug(
+ "Got packet on port %s: src=%u (id=%u)" %
+ (dst_if.name, payload_info.src, packet_index))
next_info = self.get_next_packet_info_for_interface2(
payload_info.src, dst_sw_if_index,
last_info[payload_info.src])
- add new 1k,
- del 1.5k
- ..note:: Python API is to slow to add many routes, needs C code replacement.
+ ..note:: Python API is too slow to add many routes, needs replacement.
"""
def config_fib_many_to_one(self, start_dest_addr, next_hop_addr, count):
:return list: added ips with 32 prefix
"""
added_ips = []
- dest_addr = int(
- socket.inet_pton(socket.AF_INET, start_dest_addr).encode('hex'), 16)
+ dest_addr = int(socket.inet_pton(socket.AF_INET,
+ start_dest_addr).encode('hex'),
+ 16)
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
def unconfig_fib_many_to_one(self, start_dest_addr, next_hop_addr, count):
removed_ips = []
- dest_addr = int(
- socket.inet_pton(socket.AF_INET, start_dest_addr).encode('hex'), 16)
+ dest_addr = int(socket.inet_pton(socket.AF_INET,
+ start_dest_addr).encode('hex'),
+ 16)
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
for _ in range(count):
dst_addr = random.choice(dst_ips)
- info = self.create_packet_info(
- src_if.sw_if_index, dst_if.sw_if_index)
+ info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=src_if.remote_ip4, dst=dst_addr) /
UDP(sport=1234, dport=1234) /
Raw(payload))
info.data = p.copy()
- size = random.choice(self.pg_if_packet_sizes)
self.extend_packet(p, random.choice(self.pg_if_packet_sizes))
pkts.append(p)
def _find_ip_match(self, find_in, pkt):
for p in find_in:
- if self.payload_to_info(str(p[Raw])) == self.payload_to_info(str(pkt[Raw])):
+ if self.payload_to_info(str(p[Raw])) == \
+ self.payload_to_info(str(pkt[Raw])):
if p[IP].src != pkt[IP].src:
break
if p[IP].dst != pkt[IP].dst:
def setUp(self):
super(TestIPv4FibCrud, self).setUp()
- self.packet_infos = {}
+ self.reset_packet_infos()
def test_1_add_routes(self):
""" Add 1k routes
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- pkts = self.pg0.get_capture()
+ pkts = self.pg0.get_capture(len(self.stream_1) + len(self.stream_2))
self.verify_capture(self.pg0, pkts, self.stream_1 + self.stream_2)
-
def test_2_del_routes(self):
""" Delete 100 routes
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- pkts = self.pg0.get_capture()
+ pkts = self.pg0.get_capture(len(self.stream_1) + len(self.stream_2))
self.verify_capture(self.pg0, pkts, self.stream_1 + self.stream_2)
def test_3_add_new_routes(self):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- pkts = self.pg0.get_capture()
+ pkts = self.pg0.get_capture(len(self.stream_1) + len(self.stream_2))
self.verify_capture(self.pg0, pkts, self.stream_1 + self.stream_2)
def test_4_del_routes(self):
self.verify_not_in_route_dump(fib_dump, self.deleted_routes)
+class TestIPNull(VppTestCase):
+ """ IPv4 routes via NULL """
+
+ def setUp(self):
+ super(TestIPNull, self).setUp()
+
+ # create 2 pg interfaces
+ self.create_pg_interfaces(range(1))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ def tearDown(self):
+ super(TestIPNull, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.admin_down()
+
+ def test_ip_null(self):
+ """ IP NULL route """
+
+ #
+ # A route via IP NULL that will reply with ICMP unreachables
+ #
+ ip_unreach = VppIpRoute(self, "10.0.0.1", 32, [], is_unreach=1)
+ ip_unreach.add_vpp_config()
+
+ p_unreach = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst="10.0.0.1") /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ self.pg0.add_stream(p_unreach)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture(1)
+ rx = rx[0]
+ icmp = rx[ICMP]
+
+ self.assertEqual(icmptypes[icmp.type], "dest-unreach")
+ self.assertEqual(icmpcodes[icmp.type][icmp.code], "host-unreachable")
+ self.assertEqual(icmp.src, self.pg0.remote_ip4)
+ self.assertEqual(icmp.dst, "10.0.0.1")
+
+ #
+ # ICMP replies are rate limited. so sit and spin.
+ #
+ self.sleep(1)
+
+ #
+ # A route via IP NULL that will reply with ICMP prohibited
+ #
+ ip_prohibit = VppIpRoute(self, "10.0.0.2", 32, [], is_prohibit=1)
+ ip_prohibit.add_vpp_config()
+
+ p_prohibit = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst="10.0.0.2") /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ self.pg0.add_stream(p_prohibit)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture(1)
+
+ rx = rx[0]
+ icmp = rx[ICMP]
+
+ self.assertEqual(icmptypes[icmp.type], "dest-unreach")
+ self.assertEqual(icmpcodes[icmp.type][icmp.code], "host-prohibited")
+ self.assertEqual(icmp.src, self.pg0.remote_ip4)
+ self.assertEqual(icmp.dst, "10.0.0.2")
+
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)