pkts = []
for i in range(0, 257):
remote_dst_host = choice(dst_ip_if.remote_hosts)
- info = self.create_packet_info(
- src_ip_if.sw_if_index, dst_ip_if.sw_if_index)
+ info = self.create_packet_info(src_ip_if, dst_ip_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_ip_if.local_mac, src=src_ip_if.remote_mac) /
IP(src=src_ip_if.remote_ip4,
packet_sizes):
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(
- src_ip_if.sw_if_index, dst_ip_if.sw_if_index)
+ info = self.create_packet_info(src_ip_if, dst_ip_if)
payload = self.info_to_payload(info)
host = choice(src_l2_if.remote_hosts)
p = (Ether(src=host.mac,
- dst = src_ip_if.local_mac) /
+ dst=src_ip_if.local_mac) /
IP(src=host.ip4,
dst=dst_ip_if.remote_ip4) /
UDP(sport=1234, dport=1234) /
ip = packet[IP]
udp = packet[IP][UDP]
payload_info = self.payload_to_info(str(packet[IP][UDP][Raw]))
- packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rcvd1 = self.pg0.get_capture()
- rcvd2 = self.pg1.get_capture()
+ packet_count = self.get_packet_count_for_if_idx(self.loop0.sw_if_index)
+
+ rcvd1 = self.pg0.get_capture(packet_count)
+ rcvd2 = self.pg1.get_capture(packet_count)
self.verify_capture(self.loop0, self.pg2, rcvd1)
self.verify_capture(self.loop0, self.pg2, rcvd2)
rcvd = self.pg2.get_capture()
self.verify_capture_l2_to_ip(self.pg2, self.loop0, rcvd)
- self.assertEqual(len(stream1) + len(stream2), len(rcvd.res))
-
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)