"Finished sleep (%s) - slept %ss (wanted %ss)" % (
remark, after - before, timeout))
+ def send_and_assert_no_replies(self, intf, pkts, remark=""):
+ self.vapi.cli("clear trace")
+ intf.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ timeout = 1
+ for i in self.pg_interfaces:
+ i.get_capture(0, timeout=timeout)
+ i.assert_nothing_captured(remark=remark)
+ timeout = 0.1
+
+ def send_and_expect(self, input, pkts, output):
+ self.vapi.cli("clear trace")
+ input.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ rx = output.get_capture(len(pkts))
+ return rx
+
class TestCasePrinter(object):
_shared_state = {}
i.admin_down()
super(TestBier, self).tearDown()
- def send_and_assert_no_replies(self, intf, pkts, remark):
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.assert_nothing_captured(remark=remark)
-
- def send_and_expect(self, input, pkts, output):
- self.vapi.cli("trace add bier-mpls-lookup 10")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- rx = output.get_capture(len(pkts))
- return rx
-
def test_bier_midpoint(self):
"""BIER midpoint"""
i.admin_down()
super(TestDHCP, self).tearDown()
- def send_and_assert_no_replies(self, intf, pkts, remark):
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.assert_nothing_captured(remark=remark)
-
def verify_dhcp_has_option(self, pkt, option, value):
dhcp = pkt[DHCP]
found = False
super(TestDVR, self).tearDown()
- def send_and_assert_no_replies(self, intf, pkts):
- self.vapi.cli("clear trace")
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.get_capture(0)
- i.assert_nothing_captured()
-
- def send_and_expect(self, input, pkts, output):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- rx = output.get_capture(len(pkts))
- return rx
-
def assert_same_mac_addr(self, tx, rx):
t_eth = tx[Ether]
for p in rx:
i.unconfig_ip4()
i.admin_down()
- def send_and_assert_no_replies(self, intf, pkts, remark):
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.get_capture(0)
- i.assert_nothing_captured(remark=remark)
-
def test_ip_disabled(self):
""" IP Disabled """
for i in self.pg_interfaces:
i.admin_down()
- def send_and_assert_no_replies(self, intf, pkts, remark):
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.get_capture(0)
- i.assert_nothing_captured(remark=remark)
-
def test_ip_sub_nets(self):
""" IP Sub Nets """
i.admin_down()
super(TestIPVlan0, self).tearDown()
- def send_and_expect(self, input, pkts, output):
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- rx = output.get_capture(len(pkts))
-
def test_ip_vlan_0(self):
""" IP VLAN-0 """
i.unconfig_ip4()
i.admin_down()
- def send_and_expect(self, input, pkts, output):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- rx = output.get_capture(len(pkts))
- return rx
-
- def send_and_assert_no_replies(self, intf, pkts, remark):
- self.vapi.cli("clear trace")
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.get_capture(0)
- i.assert_nothing_captured(remark=remark)
-
def test_ip_punt(self):
""" IP punt police and redirect """
i.unconfig_ip4()
i.admin_down()
- def send_and_expect(self, input, pkts, output):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- rx = output.get_capture(len(pkts))
- return rx
-
- def send_and_assert_no_replies(self, intf, pkts, remark):
- self.vapi.cli("clear trace")
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.get_capture(0)
- i.assert_nothing_captured(remark=remark)
-
def test_ip_deag(self):
""" IP Deag Routes """
i.unconfig_ip4()
i.admin_down()
- def send_and_expect(self, input, pkts, output):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- rx = output.get_capture(len(pkts))
- return rx
-
- def send_and_assert_no_replies(self, intf, pkts, remark):
- self.vapi.cli("clear trace")
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.get_capture(0)
- i.assert_nothing_captured(remark=remark)
-
def test_ip_input(self):
""" IP Input Exceptions """
rx = rx[0]
self.validate_ns(rx_intf, rx, tgt_ip)
- def send_and_assert_no_replies(self, intf, pkts, remark):
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.get_capture(0)
- i.assert_nothing_captured(remark=remark)
-
def verify_ip(self, rx, smac, dmac, sip, dip):
ether = rx[Ether]
self.assertEqual(ether.dst, dmac)
i.unconfig_ip4()
i.admin_down()
- def send_and_assert_no_replies(self, intf, pkts, remark):
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.get_capture(0)
- i.assert_nothing_captured(remark=remark)
-
def test_ip_disabled(self):
""" IP Disabled """
i.unconfig_ip6()
i.admin_down()
- def send_and_expect(self, input, pkts, output):
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- rx = output.get_capture(len(pkts))
- return rx
-
- def send_and_assert_no_replies(self, intf, pkts, remark):
- self.vapi.cli("clear trace")
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.get_capture(0)
- i.assert_nothing_captured(remark=remark)
-
def test_ip_punt(self):
""" IP6 punt police and redirect """
i.unconfig_ip6()
i.admin_down()
- def send_and_expect(self, input, pkts, output):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- rx = output.get_capture(len(pkts))
- return rx
-
- def send_and_assert_no_replies(self, intf, pkts, remark):
- self.vapi.cli("clear trace")
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.get_capture(0)
- i.assert_nothing_captured(remark=remark)
-
def test_ip_input(self):
""" IP6 Input Exceptions """
i.unconfig_ip6()
i.admin_down()
- def send_and_assert_no_replies(self, intf, pkts, remark):
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.assert_nothing_captured(remark=remark)
-
def send_and_assert_encapped(self, tx, ip6_src, ip6_dst, dmac=None):
if not dmac:
dmac = self.pg1.remote_mac
except:
raise
- def send_and_assert_no_replies(self, intf, pkts, remark):
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.assert_nothing_captured(remark=remark)
-
def test_swap(self):
""" MPLS label swap tests """
#
tx = self.create_stream_labelled_ip6(self.pg0, [34], 255,
dst_ip="ff01::1", hlim=1)
- self.send_and_assert_no_replies(self.pg0, tx, "Hop Limt Expired")
+ self.pg0.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture(257)
+ self.verify_capture_ip6_icmp(self.pg0, rx, tx)
#
# set the RPF-ID of the enrtry to not match the input packet's
self.pg0.disable_mpls()
super(TestMPLSDisabled, self).tearDown()
- def send_and_assert_no_replies(self, intf, pkts, remark):
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.get_capture(0)
- i.assert_nothing_captured(remark=remark)
-
def test_mpls_disabled(self):
""" MPLS Disabled """
self.assertEqual(ip.src, sip)
self.assertEqual(ip.dst, dip)
- def send_and_assert_no_replies(self, intf, pkts, remark):
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- timeout = 1
- for i in self.pg_interfaces:
- i.get_capture(0, timeout=timeout)
- i.assert_nothing_captured(remark=remark)
- timeout = 0.1
-
def test_arp(self):
""" ARP """
self.assertEqual(rx.dst, tx[IPv6].dst)
self.assertEqual(rx.hlim, tx[IPv6].hlim)
- def send_and_expect(self, input, output, pkts):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- rx = output.get_capture(len(pkts))
- return rx
-
def test_udp_encap(self):
""" UDP Encap test
"""
IP(src="2.2.2.2", dst="1.1.0.1") /
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
- rx = self.send_and_expect(self.pg0, self.pg0, p_4o4*65)
+ rx = self.send_and_expect(self.pg0, p_4o4*65, self.pg0)
for p in rx:
self.validate_outer4(p, udp_encap_0)
p = IP(p["UDP"].payload.load)
IP(src="2.2.2.2", dst="1.1.2.1") /
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
- rx = self.send_and_expect(self.pg0, self.pg2, p_4o6*65)
+ rx = self.send_and_expect(self.pg0, p_4o6*65, self.pg2)
for p in rx:
self.validate_outer6(p, udp_encap_2)
p = IP(p["UDP"].payload.load)
IPv6(src="2001::100", dst="2001::1") /
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
- rx = self.send_and_expect(self.pg0, self.pg1, p_6o4*65)
+ rx = self.send_and_expect(self.pg0, p_6o4*65, self.pg1)
for p in rx:
self.validate_outer4(p, udp_encap_1)
p = IPv6(p["UDP"].payload.load)
IPv6(src="2001::100", dst="2001::3") /
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
- rx = self.send_and_expect(self.pg0, self.pg3, p_6o6*65)
+ rx = self.send_and_expect(self.pg0, p_6o6*65, self.pg3)
for p in rx:
self.validate_outer6(p, udp_encap_3)
p = IPv6(p["UDP"].payload.load)
IP(src="2.2.2.2", dst="1.1.2.22") /
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
- rx = self.send_and_expect(self.pg0, self.pg1, p_4omo4*65)
+ rx = self.send_and_expect(self.pg0, p_4omo4*65, self.pg1)
for p in rx:
self.validate_outer4(p, udp_encap_1)
p = MPLS(p["UDP"].payload.load)