X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftemplate_ipsec.py;h=034bc8ecf45ed631029ab3772d38a6b160ee7868;hb=53131d2a2667a28b64d8a862d070b6402e7732bb;hp=773531fe038f78047fb77f67fd7b39f7026bb907;hpb=1404698df397bc4d3007daea41f52ad75ed4486c;p=vpp.git diff --git a/test/template_ipsec.py b/test/template_ipsec.py index 773531fe038..034bc8ecf45 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -4,7 +4,8 @@ import struct from scapy.layers.inet import IP, ICMP, TCP, UDP from scapy.layers.ipsec import SecurityAssociation, ESP -from scapy.layers.l2 import Ether, Raw +from scapy.layers.l2 import Ether +from scapy.packet import Raw from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest from framework import VppTestCase, VppTestRunner @@ -37,12 +38,12 @@ class IPsecIPv4Params(object): self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t. IPSEC_API_INTEG_ALG_SHA1_96) self.auth_algo = 'HMAC-SHA1-96' # scapy name - self.auth_key = 'C91KUR9GYMm5GfkEvNjX' + self.auth_key = b'C91KUR9GYMm5GfkEvNjX' self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t. IPSEC_API_CRYPTO_ALG_AES_CBC_128) self.crypt_algo = 'AES-CBC' # scapy name - self.crypt_key = 'JPjyOWBeVEQiMe7h' + self.crypt_key = b'JPjyOWBeVEQiMe7h' self.salt = 0 self.flags = 0 self.nat_header = None @@ -73,18 +74,18 @@ class IPsecIPv6Params(object): self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t. IPSEC_API_INTEG_ALG_SHA1_96) self.auth_algo = 'HMAC-SHA1-96' # scapy name - self.auth_key = 'C91KUR9GYMm5GfkEvNjX' + self.auth_key = b'C91KUR9GYMm5GfkEvNjX' self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t. IPSEC_API_CRYPTO_ALG_AES_CBC_128) self.crypt_algo = 'AES-CBC' # scapy name - self.crypt_key = 'JPjyOWBeVEQiMe7h' + self.crypt_key = b'JPjyOWBeVEQiMe7h' self.salt = 0 self.flags = 0 self.nat_header = None -def mk_scapy_crpyt_key(p): +def mk_scapy_crypt_key(p): if p.crypt_algo == "AES-GCM": return p.crypt_key + struct.pack("!I", p.salt) else: @@ -93,9 +94,9 @@ def mk_scapy_crpyt_key(p): def config_tun_params(p, encryption_type, tun_if): ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6} - use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. - IPSEC_API_SAD_FLAG_USE_ESN)) - crypt_key = mk_scapy_crpyt_key(p) + esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. + IPSEC_API_SAD_FLAG_USE_ESN)) + crypt_key = mk_scapy_crypt_key(p) p.scapy_tun_sa = SecurityAssociation( encryption_type, spi=p.vpp_tun_spi, crypt_algo=p.crypt_algo, @@ -105,7 +106,7 @@ def config_tun_params(p, encryption_type, tun_if): src=tun_if.remote_addr[p.addr_type], dst=tun_if.local_addr[p.addr_type]), nat_t_header=p.nat_header, - use_esn=use_esn) + esn_en=esn_en) p.vpp_tun_sa = SecurityAssociation( encryption_type, spi=p.scapy_tun_spi, crypt_algo=p.crypt_algo, @@ -115,13 +116,13 @@ def config_tun_params(p, encryption_type, tun_if): dst=tun_if.remote_addr[p.addr_type], src=tun_if.local_addr[p.addr_type]), nat_t_header=p.nat_header, - use_esn=use_esn) + esn_en=esn_en) def config_tra_params(p, encryption_type): - use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. - IPSEC_API_SAD_FLAG_USE_ESN)) - crypt_key = mk_scapy_crpyt_key(p) + esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. + IPSEC_API_SAD_FLAG_USE_ESN)) + crypt_key = mk_scapy_crypt_key(p) p.scapy_tra_sa = SecurityAssociation( encryption_type, spi=p.vpp_tra_spi, @@ -130,7 +131,7 @@ def config_tra_params(p, encryption_type): auth_algo=p.auth_algo, auth_key=p.auth_key, nat_t_header=p.nat_header, - use_esn=use_esn) + esn_en=esn_en) p.vpp_tra_sa = SecurityAssociation( encryption_type, spi=p.scapy_tra_spi, @@ -139,7 +140,7 @@ def config_tra_params(p, encryption_type): auth_algo=p.auth_algo, auth_key=p.auth_key, nat_t_header=p.nat_header, - use_esn=use_esn) + esn_en=esn_en) class TemplateIpsec(VppTestCase): @@ -223,7 +224,7 @@ class TemplateIpsec(VppTestCase): payload_size=54): return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / sa.encrypt(IP(src=src, dst=dst) / - ICMP() / Raw('X' * payload_size)) + ICMP() / Raw(b'X' * payload_size)) for i in range(count)] def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1, @@ -236,7 +237,7 @@ class TemplateIpsec(VppTestCase): def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54): return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - IP(src=src, dst=dst) / ICMP() / Raw('X' * payload_size) + IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size) for i in range(count)] def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54): @@ -250,7 +251,6 @@ class IpsecTcp(object): def verify_tcp_checksum(self): self.vapi.cli("test http server") p = self.params[socket.AF_INET] - config_tun_params(p, self.encryption_type, self.tun_if) send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) / p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) / @@ -272,7 +272,7 @@ class IpsecTra4(object): """ verify methods for Transport v4 """ def verify_tra_anti_replay(self): p = self.params[socket.AF_INET] - use_esn = p.vpp_tra_sa.use_esn + esn_en = p.vpp_tra_sa.esn_en seq_cycle_node_name = ('/err/%s/sequence number cycled' % self.tra4_encrypt_node_name) @@ -317,6 +317,21 @@ class IpsecTra4(object): replay_count += len(pkts) self.assert_error_counter_equal(replay_node_name, replay_count) + # + # now send a batch of packets all with the same sequence number + # the first packet in the batch is legitimate, the rest bogus + # + pkts = (Ether(src=self.tra_if.remote_mac, + dst=self.tra_if.local_mac) / + p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4) / + ICMP(), + seq_num=35)) + recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, + self.tra_if, n_rx=1) + replay_count += 7 + self.assert_error_counter_equal(replay_node_name, replay_count) + # # now move the window over to 257 (more than one byte) and into Case A # @@ -347,7 +362,7 @@ class IpsecTra4(object): bogus_sa = SecurityAssociation(self.encryption_type, p.vpp_tra_spi, crypt_algo=p.crypt_algo, - crypt_key=mk_scapy_crpyt_key(p)[::-1], + crypt_key=mk_scapy_crypt_key(p)[::-1], auth_algo=p.auth_algo, auth_key=p.auth_key[::-1]) pkt = (Ether(src=self.tra_if.remote_mac, @@ -364,7 +379,7 @@ class IpsecTra4(object): # a malformed 'runt' packet # created by a mis-constructed SA - if (ESP == self.encryption_type): + if (ESP == self.encryption_type and p.crypt_algo != "NULL"): bogus_sa = SecurityAssociation(self.encryption_type, p.vpp_tra_spi) pkt = (Ether(src=self.tra_if.remote_mac, @@ -401,7 +416,7 @@ class IpsecTra4(object): seq_num=17)) self.send_and_assert_no_replies(self.tra_if, pkt * 17) - if use_esn: + if esn_en: # an out of window error with ESN looks like a high sequence # wrap. but since it isn't then the verify will fail. hash_failed_count += 17 @@ -441,7 +456,7 @@ class IpsecTra4(object): seq_num=seq)) for seq in range(259, 280)] - if use_esn: + if esn_en: rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if) # @@ -703,7 +718,6 @@ class IpsecTun4(object): if not n_rx: n_rx = count try: - config_tun_params(p, self.encryption_type, self.tun_if) send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if, src=p.remote_tun_if_host, dst=self.pg1.remote_ip4, @@ -730,7 +744,6 @@ class IpsecTun4(object): sw_if_index=self.tun_if.sw_if_index, enable_ip4=True) try: - config_tun_params(p, self.encryption_type, self.tun_if) send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if, src=p.remote_tun_if_host, dst=self.pg1.remote_ip4, @@ -758,7 +771,6 @@ class IpsecTun4(object): def verify_tun_64(self, p, count=1): self.vapi.cli("clear errors") try: - config_tun_params(p, self.encryption_type, self.tun_if) send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if, src=p.remote_tun_if_host6, dst=self.pg1.remote_ip6, @@ -797,7 +809,7 @@ class IpsecTun4(object): pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) / UDP(sport=333, dport=4500) / - Raw(0xff)) + Raw(b'\xff')) self.send_and_assert_no_replies(self.tun_if, pkt*31) self.assert_error_counter_equal( '/err/%s/NAT Keepalive' % self.tun4_input_node, 31) @@ -805,7 +817,7 @@ class IpsecTun4(object): pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) / UDP(sport=333, dport=4500) / - Raw(0xfe)) + Raw(b'\xfe')) self.send_and_assert_no_replies(self.tun_if, pkt*31) self.assert_error_counter_equal( '/err/%s/Too Short' % self.tun4_input_node, 31) @@ -872,7 +884,6 @@ class IpsecTun6(object): self.vapi.cli("clear errors") self.vapi.cli("clear ipsec sa") - config_tun_params(p_in, self.encryption_type, self.tun_if) send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if, src=p_in.remote_tun_if_host, dst=self.pg1.remote_ip6, @@ -886,8 +897,6 @@ class IpsecTun6(object): if not p_out: p_out = p_in try: - config_tun_params(p_in, self.encryption_type, self.tun_if) - config_tun_params(p_out, self.encryption_type, self.tun_if) send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if, src=p_in.remote_tun_if_host, dst=self.pg1.remote_ip6, @@ -914,7 +923,6 @@ class IpsecTun6(object): sw_if_index=self.tun_if.sw_if_index, enable_ip6=True) try: - config_tun_params(p, self.encryption_type, self.tun_if) send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if, src=p.remote_tun_if_host, dst=self.pg1.remote_ip6, @@ -943,7 +951,6 @@ class IpsecTun6(object): """ ipsec 4o6 tunnel basic test """ self.vapi.cli("clear errors") try: - config_tun_params(p, self.encryption_type, self.tun_if) send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if, src=p.remote_tun_if_host4, dst=self.pg1.remote_ip4,