X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftemplate_ipsec.py;h=84b878440e97406f475ec80c9cf1042372b46163;hb=68b24e2c9;hp=6e42ac7f9f43474a9556df27a626a345f9974d86;hpb=47feb1146ec3b0e1cf2ebd83cd5211e1df261194;p=vpp.git diff --git a/test/template_ipsec.py b/test/template_ipsec.py index 6e42ac7f9f4..84b878440e9 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -1,8 +1,9 @@ import unittest import socket +import struct from scapy.layers.inet import IP, ICMP, TCP, UDP -from scapy.layers.ipsec import SecurityAssociation +from scapy.layers.ipsec import SecurityAssociation, ESP from scapy.layers.l2 import Ether, Raw from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest @@ -42,7 +43,7 @@ class IPsecIPv4Params(object): IPSEC_API_CRYPTO_ALG_AES_CBC_128) self.crypt_algo = 'AES-CBC' # scapy name self.crypt_key = 'JPjyOWBeVEQiMe7h' - self.crypt_salt = '' + self.salt = 0 self.flags = 0 self.nat_header = None @@ -78,7 +79,7 @@ class IPsecIPv6Params(object): IPSEC_API_CRYPTO_ALG_AES_CBC_128) self.crypt_algo = 'AES-CBC' # scapy name self.crypt_key = 'JPjyOWBeVEQiMe7h' - self.crypt_salt = '' + self.salt = 0 self.flags = 0 self.nat_header = None @@ -87,9 +88,14 @@ def config_tun_params(p, encryption_type, tun_if): ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6} use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. IPSEC_API_SAD_FLAG_USE_ESN)) + if p.crypt_algo == "AES-GCM": + crypt_key = p.crypt_key + struct.pack("!I", p.salt) + else: + crypt_key = p.crypt_key p.scapy_tun_sa = SecurityAssociation( encryption_type, spi=p.vpp_tun_spi, - crypt_algo=p.crypt_algo, crypt_key=p.crypt_key + p.crypt_salt, + crypt_algo=p.crypt_algo, + crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, tunnel_header=ip_class_by_addr_type[p.addr_type]( src=tun_if.remote_addr[p.addr_type], @@ -98,7 +104,8 @@ def config_tun_params(p, encryption_type, tun_if): use_esn=use_esn) p.vpp_tun_sa = SecurityAssociation( encryption_type, spi=p.scapy_tun_spi, - crypt_algo=p.crypt_algo, crypt_key=p.crypt_key + p.crypt_salt, + crypt_algo=p.crypt_algo, + crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, tunnel_header=ip_class_by_addr_type[p.addr_type]( dst=tun_if.remote_addr[p.addr_type], @@ -110,11 +117,15 @@ def config_tun_params(p, encryption_type, tun_if): def config_tra_params(p, encryption_type): use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. IPSEC_API_SAD_FLAG_USE_ESN)) + if p.crypt_algo == "AES-GCM": + crypt_key = p.crypt_key + struct.pack("!I", p.salt) + else: + crypt_key = p.crypt_key p.scapy_tra_sa = SecurityAssociation( encryption_type, spi=p.vpp_tra_spi, crypt_algo=p.crypt_algo, - crypt_key=p.crypt_key + p.crypt_salt, + crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, nat_t_header=p.nat_header, @@ -123,7 +134,7 @@ def config_tra_params(p, encryption_type): encryption_type, spi=p.scapy_tra_spi, crypt_algo=p.crypt_algo, - crypt_key=p.crypt_key + p.crypt_salt, + crypt_key=crypt_key, auth_algo=p.auth_algo, auth_key=p.auth_key, nat_t_header=p.nat_header, @@ -282,7 +293,7 @@ class IpsecTra4(object): # replayed packets are dropped self.send_and_assert_no_replies(self.tra_if, pkt * 3) - self.assert_packet_counter_equal( + self.assert_error_counter_equal( '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3) # the window size is 64 packets @@ -297,7 +308,11 @@ class IpsecTra4(object): # a packet that does not decrypt does not move the window forward bogus_sa = SecurityAssociation(self.encryption_type, - p.vpp_tra_spi) + p.vpp_tra_spi, + crypt_algo=p.crypt_algo, + crypt_key=p.crypt_key[::-1], + auth_algo=p.auth_algo, + auth_key=p.auth_key[::-1]) pkt = (Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) / bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4, @@ -306,9 +321,25 @@ class IpsecTra4(object): seq_num=350)) self.send_and_assert_no_replies(self.tra_if, pkt * 17) - self.assert_packet_counter_equal( + self.assert_error_counter_equal( '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17) + # a malformed 'runt' packet + # created by a mis-constructed SA + if (ESP == self.encryption_type): + bogus_sa = SecurityAssociation(self.encryption_type, + p.vpp_tra_spi) + pkt = (Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=350)) + self.send_and_assert_no_replies(self.tra_if, pkt * 17) + + self.assert_error_counter_equal( + '/err/%s/undersized packet' % self.tra4_decrypt_node_name, 17) + # which we can determine since this packet is still in the window pkt = (Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac) / @@ -330,12 +361,12 @@ class IpsecTra4(object): if use_esn: # an out of window error with ESN looks like a high sequence # wrap. but since it isn't then the verify will fail. - self.assert_packet_counter_equal( + self.assert_error_counter_equal( '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 34) else: - self.assert_packet_counter_equal( + self.assert_error_counter_equal( '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 20) @@ -380,7 +411,7 @@ class IpsecTra4(object): decrypted = p.vpp_tra_sa.decrypt(rx[0][IP]) else: self.send_and_assert_no_replies(self.tra_if, [pkt]) - self.assert_packet_counter_equal( + self.assert_error_counter_equal( '/err/%s/sequence number cycled' % self.tra4_encrypt_node_name, 1) @@ -401,6 +432,8 @@ class IpsecTra4(object): recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if) for rx in recv_pkts: + self.assertEqual(len(rx) - len(Ether()), rx[IP].len) + self.assert_packet_checksums_valid(rx) try: decrypted = p.vpp_tra_sa.decrypt(rx[IP]) self.assert_packet_checksums_valid(decrypted) @@ -409,7 +442,7 @@ class IpsecTra4(object): raise finally: self.logger.info(self.vapi.ppcli("show error")) - self.logger.info(self.vapi.ppcli("show ipsec")) + self.logger.info(self.vapi.ppcli("show ipsec all")) pkts = p.tra_sa_in.get_stats()['packets'] self.assertEqual(pkts, count, @@ -452,6 +485,8 @@ class IpsecTra6(object): recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if) for rx in recv_pkts: + self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), + rx[IPv6].plen) try: decrypted = p.vpp_tra_sa.decrypt(rx[IPv6]) self.assert_packet_checksums_valid(decrypted) @@ -460,7 +495,7 @@ class IpsecTra6(object): raise finally: self.logger.info(self.vapi.ppcli("show error")) - self.logger.info(self.vapi.ppcli("show ipsec")) + self.logger.info(self.vapi.ppcli("show ipsec all")) pkts = p.tra_sa_in.get_stats()['packets'] self.assertEqual(pkts, count, @@ -521,6 +556,8 @@ class IpsecTun4(object): def verify_encrypted(self, p, sa, rxs): decrypt_pkts = [] for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(len(rx) - len(Ether()), rx[IP].len) try: decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP]) if not decrypt_pkt.haslayer(IP): @@ -561,7 +598,7 @@ class IpsecTun4(object): finally: self.logger.info(self.vapi.ppcli("show error")) - self.logger.info(self.vapi.ppcli("show ipsec")) + self.logger.info(self.vapi.ppcli("show ipsec all")) self.verify_counters(p, count) @@ -599,7 +636,7 @@ class IpsecTun4(object): raise finally: self.logger.info(self.vapi.ppcli("show error")) - self.logger.info(self.vapi.ppcli("show ipsec")) + self.logger.info(self.vapi.ppcli("show ipsec all")) self.verify_counters(p, count) @@ -649,6 +686,8 @@ class IpsecTun6(object): count=count) recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if) for recv_pkt in recv_pkts: + self.assertEqual(len(recv_pkt) - len(Ether()) - len(IPv6()), + recv_pkt[IPv6].plen) try: decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6]) if not decrypt_pkt.haslayer(IPv6): @@ -666,7 +705,7 @@ class IpsecTun6(object): raise finally: self.logger.info(self.vapi.ppcli("show error")) - self.logger.info(self.vapi.ppcli("show ipsec")) + self.logger.info(self.vapi.ppcli("show ipsec all")) self.verify_counters(p, count) def verify_tun_46(self, p, count=1): @@ -705,7 +744,7 @@ class IpsecTun6(object): raise finally: self.logger.info(self.vapi.ppcli("show error")) - self.logger.info(self.vapi.ppcli("show ipsec")) + self.logger.info(self.vapi.ppcli("show ipsec all")) self.verify_counters(p, count)