import base64
import numbers
import random
-import inspect
+from inspect import getdoc
import json
from pprint import pprint
self.protocol_fields_fe_aware = {}
self.instruction_parameter_meta_definitions = []
self.field_engine_parameter_meta_definitions = []
+ self.field_engine_instructions_meta = []
self.field_engine_instruction_expressions = []
self._load_definitions_from_json()
- self._load_instruction_parameter_definitions_from_json()
+ self._load_field_engine_meta_from_json()
def _load_definitions_from_json(self):
# load protocol definitions from a json file
for protocol in protocols:
self.protocol_definitions[ protocol['id'] ] = protocol
- def _load_instruction_parameter_definitions_from_json(self):
+ def _load_field_engine_meta_from_json(self):
# load protocol definitions from a json file
self.instruction_parameter_meta_definitions = []
self.protocol_fields_fe_aware = {}
self.field_engine_parameter_meta_definitions = []
with open('field_engine.json', 'r') as f:
metas = json.load(f)
- for meta in metas["instruction_params_meta"]:
- self.instruction_parameter_meta_definitions.append(meta)
+ self.instruction_parameter_meta_definitions = metas["instruction_params_meta"]
+ self.field_engine_instructions_meta = metas["instructions"]
+ self._append_intructions_help()
self.protocol_fields_fe_aware = metas["protocol_fields"]
self.field_engine_parameter_meta_definitions = metas["global_params_meta"]
+ def _append_intructions_help(self):
+ for instruction_meta in self.field_engine_instructions_meta:
+ clazz = eval(instruction_meta['id'])
+ instruction_meta['help'] = base64.b64encode(getdoc(clazz.__init__)).decode('ascii')
+
def _all_protocol_structs(self):
old_stdout = sys.stdout
sys.stdout = mystdout = StringIO()
self.field_engine_instruction_expressions.append(expression)
fe_parameters = field_engine_model_descriptor['global_parameters']
- split_by_field = None
- if "split_by_field" in fe_parameters:
- split_by_field = str(fe_parameters['split_by_field'])
cache_size = None
if "cache_size" in fe_parameters:
cache_size = int(fe_parameters['cache_size'])
- self.field_engine_instruction_expressions += self._get_instruction_expressions_footer(split_by_field, cache_size)
+ self.field_engine_instruction_expressions += self._get_instruction_expressions_footer(cache_size)
- pkt_builder = STLPktBuilder(pkt=pkt, vm=STLScVmRaw(instructions, split_by_field=split_by_field, cache_size=cache_size))
+ pkt_builder = STLPktBuilder(pkt=pkt, vm=STLScVmRaw(instructions, cache_size=cache_size))
pkt_builder.compile()
return pkt_builder.get_vm_data()
def _get_instruction_expressions_header(self):
return {'free_form': "vm = STLScVmRaw(["}
- def _get_instruction_expressions_footer(self, split_by_field, cache_size):
+ def _get_instruction_expressions_footer(self, cache_size):
instructions = []
footer = ']'
- if split_by_field != None and split_by_field != "":
- footer += ',{0}="{1}"'.format("split_by_field", split_by_field)
if cache_size != None and cache_size > 0:
footer += ',{0}={1}'.format("cache_size", cache_size)
})
res = {"protocols": protocols,
"feInstructionParameters": self.instruction_parameter_meta_definitions,
+ "feInstructions": self.field_engine_instructions_meta,
"feParameters": self.field_engine_parameter_meta_definitions}
return res
def test_get_definitions_all():
get_definitions(None)
- def_classnames = [pdef['id'] for pdef in get_definitions(None)['protocols']]
+ defs = get_definitions(None)
+ def_classnames = [pdef['id'] for pdef in defs['protocols']]
assert("IP" in def_classnames)
assert("Dot1Q" in def_classnames)
assert("TCP" in def_classnames)
+ fe_parameter_metas = defs['feInstructionParameters']
+ assert(fe_parameter_metas[0]['id'] == "max_value")
+ assert(fe_parameter_metas[1]['id'] == "min_value")
+ assert(fe_parameter_metas[2]['id'] == "step")
+ assert(fe_parameter_metas[3]['id'] == "op")
+ assert(fe_parameter_metas[4]['id'] == "fv_name")
+ assert(fe_parameter_metas[5]['id'] == "pkt_offset")
+ assert(fe_parameter_metas[6]['id'] == "offset")
+ assert(fe_parameter_metas[7]['id'] == "l3_offset")
+ assert(fe_parameter_metas[8]['id'] == "l4_offset")
+ assert(fe_parameter_metas[9]['id'] == "l4_type")
+
+ # All instructions should have a help description.
+ fe_instructions = defs['feInstructions']
+ for instruction in fe_instructions:
+ print(instruction['help'])
+ assert("help" in instruction)
+
def test_get_definitions_ether():
res = get_definitions(["Ether"])
protocols = res['protocols']
layer_def("Ether"),
layer_def("IP", src="16.0.0.1", dst="48.0.0.1")
]
- ip_instructions_model = {"field_engine":{'global_parameters': {}, 'instructions': [{'id': "IP", 'fields': [{"fieldId": "src", 'parameters': {'min_value': "0", 'max_value': "16.0.0.255", 'step': "1", 'op': "inc"}},
- {"fieldId": "ttl", 'parameters': {'min_value': "1", 'max_value': "17", 'step': "1", 'op': "inc"}}]}]}}
+ ip_instructions_model = {"field_engine": {'global_parameters': {}, 'instructions': [{'id': "IP", 'fields': [
+ {"fieldId": "src", 'parameters': {'min_value': "0", 'max_value': "16.0.0.255", 'step': "1", 'op': "inc"}},
+ {"fieldId": "ttl", 'parameters': {'min_value': "1", 'max_value': "17", 'step': "1", 'op': "inc"}}]}]}}
res = build_pkt_ex(ip_pkt_model, ip_instructions_model)
src_instruction = res['vm_instructions']['instructions'][0]
assert(src_instruction['min_value'] == 0)