3 from logging import error
4 from scapy.utils import wrpcap, rdpcap
5 from vpp_interface import VppInterface
8 class VppPGInterface(VppInterface):
10 VPP packet-generator interface
15 """packet-generator interface index assigned by VPP"""
20 """pcap file path - captured packets"""
25 """ pcap file path - injected packets"""
29 def capture_cli(self):
30 """CLI string to start capture on this interface"""
31 return self._capture_cli
35 """capture name for this interface"""
40 """CLI string to load the injected packets"""
41 return self._input_cli
44 def in_history_counter(self):
45 """Self-incrementing counter used when renaming old pcap files"""
46 v = self._in_history_counter
47 self._in_history_counter += 1
51 def out_history_counter(self):
52 """Self-incrementing counter used when renaming old pcap files"""
53 v = self._out_history_counter
54 self._out_history_counter += 1
57 def post_init_setup(self):
58 """ Perform post-init setup for super class and add our own setup """
59 super(VppPGInterface, self).post_init_setup()
60 self._out_file = "pg%u_out.pcap" % self.sw_if_index
61 self._out_path = self.test.tempdir + "/" + self._out_file
62 self._in_file = "pg%u_in.pcap" % self.sw_if_index
63 self._in_path = self.test.tempdir + "/" + self._in_file
64 self._capture_cli = "packet-generator capture pg%u pcap %s" % (
65 self.pg_index, self.out_path)
66 self._cap_name = "pcap%u" % self.sw_if_index
67 self._input_cli = "packet-generator new pcap %s source pg%u name %s" % (
68 self.in_path, self.pg_index, self.cap_name)
70 def __init__(self, test, pg_index):
71 """ Create VPP packet-generator interface """
72 self._in_history_counter = 0
73 self._out_history_counter = 0
74 self._pg_index = pg_index
76 r = self.test.vapi.pg_create_interface(self.pg_index)
77 self._sw_if_index = r.sw_if_index
78 self.post_init_setup()
80 def enable_capture(self):
81 """ Enable capture on this packet-generator interface"""
83 if os.path.isfile(self.out_path):
84 os.rename(self.out_path,
85 "%s/history.[timestamp:%f].[%s-counter:%04d].%s" %
89 self.out_history_counter,
93 # FIXME this should be an API, but no such exists atm
94 self.test.vapi.cli(self.capture_cli)
96 def add_stream(self, pkts):
98 Add a stream of packets to this packet-generator
100 :param pkts: iterable packets
104 if os.path.isfile(self.in_path):
105 os.rename(self.in_path,
106 "%s/history.[timestamp:%f].[%s-counter:%04d].%s" %
110 self.in_history_counter,
114 wrpcap(self.in_path, pkts)
115 # FIXME this should be an API, but no such exists atm
116 self.test.vapi.cli(self.input_cli)
117 self.test.pg_streams.append(self.cap_name)
118 self.test.vapi.cli("trace add pg-input %d" % len(pkts))
120 def get_capture(self):
124 :returns: iterable packets
127 output = rdpcap(self.out_path)
128 except IOError: # TODO
129 error("File %s does not exist, probably because no"
130 " packets arrived" % self.out_path)