- def verify_capture(self, dst_if, capture_pg1, capture_pg2):
- last_info = dict()
- for i in self.interfaces:
- last_info[i.sw_if_index] = None
- dst_sw_if_index = dst_if.sw_if_index
- self.assertEqual(
- len(capture_pg1),
- len(capture_pg2),
- "Different number of outgoing and mirrored packets : %u != %u" %
- (len(capture_pg1),
- len(capture_pg2)))
- for pkt_pg1, pkt_pg2 in zip(capture_pg1, capture_pg2):
- try:
- ip1 = pkt_pg1[IP]
- udp1 = pkt_pg1[UDP]
- raw1 = pkt_pg1[Raw]
-
- if pkt_pg1[Ether] != pkt_pg2[Ether]:
- self.logger.error("Different ethernet header of "
- "outgoing and mirrored packet")
- raise
- if ip1 != pkt_pg2[IP]:
- self.logger.error(
- "Different ip header of outgoing and mirrored packet")
- raise
- if udp1 != pkt_pg2[UDP]:
- self.logger.error(
- "Different udp header of outgoing and mirrored packet")
- raise
- if raw1 != pkt_pg2[Raw]:
- self.logger.error(
- "Different raw data of outgoing and mirrored packet")
- raise
-
- payload_info = self.payload_to_info(str(raw1))
- packet_index = payload_info.index
- self.assertEqual(payload_info.dst, dst_sw_if_index)
- self.logger.debug(
- "Got packet on port %s: src=%u (id=%u)" %
- (dst_if.name, payload_info.src, packet_index))
- next_info = self.get_next_packet_info_for_interface2(
- payload_info.src, dst_sw_if_index,
- last_info[payload_info.src])
- last_info[payload_info.src] = next_info
- self.assertTrue(next_info is not None)
- self.assertEqual(packet_index, next_info.index)
- saved_packet = next_info.data
- # Check standard fields
- self.assertEqual(ip1.src, saved_packet[IP].src)
- self.assertEqual(ip1.dst, saved_packet[IP].dst)
- self.assertEqual(udp1.sport, saved_packet[UDP].sport)
- self.assertEqual(udp1.dport, saved_packet[UDP].dport)
- except:
- self.logger.error("Unexpected or invalid packets:")
- self.logger.error(ppp("pg1 packet:", pkt_pg1))
- self.logger.error(ppp("pg2 packet:", pkt_pg2))
- raise
- for i in self.interfaces:
- remaining_packet = self.get_next_packet_info_for_interface2(
- i, dst_sw_if_index, last_info[i.sw_if_index])
- self.assertTrue(remaining_packet is None,
- "Port %u: Packet expected from source %u didn't"
- " arrive" % (dst_sw_if_index, i.sw_if_index))
+ def verify_capture(self, cap1, cap2):
+ self.assertEqual(len(cap1), len(cap2),
+ "Different number of sent and mirrored packets :"
+ "%u != %u" % (len(cap1), len(cap2)))
+
+ pkts1 = [(pkt[Ether] / pkt[IP] / pkt[UDP]) for pkt in cap1]
+ pkts2 = [(pkt[Ether] / pkt[IP] / pkt[UDP]) for pkt in cap2]
+
+ self.assertEqual(pkts1.sort(), pkts2.sort())