ipsec: remove dedicated IPSec tunnels
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether, Raw
8 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
9
10 from framework import VppTestCase, VppTestRunner
11 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
12 from vpp_papi import VppEnum
13
14
15 class IPsecIPv4Params(object):
16
17     addr_type = socket.AF_INET
18     addr_any = "0.0.0.0"
19     addr_bcast = "255.255.255.255"
20     addr_len = 32
21     is_ipv6 = 0
22
23     def __init__(self):
24         self.remote_tun_if_host = '1.1.1.1'
25         self.remote_tun_if_host6 = '1111::1'
26
27         self.scapy_tun_sa_id = 10
28         self.scapy_tun_spi = 1001
29         self.vpp_tun_sa_id = 20
30         self.vpp_tun_spi = 1000
31
32         self.scapy_tra_sa_id = 30
33         self.scapy_tra_spi = 2001
34         self.vpp_tra_sa_id = 40
35         self.vpp_tra_spi = 2000
36
37         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
38                                  IPSEC_API_INTEG_ALG_SHA1_96)
39         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
40         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
41
42         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
43                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
44         self.crypt_algo = 'AES-CBC'  # scapy name
45         self.crypt_key = b'JPjyOWBeVEQiMe7h'
46         self.salt = 0
47         self.flags = 0
48         self.nat_header = None
49
50
51 class IPsecIPv6Params(object):
52
53     addr_type = socket.AF_INET6
54     addr_any = "0::0"
55     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
56     addr_len = 128
57     is_ipv6 = 1
58
59     def __init__(self):
60         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
61         self.remote_tun_if_host4 = '1.1.1.1'
62
63         self.scapy_tun_sa_id = 50
64         self.scapy_tun_spi = 3001
65         self.vpp_tun_sa_id = 60
66         self.vpp_tun_spi = 3000
67
68         self.scapy_tra_sa_id = 70
69         self.scapy_tra_spi = 4001
70         self.vpp_tra_sa_id = 80
71         self.vpp_tra_spi = 4000
72
73         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
74                                  IPSEC_API_INTEG_ALG_SHA1_96)
75         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
76         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
77
78         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
79                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
80         self.crypt_algo = 'AES-CBC'  # scapy name
81         self.crypt_key = b'JPjyOWBeVEQiMe7h'
82         self.salt = 0
83         self.flags = 0
84         self.nat_header = None
85
86
87 def mk_scapy_crypt_key(p):
88     if p.crypt_algo == "AES-GCM":
89         return p.crypt_key + struct.pack("!I", p.salt)
90     else:
91         return p.crypt_key
92
93
94 def config_tun_params(p, encryption_type, tun_if):
95     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
96     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
97                               IPSEC_API_SAD_FLAG_USE_ESN))
98     crypt_key = mk_scapy_crypt_key(p)
99     p.scapy_tun_sa = SecurityAssociation(
100         encryption_type, spi=p.vpp_tun_spi,
101         crypt_algo=p.crypt_algo,
102         crypt_key=crypt_key,
103         auth_algo=p.auth_algo, auth_key=p.auth_key,
104         tunnel_header=ip_class_by_addr_type[p.addr_type](
105             src=tun_if.remote_addr[p.addr_type],
106             dst=tun_if.local_addr[p.addr_type]),
107         nat_t_header=p.nat_header,
108         use_esn=use_esn)
109     p.vpp_tun_sa = SecurityAssociation(
110         encryption_type, spi=p.scapy_tun_spi,
111         crypt_algo=p.crypt_algo,
112         crypt_key=crypt_key,
113         auth_algo=p.auth_algo, auth_key=p.auth_key,
114         tunnel_header=ip_class_by_addr_type[p.addr_type](
115             dst=tun_if.remote_addr[p.addr_type],
116             src=tun_if.local_addr[p.addr_type]),
117         nat_t_header=p.nat_header,
118         use_esn=use_esn)
119
120
121 def config_tra_params(p, encryption_type):
122     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
123                               IPSEC_API_SAD_FLAG_USE_ESN))
124     crypt_key = mk_scapy_crypt_key(p)
125     p.scapy_tra_sa = SecurityAssociation(
126         encryption_type,
127         spi=p.vpp_tra_spi,
128         crypt_algo=p.crypt_algo,
129         crypt_key=crypt_key,
130         auth_algo=p.auth_algo,
131         auth_key=p.auth_key,
132         nat_t_header=p.nat_header,
133         use_esn=use_esn)
134     p.vpp_tra_sa = SecurityAssociation(
135         encryption_type,
136         spi=p.scapy_tra_spi,
137         crypt_algo=p.crypt_algo,
138         crypt_key=crypt_key,
139         auth_algo=p.auth_algo,
140         auth_key=p.auth_key,
141         nat_t_header=p.nat_header,
142         use_esn=use_esn)
143
144
145 class TemplateIpsec(VppTestCase):
146     """
147     TRANSPORT MODE:
148
149      ------   encrypt   ---
150     |tra_if| <-------> |VPP|
151      ------   decrypt   ---
152
153     TUNNEL MODE:
154
155      ------   encrypt   ---   plain   ---
156     |tun_if| <-------  |VPP| <------ |pg1|
157      ------             ---           ---
158
159      ------   decrypt   ---   plain   ---
160     |tun_if| ------->  |VPP| ------> |pg1|
161      ------             ---           ---
162     """
163     tun_spd_id = 1
164     tra_spd_id = 2
165
166     def ipsec_select_backend(self):
167         """ empty method to be overloaded when necessary """
168         pass
169
170     @classmethod
171     def setUpClass(cls):
172         super(TemplateIpsec, cls).setUpClass()
173
174     @classmethod
175     def tearDownClass(cls):
176         super(TemplateIpsec, cls).tearDownClass()
177
178     def setup_params(self):
179         self.ipv4_params = IPsecIPv4Params()
180         self.ipv6_params = IPsecIPv6Params()
181         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
182                        self.ipv6_params.addr_type: self.ipv6_params}
183
184     def config_interfaces(self):
185         self.create_pg_interfaces(range(3))
186         self.interfaces = list(self.pg_interfaces)
187         for i in self.interfaces:
188             i.admin_up()
189             i.config_ip4()
190             i.resolve_arp()
191             i.config_ip6()
192             i.resolve_ndp()
193
194     def setUp(self):
195         super(TemplateIpsec, self).setUp()
196
197         self.setup_params()
198
199         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
200                                  IPSEC_API_PROTO_ESP)
201         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
202                                 IPSEC_API_PROTO_AH)
203
204         self.config_interfaces()
205
206         self.ipsec_select_backend()
207
208     def unconfig_interfaces(self):
209         for i in self.interfaces:
210             i.admin_down()
211             i.unconfig_ip4()
212             i.unconfig_ip6()
213
214     def tearDown(self):
215         super(TemplateIpsec, self).tearDown()
216
217         self.unconfig_interfaces()
218
219     def show_commands_at_teardown(self):
220         self.logger.info(self.vapi.cli("show hardware"))
221
222     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
223                          payload_size=54):
224         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
225                 sa.encrypt(IP(src=src, dst=dst) /
226                            ICMP() / Raw(b'X' * payload_size))
227                 for i in range(count)]
228
229     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
230                           payload_size=54):
231         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
232                 sa.encrypt(IPv6(src=src, dst=dst) /
233                            ICMPv6EchoRequest(id=0, seq=1,
234                                              data='X' * payload_size))
235                 for i in range(count)]
236
237     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
238         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
239                 IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
240                 for i in range(count)]
241
242     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
243         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
244                 IPv6(src=src, dst=dst) /
245                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
246                 for i in range(count)]
247
248
249 class IpsecTcp(object):
250     def verify_tcp_checksum(self):
251         self.vapi.cli("test http server")
252         p = self.params[socket.AF_INET]
253         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
254                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
255                                           dst=self.tun_if.local_ip4) /
256                                        TCP(flags='S', dport=80)))
257         self.logger.debug(ppp("Sending packet:", send))
258         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
259         recv = recv[0]
260         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
261         self.assert_packet_checksums_valid(decrypted)
262
263
264 class IpsecTcpTests(IpsecTcp):
265     def test_tcp_checksum(self):
266         """ verify checksum correctness for vpp generated packets """
267         self.verify_tcp_checksum()
268
269
270 class IpsecTra4(object):
271     """ verify methods for Transport v4 """
272     def verify_tra_anti_replay(self):
273         p = self.params[socket.AF_INET]
274         use_esn = p.vpp_tra_sa.use_esn
275
276         seq_cycle_node_name = ('/err/%s/sequence number cycled' %
277                                self.tra4_encrypt_node_name)
278         replay_node_name = ('/err/%s/SA replayed packet' %
279                             self.tra4_decrypt_node_name)
280         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
281             hash_failed_node_name = ('/err/%s/ESP decryption failed' %
282                                      self.tra4_decrypt_node_name)
283         else:
284             hash_failed_node_name = ('/err/%s/Integrity check failed' %
285                                      self.tra4_decrypt_node_name)
286         replay_count = self.statistics.get_err_counter(replay_node_name)
287         hash_failed_count = self.statistics.get_err_counter(
288             hash_failed_node_name)
289         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
290
291         if ESP == self.encryption_type:
292             undersize_node_name = ('/err/%s/undersized packet' %
293                                    self.tra4_decrypt_node_name)
294             undersize_count = self.statistics.get_err_counter(
295                 undersize_node_name)
296
297         #
298         # send packets with seq numbers 1->34
299         # this means the window size is still in Case B (see RFC4303
300         # Appendix A)
301         #
302         # for reasons i haven't investigated Scapy won't create a packet with
303         # seq_num=0
304         #
305         pkts = [(Ether(src=self.tra_if.remote_mac,
306                        dst=self.tra_if.local_mac) /
307                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
308                                            dst=self.tra_if.local_ip4) /
309                                         ICMP(),
310                                         seq_num=seq))
311                 for seq in range(1, 34)]
312         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
313
314         # replayed packets are dropped
315         self.send_and_assert_no_replies(self.tra_if, pkts)
316         replay_count += len(pkts)
317         self.assert_error_counter_equal(replay_node_name, replay_count)
318
319         #
320         # now send a batch of packets all with the same sequence number
321         # the first packet in the batch is legitimate, the rest bogus
322         #
323         pkts = (Ether(src=self.tra_if.remote_mac,
324                       dst=self.tra_if.local_mac) /
325                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
326                                           dst=self.tra_if.local_ip4) /
327                                        ICMP(),
328                                        seq_num=35))
329         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
330                                          self.tra_if, n_rx=1)
331         replay_count += 7
332         self.assert_error_counter_equal(replay_node_name, replay_count)
333
334         #
335         # now move the window over to 257 (more than one byte) and into Case A
336         #
337         pkt = (Ether(src=self.tra_if.remote_mac,
338                      dst=self.tra_if.local_mac) /
339                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
340                                          dst=self.tra_if.local_ip4) /
341                                       ICMP(),
342                                       seq_num=257))
343         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
344
345         # replayed packets are dropped
346         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
347         replay_count += 3
348         self.assert_error_counter_equal(replay_node_name, replay_count)
349
350         # the window size is 64 packets
351         # in window are still accepted
352         pkt = (Ether(src=self.tra_if.remote_mac,
353                      dst=self.tra_if.local_mac) /
354                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
355                                          dst=self.tra_if.local_ip4) /
356                                       ICMP(),
357                                       seq_num=200))
358         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
359
360         # a packet that does not decrypt does not move the window forward
361         bogus_sa = SecurityAssociation(self.encryption_type,
362                                        p.vpp_tra_spi,
363                                        crypt_algo=p.crypt_algo,
364                                        crypt_key=mk_scapy_crypt_key(p)[::-1],
365                                        auth_algo=p.auth_algo,
366                                        auth_key=p.auth_key[::-1])
367         pkt = (Ether(src=self.tra_if.remote_mac,
368                      dst=self.tra_if.local_mac) /
369                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
370                                    dst=self.tra_if.local_ip4) /
371                                 ICMP(),
372                                 seq_num=350))
373         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
374
375         hash_failed_count += 17
376         self.assert_error_counter_equal(hash_failed_node_name,
377                                         hash_failed_count)
378
379         # a malformed 'runt' packet
380         #  created by a mis-constructed SA
381         if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
382             bogus_sa = SecurityAssociation(self.encryption_type,
383                                            p.vpp_tra_spi)
384             pkt = (Ether(src=self.tra_if.remote_mac,
385                          dst=self.tra_if.local_mac) /
386                    bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
387                                        dst=self.tra_if.local_ip4) /
388                                     ICMP(),
389                                     seq_num=350))
390             self.send_and_assert_no_replies(self.tra_if, pkt * 17)
391
392             undersize_count += 17
393             self.assert_error_counter_equal(undersize_node_name,
394                                             undersize_count)
395
396         # which we can determine since this packet is still in the window
397         pkt = (Ether(src=self.tra_if.remote_mac,
398                      dst=self.tra_if.local_mac) /
399                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
400                                          dst=self.tra_if.local_ip4) /
401                                       ICMP(),
402                                       seq_num=234))
403         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
404
405         #
406         # out of window are dropped
407         #  this is Case B. So VPP will consider this to be a high seq num wrap
408         #  and so the decrypt attempt will fail
409         #
410         pkt = (Ether(src=self.tra_if.remote_mac,
411                      dst=self.tra_if.local_mac) /
412                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
413                                          dst=self.tra_if.local_ip4) /
414                                       ICMP(),
415                                       seq_num=17))
416         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
417
418         if use_esn:
419             # an out of window error with ESN looks like a high sequence
420             # wrap. but since it isn't then the verify will fail.
421             hash_failed_count += 17
422             self.assert_error_counter_equal(hash_failed_node_name,
423                                             hash_failed_count)
424
425         else:
426             replay_count += 17
427             self.assert_error_counter_equal(replay_node_name,
428                                             replay_count)
429
430         # valid packet moves the window over to 258
431         pkt = (Ether(src=self.tra_if.remote_mac,
432                      dst=self.tra_if.local_mac) /
433                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
434                                          dst=self.tra_if.local_ip4) /
435                                       ICMP(),
436                                       seq_num=258))
437         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
438         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
439
440         #
441         # move VPP's SA TX seq-num to just before the seq-number wrap.
442         # then fire in a packet that VPP should drop on TX because it
443         # causes the TX seq number to wrap; unless we're using extened sequence
444         # numbers.
445         #
446         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
447         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
448         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
449
450         pkts = [(Ether(src=self.tra_if.remote_mac,
451                        dst=self.tra_if.local_mac) /
452                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
453                                            dst=self.tra_if.local_ip4) /
454                                         ICMP(),
455                                         seq_num=seq))
456                 for seq in range(259, 280)]
457
458         if use_esn:
459             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
460
461             #
462             # in order for scapy to decrypt its SA's high order number needs
463             # to wrap
464             #
465             p.vpp_tra_sa.seq_num = 0x100000000
466             for rx in rxs:
467                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
468
469             #
470             # wrap scapy's TX high sequence number. VPP is in case B, so it
471             # will consider this a high seq wrap also.
472             # The low seq num we set it to will place VPP's RX window in Case A
473             #
474             p.scapy_tra_sa.seq_num = 0x100000005
475             pkt = (Ether(src=self.tra_if.remote_mac,
476                          dst=self.tra_if.local_mac) /
477                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
478                                              dst=self.tra_if.local_ip4) /
479                                           ICMP(),
480                                           seq_num=0x100000005))
481             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
482             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
483
484             #
485             # A packet that has seq num between (2^32-64) and 5 is within
486             # the window
487             #
488             p.scapy_tra_sa.seq_num = 0xfffffffd
489             pkt = (Ether(src=self.tra_if.remote_mac,
490                          dst=self.tra_if.local_mac) /
491                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
492                                              dst=self.tra_if.local_ip4) /
493                                           ICMP(),
494                                           seq_num=0xfffffffd))
495             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
496             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
497
498             #
499             # While in case A we cannot wrap the high sequence number again
500             # becuase VPP will consider this packet to be one that moves the
501             # window forward
502             #
503             pkt = (Ether(src=self.tra_if.remote_mac,
504                          dst=self.tra_if.local_mac) /
505                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
506                                              dst=self.tra_if.local_ip4) /
507                                           ICMP(),
508                                           seq_num=0x200000999))
509             self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
510
511             hash_failed_count += 1
512             self.assert_error_counter_equal(hash_failed_node_name,
513                                             hash_failed_count)
514
515             #
516             # but if we move the wondow forward to case B, then we can wrap
517             # again
518             #
519             p.scapy_tra_sa.seq_num = 0x100000555
520             pkt = (Ether(src=self.tra_if.remote_mac,
521                          dst=self.tra_if.local_mac) /
522                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
523                                              dst=self.tra_if.local_ip4) /
524                                           ICMP(),
525                                           seq_num=0x100000555))
526             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
527             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
528
529             p.scapy_tra_sa.seq_num = 0x200000444
530             pkt = (Ether(src=self.tra_if.remote_mac,
531                          dst=self.tra_if.local_mac) /
532                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
533                                              dst=self.tra_if.local_ip4) /
534                                           ICMP(),
535                                           seq_num=0x200000444))
536             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
537             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
538
539         else:
540             #
541             # without ESN TX sequence numbers can't wrap and packets are
542             # dropped from here on out.
543             #
544             self.send_and_assert_no_replies(self.tra_if, pkts)
545             seq_cycle_count += len(pkts)
546             self.assert_error_counter_equal(seq_cycle_node_name,
547                                             seq_cycle_count)
548
549         # move the security-associations seq number on to the last we used
550         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
551         p.scapy_tra_sa.seq_num = 351
552         p.vpp_tra_sa.seq_num = 351
553
554     def verify_tra_basic4(self, count=1):
555         """ ipsec v4 transport basic test """
556         self.vapi.cli("clear errors")
557         self.vapi.cli("clear ipsec sa")
558         try:
559             p = self.params[socket.AF_INET]
560             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
561                                               src=self.tra_if.remote_ip4,
562                                               dst=self.tra_if.local_ip4,
563                                               count=count)
564             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
565                                              self.tra_if)
566             for rx in recv_pkts:
567                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
568                 self.assert_packet_checksums_valid(rx)
569                 try:
570                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
571                     self.assert_packet_checksums_valid(decrypted)
572                 except:
573                     self.logger.debug(ppp("Unexpected packet:", rx))
574                     raise
575         finally:
576             self.logger.info(self.vapi.ppcli("show error"))
577             self.logger.info(self.vapi.ppcli("show ipsec all"))
578
579         pkts = p.tra_sa_in.get_stats()['packets']
580         self.assertEqual(pkts, count,
581                          "incorrect SA in counts: expected %d != %d" %
582                          (count, pkts))
583         pkts = p.tra_sa_out.get_stats()['packets']
584         self.assertEqual(pkts, count,
585                          "incorrect SA out counts: expected %d != %d" %
586                          (count, pkts))
587
588         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
589         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
590
591
592 class IpsecTra4Tests(IpsecTra4):
593     """ UT test methods for Transport v4 """
594     def test_tra_anti_replay(self):
595         """ ipsec v4 transport anti-reply test """
596         self.verify_tra_anti_replay()
597
598     def test_tra_basic(self, count=1):
599         """ ipsec v4 transport basic test """
600         self.verify_tra_basic4(count=1)
601
602     def test_tra_burst(self):
603         """ ipsec v4 transport burst test """
604         self.verify_tra_basic4(count=257)
605
606
607 class IpsecTra6(object):
608     """ verify methods for Transport v6 """
609     def verify_tra_basic6(self, count=1):
610         self.vapi.cli("clear errors")
611         try:
612             p = self.params[socket.AF_INET6]
613             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
614                                                src=self.tra_if.remote_ip6,
615                                                dst=self.tra_if.local_ip6,
616                                                count=count)
617             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
618                                              self.tra_if)
619             for rx in recv_pkts:
620                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
621                                  rx[IPv6].plen)
622                 try:
623                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
624                     self.assert_packet_checksums_valid(decrypted)
625                 except:
626                     self.logger.debug(ppp("Unexpected packet:", rx))
627                     raise
628         finally:
629             self.logger.info(self.vapi.ppcli("show error"))
630             self.logger.info(self.vapi.ppcli("show ipsec all"))
631
632         pkts = p.tra_sa_in.get_stats()['packets']
633         self.assertEqual(pkts, count,
634                          "incorrect SA in counts: expected %d != %d" %
635                          (count, pkts))
636         pkts = p.tra_sa_out.get_stats()['packets']
637         self.assertEqual(pkts, count,
638                          "incorrect SA out counts: expected %d != %d" %
639                          (count, pkts))
640         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
641         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
642
643
644 class IpsecTra6Tests(IpsecTra6):
645     """ UT test methods for Transport v6 """
646     def test_tra_basic6(self):
647         """ ipsec v6 transport basic test """
648         self.verify_tra_basic6(count=1)
649
650     def test_tra_burst6(self):
651         """ ipsec v6 transport burst test """
652         self.verify_tra_basic6(count=257)
653
654
655 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
656     """ UT test methods for Transport v6 and v4"""
657     pass
658
659
660 class IpsecTun4(object):
661     """ verify methods for Tunnel v4 """
662     def verify_counters4(self, p, count, n_frags=None):
663         if not n_frags:
664             n_frags = count
665         if (hasattr(p, "spd_policy_in_any")):
666             pkts = p.spd_policy_in_any.get_stats()['packets']
667             self.assertEqual(pkts, count,
668                              "incorrect SPD any policy: expected %d != %d" %
669                              (count, pkts))
670
671         if (hasattr(p, "tun_sa_in")):
672             pkts = p.tun_sa_in.get_stats()['packets']
673             self.assertEqual(pkts, count,
674                              "incorrect SA in counts: expected %d != %d" %
675                              (count, pkts))
676             pkts = p.tun_sa_out.get_stats()['packets']
677             self.assertEqual(pkts, count,
678                              "incorrect SA out counts: expected %d != %d" %
679                              (count, pkts))
680
681         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
682         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
683
684     def verify_decrypted(self, p, rxs):
685         for rx in rxs:
686             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
687             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
688             self.assert_packet_checksums_valid(rx)
689
690     def verify_encrypted(self, p, sa, rxs):
691         decrypt_pkts = []
692         for rx in rxs:
693             if p.nat_header:
694                 self.assertEqual(rx[UDP].dport, 4500)
695             self.assert_packet_checksums_valid(rx)
696             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
697             try:
698                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
699                 if not decrypt_pkt.haslayer(IP):
700                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
701                 decrypt_pkts.append(decrypt_pkt)
702                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
703                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
704             except:
705                 self.logger.debug(ppp("Unexpected packet:", rx))
706                 try:
707                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
708                 except:
709                     pass
710                 raise
711         pkts = reassemble4(decrypt_pkts)
712         for pkt in pkts:
713             self.assert_packet_checksums_valid(pkt)
714
715     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
716         self.vapi.cli("clear errors")
717         if not n_rx:
718             n_rx = count
719         try:
720             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
721                                               src=p.remote_tun_if_host,
722                                               dst=self.pg1.remote_ip4,
723                                               count=count)
724             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
725             self.verify_decrypted(p, recv_pkts)
726
727             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
728                                       dst=p.remote_tun_if_host, count=count,
729                                       payload_size=payload_size)
730             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
731                                              self.tun_if, n_rx)
732             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
733
734         finally:
735             self.logger.info(self.vapi.ppcli("show error"))
736             self.logger.info(self.vapi.ppcli("show ipsec all"))
737
738         self.verify_counters4(p, count, n_rx)
739
740     def verify_tun_reass_44(self, p):
741         self.vapi.cli("clear errors")
742         self.vapi.ip_reassembly_enable_disable(
743             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
744
745         try:
746             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
747                                               src=p.remote_tun_if_host,
748                                               dst=self.pg1.remote_ip4,
749                                               payload_size=1900,
750                                               count=1)
751             send_pkts = fragment_rfc791(send_pkts[0], 1400)
752             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
753                                              self.pg1, n_rx=1)
754             self.verify_decrypted(p, recv_pkts)
755
756             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
757                                       dst=p.remote_tun_if_host, count=1)
758             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
759                                              self.tun_if)
760             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
761
762         finally:
763             self.logger.info(self.vapi.ppcli("show error"))
764             self.logger.info(self.vapi.ppcli("show ipsec all"))
765
766         self.verify_counters4(p, 1, 1)
767         self.vapi.ip_reassembly_enable_disable(
768             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
769
770     def verify_tun_64(self, p, count=1):
771         self.vapi.cli("clear errors")
772         try:
773             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
774                                                src=p.remote_tun_if_host6,
775                                                dst=self.pg1.remote_ip6,
776                                                count=count)
777             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
778             for recv_pkt in recv_pkts:
779                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
780                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
781                 self.assert_packet_checksums_valid(recv_pkt)
782             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
783                                        dst=p.remote_tun_if_host6, count=count)
784             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
785             for recv_pkt in recv_pkts:
786                 try:
787                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
788                     if not decrypt_pkt.haslayer(IPv6):
789                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
790                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
791                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
792                     self.assert_packet_checksums_valid(decrypt_pkt)
793                 except:
794                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
795                     try:
796                         self.logger.debug(
797                             ppp("Decrypted packet:", decrypt_pkt))
798                     except:
799                         pass
800                     raise
801         finally:
802             self.logger.info(self.vapi.ppcli("show error"))
803             self.logger.info(self.vapi.ppcli("show ipsec all"))
804
805         self.verify_counters4(p, count)
806
807     def verify_keepalive(self, p):
808         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
809                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
810                UDP(sport=333, dport=4500) /
811                Raw(b'\xff'))
812         self.send_and_assert_no_replies(self.tun_if, pkt*31)
813         self.assert_error_counter_equal(
814             '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
815
816         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
817                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
818                UDP(sport=333, dport=4500) /
819                Raw(b'\xfe'))
820         self.send_and_assert_no_replies(self.tun_if, pkt*31)
821         self.assert_error_counter_equal(
822             '/err/%s/Too Short' % self.tun4_input_node, 31)
823
824
825 class IpsecTun4Tests(IpsecTun4):
826     """ UT test methods for Tunnel v4 """
827     def test_tun_basic44(self):
828         """ ipsec 4o4 tunnel basic test """
829         self.verify_tun_44(self.params[socket.AF_INET], count=1)
830
831     def test_tun_reass_basic44(self):
832         """ ipsec 4o4 tunnel basic reassembly test """
833         self.verify_tun_reass_44(self.params[socket.AF_INET])
834
835     def test_tun_burst44(self):
836         """ ipsec 4o4 tunnel burst test """
837         self.verify_tun_44(self.params[socket.AF_INET], count=257)
838
839
840 class IpsecTun6(object):
841     """ verify methods for Tunnel v6 """
842     def verify_counters6(self, p_in, p_out, count):
843         if (hasattr(p_in, "tun_sa_in")):
844             pkts = p_in.tun_sa_in.get_stats()['packets']
845             self.assertEqual(pkts, count,
846                              "incorrect SA in counts: expected %d != %d" %
847                              (count, pkts))
848         if (hasattr(p_out, "tun_sa_out")):
849             pkts = p_out.tun_sa_out.get_stats()['packets']
850             self.assertEqual(pkts, count,
851                              "incorrect SA out counts: expected %d != %d" %
852                              (count, pkts))
853         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
854         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
855
856     def verify_decrypted6(self, p, rxs):
857         for rx in rxs:
858             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
859             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
860             self.assert_packet_checksums_valid(rx)
861
862     def verify_encrypted6(self, p, sa, rxs):
863         for rx in rxs:
864             self.assert_packet_checksums_valid(rx)
865             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
866                              rx[IPv6].plen)
867             try:
868                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
869                 if not decrypt_pkt.haslayer(IPv6):
870                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
871                 self.assert_packet_checksums_valid(decrypt_pkt)
872                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
873                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
874             except:
875                 self.logger.debug(ppp("Unexpected packet:", rx))
876                 try:
877                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
878                 except:
879                     pass
880                 raise
881
882     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
883         self.vapi.cli("clear errors")
884         self.vapi.cli("clear ipsec sa")
885
886         send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
887                                            src=p_in.remote_tun_if_host,
888                                            dst=self.pg1.remote_ip6,
889                                            count=count)
890         self.send_and_assert_no_replies(self.tun_if, send_pkts)
891         self.logger.info(self.vapi.cli("sh punt stats"))
892
893     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
894         self.vapi.cli("clear errors")
895         self.vapi.cli("clear ipsec sa")
896         if not p_out:
897             p_out = p_in
898         try:
899             send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
900                                                src=p_in.remote_tun_if_host,
901                                                dst=self.pg1.remote_ip6,
902                                                count=count)
903             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
904             self.verify_decrypted6(p_in, recv_pkts)
905
906             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
907                                        dst=p_out.remote_tun_if_host,
908                                        count=count,
909                                        payload_size=payload_size)
910             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
911                                              self.tun_if)
912             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
913
914         finally:
915             self.logger.info(self.vapi.ppcli("show error"))
916             self.logger.info(self.vapi.ppcli("show ipsec all"))
917         self.verify_counters6(p_in, p_out, count)
918
919     def verify_tun_reass_66(self, p):
920         self.vapi.cli("clear errors")
921         self.vapi.ip_reassembly_enable_disable(
922             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
923
924         try:
925             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
926                                                src=p.remote_tun_if_host,
927                                                dst=self.pg1.remote_ip6,
928                                                count=1,
929                                                payload_size=1900)
930             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
931             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
932                                              self.pg1, n_rx=1)
933             self.verify_decrypted6(p, recv_pkts)
934
935             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
936                                        dst=p.remote_tun_if_host,
937                                        count=1,
938                                        payload_size=64)
939             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
940                                              self.tun_if)
941             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
942         finally:
943             self.logger.info(self.vapi.ppcli("show error"))
944             self.logger.info(self.vapi.ppcli("show ipsec all"))
945         self.verify_counters6(p, p, 1)
946         self.vapi.ip_reassembly_enable_disable(
947             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
948
949     def verify_tun_46(self, p, count=1):
950         """ ipsec 4o6 tunnel basic test """
951         self.vapi.cli("clear errors")
952         try:
953             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
954                                               src=p.remote_tun_if_host4,
955                                               dst=self.pg1.remote_ip4,
956                                               count=count)
957             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
958             for recv_pkt in recv_pkts:
959                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
960                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
961                 self.assert_packet_checksums_valid(recv_pkt)
962             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
963                                       dst=p.remote_tun_if_host4,
964                                       count=count)
965             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
966             for recv_pkt in recv_pkts:
967                 try:
968                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
969                     if not decrypt_pkt.haslayer(IP):
970                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
971                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
972                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
973                     self.assert_packet_checksums_valid(decrypt_pkt)
974                 except:
975                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
976                     try:
977                         self.logger.debug(ppp("Decrypted packet:",
978                                               decrypt_pkt))
979                     except:
980                         pass
981                     raise
982         finally:
983             self.logger.info(self.vapi.ppcli("show error"))
984             self.logger.info(self.vapi.ppcli("show ipsec all"))
985         self.verify_counters6(p, p, count)
986
987
988 class IpsecTun6Tests(IpsecTun6):
989     """ UT test methods for Tunnel v6 """
990
991     def test_tun_basic66(self):
992         """ ipsec 6o6 tunnel basic test """
993         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
994
995     def test_tun_reass_basic66(self):
996         """ ipsec 6o6 tunnel basic reassembly test """
997         self.verify_tun_reass_66(self.params[socket.AF_INET6])
998
999     def test_tun_burst66(self):
1000         """ ipsec 6o6 tunnel burst test """
1001         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1002
1003
1004 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1005     """ UT test methods for Tunnel v6 & v4 """
1006     pass
1007
1008
1009 if __name__ == '__main__':
1010     unittest.main(testRunner=VppTestRunner)