# scapy server unit test\r
\r
import sys,os\r
-scapy_server_path = os.path.abspath(os.path.join(os.pardir, 'trex_control_plane', 'stl', 'examples'))\r
-print scapy_server_path\r
+scapy_server_path = os.path.abspath(os.path.join(os.pardir, 'trex_control_plane', 'stl', 'services','scapy_server'))\r
stl_pathname = os.path.abspath(os.path.join(os.pardir, os.pardir, 'trex_control_plane','stl'))\r
sys.path.append(scapy_server_path)\r
sys.path.append(stl_pathname)\r
\r
\r
-\r
#import stl_path\r
import trex_stl_lib\r
from trex_stl_lib.api import *\r
from copy import deepcopy\r
\r
import tempfile\r
-import md5\r
-\r
-import outer_packages\r
+import hashlib\r
from platform_cmd_link import *\r
import functional_general_test\r
from nose.tools import assert_equal\r
from nose.tools import assert_not_equal\r
from nose.tools import nottest\r
from nose.plugins.attrib import attr\r
-\r
+import binascii\r
from scapy_server import *\r
\r
\r
class scapy_server_tester(functional_general_test.CGeneralFunctional_Test):\r
def setUp(self):\r
- pass\r
+ self.s = scapy_server()\r
\r
def tearDown(self):\r
pass\r
test for db and field update - checking check_update_test()\r
'''\r
def test_check_update(self):\r
- allData = get_all()\r
- allDataParsed = json.loads(allData)\r
- dbMD5 = allDataParsed['DB_md5']\r
+ allData = self.s.get_all()\r
+ allDataParsed = allData\r
+ dbMD5 = allDataParsed['db_md5']\r
fieldMD5 = allDataParsed['fields_md5']\r
- result = check_update(dbMD5,fieldMD5)\r
- result = json.loads(result)\r
- '''\r
- if result[0][0] == 'Success' and result[1][0] == 'Success':\r
- print 'check_update_test [md5 comparison test]: Success'\r
- else:\r
- print 'check_update_test [md5 comparison test]: md5s of fields or db do not match the source'\r
- '''\r
- resT1 = (result[0][0] == 'Success' and result[1][0] == 'Success')\r
+ resT1 = self.s.check_update(dbMD5,fieldMD5)\r
assert_equal(resT1,True)\r
-\r
- result = check_update(json.dumps('falseMD5'),json.dumps('falseMD5'))\r
- result = json.loads(result)\r
- '''\r
- if result[0][0] == 'Fail' and result[1][0] == 'Fail':\r
- print 'check_update_test [wrong md5s return failure]: Success'\r
- else:\r
- print 'check_update_test [wrong md5s return failure]: md5s of fields or db return Success for invalid value'\r
- '''\r
- resT2 = (result[0][0] == 'Fail' and result[1][0] == 'Fail')\r
+ try:\r
+ resT2 = False\r
+ resT2 = self.s.check_update('falseMD5','falseMD5')\r
+ except Exception as e:\r
+ if e.message == "Fields DB is not up to date":\r
+ resT2 = True\r
+ else:\r
+ raise\r
assert_equal(resT2,True)\r
-\r
- result = check_update(dbMD5,json.dumps('falseMD5'))\r
- result = json.loads(result)\r
- '''\r
- if result[0][0] == 'Fail' and result[1][0] == 'Success':\r
- print 'check_update_test [wrong field md5 returns error, correct db md5]: Success'\r
- else:\r
- print 'md5 of field return Success for invalid value'\r
- '''\r
- resT3 = (result[0][0] == 'Fail' and result[1][0] == 'Success')\r
+ try:\r
+ resT3 = False\r
+ resT3 = self.s.check_update(dbMD5,'falseMD5')\r
+ except Exception as e:\r
+ if e.message == "Fields DB is not up to date":\r
+ resT3 = True\r
+ else:\r
+ raise\r
assert_equal(resT3,True)\r
-\r
- result = check_update(json.dumps('falseMD5'),fieldMD5)\r
- result = json.loads(result)\r
- '''\r
- if result[0][0] == 'Success' and result[1][0] == 'Fail':\r
- print 'check_update_test [wrong db md5 returns error, correct field md5]: Success'\r
- else:\r
- print 'md5 of db return Success for invalid value'\r
- '''\r
- resT4 = (result[0][0] == 'Success' and result[1][0] == 'Fail')\r
+ try:\r
+ resT4 = False\r
+ resT4 = self.s.check_update('falseMD5',fieldMD5)\r
+ except Exception as e:\r
+ if e.message == "Protocol DB is not up to date":\r
+ resT4 = True\r
+ else:\r
+ raise\r
assert_equal(resT4,True)\r
\r
\r
def test_check_updating_db(self):\r
#assume i got old db\r
- result = check_update(json.dumps('falseMD5'),json.dumps('falseMD5'))\r
- result = json.loads(result)\r
- if result[0][0] == 'Fail' or result[1][0] == 'Fail':\r
- newAllData = get_all()\r
- allDataParsed = json.loads(newAllData)\r
- dbMD5 = allDataParsed['DB_md5']\r
- fieldMD5 = allDataParsed['fields_md5']\r
- result = check_update(dbMD5,fieldMD5)\r
- result = json.loads(result)\r
- '''\r
- if result[0][0] == 'Success' and result[1][0] == 'Success':\r
- print 'check_updating_db [got old db and updated it]: Success'\r
- else:\r
- print'check_updating_db [got old db and updated it]: FAILED'\r
- '''\r
- resT1 = (result[0][0] == 'Success' and result[1][0] == 'Success')\r
- assert_equal(resT1,True)\r
+ try:\r
+ result = self.s.check_update('falseMD5','falseMD5')\r
+ except:\r
+ newAllData = self.s.get_all()\r
+ dbMD5 = newAllData['db_md5']\r
+ fieldMD5 = newAllData['fields_md5']\r
+ result = self.s.check_update(dbMD5,fieldMD5)\r
+ assert_equal(result,True)\r
else:\r
raise Exception("scapy_server_test: check_updating_db failed")\r
\r
\r
# testing pkt = Ether()/IP()/TCP()/"test" by defualt\r
- def test_build_packet(self,original_pkt = json.dumps('Ether()/IP()/TCP()/"test"')):\r
+ def test_build_packet(self,original_pkt='Ether()/IP()/TCP()/"test"'):\r
test_pkt = original_pkt\r
- original_pkt = eval(json.loads(original_pkt))\r
- test_res = build_pkt(test_pkt)\r
- test_res = json.loads(test_res)\r
- test_pkt_buffer = json.loads(test_res[2])\r
- test_pkt_buffer = test_pkt_buffer.decode('base64')\r
- '''\r
- if test_pkt_buffer == str(original_pkt):\r
- print 'build_pkt test [scapy packet and string-defined packet comparison]: Success'\r
- else:\r
- print 'build_pkt test [scapy packet and string-defined packet comparison]: FAILED'\r
- '''\r
- resT1 = (test_pkt_buffer == str(original_pkt))\r
+ original_pkt = eval(original_pkt)\r
+ test_res = self.s.build_pkt(test_pkt)\r
+ test_pkt_buffer = test_res[1]\r
+ resT1 = (test_pkt_buffer == binascii.b2a_base64(str(original_pkt)))\r
assert_equal(resT1,True)\r
\r
\r
#testing offsets of packet IP() by default\r
- def test_get_all_offsets(self,original_pkt = json.dumps('IP()')):\r
+ def test_get_all_offsets(self,original_pkt = 'IP()'):\r
test_pkt = original_pkt\r
- original_pkt = eval(json.loads(original_pkt))\r
- tested_offsets_by_layers = get_all_pkt_offsets(test_pkt)\r
- tested_offsets_by_layers = json.loads(tested_offsets_by_layers)\r
- layers = json.loads(test_pkt).split('/')\r
+ original_pkt = eval(original_pkt)\r
+ tested_offsets_by_layers = self.s.get_all_pkt_offsets(test_pkt)\r
+ layers = (test_pkt).split('/')\r
offsets_by_layers = {}\r
for layer in layers:\r
fields_list = []\r
size = len(original_pkt)\r
fields_list.append([f.name, f.offset, size])\r
original_pkt = original_pkt.payload\r
- offsets_by_layers[layer] = fields_list\r
- '''\r
- if tested_offsets_by_layers == offsets_by_layers:\r
- print 'Success'\r
- else:\r
- print 'test_get_all_offsets[comparison of offsets in given packet]: FAILED'\r
- '''\r
+ layer_name = layer.partition('(')[0] #clear layer name to include only alpha-numeric\r
+ layer_name = re.sub(r'\W+', '',layer_name)\r
+ offsets_by_layers[layer_name] = fields_list\r
resT1 = (tested_offsets_by_layers == offsets_by_layers)\r
assert_equal(resT1,True)\r
\r
def test_multi_packet(self):\r
- e0 = json.dumps('Ether()')\r
- e1 = json.dumps('Ether()/IP()')\r
- e2 = json.dumps('TCP()')\r
- e3 = json.dumps('UDP()')\r
- e4 = json.dumps('Ether()/IP()/TCP()/"test"')\r
- e5 = json.dumps('Ether()/IP()/UDP()')\r
+ e0 = 'Ether()'\r
+ e1 = 'Ether()/IP()'\r
+ e2 = 'TCP()'\r
+ e3 = 'UDP()'\r
+ e4 = 'Ether()/IP()/TCP()/"test"'\r
+ e5 = 'Ether()/IP()/UDP()'\r
packets = [e0,e1,e2,e3,e4,e5]\r
-\r
for packet in packets:\r
self.test_get_all_offsets(packet)\r
\r