vpp_esp_protocol = 1
vpp_ah_protocol = 0
+ @classmethod
+ def ipsec_select_backend(cls):
+ """ empty method to be overloaded when necessary """
+ pass
+
@classmethod
def setUpClass(cls):
super(TemplateIpsec, cls).setUpClass()
i.resolve_arp()
i.config_ip6()
i.resolve_ndp()
+ cls.ipsec_select_backend()
def tearDown(self):
super(TemplateIpsec, self).tearDown()
src=self.tun_if.local_addr[params.addr_type]))
return vpp_tun_sa, scapy_tun_sa
- def configure_sa_tra(self, params):
- scapy_tra_sa = SecurityAssociation(self.encryption_type,
- spi=params.vpp_tra_spi,
- crypt_algo=params.crypt_algo,
- crypt_key=params.crypt_key,
- auth_algo=params.auth_algo,
- auth_key=params.auth_key)
- vpp_tra_sa = SecurityAssociation(self.encryption_type,
- spi=params.scapy_tra_spi,
- crypt_algo=params.crypt_algo,
- crypt_key=params.crypt_key,
- auth_algo=params.auth_algo,
- auth_key=params.auth_key)
- return vpp_tra_sa, scapy_tra_sa
+ @classmethod
+ def configure_sa_tra(cls, params):
+ params.scapy_tra_sa = SecurityAssociation(cls.encryption_type,
+ spi=params.vpp_tra_spi,
+ crypt_algo=params.crypt_algo,
+ crypt_key=params.crypt_key,
+ auth_algo=params.auth_algo,
+ auth_key=params.auth_key)
+ params.vpp_tra_sa = SecurityAssociation(cls.encryption_type,
+ spi=params.scapy_tra_spi,
+ crypt_algo=params.crypt_algo,
+ crypt_key=params.crypt_key,
+ auth_algo=params.auth_algo,
+ auth_key=params.auth_key)
class IpsecTcpTests(object):
class IpsecTraTests(object):
+ def test_tra_anti_replay(self, count=1):
+ """ ipsec v4 transport anti-reply test """
+ p = self.params[socket.AF_INET]
+
+ # fire in a packet with seq number 1
+ pkt = (Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=1))
+ recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+
+ # now move the window over to 235
+ pkt = (Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=235))
+ recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+
+ # the window size is 64 packets
+ # in window are still accepted
+ pkt = (Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=172))
+ recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+
+ # out of window are dropped
+ pkt = (Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=17))
+ self.send_and_assert_no_replies(self.tra_if, pkt * 17)
+
+ self.assert_packet_counter_equal(
+ '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 17)
+
+ # a packet that does not decrypt does not move the window forward
+ bogus_sa = SecurityAssociation(self.encryption_type,
+ p.vpp_tra_spi)
+ pkt = (Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=350))
+ self.send_and_assert_no_replies(self.tra_if, pkt * 17)
+
+ self.assert_packet_counter_equal(
+ '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
+
+ # which we can determine since this packet is still in the window
+ pkt = (Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=234))
+ self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+
+ # move the security-associations seq number on to the last we used
+ p.scapy_tra_sa.seq_num = 351
+ p.vpp_tra_sa.seq_num = 351
+
def test_tra_basic(self, count=1):
""" ipsec v4 transport basic test """
+ self.vapi.cli("clear errors")
try:
p = self.params[socket.AF_INET]
- vpp_tra_sa, scapy_tra_sa = self.configure_sa_tra(p)
- send_pkts = self.gen_encrypt_pkts(scapy_tra_sa, self.tra_if,
+ send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
src=self.tra_if.remote_ip4,
dst=self.tra_if.local_ip4,
count=count)
recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
self.tra_if)
- for p in recv_pkts:
+ for rx in recv_pkts:
try:
- decrypted = vpp_tra_sa.decrypt(p[IP])
+ decrypted = p.vpp_tra_sa.decrypt(rx[IP])
self.assert_packet_checksums_valid(decrypted)
except:
- self.logger.debug(ppp("Unexpected packet:", p))
+ self.logger.debug(ppp("Unexpected packet:", rx))
raise
finally:
self.logger.info(self.vapi.ppcli("show error"))
self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
+ self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
+
def test_tra_burst(self):
""" ipsec v4 transport burst test """
self.test_tra_basic(count=257)
def test_tra_basic6(self, count=1):
""" ipsec v6 transport basic test """
+ self.vapi.cli("clear errors")
try:
p = self.params[socket.AF_INET6]
- vpp_tra_sa, scapy_tra_sa = self.configure_sa_tra(p)
- send_pkts = self.gen_encrypt_pkts6(scapy_tra_sa, self.tra_if,
+ send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
src=self.tra_if.remote_ip6,
dst=self.tra_if.local_ip6,
count=count)
recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
self.tra_if)
- for p in recv_pkts:
+ for rx in recv_pkts:
try:
- decrypted = vpp_tra_sa.decrypt(p[IPv6])
+ decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
self.assert_packet_checksums_valid(decrypted)
except:
- self.logger.debug(ppp("Unexpected packet:", p))
+ self.logger.debug(ppp("Unexpected packet:", rx))
raise
finally:
self.logger.info(self.vapi.ppcli("show error"))
self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
+ self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
+
def test_tra_burst6(self):
""" ipsec v6 transport burst test """
self.test_tra_basic6(count=257)
class IpsecTun4Tests(object):
def test_tun_basic44(self, count=1):
""" ipsec 4o4 tunnel basic test """
+ self.vapi.cli("clear errors")
try:
p = self.params[socket.AF_INET]
vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
self.logger.info(self.vapi.ppcli("show error"))
self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
+ self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
+
def test_tun_burst44(self):
""" ipsec 4o4 tunnel burst test """
self.test_tun_basic44(count=257)
class IpsecTun6Tests(object):
def test_tun_basic66(self, count=1):
""" ipsec 6o6 tunnel basic test """
+ self.vapi.cli("clear errors")
try:
p = self.params[socket.AF_INET6]
vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
self.logger.info(self.vapi.ppcli("show error"))
self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
+ self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
+
def test_tun_burst66(self):
""" ipsec 6o6 tunnel burst test """
self.test_tun_basic66(count=257)