4 from scapy.layers.inet import IP, ICMP, TCP, UDP
5 from scapy.layers.ipsec import SecurityAssociation
6 from scapy.layers.l2 import Ether, Raw
7 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
9 from framework import VppTestCase, VppTestRunner
11 from vpp_papi import VppEnum
14 class IPsecIPv4Params(object):
16 addr_type = socket.AF_INET
18 addr_bcast = "255.255.255.255"
23 self.remote_tun_if_host = '1.1.1.1'
25 self.scapy_tun_sa_id = 10
26 self.scapy_tun_spi = 1001
27 self.vpp_tun_sa_id = 20
28 self.vpp_tun_spi = 1000
30 self.scapy_tra_sa_id = 30
31 self.scapy_tra_spi = 2001
32 self.vpp_tra_sa_id = 40
33 self.vpp_tra_spi = 2000
35 self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
36 IPSEC_API_INTEG_ALG_SHA1_96)
37 self.auth_algo = 'HMAC-SHA1-96' # scapy name
38 self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
40 self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
41 IPSEC_API_CRYPTO_ALG_AES_CBC_128)
42 self.crypt_algo = 'AES-CBC' # scapy name
43 self.crypt_key = 'JPjyOWBeVEQiMe7h'
45 self.nat_header = None
48 class IPsecIPv6Params(object):
50 addr_type = socket.AF_INET6
52 addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
57 self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
59 self.scapy_tun_sa_id = 50
60 self.scapy_tun_spi = 3001
61 self.vpp_tun_sa_id = 60
62 self.vpp_tun_spi = 3000
64 self.scapy_tra_sa_id = 70
65 self.scapy_tra_spi = 4001
66 self.vpp_tra_sa_id = 80
67 self.vpp_tra_spi = 4000
69 self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
70 IPSEC_API_INTEG_ALG_SHA_256_128)
71 self.auth_algo = 'SHA2-256-128' # scapy name
72 self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
74 self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
75 IPSEC_API_CRYPTO_ALG_AES_CBC_256)
76 self.crypt_algo = 'AES-CBC' # scapy name
77 self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
79 self.nat_header = None
82 class TemplateIpsec(VppTestCase):
87 |tra_if| <-------> |VPP|
92 ------ encrypt --- plain ---
93 |tun_if| <------- |VPP| <------ |pg1|
96 ------ decrypt --- plain ---
97 |tun_if| -------> |VPP| ------> |pg1|
101 def ipsec_select_backend(self):
102 """ empty method to be overloaded when necessary """
106 super(TemplateIpsec, self).setUp()
108 self.ipv4_params = IPsecIPv4Params()
109 self.ipv6_params = IPsecIPv6Params()
110 self.params = {self.ipv4_params.addr_type: self.ipv4_params,
111 self.ipv6_params.addr_type: self.ipv6_params}
113 self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\
114 "XXXXXXXXXXXXXXXXXXXXX"
119 self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
121 self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
124 self.create_pg_interfaces(range(3))
125 self.interfaces = list(self.pg_interfaces)
126 for i in self.interfaces:
132 self.ipsec_select_backend()
135 super(TemplateIpsec, self).tearDown()
137 for i in self.interfaces:
142 if not self.vpp_dead:
143 self.vapi.cli("show hardware")
145 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
146 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
147 sa.encrypt(IP(src=src, dst=dst) / ICMP() / self.payload)
148 for i in range(count)]
150 def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1):
151 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
152 sa.encrypt(IPv6(src=src, dst=dst) /
153 ICMPv6EchoRequest(id=0, seq=1, data=self.payload))
154 for i in range(count)]
156 def gen_pkts(self, sw_intf, src, dst, count=1):
157 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
158 IP(src=src, dst=dst) / ICMP() / self.payload
159 for i in range(count)]
161 def gen_pkts6(self, sw_intf, src, dst, count=1):
162 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
163 IPv6(src=src, dst=dst) /
164 ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
165 for i in range(count)]
167 def configure_sa_tun(self, params):
168 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
169 scapy_tun_sa = SecurityAssociation(
170 self.encryption_type, spi=params.vpp_tun_spi,
171 crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
172 auth_algo=params.auth_algo, auth_key=params.auth_key,
173 tunnel_header=ip_class_by_addr_type[params.addr_type](
174 src=self.tun_if.remote_addr[params.addr_type],
175 dst=self.tun_if.local_addr[params.addr_type]),
176 nat_t_header=params.nat_header)
177 vpp_tun_sa = SecurityAssociation(
178 self.encryption_type, spi=params.scapy_tun_spi,
179 crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
180 auth_algo=params.auth_algo, auth_key=params.auth_key,
181 tunnel_header=ip_class_by_addr_type[params.addr_type](
182 dst=self.tun_if.remote_addr[params.addr_type],
183 src=self.tun_if.local_addr[params.addr_type]),
184 nat_t_header=params.nat_header)
185 return vpp_tun_sa, scapy_tun_sa
187 def configure_sa_tra(self, params):
188 params.scapy_tra_sa = SecurityAssociation(
189 self.encryption_type,
190 spi=params.vpp_tra_spi,
191 crypt_algo=params.crypt_algo,
192 crypt_key=params.crypt_key,
193 auth_algo=params.auth_algo,
194 auth_key=params.auth_key,
195 nat_t_header=params.nat_header)
196 params.vpp_tra_sa = SecurityAssociation(
197 self.encryption_type,
198 spi=params.scapy_tra_spi,
199 crypt_algo=params.crypt_algo,
200 crypt_key=params.crypt_key,
201 auth_algo=params.auth_algo,
202 auth_key=params.auth_key,
203 nat_t_header=params.nat_header)
206 class IpsecTcpTests(object):
207 def test_tcp_checksum(self):
208 """ verify checksum correctness for vpp generated packets """
209 self.vapi.cli("test http server")
210 p = self.params[socket.AF_INET]
211 vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
212 send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
213 scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
214 dst=self.tun_if.local_ip4) /
215 TCP(flags='S', dport=80)))
216 self.logger.debug(ppp("Sending packet:", send))
217 recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
219 decrypted = vpp_tun_sa.decrypt(recv[IP])
220 self.assert_packet_checksums_valid(decrypted)
223 class IpsecTra4Tests(object):
224 def test_tra_anti_replay(self, count=1):
225 """ ipsec v4 transport anti-reply test """
226 p = self.params[socket.AF_INET]
228 # fire in a packet with seq number 1
229 pkt = (Ether(src=self.tra_if.remote_mac,
230 dst=self.tra_if.local_mac) /
231 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
232 dst=self.tra_if.local_ip4) /
235 recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
237 # now move the window over to 235
238 pkt = (Ether(src=self.tra_if.remote_mac,
239 dst=self.tra_if.local_mac) /
240 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
241 dst=self.tra_if.local_ip4) /
244 recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
246 # the window size is 64 packets
247 # in window are still accepted
248 pkt = (Ether(src=self.tra_if.remote_mac,
249 dst=self.tra_if.local_mac) /
250 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
251 dst=self.tra_if.local_ip4) /
254 recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
256 # out of window are dropped
257 pkt = (Ether(src=self.tra_if.remote_mac,
258 dst=self.tra_if.local_mac) /
259 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
260 dst=self.tra_if.local_ip4) /
263 self.send_and_assert_no_replies(self.tra_if, pkt * 17)
265 self.assert_packet_counter_equal(
266 '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 17)
268 # a packet that does not decrypt does not move the window forward
269 bogus_sa = SecurityAssociation(self.encryption_type,
271 pkt = (Ether(src=self.tra_if.remote_mac,
272 dst=self.tra_if.local_mac) /
273 bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
274 dst=self.tra_if.local_ip4) /
277 self.send_and_assert_no_replies(self.tra_if, pkt * 17)
279 self.assert_packet_counter_equal(
280 '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
282 # which we can determine since this packet is still in the window
283 pkt = (Ether(src=self.tra_if.remote_mac,
284 dst=self.tra_if.local_mac) /
285 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
286 dst=self.tra_if.local_ip4) /
289 self.send_and_expect(self.tra_if, [pkt], self.tra_if)
291 # move the security-associations seq number on to the last we used
292 p.scapy_tra_sa.seq_num = 351
293 p.vpp_tra_sa.seq_num = 351
295 def test_tra_basic(self, count=1):
296 """ ipsec v4 transport basic test """
297 self.vapi.cli("clear errors")
299 p = self.params[socket.AF_INET]
300 send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
301 src=self.tra_if.remote_ip4,
302 dst=self.tra_if.local_ip4,
304 recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
308 decrypted = p.vpp_tra_sa.decrypt(rx[IP])
309 self.assert_packet_checksums_valid(decrypted)
311 self.logger.debug(ppp("Unexpected packet:", rx))
314 self.logger.info(self.vapi.ppcli("show error"))
315 self.logger.info(self.vapi.ppcli("show ipsec"))
317 pkts = p.tra_sa_in.get_stats()['packets']
318 self.assertEqual(pkts, count,
319 "incorrect SA in counts: expected %d != %d" %
321 pkts = p.tra_sa_out.get_stats()['packets']
322 self.assertEqual(pkts, count,
323 "incorrect SA out counts: expected %d != %d" %
326 self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
327 self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
329 def test_tra_burst(self):
330 """ ipsec v4 transport burst test """
331 self.test_tra_basic(count=257)
334 class IpsecTra6Tests(object):
335 def test_tra_basic6(self, count=1):
336 """ ipsec v6 transport basic test """
337 self.vapi.cli("clear errors")
339 p = self.params[socket.AF_INET6]
340 send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
341 src=self.tra_if.remote_ip6,
342 dst=self.tra_if.local_ip6,
344 recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
348 decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
349 self.assert_packet_checksums_valid(decrypted)
351 self.logger.debug(ppp("Unexpected packet:", rx))
354 self.logger.info(self.vapi.ppcli("show error"))
355 self.logger.info(self.vapi.ppcli("show ipsec"))
357 pkts = p.tra_sa_in.get_stats()['packets']
358 self.assertEqual(pkts, count,
359 "incorrect SA in counts: expected %d != %d" %
361 pkts = p.tra_sa_out.get_stats()['packets']
362 self.assertEqual(pkts, count,
363 "incorrect SA out counts: expected %d != %d" %
365 self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
366 self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
368 def test_tra_burst6(self):
369 """ ipsec v6 transport burst test """
370 self.test_tra_basic6(count=257)
373 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
377 class IpsecTun4Tests(object):
378 def test_tun_basic44(self, count=1):
379 """ ipsec 4o4 tunnel basic test """
380 self.vapi.cli("clear errors")
382 p = self.params[socket.AF_INET]
383 vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
384 send_pkts = self.gen_encrypt_pkts(scapy_tun_sa, self.tun_if,
385 src=p.remote_tun_if_host,
386 dst=self.pg1.remote_ip4,
388 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
389 for recv_pkt in recv_pkts:
390 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host)
391 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
392 self.assert_packet_checksums_valid(recv_pkt)
393 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
394 dst=p.remote_tun_if_host, count=count)
395 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
396 for recv_pkt in recv_pkts:
398 decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IP])
399 if not decrypt_pkt.haslayer(IP):
400 decrypt_pkt = IP(decrypt_pkt[Raw].load)
401 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
402 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
403 self.assert_packet_checksums_valid(decrypt_pkt)
405 self.logger.debug(ppp("Unexpected packet:", recv_pkt))
408 ppp("Decrypted packet:", decrypt_pkt))
413 self.logger.info(self.vapi.ppcli("show error"))
414 self.logger.info(self.vapi.ppcli("show ipsec"))
416 if (hasattr(p, "spd_policy_in_any")):
417 pkts = p.spd_policy_in_any.get_stats()['packets']
418 self.assertEqual(pkts, count,
419 "incorrect SPD any policy: expected %d != %d" %
422 if (hasattr(p, "tun_sa_in")):
423 pkts = p.tun_sa_in.get_stats()['packets']
424 self.assertEqual(pkts, count,
425 "incorrect SA in counts: expected %d != %d" %
427 pkts = p.tun_sa_out.get_stats()['packets']
428 self.assertEqual(pkts, count,
429 "incorrect SA out counts: expected %d != %d" %
432 self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
433 self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
435 def test_tun_burst44(self):
436 """ ipsec 4o4 tunnel burst test """
437 self.test_tun_basic44(count=257)
440 class IpsecTun6Tests(object):
441 def test_tun_basic66(self, count=1):
442 """ ipsec 6o6 tunnel basic test """
443 self.vapi.cli("clear errors")
445 p = self.params[socket.AF_INET6]
446 vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
447 send_pkts = self.gen_encrypt_pkts6(scapy_tun_sa, self.tun_if,
448 src=p.remote_tun_if_host,
449 dst=self.pg1.remote_ip6,
451 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
452 for recv_pkt in recv_pkts:
453 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
454 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
455 self.assert_packet_checksums_valid(recv_pkt)
456 send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
457 dst=p.remote_tun_if_host,
459 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
460 for recv_pkt in recv_pkts:
462 decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IPv6])
463 if not decrypt_pkt.haslayer(IPv6):
464 decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
465 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
466 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
467 self.assert_packet_checksums_valid(decrypt_pkt)
469 self.logger.debug(ppp("Unexpected packet:", recv_pkt))
472 ppp("Decrypted packet:", decrypt_pkt))
477 self.logger.info(self.vapi.ppcli("show error"))
478 self.logger.info(self.vapi.ppcli("show ipsec"))
480 pkts = p.tun_sa_in.get_stats()['packets']
481 self.assertEqual(pkts, count,
482 "incorrect SA in counts: expected %d != %d" %
484 pkts = p.tun_sa_out.get_stats()['packets']
485 self.assertEqual(pkts, count,
486 "incorrect SA out counts: expected %d != %d" %
488 self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
489 self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
491 def test_tun_burst66(self):
492 """ ipsec 6o6 tunnel burst test """
493 self.test_tun_basic66(count=257)
496 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
500 if __name__ == '__main__':
501 unittest.main(testRunner=VppTestRunner)