add unit tests for scapy service
authorAnton Kiselev <[email protected]>
Tue, 18 Oct 2016 12:46:06 +0000 (19:46 +0700)
committerAnton Kiselev <[email protected]>
Tue, 18 Oct 2016 12:46:06 +0000 (19:46 +0700)
Signed-off-by: Anton Kiselev <[email protected]>
scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py [new file with mode: 0644]
scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py [new file with mode: 0644]

diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py
new file mode 100644 (file)
index 0000000..79773f9
--- /dev/null
@@ -0,0 +1,70 @@
+import os
+import sys
+import json
+import base64
+import inspect
+from inspect import getcallargs
+# add paths to scapy_service and trex_stl_lib.api
+sys.path.append(os.path.abspath(os.pardir))
+sys.path.append(os.path.abspath(os.path.join(os.pardir, os.pardir, os.pardir)))
+
+from scapy_service import *
+from scapy.all import *
+
+service = Scapy_service()
+v_handler = service.get_version_handler('1','01')
+
+def pretty_json(obj):
+    return json.dumps(obj, indent=4)
+
+def pprint(obj):
+    print(pretty_json(obj))
+
+def is_verbose():
+    return True
+
+def pass_result(result, *args):
+    # returns result unchanged, but can display debug info if enabled
+    if is_verbose():
+        fargs = (inspect.stack()[-1][4])
+        print(fargs[0])
+        pprint(result)
+    return result
+
+def pass_pkt(result):
+    # returns packet unchanged, but can display debug info if enabled
+    if is_verbose() and result is not None:
+        result.show2()
+    return result
+
+# utility functions for tests
+
+def layer_def(layerId, **layerfields):
+    # test helper method to generate JSON-like protocol definition object for scapy
+    # ex. { "id": "Ether", "fields": [ { "id": "dst", "value": "10:10:10:10:10:10" } ] }
+    res = { "id": layerId }
+    if layerfields:
+        res["fields"] = [ {"id": k, "value": v} for k,v in layerfields.items() ]
+    return res
+
+def get_version_handler():
+    return pass_result(service.get_version_handler("1", "01"))
+
+def build_pkt(model_def):
+    return pass_result(service.build_pkt(v_handler, model_def))
+
+def build_pkt_get_scapy(model_def):
+    return build_pkt_to_scapy(build_pkt(model_def))
+
+def reconstruct_pkt(bytes_b64, model_def):
+    return pass_result(service.reconstruct_pkt(v_handler, bytes_b64, model_def))
+
+def get_definitions(def_filter):
+    return pass_result(service.get_definitions(v_handler, def_filter))
+
+def get_payload_classes(def_filter):
+    return pass_result(service.get_payload_classes(v_handler, def_filter))
+
+def build_pkt_to_scapy(buildpkt_result):
+    return pass_pkt(Ether(b64_to_bytes(buildpkt_result['binary'])))
+
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
new file mode 100644 (file)
index 0000000..5ea8cf0
--- /dev/null
@@ -0,0 +1,105 @@
+#
+# run with 'nosetests' utility
+
+import tempfile
+from basetest import *
+
+TEST_MAC_1 = "10:10:10:10:10:10"
+# Test scapy structure
+TEST_PKT = Ether(dst=TEST_MAC_1)/IP(src='127.0.0.1')/TCP(sport=443)
+
+# Corresponding JSON-like structure
+TEST_PKT_DEF = [
+        layer_def("Ether", dst=TEST_MAC_1),
+        layer_def("IP", dst="127.0.0.1"),
+        layer_def("TCP", sport="443")
+        ]
+
+def test_build_pkt():
+    pkt = build_pkt_get_scapy(TEST_PKT_DEF)
+    assert(pkt[TCP].sport == 443)
+
+def test_build_invalid_structure_pkt():
+    ether_fields = {"dst": TEST_MAC_1, "type": "LOOP"}
+    pkt = build_pkt_get_scapy([
+        layer_def("Ether", **ether_fields),
+        layer_def("IP"),
+        layer_def("TCP", sport=8080)
+        ])
+    assert(pkt[Ether].dst == TEST_MAC_1)
+    assert(isinstance(pkt[Ether].payload, Raw))
+
+def test_reconstruct_pkt():
+    res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), None)
+    pkt = build_pkt_to_scapy(res)
+    assert(pkt[TCP].sport == 443)
+
+def test_layer_del():
+    modif = [
+            {"id": "Ether"},
+            {"id": "IP"},
+            {"id": "TCP", "delete": True},
+    ]
+    res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif)
+    pkt = build_pkt_to_scapy(res)
+    assert(not pkt[IP].payload)
+
+def test_layer_field_edit():
+    modif = [
+            {"id": "Ether"},
+            {"id": "IP"},
+            {"id": "TCP", "fields": [{"id": "dport", "value": 777}]},
+    ]
+    res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif)
+    pkt = build_pkt_to_scapy(res)
+    assert(pkt[TCP].dport == 777)
+    assert(pkt[TCP].sport == 443)
+
+def test_layer_add():
+    modif = [
+            {"id": "Ether"},
+            {"id": "IP"},
+            {"id": "TCP"},
+            {"id": "Raw", "fields": [{"id": "load", "value": "GET /helloworld HTTP/1.0\n\n"}]},
+    ]
+    res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif)
+    pkt = build_pkt_to_scapy(res)
+    assert("GET /helloworld" in str(pkt[TCP].payload.load))
+
+def test_build_Raw():
+    pkt = build_pkt_get_scapy([
+        layer_def("Ether"),
+        layer_def("IP"),
+        layer_def("TCP"),
+        layer_def("Raw", load={"vtype": "BYTES", "base64": bytes_to_b64(b"hi")})
+        ])
+    assert(str(pkt[Raw].load == "hi"))
+
+def test_get_all():
+    service.get_all(v_handler)
+
+def test_get_definitions_all():
+    get_definitions(None)
+    def_classnames = [pdef['id'] for pdef in get_definitions(None)['protocols']]
+    assert("IP" in def_classnames)
+    assert("Dot1Q" in def_classnames)
+    assert("TCP" in def_classnames)
+
+def test_get_definitions_ether():
+    res = get_definitions(["Ether"])
+    assert(len(res) == 1)
+    assert(res['protocols'][0]['id'] == "Ether")
+
+def test_get_payload_classes():
+    eth_payloads = get_payload_classes([{"id":"Ether"}])
+    assert("IP" in eth_payloads)
+    assert("Dot1Q" in eth_payloads)
+    assert("TCP" not in eth_payloads)
+
+def test_pcap_read_and_write():
+    pkts_to_write = [bytes_to_b64(bytes(TEST_PKT))]
+    pcap_b64 = service.write_pcap(v_handler, pkts_to_write)
+    array_pkt = service.read_pcap(v_handler, pcap_b64)
+    pkt = build_pkt_to_scapy(array_pkt[0])
+    assert(pkt[Ether].dst == TEST_MAC_1)
+