scapy_service: support {vtype: BYTES, generate: ascii/bytes/template/template code }
authorAnton Kiselev <[email protected]>
Mon, 31 Oct 2016 21:11:57 +0000 (04:11 +0700)
committerAnton Kiselev <[email protected]>
Fri, 4 Nov 2016 12:54:44 +0000 (19:54 +0700)
Signed-off-by: Anton Kiselev <[email protected]>
scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py
scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py [new file with mode: 0644]

index 4ce9e4a..16261b8 100755 (executable)
@@ -9,6 +9,7 @@ import tempfile
 import hashlib
 import base64
 import numbers
+import random
 import inspect
 import json
 from pprint import pprint
@@ -279,6 +280,48 @@ def get_sample_field_val(scapy_layer, fieldId):
     except:
         pass
 
+def generate_random_bytes(sz, seed, start, end):
+    # generate bytes of specified range with a fixed seed and size
+    rnd = random.Random(seed)
+    end = end + 1 # to include end value
+    res = [rnd.randrange(start, end) for _i in range(sz)]
+    if is_python(2):
+        return ''.join(chr(x) for x in res)
+    else:
+        return bytes(res)
+
+def generate_bytes_from_template(sz, template):
+    # generate bytes by repeating a template
+    res = str_to_bytes('') # new bytes array
+    if len(template) == 0:
+        return res
+    while len(res) < sz:
+        res = res + template
+    return res[:sz]
+
+def parse_template_code(template_code):
+    template_code = re.sub("0[xX]", '', template_code) # remove 0x
+    template_code = re.sub("[\s]", '', template_code) # remove spaces
+    return bytearray.fromhex(template_code)
+
+def generate_bytes(bytes_definition):
+    # accepts a bytes definition object
+    # {generate: random_bytes or random_ascii, seed: <seed_number>, size: <size_bytes>}
+    # {generate: template, template_base64: '<base64str>',  size: <size_bytes>}
+    # {generate: template_code, template_text_code: '<template_code_str>',  size: <size_bytes>}
+    gen_type = bytes_definition.get('generate')
+    bytes_size = bytes_definition.get('size') 
+    seed = bytes_definition.get('seed') or 12345
+    if gen_type == 'random_bytes':
+        return generate_random_bytes(bytes_size, seed, 0, 0xFF)
+    elif gen_type == 'random_ascii':
+        return generate_random_bytes(bytes_size, seed, 0x20, 0x7E)
+    elif gen_type == 'template':
+        return generate_bytes_from_template(bytes_size, b64_to_bytes(bytes_definition["template_base64"]))
+    elif gen_type == 'template_code':
+        return generate_bytes_from_template(bytes_size, bytes_definition["template_code"])
+
+
 class ScapyException(Exception): pass
 class Scapy_service(Scapy_service_api):
 
@@ -381,7 +424,10 @@ class Scapy_service(Scapy_service_api):
             if value_type == 'EXPRESSION':
                 return eval(val['expr'], {})
             elif value_type == 'BYTES':   # bytes payload(ex Raw.load)
-                return b64_to_bytes(val['base64'])
+                if 'generate' in val:
+                    return generate_bytes(val)
+                else:
+                    return b64_to_bytes(val['base64'])
             elif value_type == 'OBJECT':
                 return val['value']
             else:
index 7126ef4..b2bc8e0 100644 (file)
@@ -163,7 +163,7 @@ def test_layer_wrong_structure():
     assert(real_structure == ["Ether", "IP", "Raw", None, None])
     assert(valid_structure_flags == [True, True, True, False, False])
 
-def test_hand_crafted_definitions():
+def test_ether_definitions():
     etherDef = get_definition_of("Ether")
     assert(etherDef['name'] == "Ethernet II")
     etherFields = etherDef['fields']
@@ -174,3 +174,27 @@ def test_hand_crafted_definitions():
     assert(etherFields[2]['id'] == 'type')
     assert(etherFields[2]['name'] == 'Type')
 
+def test_ether_definitions():
+    pdef = get_definition_of("ICMP")
+    assert(pdef['id'] == "ICMP")
+    assert(pdef['name'])
+    assert(pdef['fields'])
+
+def test_ip_definitions():
+    pdef = get_definition_of("IP")
+    fields = pdef['fields']
+    assert(fields[0]['id'] == 'version')
+
+    assert(fields[1]['id'] == 'ihl')
+    assert(fields[1]['auto'] == True)
+
+    assert(fields[3]['id'] == 'len')
+    assert(fields[3]['auto'] == True)
+
+    assert(fields[5]['id'] == 'flags')
+    assert(fields[5]['type'] == 'BITMASK')
+    assert(fields[5]['bits'][0]['name'] == 'Reserved')
+
+    assert(fields[9]['id'] == 'chksum')
+    assert(fields[9]['auto'] == True)
+
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py
new file mode 100644 (file)
index 0000000..8ae186a
--- /dev/null
@@ -0,0 +1,58 @@
+# run with 'nosetests' utility
+
+from basetest import *
+from scapy_service import *
+
+def test_generate_random_bytes():
+    res = generate_random_bytes(10, 333, ord('0'), ord('9'))
+    print(res)
+    assert(len(res) == 10)
+    assert(res == '5390532937') # random value with this seed
+
+def test_generate_bytes_from_template_empty():
+    res = generate_bytes_from_template(5, b"")
+    print(res)
+    assert(res == b"")
+
+def test_generate_bytes_from_template_less():
+    res = generate_bytes_from_template(5, b"qwe")
+    print(res)
+    assert(res == b"qweqw")
+
+def test_generate_bytes_from_template_same():
+    res = generate_bytes_from_template(5, b"qwert")
+    print(res)
+    assert(res == b"qwert")
+
+def test_generate_bytes_from_template_more():
+    res = generate_bytes_from_template(5, b"qwerty")
+    print(res)
+    assert(res == b"qwert")
+
+def test_parse_template_code_with_trash():
+    res = parse_template_code("0xDE AD\n be ef \t0xDEAD")
+    print(res)
+    assert(res == bytearray.fromhex('DEADBEEFDEAD'))
+
+def test_generate_bytes():
+    res = generate_bytes({"generate":"random_bytes", "seed": 123, "size": 12})
+    print(res)
+    assert(len(res) == 12)
+
+def test_generate_ascii_default_seed():
+    res = generate_bytes({"generate":"random_ascii", "size": 14})
+    print(res)
+    assert(len(res) == 14)
+
+
+def test_generate_template_code():
+    res = generate_bytes({"generate":"template_code", "template_code": "0xDEAD 0xBEEF", "size": 6})
+    print(res)
+    assert(res == bytearray.fromhex('DE AD BE EF DE AD'))
+
+def test_generate_template_code():
+    res = generate_bytes({"generate":"template", "template_base64": bytes_to_b64(b'hi'), "size": 5})
+    print(res)
+    assert(res == b'hihih')
+
+