preserve abstract model structure, calculate values and show structure changes
authorAnton Kiselev <[email protected]>
Thu, 20 Oct 2016 07:49:44 +0000 (14:49 +0700)
committerAnton Kiselev <[email protected]>
Fri, 21 Oct 2016 08:09:08 +0000 (15:09 +0700)
scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py
scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py
scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py

index 2d023dc..1ad6843 100755 (executable)
@@ -434,19 +434,33 @@ class Scapy_service(Scapy_service_api):
         return { "vtype": "BYTES", "base64": bytes_to_b64(payload_bytes) }
 
     def _pkt_to_field_tree(self,pkt):
-        pkt = self._fully_define(pkt)
+        pkt.build()
         result = []
+        pcap_struct = self._fully_define(pkt) # structure, which will appear in pcap binary
         while pkt:
             layer_id = type(pkt).__name__ # Scapy classname
-            layer_name = pkt.name # Display name
+            layer_full = self._fully_define(pkt) # current layer recreated from binary to get auto-calculated vals
+            real_layer_id = type(pcap_struct).__name__ if pcap_struct else None
+            valid_struct = True # shows if packet is mapped correctly to the binary representation
+            if not pcap_struct:
+                valid_struct = False
+            elif not issubclass(type(pkt), type(pcap_struct)) and not issubclass(type(pcap_struct), type(pkt)):
+                # structure mismatch. no need to go deeper in pcap_struct
+                valid_struct = False
+                pcap_struct = None
             fields = []
             for field_desc in pkt.fields_desc:
                 field_id = field_desc.name
-                ignored = field_id not in pkt.fields
+                ignored = field_id not in layer_full.fields
                 offset = field_desc.offset
                 protocol_offset = pkt.offset
                 field_sz = field_desc.get_size_bytes()
-                fieldval = getattr(pkt, field_id)
+                # some values are unavailable in pkt(original model)
+                # at the same time,
+                fieldval = pkt.getfieldval(field_id)
+                pkt_fieldval_defined = is_string(fieldval) or is_number(fieldval) or is_bytes3(fieldval)
+                if not pkt_fieldval_defined:
+                    fieldval = layer_full.getfieldval(field_id)
                 value = None
                 hvalue = None
                 value_base64 = None
@@ -487,7 +501,7 @@ class Scapy_service(Scapy_service_api):
                         hvalue = '<binary>'
                 if field_desc.name == 'load':
                     # show Padding(and possible similar classes) as Raw
-                    layer_id = layer_name ='Raw'
+                    layer_id = 'Raw'
                     field_sz = len(pkt)
                     value = self._bytes_to_value(fieldval)
                 field_data = {
@@ -502,12 +516,15 @@ class Scapy_service(Scapy_service_api):
                 fields.append(field_data)
             layer_data = {
                     "id": layer_id,
-                    "name": layer_name,
                     "offset": pkt.offset,
                     "fields": fields,
+                    "real_id": real_layer_id,
+                    "valid_structure": valid_struct,
                     }
             result.append(layer_data)
             pkt = pkt.payload
+            if pcap_struct:
+                pcap_struct = pcap_struct.payload or None
         return result
 
 #input: container
index 79773f9..17dd304 100644 (file)
@@ -68,3 +68,17 @@ def get_payload_classes(def_filter):
 def build_pkt_to_scapy(buildpkt_result):
     return pass_pkt(Ether(b64_to_bytes(buildpkt_result['binary'])))
 
+def fields_to_map(field_array):
+    # [{id, value, hvalue, offset}, ...] to map id -> {value, hvalue, offset}
+    res = {}
+    if field_array:
+        for f in field_array:
+            res[ f["id"] ] = f
+    return res
+
+def adapt_json_protocol_fields(protocols_array):
+    # replaces layer.fields(array) with map for easier access in tests
+    for protocol in protocols_array:
+        # change structure for easier
+        if protocol.get("fields"):
+            protocol["fields"] = fields_to_map(protocol["fields"])
index 5ea8cf0..9cd473d 100644 (file)
@@ -2,8 +2,11 @@
 # run with 'nosetests' utility
 
 import tempfile
+import re
 from basetest import *
 
+RE_MAC = "^([0-9A-Fa-f]{2}:){5}([0-9A-Fa-f]{2})$"
+
 TEST_MAC_1 = "10:10:10:10:10:10"
 # Test scapy structure
 TEST_PKT = Ether(dst=TEST_MAC_1)/IP(src='127.0.0.1')/TCP(sport=443)
@@ -103,3 +106,50 @@ def test_pcap_read_and_write():
     pkt = build_pkt_to_scapy(array_pkt[0])
     assert(pkt[Ether].dst == TEST_MAC_1)
 
+def test_layer_default_value():
+    res = build_pkt([
+        layer_def("Ether", src={"vtype": "UNDEFINED"})
+        ])
+    ether_fields = fields_to_map(res['data'][0]['fields'])
+    assert(re.match(RE_MAC, ether_fields['src']['value']))
+
+def test_layer_random_value():
+    res = build_pkt([
+        layer_def("Ether", src={"vtype": "RANDOM"})
+        ])
+    ether_fields = fields_to_map(res['data'][0]['fields'])
+    assert(re.match(RE_MAC, ether_fields['src']['value']))
+
+def test_layer_wrong_structure():
+    payload = [
+            layer_def("Ether"),
+            layer_def("IP"),
+            layer_def("Raw", load="dummy"),
+            layer_def("Ether"),
+            layer_def("IP"),
+            ]
+    res = build_pkt(payload)
+    pkt = build_pkt_to_scapy(res)
+    assert(type(pkt[0]) is Ether)
+    assert(type(pkt[1]) is IP)
+    assert(isinstance(pkt[2], Raw))
+    assert(not pkt[2].payload)
+    model = res["data"]
+    assert(len(payload) == len(model))
+    # verify same protocol structure as in abstract model
+    # and all fields defined
+    for depth in range(len(payload)):
+        layer_model = model[depth]
+        layer_fields = fields_to_map(layer_model["fields"])
+        assert(payload[depth]["id"] == model[depth]["id"])
+        for field in layer_model["fields"]:
+            required_field_properties = ["value", "hvalue", "offset"]
+            for field_property in required_field_properties:
+                assert(field[field_property] is not None)
+        if (model[depth]["id"] == "Ether"):
+            assert(layer_fields["type"]["hvalue"] == "IPv4")
+    real_structure = [layer["real_id"] for layer in model]
+    valid_structure_flags = [layer["valid_structure"] for layer in model]
+    assert(real_structure == ["Ether", "IP", "Raw", None, None])
+    assert(valid_structure_flags == [True, True, True, False, False])
+