import struct
from scapy.layers.inet import IP, ICMP, TCP, UDP
-from scapy.layers.ipsec import SecurityAssociation
+from scapy.layers.ipsec import SecurityAssociation, ESP
from scapy.layers.l2 import Ether, Raw
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
# replayed packets are dropped
self.send_and_assert_no_replies(self.tra_if, pkt * 3)
- self.assert_packet_counter_equal(
+ self.assert_error_counter_equal(
'/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3)
# the window size is 64 packets
# a packet that does not decrypt does not move the window forward
bogus_sa = SecurityAssociation(self.encryption_type,
- p.vpp_tra_spi)
+ p.vpp_tra_spi,
+ crypt_algo=p.crypt_algo,
+ crypt_key=p.crypt_key[::-1],
+ auth_algo=p.auth_algo,
+ auth_key=p.auth_key[::-1])
pkt = (Ether(src=self.tra_if.remote_mac,
dst=self.tra_if.local_mac) /
bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
seq_num=350))
self.send_and_assert_no_replies(self.tra_if, pkt * 17)
- self.assert_packet_counter_equal(
+ self.assert_error_counter_equal(
'/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
+ # a malformed 'runt' packet
+ # created by a mis-constructed SA
+ if (ESP == self.encryption_type):
+ bogus_sa = SecurityAssociation(self.encryption_type,
+ p.vpp_tra_spi)
+ pkt = (Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=350))
+ self.send_and_assert_no_replies(self.tra_if, pkt * 17)
+
+ self.assert_error_counter_equal(
+ '/err/%s/undersized packet' % self.tra4_decrypt_node_name, 17)
+
# which we can determine since this packet is still in the window
pkt = (Ether(src=self.tra_if.remote_mac,
dst=self.tra_if.local_mac) /
if use_esn:
# an out of window error with ESN looks like a high sequence
# wrap. but since it isn't then the verify will fail.
- self.assert_packet_counter_equal(
+ self.assert_error_counter_equal(
'/err/%s/Integrity check failed' %
self.tra4_decrypt_node_name, 34)
else:
- self.assert_packet_counter_equal(
+ self.assert_error_counter_equal(
'/err/%s/SA replayed packet' %
self.tra4_decrypt_node_name, 20)
decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
else:
self.send_and_assert_no_replies(self.tra_if, [pkt])
- self.assert_packet_counter_equal(
+ self.assert_error_counter_equal(
'/err/%s/sequence number cycled' %
self.tra4_encrypt_node_name, 1)
recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
self.tra_if)
for rx in recv_pkts:
+ self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
+ self.assert_packet_checksums_valid(rx)
try:
decrypted = p.vpp_tra_sa.decrypt(rx[IP])
self.assert_packet_checksums_valid(decrypted)
raise
finally:
self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
pkts = p.tra_sa_in.get_stats()['packets']
self.assertEqual(pkts, count,
raise
finally:
self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
pkts = p.tra_sa_in.get_stats()['packets']
self.assertEqual(pkts, count,
def verify_encrypted(self, p, sa, rxs):
decrypt_pkts = []
for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
try:
decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
if not decrypt_pkt.haslayer(IP):
finally:
self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
self.verify_counters(p, count)
raise
finally:
self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
self.verify_counters(p, count)
raise
finally:
self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
self.verify_counters(p, count)
def verify_tun_46(self, p, count=1):
raise
finally:
self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
self.verify_counters(p, count)