IPSEC: Mutli-tunnel tests
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3
4 from scapy.layers.inet import IP, ICMP, TCP, UDP
5 from scapy.layers.ipsec import SecurityAssociation
6 from scapy.layers.l2 import Ether, Raw
7 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
8
9 from framework import VppTestCase, VppTestRunner
10 from util import ppp
11 from vpp_papi import VppEnum
12
13
14 class IPsecIPv4Params(object):
15
16     addr_type = socket.AF_INET
17     addr_any = "0.0.0.0"
18     addr_bcast = "255.255.255.255"
19     addr_len = 32
20     is_ipv6 = 0
21
22     def __init__(self):
23         self.remote_tun_if_host = '1.1.1.1'
24
25         self.scapy_tun_sa_id = 10
26         self.scapy_tun_spi = 1001
27         self.vpp_tun_sa_id = 20
28         self.vpp_tun_spi = 1000
29
30         self.scapy_tra_sa_id = 30
31         self.scapy_tra_spi = 2001
32         self.vpp_tra_sa_id = 40
33         self.vpp_tra_spi = 2000
34
35         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
36                                  IPSEC_API_INTEG_ALG_SHA1_96)
37         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
38         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
39
40         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
41                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
42         self.crypt_algo = 'AES-CBC'  # scapy name
43         self.crypt_key = 'JPjyOWBeVEQiMe7h'
44         self.flags = 0
45         self.nat_header = None
46
47
48 class IPsecIPv6Params(object):
49
50     addr_type = socket.AF_INET6
51     addr_any = "0::0"
52     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
53     addr_len = 128
54     is_ipv6 = 1
55
56     def __init__(self):
57         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
58
59         self.scapy_tun_sa_id = 50
60         self.scapy_tun_spi = 3001
61         self.vpp_tun_sa_id = 60
62         self.vpp_tun_spi = 3000
63
64         self.scapy_tra_sa_id = 70
65         self.scapy_tra_spi = 4001
66         self.vpp_tra_sa_id = 80
67         self.vpp_tra_spi = 4000
68
69         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
70                                  IPSEC_API_INTEG_ALG_SHA_256_128)
71         self.auth_algo = 'SHA2-256-128'  # scapy name
72         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
73
74         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
75                                   IPSEC_API_CRYPTO_ALG_AES_CBC_256)
76         self.crypt_algo = 'AES-CBC'  # scapy name
77         self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
78         self.flags = 0
79         self.nat_header = None
80
81
82 def config_tun_params(p, encryption_type, tun_if):
83     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
84     p.scapy_tun_sa = SecurityAssociation(
85         encryption_type, spi=p.vpp_tun_spi,
86         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
87         auth_algo=p.auth_algo, auth_key=p.auth_key,
88         tunnel_header=ip_class_by_addr_type[p.addr_type](
89             src=tun_if.remote_addr[p.addr_type],
90             dst=tun_if.local_addr[p.addr_type]),
91         nat_t_header=p.nat_header)
92     p.vpp_tun_sa = SecurityAssociation(
93         encryption_type, spi=p.scapy_tun_spi,
94         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
95         auth_algo=p.auth_algo, auth_key=p.auth_key,
96         tunnel_header=ip_class_by_addr_type[p.addr_type](
97             dst=tun_if.remote_addr[p.addr_type],
98             src=tun_if.local_addr[p.addr_type]),
99         nat_t_header=p.nat_header)
100
101
102 def config_tra_params(p, encryption_type):
103     p.scapy_tra_sa = SecurityAssociation(
104         encryption_type,
105         spi=p.vpp_tra_spi,
106         crypt_algo=p.crypt_algo,
107         crypt_key=p.crypt_key,
108         auth_algo=p.auth_algo,
109         auth_key=p.auth_key,
110         nat_t_header=p.nat_header)
111     p.vpp_tra_sa = SecurityAssociation(
112         encryption_type,
113         spi=p.scapy_tra_spi,
114         crypt_algo=p.crypt_algo,
115         crypt_key=p.crypt_key,
116         auth_algo=p.auth_algo,
117         auth_key=p.auth_key,
118         nat_t_header=p.nat_header)
119
120
121 class TemplateIpsec(VppTestCase):
122     """
123     TRANSPORT MODE:
124
125      ------   encrypt   ---
126     |tra_if| <-------> |VPP|
127      ------   decrypt   ---
128
129     TUNNEL MODE:
130
131      ------   encrypt   ---   plain   ---
132     |tun_if| <-------  |VPP| <------ |pg1|
133      ------             ---           ---
134
135      ------   decrypt   ---   plain   ---
136     |tun_if| ------->  |VPP| ------> |pg1|
137      ------             ---           ---
138     """
139
140     def ipsec_select_backend(self):
141         """ empty method to be overloaded when necessary """
142         pass
143
144     def setUp(self):
145         super(TemplateIpsec, self).setUp()
146
147         self.ipv4_params = IPsecIPv4Params()
148         self.ipv6_params = IPsecIPv6Params()
149         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
150                        self.ipv6_params.addr_type: self.ipv6_params}
151
152         self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\
153                        "XXXXXXXXXXXXXXXXXXXXX"
154
155         self.tun_spd_id = 1
156         self.tra_spd_id = 2
157
158         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
159                                  IPSEC_API_PROTO_ESP)
160         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
161                                 IPSEC_API_PROTO_AH)
162
163         self.create_pg_interfaces(range(3))
164         self.interfaces = list(self.pg_interfaces)
165         for i in self.interfaces:
166             i.admin_up()
167             i.config_ip4()
168             i.resolve_arp()
169             i.config_ip6()
170             i.resolve_ndp()
171         self.ipsec_select_backend()
172
173     def tearDown(self):
174         super(TemplateIpsec, self).tearDown()
175
176         for i in self.interfaces:
177             i.admin_down()
178             i.unconfig_ip4()
179             i.unconfig_ip6()
180
181         if not self.vpp_dead:
182             self.vapi.cli("show hardware")
183
184     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
185         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
186                 sa.encrypt(IP(src=src, dst=dst) / ICMP() / self.payload)
187                 for i in range(count)]
188
189     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1):
190         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
191                 sa.encrypt(IPv6(src=src, dst=dst) /
192                            ICMPv6EchoRequest(id=0, seq=1, data=self.payload))
193                 for i in range(count)]
194
195     def gen_pkts(self, sw_intf, src, dst, count=1):
196         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
197                 IP(src=src, dst=dst) / ICMP() / self.payload
198                 for i in range(count)]
199
200     def gen_pkts6(self, sw_intf, src, dst, count=1):
201         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
202                 IPv6(src=src, dst=dst) /
203                 ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
204                 for i in range(count)]
205
206     def configure_sa_tra(self, params):
207         params.scapy_tra_sa = SecurityAssociation(
208             self.encryption_type,
209             spi=params.vpp_tra_spi,
210             crypt_algo=params.crypt_algo,
211             crypt_key=params.crypt_key,
212             auth_algo=params.auth_algo,
213             auth_key=params.auth_key,
214             nat_t_header=params.nat_header)
215         params.vpp_tra_sa = SecurityAssociation(
216             self.encryption_type,
217             spi=params.scapy_tra_spi,
218             crypt_algo=params.crypt_algo,
219             crypt_key=params.crypt_key,
220             auth_algo=params.auth_algo,
221             auth_key=params.auth_key,
222             nat_t_header=params.nat_header)
223
224
225 class IpsecTcpTests(object):
226     def test_tcp_checksum(self):
227         """ verify checksum correctness for vpp generated packets """
228         self.vapi.cli("test http server")
229         p = self.params[socket.AF_INET]
230         config_tun_params(p, self.encryption_type, self.tun_if)
231         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
232                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
233                                           dst=self.tun_if.local_ip4) /
234                                        TCP(flags='S', dport=80)))
235         self.logger.debug(ppp("Sending packet:", send))
236         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
237         recv = recv[0]
238         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
239         self.assert_packet_checksums_valid(decrypted)
240
241
242 class IpsecTra4Tests(object):
243     def test_tra_anti_replay(self, count=1):
244         """ ipsec v4 transport anti-reply test """
245         p = self.params[socket.AF_INET]
246
247         # fire in a packet with seq number 1
248         pkt = (Ether(src=self.tra_if.remote_mac,
249                      dst=self.tra_if.local_mac) /
250                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
251                                          dst=self.tra_if.local_ip4) /
252                                       ICMP(),
253                                       seq_num=1))
254         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
255
256         # now move the window over to 235
257         pkt = (Ether(src=self.tra_if.remote_mac,
258                      dst=self.tra_if.local_mac) /
259                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
260                                          dst=self.tra_if.local_ip4) /
261                                       ICMP(),
262                                       seq_num=235))
263         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
264
265         # the window size is 64 packets
266         # in window are still accepted
267         pkt = (Ether(src=self.tra_if.remote_mac,
268                      dst=self.tra_if.local_mac) /
269                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
270                                          dst=self.tra_if.local_ip4) /
271                                       ICMP(),
272                                       seq_num=172))
273         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
274
275         # out of window are dropped
276         pkt = (Ether(src=self.tra_if.remote_mac,
277                      dst=self.tra_if.local_mac) /
278                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
279                                          dst=self.tra_if.local_ip4) /
280                                       ICMP(),
281                                       seq_num=17))
282         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
283
284         self.assert_packet_counter_equal(
285             '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 17)
286
287         # a packet that does not decrypt does not move the window forward
288         bogus_sa = SecurityAssociation(self.encryption_type,
289                                        p.vpp_tra_spi)
290         pkt = (Ether(src=self.tra_if.remote_mac,
291                      dst=self.tra_if.local_mac) /
292                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
293                                    dst=self.tra_if.local_ip4) /
294                                 ICMP(),
295                                 seq_num=350))
296         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
297
298         self.assert_packet_counter_equal(
299             '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
300
301         # which we can determine since this packet is still in the window
302         pkt = (Ether(src=self.tra_if.remote_mac,
303                      dst=self.tra_if.local_mac) /
304                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
305                                          dst=self.tra_if.local_ip4) /
306                                       ICMP(),
307                                       seq_num=234))
308         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
309
310         # move the security-associations seq number on to the last we used
311         p.scapy_tra_sa.seq_num = 351
312         p.vpp_tra_sa.seq_num = 351
313
314     def test_tra_basic(self, count=1):
315         """ ipsec v4 transport basic test """
316         self.vapi.cli("clear errors")
317         try:
318             p = self.params[socket.AF_INET]
319             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
320                                               src=self.tra_if.remote_ip4,
321                                               dst=self.tra_if.local_ip4,
322                                               count=count)
323             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
324                                              self.tra_if)
325             for rx in recv_pkts:
326                 try:
327                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
328                     self.assert_packet_checksums_valid(decrypted)
329                 except:
330                     self.logger.debug(ppp("Unexpected packet:", rx))
331                     raise
332         finally:
333             self.logger.info(self.vapi.ppcli("show error"))
334             self.logger.info(self.vapi.ppcli("show ipsec"))
335
336         pkts = p.tra_sa_in.get_stats()['packets']
337         self.assertEqual(pkts, count,
338                          "incorrect SA in counts: expected %d != %d" %
339                          (count, pkts))
340         pkts = p.tra_sa_out.get_stats()['packets']
341         self.assertEqual(pkts, count,
342                          "incorrect SA out counts: expected %d != %d" %
343                          (count, pkts))
344
345         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
346         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
347
348     def test_tra_burst(self):
349         """ ipsec v4 transport burst test """
350         self.test_tra_basic(count=257)
351
352
353 class IpsecTra6Tests(object):
354     def test_tra_basic6(self, count=1):
355         """ ipsec v6 transport basic test """
356         self.vapi.cli("clear errors")
357         try:
358             p = self.params[socket.AF_INET6]
359             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
360                                                src=self.tra_if.remote_ip6,
361                                                dst=self.tra_if.local_ip6,
362                                                count=count)
363             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
364                                              self.tra_if)
365             for rx in recv_pkts:
366                 try:
367                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
368                     self.assert_packet_checksums_valid(decrypted)
369                 except:
370                     self.logger.debug(ppp("Unexpected packet:", rx))
371                     raise
372         finally:
373             self.logger.info(self.vapi.ppcli("show error"))
374             self.logger.info(self.vapi.ppcli("show ipsec"))
375
376         pkts = p.tra_sa_in.get_stats()['packets']
377         self.assertEqual(pkts, count,
378                          "incorrect SA in counts: expected %d != %d" %
379                          (count, pkts))
380         pkts = p.tra_sa_out.get_stats()['packets']
381         self.assertEqual(pkts, count,
382                          "incorrect SA out counts: expected %d != %d" %
383                          (count, pkts))
384         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
385         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
386
387     def test_tra_burst6(self):
388         """ ipsec v6 transport burst test """
389         self.test_tra_basic6(count=257)
390
391
392 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
393     pass
394
395
396 class IpsecTun4(object):
397
398     def verify_tun_44(self, p, count=1):
399         self.vapi.cli("clear errors")
400         try:
401             config_tun_params(p, self.encryption_type, self.tun_if)
402             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
403                                               src=p.remote_tun_if_host,
404                                               dst=self.pg1.remote_ip4,
405                                               count=count)
406             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
407             for recv_pkt in recv_pkts:
408                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host)
409                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
410                 self.assert_packet_checksums_valid(recv_pkt)
411             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
412                                       dst=p.remote_tun_if_host, count=count)
413             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
414             for recv_pkt in recv_pkts:
415                 try:
416                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
417                     if not decrypt_pkt.haslayer(IP):
418                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
419                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
420                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
421                     self.assert_packet_checksums_valid(decrypt_pkt)
422                 except:
423                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
424                     try:
425                         self.logger.debug(
426                             ppp("Decrypted packet:", decrypt_pkt))
427                     except:
428                         pass
429                     raise
430         finally:
431             self.logger.info(self.vapi.ppcli("show error"))
432             self.logger.info(self.vapi.ppcli("show ipsec"))
433
434         if (hasattr(p, "spd_policy_in_any")):
435             pkts = p.spd_policy_in_any.get_stats()['packets']
436             self.assertEqual(pkts, count,
437                              "incorrect SPD any policy: expected %d != %d" %
438                              (count, pkts))
439
440         if (hasattr(p, "tun_sa_in")):
441             pkts = p.tun_sa_in.get_stats()['packets']
442             self.assertEqual(pkts, count,
443                              "incorrect SA in counts: expected %d != %d" %
444                              (count, pkts))
445             pkts = p.tun_sa_out.get_stats()['packets']
446             self.assertEqual(pkts, count,
447                              "incorrect SA out counts: expected %d != %d" %
448                              (count, pkts))
449
450         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
451         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
452
453
454 class IpsecTun4Tests(IpsecTun4):
455
456     def test_tun_basic44(self):
457         """ ipsec 4o4 tunnel basic test """
458         self.verify_tun_44(self.params[socket.AF_INET], count=1)
459
460     def test_tun_burst44(self):
461         """ ipsec 4o4 tunnel burst test """
462         self.verify_tun_44(self.params[socket.AF_INET], count=257)
463
464
465 class IpsecTun6(object):
466
467     def verify_tun_66(self, p, count=1):
468         """ ipsec 6o6 tunnel basic test """
469         self.vapi.cli("clear errors")
470         try:
471             config_tun_params(p, self.encryption_type, self.tun_if)
472             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
473                                                src=p.remote_tun_if_host,
474                                                dst=self.pg1.remote_ip6,
475                                                count=count)
476             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
477             for recv_pkt in recv_pkts:
478                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
479                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
480                 self.assert_packet_checksums_valid(recv_pkt)
481             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
482                                        dst=p.remote_tun_if_host,
483                                        count=count)
484             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
485             for recv_pkt in recv_pkts:
486                 try:
487                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
488                     if not decrypt_pkt.haslayer(IPv6):
489                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
490                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
491                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
492                     self.assert_packet_checksums_valid(decrypt_pkt)
493                 except:
494                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
495                     try:
496                         self.logger.debug(
497                             ppp("Decrypted packet:", decrypt_pkt))
498                     except:
499                         pass
500                     raise
501         finally:
502             self.logger.info(self.vapi.ppcli("show error"))
503             self.logger.info(self.vapi.ppcli("show ipsec"))
504
505         if (hasattr(p, "tun_sa_in")):
506             pkts = p.tun_sa_in.get_stats()['packets']
507             self.assertEqual(pkts, count,
508                              "incorrect SA in counts: expected %d != %d" %
509                              (count, pkts))
510             pkts = p.tun_sa_out.get_stats()['packets']
511             self.assertEqual(pkts, count,
512                              "incorrect SA out counts: expected %d != %d" %
513                              (count, pkts))
514         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
515         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
516
517
518 class IpsecTun6Tests(IpsecTun6):
519
520     def test_tun_basic66(self):
521         """ ipsec 6o6 tunnel basic test """
522         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
523
524     def test_tun_burst66(self):
525         """ ipsec 6o6 tunnel burst test """
526         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
527
528
529 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
530     pass
531
532
533 if __name__ == '__main__':
534     unittest.main(testRunner=VppTestRunner)