TLDK udwfwd test case:
[csit.git] / resources / libraries / python / TLDK / UdpTest.py
diff --git a/resources/libraries/python/TLDK/UdpTest.py b/resources/libraries/python/TLDK/UdpTest.py
new file mode 100644 (file)
index 0000000..adf0888
--- /dev/null
@@ -0,0 +1,126 @@
+# Copyright (c) 2017 Cisco and/or its affiliates.\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at:\r
+#\r
+#     http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+\r
+"""\r
+This module exists to provide the UDP test for TLDK on topology nodes.\r
+"""\r
+\r
+from scapy.utils import rdpcap\r
+from scapy.layers.inet import IP\r
+from scapy.layers.inet6 import IPv6\r
+\r
+from robot.libraries.BuiltIn import BuiltIn\r
+\r
+from resources.libraries.python.ssh import SSH\r
+from resources.libraries.python.TLDK.TLDKConstants import TLDKConstants as con\r
+from resources.libraries.python.topology import Topology\r
+\r
+class UdpTest(object):\r
+    """Test the TLDK UDP function."""\r
+\r
+    @staticmethod\r
+    def get_pcap_info(file_prefix):\r
+        """Get the Dest IP from the RX pcap file\r
+\r
+        :param file_prefix: the test case pcap file prefix\r
+        :type file_prefix: str\r
+        :returns: packet counts, dest ip, is or not ipv4\r
+        :rtype: tuple(int, str, bool).\r
+        """\r
+        exec_dir = BuiltIn().get_variable_value("${EXECDIR}")\r
+\r
+        rx_pcapfile = '{0}/{1}/{2}_rx.pcap' \\r
+            .format(exec_dir, con.TLDK_TESTCONFIG, file_prefix)\r
+        packets = rdpcap(rx_pcapfile)\r
+        count = len(packets)\r
+\r
+        ### the first packet\r
+        pkt = packets[0]\r
+        if pkt.type == 0x0800:\r
+            ### this is a IPv4 packet\r
+            dest_ip = pkt[IP].dst\r
+            is_ipv4 = True\r
+        elif pkt.type == 0x86dd:\r
+            ### this is a IPv6 packet\r
+            dest_ip = pkt[IPv6].dst\r
+            is_ipv4 = False\r
+\r
+        return count, dest_ip, is_ipv4\r
+\r
+    @staticmethod\r
+    def exec_the_udpfwd_test(dut_node, dut_if, file_prefix, \\r
+            dest_ip, is_ipv4=True):\r
+        """Execute the udpfwd on the dut_node.\r
+\r
+        :param dut_node: Will execute the udpfwd on this node.\r
+        :param dut_if: DUT interface name.\r
+        :param file_prefix: The test case config file prefix.\r
+        :param dest_ip: The UDP packet dest IP.\r
+        :param is_ipv4: Execute the IPv4 or IPv6 test.\r
+        :type dut_node: dict\r
+        :type dut_if: str\r
+        :type file_prefix: str\r
+        :type dest_ip: str\r
+        :type is_ipv4: bool\r
+        :returns: none.\r
+        :raises RuntimeError: If failed to execute udpfwd test on the dut node.\r
+        """\r
+        pci_address = Topology.get_interface_pci_addr(dut_node, dut_if)\r
+        ssh = SSH()\r
+        ssh.connect(dut_node)\r
+        if is_ipv4:\r
+            cmd = 'cd {0}/{4} && ./run_tldk.sh {0}/{5}/{2}_rx.pcap ' \\r
+                '{0}/{5}/{2}_tx.pcap {1} {0}/{5}/{2}_fe.cfg ' \\r
+                '{0}/{5}/{2}_be.cfg {3} NONE' \\r
+                .format(con.REMOTE_FW_DIR, pci_address, file_prefix, \\r
+                dest_ip, con.TLDK_SCRIPTS, con.TLDK_TESTCONFIG)\r
+        else:\r
+            cmd = 'cd {0}/{4} && ./run_tldk.sh {0}/{5}/{2}_rx.pcap ' \\r
+                '{0}/{5}/{2}_tx.pcap {1} {0}/{5}/{2}_fe.cfg ' \\r
+                '{0}/{5}/{2}_be.cfg NONE {3}' \\r
+                .format(con.REMOTE_FW_DIR, pci_address, file_prefix, \\r
+                dest_ip, con.TLDK_SCRIPTS, con.TLDK_TESTCONFIG)\r
+\r
+        (ret_code, _, _) = ssh.exec_command(cmd, timeout=600)\r
+        if ret_code != 0:\r
+            raise RuntimeError('Failed to execute udpfwd test at node {0}'\r
+                               .format(dut_node['host']))\r
+\r
+    @staticmethod\r
+    def get_the_test_result(dut_node, file_prefix):\r
+        """\r
+        After execute the udpfwd cmd, use this to get the test result.\r
+\r
+        :param dut_node: will get the test result in this dut node\r
+        :param dut_if: the dut interface name\r
+        :param file_prefix: the test case output file prefix\r
+        :type dut_node: dice\r
+        :type dut_if: str\r
+        :type file_prefix: str\r
+        :returns: str.\r
+        :rtype: str\r
+        :raises RuntimeError: If failed to get the test result.\r
+        """\r
+        ssh = SSH()\r
+        ssh.connect(dut_node)\r
+        cmd = 'cd {0}; tcpdump -nnnn -vvv -r ./{2}/{1}_tx.pcap | ' \\r
+            'grep \'udp sum ok\' | wc -l' \\r
+            .format(con.REMOTE_FW_DIR, file_prefix, con.TLDK_TESTCONFIG)\r
+\r
+        (ret_code, stdout, _) = ssh.exec_command(cmd, timeout=100)\r
+        if ret_code != 0:\r
+            raise RuntimeError('Failed to get test result at node {0}'\r
+                               .format(dut_node['host']))\r
+\r
+        return stdout\r