4 from scapy.layers.inet import IP, ICMP
5 from scapy.layers.l2 import Ether
6 from scapy.layers.ipsec import SecurityAssociation, ESP
8 from framework import VppTestCase, VppTestRunner
11 class TestIpsecEsp(VppTestCase):
13 Basic test for ipsec esp sanity - tunnel and transport modes.
15 Below 4 cases are covered as part of this test
16 1) ipsec esp v4 transport basic test - IPv4 Transport mode
17 scenario using HMAC-SHA1-96 intergrity algo
18 2) ipsec esp v4 transport burst test
19 Above test for 257 pkts
20 3) ipsec esp 4o4 tunnel basic test - IPv4 Tunnel mode
21 scenario using HMAC-SHA1-96 intergrity algo
22 4) ipsec esp 4o4 tunnel burst test
23 Above test for 257 pkts
33 --- encrypt --- plain ---
34 |pg0| -------> |VPP| ------> |pg1|
37 --- decrypt --- plain ---
38 |pg0| <------- |VPP| <------ |pg1|
41 Note : IPv6 is not covered
44 remote_pg0_lb_addr = '1.1.1.1'
45 remote_pg1_lb_addr = '2.2.2.2'
49 super(TestIpsecEsp, cls).setUpClass()
51 cls.create_pg_interfaces(range(3))
52 cls.interfaces = list(cls.pg_interfaces)
53 for i in cls.interfaces:
57 cls.logger.info(cls.vapi.ppcli("show int addr"))
59 cls.logger.info(cls.vapi.ppcli("show ipsec"))
61 cls.logger.info(cls.vapi.ppcli("show ipsec"))
63 super(TestIpsecEsp, cls).tearDownClass()
67 def configEspTun(cls):
74 src4 = socket.inet_pton(socket.AF_INET, cls.remote_pg0_lb_addr)
75 cls.vapi.ip_add_del_route(src4, 32, cls.pg0.remote_ip4n)
76 dst4 = socket.inet_pton(socket.AF_INET, cls.remote_pg1_lb_addr)
77 cls.vapi.ip_add_del_route(dst4, 32, cls.pg1.remote_ip4n)
78 cls.vapi.ipsec_sad_add_del_entry(
83 integrity_key_length=20,
86 cls.vapi.ipsec_sad_add_del_entry(
91 integrity_key_length=20,
94 cls.vapi.ipsec_spd_add_del(spd_id)
95 cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
96 l_startaddr = r_startaddr = socket.inet_pton(
97 socket.AF_INET, "0.0.0.0")
98 l_stopaddr = r_stopaddr = socket.inet_pton(
99 socket.AF_INET, "255.255.255.255")
100 cls.vapi.ipsec_spd_add_del_entry(
107 cls.vapi.ipsec_spd_add_del_entry(
115 l_startaddr = l_stopaddr = socket.inet_pton(
116 socket.AF_INET, cls.remote_pg0_lb_addr)
117 r_startaddr = r_stopaddr = socket.inet_pton(
118 socket.AF_INET, cls.remote_pg1_lb_addr)
119 cls.vapi.ipsec_spd_add_del_entry(
129 cls.vapi.ipsec_spd_add_del_entry(
142 def configEspTra(cls):
147 remote_tra_spi = 2001
149 cls.vapi.ipsec_sad_add_del_entry(
152 integrity_key_length=20,
153 crypto_key_length=16,
156 cls.vapi.ipsec_sad_add_del_entry(
159 integrity_key_length=20,
160 crypto_key_length=16,
163 cls.vapi.ipsec_spd_add_del(spd_id)
164 cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg2.sw_if_index)
165 l_startaddr = r_startaddr = socket.inet_pton(
166 socket.AF_INET, "0.0.0.0")
167 l_stopaddr = r_stopaddr = socket.inet_pton(
168 socket.AF_INET, "255.255.255.255")
169 cls.vapi.ipsec_spd_add_del_entry(
176 cls.vapi.ipsec_spd_add_del_entry(
184 l_startaddr = l_stopaddr = cls.pg2.local_ip4n
185 r_startaddr = r_stopaddr = cls.pg2.remote_ip4n
186 cls.vapi.ipsec_spd_add_del_entry(
196 cls.vapi.ipsec_spd_add_del_entry(
208 def configScapySA(self, is_tun=False):
210 self.remote_tun_sa = SecurityAssociation(
213 crypt_algo='AES-CBC',
214 crypt_key='JPjyOWBeVEQiMe7h',
215 auth_algo='HMAC-SHA1-96',
216 auth_key='C91KUR9GYMm5GfkEvNjX',
218 src=self.pg0.remote_ip4,
219 dst=self.pg0.local_ip4))
220 self.local_tun_sa = SecurityAssociation(
223 crypt_algo='AES-CBC',
224 crypt_key='JPjyOWBeVEQiMe7h',
225 auth_algo='HMAC-SHA1-96',
226 auth_key='C91KUR9GYMm5GfkEvNjX',
228 dst=self.pg0.remote_ip4,
229 src=self.pg0.local_ip4))
231 self.remote_tra_sa = SecurityAssociation(
234 crypt_algo='AES-CBC',
235 crypt_key='JPjyOWBeVEQiMe7h',
236 auth_algo='HMAC-SHA1-96',
237 auth_key='C91KUR9GYMm5GfkEvNjX')
238 self.local_tra_sa = SecurityAssociation(
241 crypt_algo='AES-CBC',
242 crypt_key='JPjyOWBeVEQiMe7h',
243 auth_algo='HMAC-SHA1-96',
244 auth_key='C91KUR9GYMm5GfkEvNjX')
247 super(TestIpsecEsp, self).tearDown()
248 if not self.vpp_dead:
249 self.vapi.cli("show hardware")
251 def send_and_expect(self, input, pkts, output, count=1):
252 input.add_stream(pkts)
253 self.pg_enable_capture(self.pg_interfaces)
255 rx = output.get_capture(count)
258 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
259 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
260 sa.encrypt(IP(src=src, dst=dst) / ICMP() /
261 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
264 def gen_pkts(self, sw_intf, src, dst, count=1):
265 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
266 IP(src=src, dst=dst) / ICMP() /
267 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
270 def test_ipsec_esp_tra_basic(self, count=1):
271 """ ipsec esp v4 transport basic test """
274 send_pkts = self.gen_encrypt_pkts(
277 src=self.pg2.remote_ip4,
278 dst=self.pg2.local_ip4,
280 recv_pkts = self.send_and_expect(
281 self.pg2, send_pkts, self.pg2, count=count)
282 # ESP TRA VPP encryption/decryption verification
283 for Pkts in recv_pkts:
284 self.local_tra_sa.decrypt(Pkts[IP])
286 self.logger.info(self.vapi.ppcli("show error"))
287 self.logger.info(self.vapi.ppcli("show ipsec"))
289 def test_ipsec_esp_tra_burst(self):
290 """ ipsec esp v4 transport burst test """
292 self.test_ipsec_esp_tra_basic(count=257)
294 self.logger.info(self.vapi.ppcli("show error"))
295 self.logger.info(self.vapi.ppcli("show ipsec"))
297 def test_ipsec_esp_tun_basic(self, count=1):
298 """ ipsec esp 4o4 tunnel basic test """
300 self.configScapySA(is_tun=True)
301 send_pkts = self.gen_encrypt_pkts(
304 src=self.remote_pg0_lb_addr,
305 dst=self.remote_pg1_lb_addr,
307 recv_pkts = self.send_and_expect(
308 self.pg0, send_pkts, self.pg1, count=count)
309 # ESP TUN VPP decryption verification
310 for recv_pkt in recv_pkts:
311 self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
312 self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
313 send_pkts = self.gen_pkts(
315 src=self.remote_pg1_lb_addr,
316 dst=self.remote_pg0_lb_addr,
318 recv_pkts = self.send_and_expect(
319 self.pg1, send_pkts, self.pg0, count=count)
320 # ESP TUN VPP encryption verification
321 for recv_pkt in recv_pkts:
322 decrypt_pkt = self.local_tun_sa.decrypt(recv_pkt[IP])
323 self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
324 self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
326 self.logger.info(self.vapi.ppcli("show error"))
327 self.logger.info(self.vapi.ppcli("show ipsec"))
329 def test_ipsec_esp_tun_burst(self):
330 """ ipsec esp 4o4 tunnel burst test """
332 self.test_ipsec_esp_tun_basic(count=257)
334 self.logger.info(self.vapi.ppcli("show error"))
335 self.logger.info(self.vapi.ppcli("show ipsec"))
338 if __name__ == '__main__':
339 unittest.main(testRunner=VppTestRunner)