1)added feature: running the entire test via RPC (over ZMQ) on a "remote" scapy server.
authoritraviv <[email protected]>
Wed, 10 Aug 2016 12:29:10 +0000 (15:29 +0300)
committeritraviv <[email protected]>
Wed, 10 Aug 2016 12:29:10 +0000 (15:29 +0300)
2) the mentioned above server is implemented as a thread running on the local machine,so in this commit a thread mechanism is added

scripts/automation/regression/functional_tests/scapy_server_test.py

index 9e64f1a..ccf0b75 100755 (executable)
@@ -25,14 +25,16 @@ from scapy_service import *
 from pprint import pprint\r
 import zmq\r
 import json\r
+import scapy_zmq_server\r
+import threading\r
 \r
 \r
 class Scapy_server_wrapper():\r
-    def __init__(self):\r
+    def __init__(self,dest_scapy_port=5555,server_ip_address='localhost'):\r
         self.context = zmq.Context()\r
         self.socket = self.context.socket(zmq.REQ)\r
-        self.dest_scapy_port =5555\r
-        self.socket.connect("tcp://10.56.216.133:"+str(self.dest_scapy_port)) #ip address of csi-trex-11\r
+        self.dest_scapy_port =dest_scapy_port\r
+        self.socket.connect("tcp://"+str(server_ip_address)+":"+str(self.dest_scapy_port)) #ip address of csi-trex-11\r
 \r
     def call_method(self,method_name,method_params):\r
         json_rpc_req = { "jsonrpc":"2.0","method": method_name ,"params": method_params, "id":"1"}\r
@@ -204,10 +206,27 @@ class scapy_service_tester(functional_general_test.CGeneralFunctional_Test):
 \r
 \r
 \r
+class scapy_server_thread(threading.Thread):\r
+    def __init__(self,thread_id,server_port=5555):\r
+        threading.Thread.__init__(self)\r
+        self.thread_id = thread_id\r
+        self.server_port = server_port\r
 \r
+    def run(self):\r
+        print '\nStarted scapy thread server'\r
+        scapy_zmq_server.main(self.server_port)\r
+        print 'Thread server closed'\r
+\r
+# Scapy_server_wrapper is the CLIENT for the scapy server, it wraps the CLIENT: its default port is set to 5555, default server ip set to localhost\r
 class scapy_server_tester(scapy_service_tester):\r
     def setUp(self):\r
-        self.s = Scapy_server_wrapper()\r
+        self.thread1 = scapy_server_thread(thread_id=1)\r
+        self.thread1.start()\r
+        self.s = Scapy_server_wrapper(dest_scapy_port=5555,server_ip_address='localhost') \r
+\r
+    def tearDown(self):\r
+        self.s.call_method('shut_down',[])\r
+        self.thread1.join()\r
 \r
 \r
 \r