+ def verify_capture_no_translation(self, capture, ingress_if, egress_if):
+ """
+ Verify captured packet that don't have to be translated
+
+ :param capture: Captured packets
+ :param ingress_if: Ingress interface
+ :param egress_if: Egress interface
+ """
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
+ self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
+ if packet.haslayer(TCP):
+ self.assertEqual(packet[TCP].sport, self.tcp_port_in)
+ elif packet.haslayer(UDP):
+ self.assertEqual(packet[UDP].sport, self.udp_port_in)
+ else:
+ self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(inside network):", packet))
+ raise
+
+ def verify_ipfix_nat44_ses(self, data):
+ """
+ Verify IPFIX NAT44 session create/delete event
+
+ :param data: Decoded IPFIX data records
+ """
+ nat44_ses_create_num = 0
+ nat44_ses_delete_num = 0
+ self.assertEqual(6, len(data))
+ for record in data:
+ # natEvent
+ self.assertIn(ord(record[230]), [4, 5])
+ if ord(record[230]) == 4:
+ nat44_ses_create_num += 1
+ else:
+ nat44_ses_delete_num += 1
+ # sourceIPv4Address
+ self.assertEqual(self.pg0.remote_ip4n, record[8])
+ # postNATSourceIPv4Address
+ self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
+ record[225])
+ # ingressVRFID
+ self.assertEqual(struct.pack("!I", 0), record[234])
+ # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
+ if IP_PROTOS.icmp == ord(record[4]):
+ self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
+ self.assertEqual(struct.pack("!H", self.icmp_id_out),
+ record[227])
+ elif IP_PROTOS.tcp == ord(record[4]):
+ self.assertEqual(struct.pack("!H", self.tcp_port_in),
+ record[7])
+ self.assertEqual(struct.pack("!H", self.tcp_port_out),
+ record[227])
+ elif IP_PROTOS.udp == ord(record[4]):
+ self.assertEqual(struct.pack("!H", self.udp_port_in),
+ record[7])
+ self.assertEqual(struct.pack("!H", self.udp_port_out),
+ record[227])
+ else:
+ self.fail("Invalid protocol")
+ self.assertEqual(3, nat44_ses_create_num)
+ self.assertEqual(3, nat44_ses_delete_num)
+
+ def verify_ipfix_addr_exhausted(self, data):
+ """
+ Verify IPFIX NAT addresses event
+
+ :param data: Decoded IPFIX data records
+ """
+ self.assertEqual(1, len(data))
+ record = data[0]
+ # natEvent
+ self.assertEqual(ord(record[230]), 3)
+ # natPoolID
+ self.assertEqual(struct.pack("!I", 0), record[283])
+