RX services - general API to allow addition of new features 93/5193/1
authorimarom <[email protected]>
Wed, 21 Dec 2016 15:49:38 +0000 (17:49 +0200)
committerimarom <[email protected]>
Wed, 21 Dec 2016 15:52:14 +0000 (17:52 +0200)
see trex_stl_lib/rx_services/trex_stl_rx_service_api.py

Signed-off-by: imarom <[email protected]>
scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/__init__.py [new file with mode: 0644]
scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_api.py [new file with mode: 0644]
scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_arp.py [new file with mode: 0644]
scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_icmp.py [new file with mode: 0644]
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_rx_features.py [deleted file]
scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py

diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/__init__.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_api.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_api.py
new file mode 100644 (file)
index 0000000..b090438
--- /dev/null
@@ -0,0 +1,199 @@
+
+import time
+
+# a generic abstract class for implementing RX services using the server
+class RXServiceAPI(object):
+
+    # specify for which layer this service is
+    LAYER_MODE_ANY = 0
+    LAYER_MODE_L2  = 1
+    LAYER_MODE_L3  = 2
+    
+    def __init__ (self, port, layer_mode = LAYER_MODE_ANY, queue_size = 100):
+        self.port = port
+        self.queue_size = queue_size
+        self.layer_mode = layer_mode
+
+    ################### virtual methods ######################
+    
+    def get_name (self):
+        """
+            returns the name of the service
+
+            :returns:
+                str
+
+        """
+
+        raise NotImplementedError()
+        
+    def pre_execute (self):
+        """
+            specific class code called before executing
+
+            :returns:
+                RC object
+
+        """
+        raise NotImplementedError()
+        
+    def generate_request (self):
+        """
+            generate a request to be sent to the server
+
+            :returns:
+                list of streams
+
+        """
+        raise NotImplementedError()
+
+    def on_pkt_rx (self, pkt, start_ts):
+        """
+            called for each packet arriving on RX
+
+            :parameters:
+                'pkt' - the packet received
+                'start_ts' - the time recorded when 'start' was called 
+                
+            :returns:
+                None for fetching more packets
+                RC object for terminating
+                
+           
+
+        """
+        raise NotImplementedError()
+
+        
+    def on_timeout_err (self, retries):
+        """
+            called when a timeout occurs
+
+            :parameters:
+                retries - how many times was the service retring before failing
+                
+            :returns:
+                RC object
+
+        """
+        raise NotImplementedError()
+
+        
+    ##################### API ######################
+    def execute (self, retries = 0):
+        
+        # sanity check
+        rc = self.__sanity()
+        if not rc:
+            return rc
+                                 
+        # first cleanup
+        rc = self.port.remove_all_streams()
+        if not rc:
+            return rc
+
+
+        # start the iteration
+        try:
+
+            # add the stream(s)
+            self.port.add_streams(self.generate_request())
+            rc = self.port.set_rx_queue(size = self.queue_size)
+            if not rc:
+                return rc
+
+            return self.__execute_internal(retries)
+
+        finally:
+            # best effort restore
+            self.port.remove_rx_queue()
+            self.port.remove_all_streams()
+
+
+    ##################### Internal ######################
+    def __sanity (self):
+        if not self.port.is_service_mode_on():
+            return self.port.err('port service mode must be enabled for performing {0}. Please enable service mode'.format(self.get_name()))
+
+        if self.layer_mode == RXServiceAPI.LAYER_MODE_L2:
+            if not self.port.is_l2_mode():
+                return self.port.err('{0} - requires L2 mode configuration'.format(self.get_name()))
+
+        elif self.layer_mode == RXServiceAPI.LAYER_MODE_L3:
+            if not self.port.is_l3_mode():
+                return self.port.err('{0} - requires L3 mode configuration'.format(self.get_name()))
+
+
+        # sanity
+        if self.port.is_active():
+            return self.port.err('{0} - port is active, please stop the port before executing command'.format(self.get_name()))
+
+        # call the specific class implementation
+        rc = self.pre_execute()
+        if not rc:
+            return rc
+            
+        return True
+        
+
+    # main resolve function
+    def __execute_internal (self, retries):
+
+        # retry for 'retries'
+        index = 0
+        while True:
+            rc = self.execute_iteration()
+            if rc is not None:
+                return rc
+
+            if index >= retries:
+                return self.on_timeout_err(retries)
+
+            index += 1
+            time.sleep(0.1)
+
+
+
+    def execute_iteration (self):
+
+        mult = {'op': 'abs', 'type' : 'percentage', 'value': 100}
+        rc = self.port.start(mul = mult, force = False, duration = -1, mask = 0xffffffff)
+        if not rc:
+            return rc
+
+        # save the start timestamp
+        self.start_ts = rc.data()['ts']
+
+        # block until traffic finishes
+        while self.port.is_active():
+            time.sleep(0.01)
+
+        return self.wait_for_rx_response()
+
+
+    def wait_for_rx_response (self):
+
+        # we try to fetch response for 5 times
+        polling = 5
+
+        while polling > 0:
+
+            # fetch the queue
+            rx_pkts = self.port.get_rx_queue_pkts()
+
+            # might be an error
+            if not rx_pkts:
+                return rx_pkts
+
+            # for each packet - examine it
+            for pkt in rx_pkts.data():
+                rc = self.on_pkt_rx(pkt, self.start_ts)
+                if rc is not None:
+                    return rc
+
+            if polling == 0:
+                return None
+
+            polling -= 1
+            time.sleep(0.1)
+
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_arp.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_arp.py
new file mode 100644 (file)
index 0000000..2c15931
--- /dev/null
@@ -0,0 +1,57 @@
+from .trex_stl_rx_service_api import RXServiceAPI
+
+from ..trex_stl_streams import STLStream, STLTXSingleBurst
+from ..trex_stl_packet_builder_scapy import STLPktBuilder
+
+from scapy.layers.l2 import Ether, ARP
+
+
+class RXServiceARP(RXServiceAPI):
+    
+    def __init__ (self, port_id):
+        super(RXServiceARP, self).__init__(port_id, layer_mode = RXServiceAPI.LAYER_MODE_L3)
+
+    def get_name (self):
+        return "ARP"
+        
+    def pre_execute (self):
+
+        self.dst = self.port.get_dst_addr()
+        self.src = self.port.get_src_addr()
+
+        return self.port.ok()
+
+    # return a list of streams for request
+    def generate_request (self):
+
+        base_pkt = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(psrc = self.src['ipv4'], pdst = self.dst['ipv4'], hwsrc = self.src['mac'])
+        s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) )
+
+        return [s1]
+
+
+    def on_pkt_rx (self, pkt, start_ts):
+        # convert to scapy
+        scapy_pkt = Ether(pkt['binary'])
+        
+        # if not ARP wait for the next one
+        if not 'ARP' in scapy_pkt:
+            return None
+
+        arp = scapy_pkt['ARP']
+
+        # check this is the right ARP (ARP reply with the address)
+        if (arp.op != 2) or (arp.psrc != self.dst['ipv4']):
+            return None
+
+
+        return self.port.ok({'psrc' : arp.psrc, 'hwsrc': arp.hwsrc})
+        
+        #return self.port.ok('Port {0} - Recieved ARP reply from: {1}, hw: {2}'.format(self.port.port_id, arp.psrc, arp.hwsrc))
+
+
+    def on_timeout_err (self, retries):
+        return self.port.err('failed to receive ARP response ({0} retries)'.format(retries))
+
+
+
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_icmp.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_icmp.py
new file mode 100644 (file)
index 0000000..486cd45
--- /dev/null
@@ -0,0 +1,85 @@
+from .trex_stl_rx_service_api import RXServiceAPI
+
+from ..trex_stl_streams import STLStream, STLTXSingleBurst
+from ..trex_stl_packet_builder_scapy import STLPktBuilder
+
+from scapy.layers.l2 import Ether
+from scapy.layers.inet import IP, ICMP
+
+
+class RXServiceICMP(RXServiceAPI):
+    
+    def __init__ (self, port, ping_ip, pkt_size):
+        
+        super(RXServiceICMP, self).__init__(port, layer_mode = RXServiceAPI.LAYER_MODE_L3)
+        self.ping_ip  = ping_ip
+        self.pkt_size = pkt_size
+
+    def get_name (self):
+        return "PING"
+        
+    def pre_execute (self):
+
+        if not self.port.is_resolved():
+            return self.port.err('ping - port has an unresolved destination, cannot determine next hop MAC address')
+
+        self.src = self.port.get_src_addr()
+        self.dst = self.port.get_dst_addr()
+
+
+        return self.port.ok()
+
+
+    # return a list of streams for request
+    def generate_request (self):
+
+        base_pkt = Ether(dst = self.dst['mac'])/IP(src = self.src['ipv4'], dst = self.ping_ip)/ICMP(type = 8)
+        pad = max(0, self.pkt_size - len(base_pkt))
+
+        base_pkt = base_pkt / (pad * 'x')
+
+        s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) )
+
+        self.base_pkt = base_pkt
+
+        return [s1]
+
+    def on_pkt_rx (self, pkt, start_ts):
+        
+        scapy_pkt = Ether(pkt['binary'])
+        if not 'ICMP' in scapy_pkt:
+            return None
+
+        ip = scapy_pkt['IP']
+        if ip.dst != self.src['ipv4']:
+            return None
+
+        icmp = scapy_pkt['ICMP']
+
+        dt = pkt['ts'] - start_ts
+
+        # echo reply
+        if icmp.type == 0:
+            # check seq
+            if icmp.seq != self.base_pkt['ICMP'].seq:
+                return None
+            return self.port.ok('Reply from {0}: bytes={1}, time={2:.2f}ms, TTL={3}'.format(ip.src, len(pkt['binary']), dt * 1000, ip.ttl))
+
+        # unreachable
+        elif icmp.type == 3:
+            # check seq
+            if icmp.payload.seq != self.base_pkt['ICMP'].seq:
+                return None
+            return self.port.ok('Reply from {0}: Destination host unreachable'.format(icmp.src))
+
+        else:
+            # skip any other types
+            #scapy_pkt.show2()
+            return None
+
+
+
+    # return the str of a timeout err
+    def on_timeout_err (self, retries):
+        return self.port.ok('Request timed out.')
+
index 946c79d..f86fff2 100755 (executable)
@@ -2991,19 +2991,20 @@ class STLClient(object):
         ports = self._validate_port_list(ports)
             
         self.logger.pre_cmd('Resolving destination on port(s) {0}:'.format(ports))
-        with self.logger.supress():
+        
+        with self.logger.supress(level = LoggerApi.VERBOSE_REGULAR_SYNC):
             rc = self.__resolve(ports, retries)
             
         self.logger.post_cmd(rc)
-        
+
+        if verbose:
+            for x in filter(bool, rc.data()):
+                self.logger.log(format_text("{0}".format(x), 'bold'))
+                
         if not rc:
             raise STLError(rc)
 
-        # print the ARP transaction
-        if verbose:
-            self.logger.log(rc)
-            self.logger.log('')
-            
+
             
         
     @__api_check(True)
index 3fe4c19..d4275cb 100644 (file)
@@ -4,7 +4,10 @@ from collections import namedtuple, OrderedDict
 from .trex_stl_packet_builder_scapy import STLPktBuilder
 from .trex_stl_streams import STLStream
 from .trex_stl_types import *
-from .trex_stl_rx_features import ARPResolver, PingResolver
+
+from .rx_services.trex_stl_rx_service_arp import RXServiceARP
+from .rx_services.trex_stl_rx_service_icmp import RXServiceICMP
+
 from . import trex_stl_stats
 from .utils.constants import FLOW_CTRL_DICT_REVERSED
 
@@ -953,17 +956,32 @@ class Port(object):
         
     @writeable
     def arp_resolve (self, retries):
-        if not self.is_service_mode_on():
-            return self.err('port service mode must be enabled for performing ARP resolution. Please enable service mode')
+        
+        # execute the ARP service
+        rc = RXServiceARP(self).execute(retries)
+        if not rc:
+            return rc
             
-        return ARPResolver(self).resolve(retries)
+        # fetch the data returned
+        arp_rc = rc.data()
+        
+        # first invalidate current ARP if exists
+        rc = self.invalidate_arp()
+        if not rc:
+            return rc
+
+        # update the port with L3 full configuration
+        rc = self.set_l3_mode(self.get_src_addr()['ipv4'], self.get_dst_addr()['ipv4'], arp_rc['hwsrc'])
+        if not rc:
+            return rc
+            
+        return self.ok('Port {0} - Recieved ARP reply from: {1}, hw: {2}'.format(self.port_id, arp_rc['psrc'], arp_rc['hwsrc']))
+            
+        
 
     @writeable
     def ping (self, ping_ipv4, pkt_size):
-        if not self.is_service_mode_on():
-            return self.err('port service mode must be enabled for performing ping. Please enable service mode')
-            
-        return PingResolver(self, ping_ipv4, pkt_size).resolve()
+        return RXServiceICMP(self, ping_ipv4, pkt_size).execute()
 
         
     ################# stats handler ######################
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_rx_features.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_rx_features.py
deleted file mode 100644 (file)
index 727451e..0000000
+++ /dev/null
@@ -1,251 +0,0 @@
-
-from .trex_stl_streams import STLStream, STLTXSingleBurst
-from .trex_stl_packet_builder_scapy import STLPktBuilder
-
-from scapy.layers.l2 import Ether, ARP
-from scapy.layers.inet import IP, ICMP
-
-import time
-# a generic abstract class for resolving using the server
-class Resolver(object):
-    def __init__ (self, port, queue_size = 100):
-        self.port = port
-     
-    # code to execute before sending any request - return RC object
-    def pre_send (self):
-        raise NotImplementedError()
-        
-    # return a list of streams for request
-    def generate_request (self):
-        raise NotImplementedError()
-        
-    # return None for more packets otherwise RC object
-    def on_pkt_rx (self, pkt):
-        raise NotImplementedError()
-    
-    # return value in case of timeout
-    def on_timeout_err (self, retries):
-        raise NotImplementedError()
-    
-    ##################### API ######################
-    def resolve (self, retries = 0):
-
-        # first cleanup
-        rc = self.port.remove_all_streams()
-        if not rc:
-            return rc
-
-        # call the specific class implementation
-        rc = self.pre_send()
-        if not rc:
-            return rc
-
-        # start the iteration
-        try:
-
-            # add the stream(s)
-            self.port.add_streams(self.generate_request())
-            rc = self.port.set_rx_queue(size = 100)
-            if not rc:
-                return rc
-            
-            return self.resolve_wrapper(retries)
-            
-        finally:
-            # best effort restore
-            self.port.remove_rx_queue()
-            self.port.remove_all_streams()
-                
-    
-    # main resolve function
-    def resolve_wrapper (self, retries):
-            
-        # retry for 'retries'
-        index = 0
-        while True:
-            rc = self.resolve_iteration()
-            if rc is not None:
-                return rc
-            
-            if index >= retries:
-                return self.on_timeout_err(retries)
-                
-            index += 1
-            time.sleep(0.1)
-            
-            
-
-    def resolve_iteration (self):
-        
-        mult = {'op': 'abs', 'type' : 'percentage', 'value': 100}
-        rc = self.port.start(mul = mult, force = False, duration = -1, mask = 0xffffffff)
-        if not rc:
-            return rc
-
-        # save the start timestamp
-        self.start_ts = rc.data()['ts']
-        
-        # block until traffic finishes
-        while self.port.is_active():
-            time.sleep(0.01)
-
-        return self.wait_for_rx_response()
-        
-             
-    def wait_for_rx_response (self):
-
-        # we try to fetch response for 5 times
-        polling = 5
-        
-        while polling > 0:
-            
-            # fetch the queue
-            rx_pkts = self.port.get_rx_queue_pkts()
-            
-            # might be an error
-            if not rx_pkts:
-                return rx_pkts
-                
-            # for each packet - examine it
-            for pkt in rx_pkts.data():
-                rc = self.on_pkt_rx(pkt)
-                if rc is not None:
-                    return rc
-                
-            if polling == 0:
-                return None
-                
-            polling -= 1
-            time.sleep(0.1)
-          
-        
-        
-        
-class ARPResolver(Resolver):
-    def __init__ (self, port_id):
-        super(ARPResolver, self).__init__(port_id)
-        
-    # before resolve
-    def pre_send (self):
-        
-        if not self.port.is_l3_mode():
-            return self.port.err("arp - port is not configured as L3 layer")
-            
-        self.dst = self.port.get_dst_addr()
-        self.src = self.port.get_src_addr()
-            
-        return self.port.invalidate_arp()
-        
-
-    # return a list of streams for request
-    def generate_request (self):
-                
-        base_pkt = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(psrc = self.src['ipv4'], pdst = self.dst['ipv4'], hwsrc = self.src['mac'])
-        s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) )
-
-        return [s1]
-
-
-    # return None in case more packets are needed else the status rc
-    def on_pkt_rx (self, pkt):
-        scapy_pkt = Ether(pkt['binary'])
-        if not 'ARP' in scapy_pkt:
-            return None
-
-        arp = scapy_pkt['ARP']
-        
-        # check this is the right ARP (ARP reply with the address)
-        if (arp.op != 2) or (arp.psrc != self.dst['ipv4']):
-            return None
-
-        
-        # update the port with L3 full configuration
-        rc = self.port.set_l3_mode(self.src['ipv4'], self.dst['ipv4'], arp.hwsrc)
-        if not rc:
-            return rc
-            
-        return self.port.ok('Port {0} - Recieved ARP reply from: {1}, hw: {2}'.format(self.port.port_id, arp.psrc, arp.hwsrc))
-        
-
-    def on_timeout_err (self, retries):
-        return self.port.err('failed to receive ARP response ({0} retries)'.format(retries))
-
-
-        
-    #################### ping resolver ####################
-           
-class PingResolver(Resolver):
-    def __init__ (self, port, ping_ip, pkt_size):
-        super(PingResolver, self).__init__(port)
-        self.ping_ip = ping_ip
-        self.pkt_size = pkt_size
-                
-    def pre_send (self):
-        if not self.port.is_l3_mode():
-            return self.port.err('ping - port is not configured as L3 layer')
-        
-        if not self.port.is_resolved():
-            return self.port.err('ping - port has an unresolved destination, cannot determine next hop MAC address')
-            
-        self.src = self.port.get_src_addr()
-        self.dst = self.port.get_dst_addr()
-        
-        
-        return self.port.ok()
-            
-        
-    # return a list of streams for request
-    def generate_request (self):
-                    
-        base_pkt = Ether(dst = self.dst['mac'])/IP(src = self.src['ipv4'], dst = self.ping_ip)/ICMP(type = 8)
-        pad = max(0, self.pkt_size - len(base_pkt))
-        
-        base_pkt = base_pkt / (pad * 'x')
-        
-        s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) )
-
-        self.base_pkt = base_pkt
-        
-        return [s1]
-        
-    # return None for more packets otherwise RC object
-    def on_pkt_rx (self, pkt):
-        scapy_pkt = Ether(pkt['binary'])
-        if not 'ICMP' in scapy_pkt:
-            return None
-        
-        ip = scapy_pkt['IP']
-        if ip.dst != self.src['ipv4']:
-            return None
-            
-        icmp = scapy_pkt['ICMP']
-        
-        dt = pkt['ts'] - self.start_ts
-        
-        # echo reply
-        if icmp.type == 0:
-            # check seq
-            if icmp.seq != self.base_pkt['ICMP'].seq:
-                return None
-            return self.port.ok('Reply from {0}: bytes={1}, time={2:.2f}ms, TTL={3}'.format(ip.src, len(pkt['binary']), dt * 1000, ip.ttl))
-            
-        # unreachable
-        elif icmp.type == 3:
-            # check seq
-            if icmp.payload.seq != self.base_pkt['ICMP'].seq:
-                return None
-            return self.port.ok('Reply from {0}: Destination host unreachable'.format(icmp.src))
-            
-        else:
-            # skip any other types
-            #scapy_pkt.show2()
-            return None
-            
-            
-    
-    # return the str of a timeout err
-    def on_timeout_err (self, retries):
-        return self.port.ok('Request timed out.')
index a60a7ed..0230db2 100644 (file)
@@ -64,7 +64,7 @@ class RC():
                 err_count += 1
                 if len(err_list) < show_count:
                     err_list.append(format_text(x, 'bold'))
-            s = '\n' if len(err_list) > 1 else ''
+            s = '\n'
             if err_count > show_count:
                 s += format_text('Occurred %s errors, showing first %s:\n' % (err_count, show_count), 'bold')
             s += '\n'.join(err_list)