+ def verify_decrypted6(self, p, rxs):
+ for rx in rxs:
+ self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
+ self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
+ self.assert_packet_checksums_valid(rx)
+
+ def verify_encrypted6(self, p, sa, rxs):
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
+ rx[IPv6].plen)
+ try:
+ decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
+ if not decrypt_pkt.haslayer(IPv6):
+ decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
+ self.assert_packet_checksums_valid(decrypt_pkt)
+ self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
+ self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
+ except:
+ self.logger.debug(ppp("Unexpected packet:", rx))
+ try:
+ self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
+ except:
+ pass
+ raise
+
+ def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
+ self.vapi.cli("clear errors")
+ self.vapi.cli("clear ipsec sa")
+
+ config_tun_params(p_in, self.encryption_type, self.tun_if)
+ send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
+ src=p_in.remote_tun_if_host,
+ dst=self.pg1.remote_ip6,
+ count=count)
+ self.send_and_assert_no_replies(self.tun_if, send_pkts)
+ self.logger.info(self.vapi.cli("sh punt stats"))
+
+ def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):