X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=test%2Ftest_udp.py;h=230335ff169c96ee2681386c710eb23e86325057;hb=66446b9888e7507929f1665fda0308b86fe7e70b;hp=322d8133b0d55f8abf541107e18f9553e4badc90;hpb=31ed74407643595fdce206e9d7487108fb8b33ab;p=vpp.git diff --git a/test/test_udp.py b/test/test_udp.py index 322d8133b0d..230335ff169 100644 --- a/test/test_udp.py +++ b/test/test_udp.py @@ -1,11 +1,11 @@ #!/usr/bin/env python - +import unittest from framework import VppTestCase, VppTestRunner -from vpp_udp_encap import * +from vpp_udp_encap import VppUdpEncap from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable, VppMplsLabel from scapy.packet import Raw -from scapy.layers.l2 import Ether, ARP +from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP from scapy.layers.inet6 import IPv6 from scapy.contrib.mpls import MPLS @@ -223,5 +223,79 @@ class TestUdpEncap(VppTestCase): self.validate_inner4(p, p_4omo4, ttl=63) +class TestUDP(VppTestCase): + """ UDP Test Case """ + + @classmethod + def setUpClass(cls): + super(TestUDP, cls).setUpClass() + + def setUp(self): + super(TestUDP, self).setUp() + self.vapi.session_enable_disable(is_enabled=1) + self.create_loopback_interfaces(2) + + table_id = 0 + + for i in self.lo_interfaces: + i.admin_up() + + if table_id != 0: + tbl = VppIpTable(self, table_id) + tbl.add_vpp_config() + + i.set_table_ip4(table_id) + i.config_ip4() + table_id += 1 + + # Configure namespaces + self.vapi.app_namespace_add(namespace_id="0", + sw_if_index=self.loop0.sw_if_index) + self.vapi.app_namespace_add(namespace_id="1", + sw_if_index=self.loop1.sw_if_index) + + def tearDown(self): + for i in self.lo_interfaces: + i.unconfig_ip4() + i.set_table_ip4(0) + i.admin_down() + self.vapi.session_enable_disable(is_enabled=0) + super(TestUDP, self).tearDown() + + def test_udp_transfer(self): + """ UDP echo client/server transfer """ + + # Add inter-table routes + ip_t01 = VppIpRoute(self, self.loop1.local_ip4, 32, + [VppRoutePath("0.0.0.0", + 0xffffffff, + nh_table_id=1)]) + ip_t10 = VppIpRoute(self, self.loop0.local_ip4, 32, + [VppRoutePath("0.0.0.0", + 0xffffffff, + nh_table_id=0)], table_id=1) + ip_t01.add_vpp_config() + ip_t10.add_vpp_config() + + # Start builtin server and client + uri = "udp://" + self.loop0.local_ip4 + "/1234" + error = self.vapi.cli("test echo server appns 0 fifo-size 4 no-echo" + + "uri " + uri) + if error: + self.logger.critical(error) + self.assertEqual(error.find("failed"), -1) + + error = self.vapi.cli("test echo client mbytes 10 appns 1 " + + "fifo-size 4 no-output test-bytes " + + "syn-timeout 2 no-return uri " + uri) + if error: + self.logger.critical(error) + self.assertEqual(error.find("failed"), -1) + + # Delete inter-table routes + ip_t01.remove_vpp_config() + ip_t10.remove_vpp_config() + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)