--- /dev/null
+import stl_path
+from trex_stl_lib.api import *
+
+"""
+An example on how to use TRex for functional tests
+
+It can be used for various tasks and can replace simple Pagent/Scapy
+low rate tests
+"""
+
+def test_dot1q (c, rx_port, tx_port):
+
+ # activate service mode on RX code
+ c.set_service_mode(ports = rx_port)
+
+ # generate a simple Dot1Q
+ pkt = Ether() / Dot1Q(vlan = 100) / IP()
+
+ # start a capture
+ capture = c.start_capture(rx_ports = rx_port)
+
+ # push the Dot1Q packet to TX port... we need 'force' because this is under service mode
+ print('\nSending 1 Dot1Q packet(s) on port {}'.format(tx_port))
+
+ c.push_packets(ports = tx_port, pkts = pkt, force = True)
+ c.wait_on_traffic(ports = tx_port)
+
+ rx_pkts = []
+ c.stop_capture(capture_id = capture['id'], output = rx_pkts)
+
+ print('\nRecived {} packets on port {}:\n'.format(len(rx_pkts), rx_port))
+
+ c.set_service_mode(ports = rx_port, enabled = False)
+
+ # got back one packet
+ assert(len(rx_pkts) == 1)
+ rx_scapy_pkt = Ether(rx_pkts[0]['binary'])
+
+ # it's a Dot1Q with the same VLAN
+ assert('Dot1Q' in rx_scapy_pkt)
+ assert(rx_scapy_pkt.vlan == 100)
+
+
+ rx_scapy_pkt.show2()
+
+
+
+def main ():
+
+ # create a client
+ c = STLClient()
+
+ try:
+ # connect to the server
+ c.connect()
+
+ # there should be at least two ports connected
+ tx_port, rx_port = stl_map_ports(c)['bi'][0]
+ c.reset(ports = [tx_port, rx_port])
+
+ # call the test
+ test_dot1q(c, tx_port, rx_port)
+
+
+ except STLError as e:
+ print(e)
+
+ finally:
+ c.stop()
+ c.remove_all_captures()
+ c.set_service_mode(enabled = False)
+ c.disconnect()
+
+
+
+if __name__ == '__main__':
+ main()
+
from collections import namedtuple, defaultdict
from yaml import YAMLError
+from contextlib import contextmanager
import time
import datetime
import re
import random
import json
import traceback
+import tempfile
import os.path
############################ logger #############################
self.remove_all_streams(ports = ports)
id_list = self.add_streams(profile.get_streams(), ports)
-
- return self.start(ports = ports, duration = duration)
+
+ return self.start(ports = ports, duration = duration, force = force)
else:
if profile_b:
self.add_streams(profile_b.get_streams(), slave)
- return self.start(ports = all_ports, duration = duration)
+ return self.start(ports = all_ports, duration = duration, force = force)
+
+
+
+
+ @__api_check(True)
+ def push_packets (self,
+ pkts,
+ ports = None,
+ ipg_usec = 100,
+ count = 1,
+ duration = -1,
+ force = False,
+ vm = None):
+
+ """
+ Pushes a list of packets to the server
+ a 'packet' can be anything with a bytes representation
+ such as Scapy object, a simple string, a byte array and etc.
+
+ Total size, as for PCAP pushing is limited to 1MB
+ unless 'force' is specified
+
+ the list of packets will be saved to a temporary file
+ which will be deleted when the function exists
+
+ :parameters:
+ pkts : list or
+ PCAP filename (accessible locally)
+
+ ports : list
+ Ports on which to execute the command
+
+ ipg_usec : float
+ Inter-packet gap in microseconds.
+
+ count: int
+ How many times to transmit the list
+ duration: float
+ Limit runtime by duration in seconds
+ force: bool
+ Ignore size limit - push any size to the server
+ vm: list of VM instructions
+ VM instructions to apply for every packet
+ :raises:
+ + :exc:`STLError`
+ """
+ validate_type('ipg_usec', ipg_usec, (float, int, type(None)))
+ if ipg_usec < 0:
+ raise STLError("'ipg_usec' should not be negative")
+
+ # create a temporary file while will be deleted when leaving scope
+ with tempfile.NamedTemporaryFile(delete = True) as f:
+
+ # write packets to file
+ writer = RawPcapWriter(f.name, linktype = 1)
+ writer._write_header(None)
+
+ # write to the file
+ for i, pkt in enumerate(listify(pkts)):
+ ts = (ipg_usec * i) / 1.0e6
+ ts_sec, ts_usec = sec_split_usec(ts)
+ writer._write_packet(bytes(pkt), sec = ts_sec, usec = ts_usec)
+
+ # close the writer
+ writer.close()
+
+ # now inject the file
+ self.push_pcap(f.name,
+ ports = ports,
+ ipg_usec = ipg_usec,
+ speedup = 1,
+ count = count,
+ duration = duration,
+ force = force,
+ vm = vm,
+ packet_hook = None,
+ is_dual = False,
+ min_ipg_usec = None)
+
+
@__api_check(True)
def validate (self, ports = None, mult = "1", duration = -1, total = False):
if not rc:
raise STLError(rc)
-
+ @contextmanager
+ def service_mode (self, ports):
+ self.set_service_mode(ports = ports)
+ try:
+ yield
+ finally:
+ self.set_service_mode(ports = ports, enabled = False)
+
+
@__api_check(True)
def resolve (self, ports = None, retries = 0, verbose = True):
"""
capture_id: int
an active capture ID to stop
- output: None/ str / list
+ output: None / str / list
if output is None - all the packets will be discarded
if output is a 'str' - it will be interpeted as output filename
if it is a list, the API will populate the list with packet objects
+ in case 'output' is a list, each element in the list is an object
+ containing:
+ 'binary' - binary bytes of the packet
+ 'origin' - RX or TX origin
+ 'ts' - timestamp relative to the start of the capture
+ 'index' - order index in the capture
+ 'port' - on which port did the packet arrive or was transmitted from
+
:raises:
+ :exe:'STLError'
def __str__ (self):
- fname = os.path.split(self.tb[-2][0])[1]
- lineno = self.tb[-2][1]
- func = self.tb[-2][2]
- src = self.tb[-2][3]
-
- s = "\n******\n"
- s += "Error at {0}:{1} - '{2}'\n\n".format(format_text(fname, 'bold'), format_text(lineno, 'bold'), format_text(src.strip(), 'bold'))
- s += "specific error:\n\n{0}\n".format(format_text(self.msg, 'bold'))
+ s = format_text("\n******\n", 'bold')
+ s += format_text('\nSummary error report:\n\n', 'underline')
+ s += format_text(self.msg + '\n', 'bold')
+
+ s += format_text("\nFull error report:\n\n", 'underline')
+
+ for line in reversed(self.tb):
+ fname, lineno, func, src = os.path.split(line[0])[1], line[1], line[2], line[3]
+ s += " {:}:{:<20} - '{}'\n".format(format_text(fname, 'bold'), format_text(lineno, 'bold'), format_text(src.strip(), 'bold'))
return s