4 from scapy.layers.inet import IP, ICMP
5 from scapy.layers.l2 import Ether, Raw
6 from scapy.layers.ipsec import SecurityAssociation, AH
8 from framework import VppTestCase, VppTestRunner
11 class TestIpsecAh(VppTestCase):
13 Basic test for IPSEC using AH transport and Tunnel mode
15 Below 4 cases are covered as part of this test
16 1) ipsec ah v4 transport basic test - IPv4 Transport mode
17 scenario using HMAC-SHA1-96 intergrity algo
18 2) ipsec ah v4 transport burst test
19 Above test for 257 pkts
20 3) ipsec ah 4o4 tunnel basic test - IPv4 Tunnel mode
21 scenario using HMAC-SHA1-96 intergrity algo
22 4) ipsec ah 4o4 tunnel burst test
23 Above test for 257 pkts
33 --- encrypt --- plain ---
34 |pg0| -------> |VPP| ------> |pg1|
37 --- decrypt --- plain ---
38 |pg0| <------- |VPP| <------ |pg1|
41 Note : IPv6 is not covered
44 remote_pg0_lb_addr = '1.1.1.1'
45 remote_pg1_lb_addr = '2.2.2.2'
49 super(TestIpsecAh, cls).setUpClass()
51 cls.create_pg_interfaces(range(3))
52 cls.interfaces = list(cls.pg_interfaces)
53 for i in cls.interfaces:
57 cls.logger.info(cls.vapi.ppcli("show int addr"))
59 cls.logger.info(cls.vapi.ppcli("show ipsec"))
61 cls.logger.info(cls.vapi.ppcli("show ipsec"))
63 super(TestIpsecAh, cls).tearDownClass()
74 src4 = socket.inet_pton(socket.AF_INET, cls.remote_pg0_lb_addr)
75 cls.vapi.ip_add_del_route(src4, 32, cls.pg0.remote_ip4n)
76 dst4 = socket.inet_pton(socket.AF_INET, cls.remote_pg1_lb_addr)
77 cls.vapi.ip_add_del_route(dst4, 32, cls.pg1.remote_ip4n)
78 cls.vapi.ipsec_sad_add_del_entry(
83 integrity_key_length=20)
84 cls.vapi.ipsec_sad_add_del_entry(
89 integrity_key_length=20)
90 cls.vapi.ipsec_spd_add_del(spd_id)
91 cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
92 l_startaddr = r_startaddr = socket.inet_pton(
93 socket.AF_INET, "0.0.0.0")
94 l_stopaddr = r_stopaddr = socket.inet_pton(
95 socket.AF_INET, "255.255.255.255")
96 cls.vapi.ipsec_spd_add_del_entry(
103 cls.vapi.ipsec_spd_add_del_entry(
111 l_startaddr = l_stopaddr = socket.inet_pton(
112 socket.AF_INET, cls.remote_pg0_lb_addr)
113 r_startaddr = r_stopaddr = socket.inet_pton(
114 socket.AF_INET, cls.remote_pg1_lb_addr)
115 cls.vapi.ipsec_spd_add_del_entry(
125 cls.vapi.ipsec_spd_add_del_entry(
138 def configAhTra(cls):
143 remote_tra_spi = 2001
145 cls.vapi.ipsec_sad_add_del_entry(
148 integrity_key_length=20,
150 cls.vapi.ipsec_sad_add_del_entry(
153 integrity_key_length=20,
155 cls.vapi.ipsec_spd_add_del(spd_id)
156 cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg2.sw_if_index)
157 l_startaddr = r_startaddr = socket.inet_pton(
158 socket.AF_INET, "0.0.0.0")
159 l_stopaddr = r_stopaddr = socket.inet_pton(
160 socket.AF_INET, "255.255.255.255")
161 cls.vapi.ipsec_spd_add_del_entry(
168 cls.vapi.ipsec_spd_add_del_entry(
176 l_startaddr = l_stopaddr = cls.pg2.local_ip4n
177 r_startaddr = r_stopaddr = cls.pg2.remote_ip4n
178 cls.vapi.ipsec_spd_add_del_entry(
188 cls.vapi.ipsec_spd_add_del_entry(
200 def configScapySA(self, is_tun=False):
202 self.remote_tun_sa = SecurityAssociation(
205 auth_algo='HMAC-SHA1-96',
206 auth_key='C91KUR9GYMm5GfkEvNjX',
208 src=self.pg0.remote_ip4,
209 dst=self.pg0.local_ip4))
210 self.local_tun_sa = SecurityAssociation(
213 auth_algo='HMAC-SHA1-96',
214 auth_key='C91KUR9GYMm5GfkEvNjX',
216 dst=self.pg0.remote_ip4,
217 src=self.pg0.local_ip4))
219 self.remote_tra_sa = SecurityAssociation(
220 AH, spi=0x000007d0, auth_algo='HMAC-SHA1-96',
221 auth_key='C91KUR9GYMm5GfkEvNjX')
222 self.local_tra_sa = SecurityAssociation(
223 AH, spi=0x000007d1, auth_algo='HMAC-SHA1-96',
224 auth_key='C91KUR9GYMm5GfkEvNjX')
227 super(TestIpsecAh, self).tearDown()
228 if not self.vpp_dead:
229 self.vapi.cli("show hardware")
231 def send_and_expect(self, input, pkts, output, count=1):
232 input.add_stream(pkts)
233 self.pg_enable_capture(self.pg_interfaces)
235 rx = output.get_capture(count)
238 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
239 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
240 sa.encrypt(IP(src=src, dst=dst) / ICMP() /
241 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
244 def gen_pkts(self, sw_intf, src, dst, count=1):
245 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
246 IP(src=src, dst=dst) / ICMP() /
247 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
250 def test_ipsec_ah_tra_basic(self, count=1):
251 """ ipsec ah v4 transport basic test """
254 send_pkts = self.gen_encrypt_pkts(
257 src=self.pg2.remote_ip4,
258 dst=self.pg2.local_ip4,
260 recv_pkts = self.send_and_expect(
261 self.pg2, send_pkts, self.pg2, count=count)
262 # ESP TRA VPP encryption/decryption verification
263 for Pkts in recv_pkts:
264 Pkts[AH].padding = Pkts[AH].icv[12:]
265 Pkts[AH].icv = Pkts[AH].icv[:12]
266 self.local_tra_sa.decrypt(Pkts[IP])
268 self.logger.info(self.vapi.ppcli("show error"))
269 self.logger.info(self.vapi.ppcli("show ipsec"))
271 def test_ipsec_ah_tra_burst(self):
272 """ ipsec ah v4 transport burst test """
274 self.test_ipsec_ah_tra_basic(count=257)
276 self.logger.info(self.vapi.ppcli("show error"))
277 self.logger.info(self.vapi.ppcli("show ipsec"))
279 def test_ipsec_ah_tun_basic(self, count=1):
280 """ ipsec ah 4o4 tunnel basic test """
282 self.configScapySA(is_tun=True)
283 send_pkts = self.gen_encrypt_pkts(
286 src=self.remote_pg0_lb_addr,
287 dst=self.remote_pg1_lb_addr,
289 recv_pkts = self.send_and_expect(
290 self.pg0, send_pkts, self.pg1, count=count)
291 # ESP TUN VPP decryption verification
292 for recv_pkt in recv_pkts:
293 self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
294 self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
295 send_pkts = self.gen_pkts(
297 src=self.remote_pg1_lb_addr,
298 dst=self.remote_pg0_lb_addr,
300 recv_pkts = self.send_and_expect(
301 self.pg1, send_pkts, self.pg0, count=count)
302 # ESP TUN VPP encryption verification
303 for recv_pkt in recv_pkts:
304 decrypt_pkt = self.local_tun_sa.decrypt(recv_pkt[IP])
305 decrypt_pkt = IP(decrypt_pkt[Raw].load)
306 self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
307 self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
309 self.logger.info(self.vapi.ppcli("show error"))
310 self.logger.info(self.vapi.ppcli("show ipsec"))
312 def test_ipsec_ah_tun_burst(self):
313 """ ipsec ah 4o4 tunnel burst test """
315 self.test_ipsec_ah_tun_basic(count=257)
317 self.logger.info(self.vapi.ppcli("show error"))
318 self.logger.info(self.vapi.ppcli("show ipsec"))
321 if __name__ == '__main__':
322 unittest.main(testRunner=VppTestRunner)