from framework import VppTestCase, VppTestRunner
from vpp_neighbor import VppNeighbor, find_nbr
from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, \
- VppIpTable
+ VppIpTable, DpoProto
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP, Dot1Q
from scapy.layers.inet import IP, UDP
+from scapy.layers.inet6 import IPv6
from scapy.contrib.mpls import MPLS
+from scapy.layers.inet6 import IPv6
# not exported by scapy, so redefined here
arp_opts = {"who-has": 1, "is-at": 2}
self.pg1.sw_if_index,
self.pg1.remote_hosts[2].ip4))
+ def test_arp_incomplete(self):
+ """ Incomplete Entries """
+
+ #
+ # ensure that we throttle the ARP and ND requests
+ #
+ self.pg0.generate_remote_hosts(2)
+
+ #
+ # IPv4/ARP
+ #
+ ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
+ [VppRoutePath(self.pg0.remote_hosts[1].ip4,
+ self.pg0.sw_if_index)])
+ ip_10_0_0_1.add_vpp_config()
+
+ p1 = (Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4,
+ dst="10.0.0.1") /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ self.pg1.add_stream(p1 * 257)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ rx = self.pg0._get_capture(1)
+
+ #
+ # how many we get is going to be dependent on the time for packet
+ # processing but it should be small
+ #
+ self.assertLess(len(rx), 64)
+
+ #
+ # IPv6/ND
+ #
+ ip_10_1 = VppIpRoute(self, "10::1", 128,
+ [VppRoutePath(self.pg0.remote_hosts[1].ip6,
+ self.pg0.sw_if_index,
+ proto=DpoProto.DPO_PROTO_IP6)],
+ is_ip6=1)
+ ip_10_1.add_vpp_config()
+
+ p1 = (Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_mac) /
+ IPv6(src=self.pg1.remote_ip6,
+ dst="10::1") /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ self.pg1.add_stream(p1 * 257)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ rx = self.pg0._get_capture(1)
+
+ #
+ # how many we get is going to be dependent on the time for packet
+ # processing but it should be small
+ #
+ self.assertLess(len(rx), 64)
+
+
+class NeighborStatsTestCase(VppTestCase):
+ """ ARP Test Case """
+
+ def setUp(self):
+ super(NeighborStatsTestCase, self).setUp()
+
+ self.create_pg_interfaces(range(2))
+
+ # pg0 configured with ip4 and 6 addresses used for input
+ # pg1 configured with ip4 and 6 addresses used for output
+ # pg2 is unnumbered to pg0
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.config_ip6()
+ i.resolve_arp()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(NeighborStatsTestCase, self).tearDown()
+
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.unconfig_ip6()
+ i.admin_down()
+
+ def test_arp_stats(self):
+ """ ARP Counters """
+
+ self.vapi.cli("adj counters enable")
+ self.pg1.generate_remote_hosts(2)
+
+ arp1 = VppNeighbor(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[0].mac,
+ self.pg1.remote_hosts[0].ip4)
+ arp1.add_vpp_config()
+ arp2 = VppNeighbor(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[1].mac,
+ self.pg1.remote_hosts[1].ip4)
+ arp2.add_vpp_config()
+
+ p1 = (Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_hosts[0].ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+ p2 = (Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_hosts[1].ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ rx = self.send_and_expect(self.pg0, p1 * 65, self.pg1)
+ rx = self.send_and_expect(self.pg0, p2 * 65, self.pg1)
+
+ self.assertEqual(65, arp1.get_stats()['packets'])
+ self.assertEqual(65, arp2.get_stats()['packets'])
+
+ rx = self.send_and_expect(self.pg0, p1 * 65, self.pg1)
+ self.assertEqual(130, arp1.get_stats()['packets'])
+
+ def test_nd_stats(self):
+ """ ND Counters """
+
+ self.vapi.cli("adj counters enable")
+ self.pg0.generate_remote_hosts(3)
+
+ nd1 = VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[1].mac,
+ self.pg0.remote_hosts[1].ip6,
+ af=AF_INET6)
+ nd1.add_vpp_config()
+ nd2 = VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[2].mac,
+ self.pg0.remote_hosts[2].ip6,
+ af=AF_INET6)
+ nd2.add_vpp_config()
+
+ p1 = (Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_mac) /
+ IPv6(src=self.pg1.remote_ip6,
+ dst=self.pg0.remote_hosts[1].ip6) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+ p2 = (Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_mac) /
+ IPv6(src=self.pg1.remote_ip6,
+ dst=self.pg0.remote_hosts[2].ip6) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ rx = self.send_and_expect(self.pg1, p1 * 16, self.pg0)
+ rx = self.send_and_expect(self.pg1, p2 * 16, self.pg0)
+
+ self.assertEqual(16, nd1.get_stats()['packets'])
+ self.assertEqual(16, nd2.get_stats()['packets'])
+
+ rx = self.send_and_expect(self.pg1, p1 * 65, self.pg0)
+ self.assertEqual(81, nd1.get_stats()['packets'])
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)