X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_ip4.py;h=5b5cf09c86f8be24f97b7cd8a13087f49941022a;hb=8a9c8f1412cb1258340b18a8eb622a835ef3c37b;hp=42fd1164a5f8dcb2694475cd8b050efe6e9fef79;hpb=054c03ac9c20a5e38121590b83f01fd91f82acf0;p=vpp.git diff --git a/test/test_ip4.py b/test/test_ip4.py index 42fd1164a5f..5b5cf09c86f 100644 --- a/test/test_ip4.py +++ b/test/test_ip4.py @@ -572,14 +572,6 @@ class TestIPDisabled(VppTestCase): i.unconfig_ip4() i.admin_down() - def send_and_assert_no_replies(self, intf, pkts, remark): - intf.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - for i in self.pg_interfaces: - i.get_capture(0) - i.assert_nothing_captured(remark=remark) - def test_ip_disabled(self): """ IP Disabled """ @@ -667,14 +659,6 @@ class TestIPSubNets(VppTestCase): for i in self.pg_interfaces: i.admin_down() - def send_and_assert_no_replies(self, intf, pkts, remark): - intf.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - for i in self.pg_interfaces: - i.get_capture(0) - i.assert_nothing_captured(remark=remark) - def test_ip_sub_nets(self): """ IP Sub Nets """ @@ -985,12 +969,6 @@ class TestIPVlan0(VppTestCase): i.admin_down() super(TestIPVlan0, self).tearDown() - def send_and_expect(self, input, pkts, output): - input.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - rx = output.get_capture(len(pkts)) - def test_ip_vlan_0(self): """ IP VLAN-0 """ @@ -1028,23 +1006,6 @@ class TestIPPunt(VppTestCase): i.unconfig_ip4() i.admin_down() - def send_and_expect(self, input, pkts, output): - self.vapi.cli("clear trace") - input.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - rx = output.get_capture(len(pkts)) - return rx - - def send_and_assert_no_replies(self, intf, pkts, remark): - self.vapi.cli("clear trace") - intf.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - for i in self.pg_interfaces: - i.get_capture(0) - i.assert_nothing_captured(remark=remark) - def test_ip_punt(self): """ IP punt police and redirect """ @@ -1138,23 +1099,6 @@ class TestIPDeag(VppTestCase): i.unconfig_ip4() i.admin_down() - def send_and_expect(self, input, pkts, output): - self.vapi.cli("clear trace") - input.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - rx = output.get_capture(len(pkts)) - return rx - - def send_and_assert_no_replies(self, intf, pkts, remark): - self.vapi.cli("clear trace") - intf.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - for i in self.pg_interfaces: - i.get_capture(0) - i.assert_nothing_captured(remark=remark) - def test_ip_deag(self): """ IP Deag Routes """ @@ -1227,5 +1171,148 @@ class TestIPDeag(VppTestCase): self.send_and_expect(self.pg0, pkts_src, self.pg2) +class TestIPInput(VppTestCase): + """ IPv4 Input Exceptions """ + + def setUp(self): + super(TestIPInput, self).setUp() + + self.create_pg_interfaces(range(2)) + + for i in self.pg_interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + + def tearDown(self): + super(TestIPInput, self).tearDown() + for i in self.pg_interfaces: + i.unconfig_ip4() + i.admin_down() + + def test_ip_input(self): + """ IP Input Exceptions """ + + # i can't find a way in scapy to construct an IP packet + # with a length less than the IP header length + + # + # Packet too short - this is forwarded + # + p_short = (Ether(src=self.pg0.remote_mac, + dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4, + len=40) / + UDP(sport=1234, dport=1234) / + Raw('\xa5' * 100)) + + rx = self.send_and_expect(self.pg0, p_short * 65, self.pg1) + + # + # Packet too long - this is dropped + # + p_long = (Ether(src=self.pg0.remote_mac, + dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4, + len=400) / + UDP(sport=1234, dport=1234) / + Raw('\xa5' * 100)) + + rx = self.send_and_assert_no_replies(self.pg0, p_long * 65, + "too long") + + # + # bad chksum - this is dropped + # + p_chksum = (Ether(src=self.pg0.remote_mac, + dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4, + chksum=400) / + UDP(sport=1234, dport=1234) / + Raw('\xa5' * 100)) + + rx = self.send_and_assert_no_replies(self.pg0, p_chksum * 65, + "bad checksum") + + # + # bad version - this is dropped + # + p_ver = (Ether(src=self.pg0.remote_mac, + dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4, + version=3) / + UDP(sport=1234, dport=1234) / + Raw('\xa5' * 100)) + + rx = self.send_and_assert_no_replies(self.pg0, p_ver * 65, + "funky version") + + # + # fragment offset 1 - this is dropped + # + p_frag = (Ether(src=self.pg0.remote_mac, + dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4, + frag=1) / + UDP(sport=1234, dport=1234) / + Raw('\xa5' * 100)) + + rx = self.send_and_assert_no_replies(self.pg0, p_frag * 65, + "frag offset") + + # + # TTL expired packet + # + p_ttl = (Ether(src=self.pg0.remote_mac, + dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4, + ttl=1) / + UDP(sport=1234, dport=1234) / + Raw('\xa5' * 100)) + + rx = self.send_and_expect(self.pg0, p_ttl * 65, self.pg0) + + rx = rx[0] + icmp = rx[ICMP] + + self.assertEqual(icmptypes[icmp.type], "time-exceeded") + self.assertEqual(icmpcodes[icmp.type][icmp.code], + "ttl-zero-during-transit") + self.assertEqual(icmp.src, self.pg0.remote_ip4) + self.assertEqual(icmp.dst, self.pg1.remote_ip4) + + # + # MTU exceeded + # + p_mtu = (Ether(src=self.pg0.remote_mac, + dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4, + ttl=10, flags='DF') / + UDP(sport=1234, dport=1234) / + Raw('\xa5' * 2000)) + + self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, 1500) + + rx = self.send_and_expect(self.pg0, p_mtu * 65, self.pg0) + rx = rx[0] + icmp = rx[ICMP] + + self.assertEqual(icmptypes[icmp.type], "dest-unreach") + self.assertEqual(icmpcodes[icmp.type][icmp.code], + "fragmentation-needed") + self.assertEqual(icmp.src, self.pg0.remote_ip4) + self.assertEqual(icmp.dst, self.pg1.remote_ip4) + + self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, 2500) + rx = self.send_and_expect(self.pg0, p_mtu * 65, self.pg1) + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)