-#!/usr/bin/env python
+#!/usr/bin/env python3
""" L2BD ARP term Test """
import unittest
import random
import copy
-from socket import AF_INET6
+from socket import AF_INET, AF_INET6, inet_pton, inet_ntop
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet import IP
-from scapy.utils import inet_pton, inet_ntop
from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
in6_mactoifaceid, in6_ismaddr
from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
from framework import VppTestCase, VppTestRunner
-from util import Host, ppp, mactobinary
+from util import Host, ppp
class TestL2bdArpTerm(VppTestCase):
super(TestL2bdArpTerm, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestL2bdArpTerm, cls).tearDownClass()
+
def setUp(self):
"""
Clear trace and packet infos before running each test.
Show various debug prints after each test.
"""
super(TestL2bdArpTerm, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(self.vapi.ppcli("show l2fib verbose"))
+
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.ppcli("show l2fib verbose"))
+ # many tests delete bridge-domain 1 as the last task. don't output
+ # the details of a non-existent bridge-domain.
+ if self.vapi.l2_fib_table_dump(bd_id=1):
self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
for e in entries:
- ip = e.ip4n if is_ipv6 == 0 else e.ip6n
- self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
- mac=e.bin_mac,
- ip=ip,
- is_ipv6=is_ipv6,
- is_add=is_add)
+ ip = e.ip4 if is_ipv6 == 0 else e.ip6
+ self.vapi.bd_ip_mac_add_del(is_add=is_add,
+ entry={
+ 'bd_id': bd_id,
+ 'ip': ip,
+ 'mac': e.mac})
@classmethod
def mac_list(cls, b6_range):
self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
for swif in self.bd_swifs(bd_id):
swif_idx = swif.sw_if_index
- self.vapi.sw_interface_set_l2_bridge(
- swif_idx, bd_id=bd_id, enable=is_add)
+ self.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=swif_idx,
+ bd_id=bd_id, enable=is_add)
if not is_add:
self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
@classmethod
- def arp(cls, src_host, host):
+ def arp_req(cls, src_host, host):
return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
ARP(op="who-has",
hwsrc=src_host.bin_mac,
@classmethod
def arp_reqs(cls, src_host, entries):
- return [cls.arp(src_host, e) for e in entries]
+ return [cls.arp_req(src_host, e) for e in entries]
+
+ @classmethod
+ def garp_req(cls, host):
+ return cls.arp_req(host, host)
+
+ @classmethod
+ def garp_reqs(cls, entries):
+ return [cls.garp_req(e) for e in entries]
def arp_resp_host(self, src_host, arp_resp):
ether = arp_resp[Ether]
def arp_resp_hosts(self, src_host, pkts):
return {self.arp_resp_host(src_host, p) for p in pkts}
+ @staticmethod
+ def inttoip4(ip):
+ o1 = int(ip / 16777216) % 256
+ o2 = int(ip / 65536) % 256
+ o3 = int(ip / 256) % 256
+ o4 = int(ip) % 256
+ return '%s.%s.%s.%s' % (o1, o2, o3, o4)
+
+ def arp_event_host(self, e):
+ return Host(str(e.mac), ip4=str(e.ip))
+
+ def arp_event_hosts(self, evs):
+ return {self.arp_event_host(e) for e in evs}
+
+ def nd_event_host(self, e):
+ return Host(str(e.mac), ip6=str(e.ip))
+
+ def nd_event_hosts(self, evs):
+ return {self.nd_event_host(e) for e in evs}
+
@classmethod
def ns_req(cls, src_host, host):
nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
@classmethod
- def ns_reqs(cls, src_host, entries):
+ def ns_reqs_dst(cls, entries, dst_host):
+ return [cls.ns_req(e, dst_host) for e in entries]
+
+ @classmethod
+ def ns_reqs_src(cls, src_host, entries):
return [cls.ns_req(src_host, e) for e in entries]
def na_resp_host(self, src_host, rx):
else:
raise ValueError("Unknown feature used: %s" % flag)
is_set = 1 if args[flag] else 0
- self.vapi.bridge_flags(bd_id, is_set, feature_bitmap)
+ self.vapi.bridge_flags(bd_id=bd_id, is_set=is_set,
+ flags=feature_bitmap)
self.logger.info("Bridge domain ID %d updated" % bd_id)
def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
self.assertEqual(len(resps ^ resp_hosts), 0)
def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
- reqs = self.ns_reqs(src_host, req_hosts)
+ reqs = self.ns_reqs_src(src_host, req_hosts)
for swif in self.bd_swifs(bd_id):
swif.add_stream(reqs)
macs = self.mac_list(range(1, 5))
hosts = self.ip4_hosts(4, 1, macs)
self.add_del_arp_term_hosts(hosts, is_add=1)
+
self.verify_arp(src_host, hosts, hosts)
type(self).hosts = hosts
self.verify_nd(src_host, hosts, updated)
self.bd_add_del(1, is_add=0)
+ def test_l2bd_arp_term_09(self):
+ """ L2BD arp term - send garps, verify arp event reports
+ """
+ self.vapi.want_l2_arp_term_events(enable=1)
+ self.bd_add_del(1, is_add=1)
+ self.set_bd_flags(1, arp_term=True, flood=False,
+ uu_flood=False, learn=False)
+ macs = self.mac_list(range(90, 95))
+ hosts = self.ip4_hosts(5, 1, macs)
+
+ garps = self.garp_reqs(hosts)
+ self.bd_swifs(1)[0].add_stream(garps)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ evs = [self.vapi.wait_for_event(1, "l2_arp_term_event")
+ for i in range(len(hosts))]
+ ev_hosts = self.arp_event_hosts(evs)
+ self.assertEqual(len(ev_hosts ^ hosts), 0)
+
+ def test_l2bd_arp_term_10(self):
+ """ L2BD arp term - send duplicate garps, verify suppression
+ """
+ macs = self.mac_list(range(70, 71))
+ hosts = self.ip4_hosts(6, 1, macs)
+
+ """ send the packet 5 times expect one event
+ """
+ garps = self.garp_reqs(hosts) * 5
+ self.bd_swifs(1)[0].add_stream(garps)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ evs = [self.vapi.wait_for_event(1, "l2_arp_term_event")
+ for i in range(len(hosts))]
+ ev_hosts = self.arp_event_hosts(evs)
+ self.assertEqual(len(ev_hosts ^ hosts), 0)
+
+ def test_l2bd_arp_term_11(self):
+ """ L2BD arp term - disable ip4 arp events,send garps, verify no events
+ """
+ self.vapi.want_l2_arp_term_events(enable=0)
+ macs = self.mac_list(range(90, 95))
+ hosts = self.ip4_hosts(5, 1, macs)
+
+ garps = self.garp_reqs(hosts)
+ self.bd_swifs(1)[0].add_stream(garps)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.sleep(1)
+ self.assertEqual(len(self.vapi.collect_events()), 0)
+ self.bd_add_del(1, is_add=0)
+
+ def test_l2bd_arp_term_12(self):
+ """ L2BD ND term - send NS packets verify reports
+ """
+ self.vapi.want_l2_arp_term_events(enable=1)
+ dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
+ self.bd_add_del(1, is_add=1)
+ self.set_bd_flags(1, arp_term=True, flood=False,
+ uu_flood=False, learn=False)
+ macs = self.mac_list(range(10, 15))
+ hosts = self.ip6_hosts(5, 1, macs)
+ reqs = self.ns_reqs_dst(hosts, dst_host)
+ self.bd_swifs(1)[0].add_stream(reqs)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ evs = [self.vapi.wait_for_event(2, "l2_arp_term_event")
+ for i in range(len(hosts))]
+ ev_hosts = self.nd_event_hosts(evs)
+ self.assertEqual(len(ev_hosts ^ hosts), 0)
+
+ def test_l2bd_arp_term_13(self):
+ """ L2BD ND term - send duplicate ns, verify suppression
+ """
+ dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
+ macs = self.mac_list(range(16, 17))
+ hosts = self.ip6_hosts(5, 1, macs)
+ reqs = self.ns_reqs_dst(hosts, dst_host) * 5
+ self.bd_swifs(1)[0].add_stream(reqs)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ evs = [self.vapi.wait_for_event(2, "l2_arp_term_event")
+ for i in range(len(hosts))]
+ ev_hosts = self.nd_event_hosts(evs)
+ self.assertEqual(len(ev_hosts ^ hosts), 0)
+
+ def test_l2bd_arp_term_14(self):
+ """ L2BD ND term - disable ip4 arp events,send ns, verify no events
+ """
+ self.vapi.want_l2_arp_term_events(enable=0)
+ dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
+ macs = self.mac_list(range(10, 15))
+ hosts = self.ip6_hosts(5, 1, macs)
+ reqs = self.ns_reqs_dst(hosts, dst_host)
+ self.bd_swifs(1)[0].add_stream(reqs)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.sleep(1)
+ self.assertEqual(len(self.vapi.collect_events()), 0)
+ self.bd_add_del(1, is_add=0)
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)