ipsec: Redo the anit-replay check post decrypt
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether, Raw
8 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
9
10 from framework import VppTestCase, VppTestRunner
11 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
12 from vpp_papi import VppEnum
13
14
15 class IPsecIPv4Params(object):
16
17     addr_type = socket.AF_INET
18     addr_any = "0.0.0.0"
19     addr_bcast = "255.255.255.255"
20     addr_len = 32
21     is_ipv6 = 0
22
23     def __init__(self):
24         self.remote_tun_if_host = '1.1.1.1'
25         self.remote_tun_if_host6 = '1111::1'
26
27         self.scapy_tun_sa_id = 10
28         self.scapy_tun_spi = 1001
29         self.vpp_tun_sa_id = 20
30         self.vpp_tun_spi = 1000
31
32         self.scapy_tra_sa_id = 30
33         self.scapy_tra_spi = 2001
34         self.vpp_tra_sa_id = 40
35         self.vpp_tra_spi = 2000
36
37         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
38                                  IPSEC_API_INTEG_ALG_SHA1_96)
39         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
40         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
41
42         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
43                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
44         self.crypt_algo = 'AES-CBC'  # scapy name
45         self.crypt_key = 'JPjyOWBeVEQiMe7h'
46         self.salt = 0
47         self.flags = 0
48         self.nat_header = None
49
50
51 class IPsecIPv6Params(object):
52
53     addr_type = socket.AF_INET6
54     addr_any = "0::0"
55     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
56     addr_len = 128
57     is_ipv6 = 1
58
59     def __init__(self):
60         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
61         self.remote_tun_if_host4 = '1.1.1.1'
62
63         self.scapy_tun_sa_id = 50
64         self.scapy_tun_spi = 3001
65         self.vpp_tun_sa_id = 60
66         self.vpp_tun_spi = 3000
67
68         self.scapy_tra_sa_id = 70
69         self.scapy_tra_spi = 4001
70         self.vpp_tra_sa_id = 80
71         self.vpp_tra_spi = 4000
72
73         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
74                                  IPSEC_API_INTEG_ALG_SHA1_96)
75         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
76         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
77
78         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
79                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
80         self.crypt_algo = 'AES-CBC'  # scapy name
81         self.crypt_key = 'JPjyOWBeVEQiMe7h'
82         self.salt = 0
83         self.flags = 0
84         self.nat_header = None
85
86
87 def mk_scapy_crpyt_key(p):
88     if p.crypt_algo == "AES-GCM":
89         return p.crypt_key + struct.pack("!I", p.salt)
90     else:
91         return p.crypt_key
92
93
94 def config_tun_params(p, encryption_type, tun_if):
95     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
96     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
97                               IPSEC_API_SAD_FLAG_USE_ESN))
98     crypt_key = mk_scapy_crpyt_key(p)
99     p.scapy_tun_sa = SecurityAssociation(
100         encryption_type, spi=p.vpp_tun_spi,
101         crypt_algo=p.crypt_algo,
102         crypt_key=crypt_key,
103         auth_algo=p.auth_algo, auth_key=p.auth_key,
104         tunnel_header=ip_class_by_addr_type[p.addr_type](
105             src=tun_if.remote_addr[p.addr_type],
106             dst=tun_if.local_addr[p.addr_type]),
107         nat_t_header=p.nat_header,
108         use_esn=use_esn)
109     p.vpp_tun_sa = SecurityAssociation(
110         encryption_type, spi=p.scapy_tun_spi,
111         crypt_algo=p.crypt_algo,
112         crypt_key=crypt_key,
113         auth_algo=p.auth_algo, auth_key=p.auth_key,
114         tunnel_header=ip_class_by_addr_type[p.addr_type](
115             dst=tun_if.remote_addr[p.addr_type],
116             src=tun_if.local_addr[p.addr_type]),
117         nat_t_header=p.nat_header,
118         use_esn=use_esn)
119
120
121 def config_tra_params(p, encryption_type):
122     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
123                               IPSEC_API_SAD_FLAG_USE_ESN))
124     crypt_key = mk_scapy_crpyt_key(p)
125     p.scapy_tra_sa = SecurityAssociation(
126         encryption_type,
127         spi=p.vpp_tra_spi,
128         crypt_algo=p.crypt_algo,
129         crypt_key=crypt_key,
130         auth_algo=p.auth_algo,
131         auth_key=p.auth_key,
132         nat_t_header=p.nat_header,
133         use_esn=use_esn)
134     p.vpp_tra_sa = SecurityAssociation(
135         encryption_type,
136         spi=p.scapy_tra_spi,
137         crypt_algo=p.crypt_algo,
138         crypt_key=crypt_key,
139         auth_algo=p.auth_algo,
140         auth_key=p.auth_key,
141         nat_t_header=p.nat_header,
142         use_esn=use_esn)
143
144
145 class TemplateIpsec(VppTestCase):
146     """
147     TRANSPORT MODE:
148
149      ------   encrypt   ---
150     |tra_if| <-------> |VPP|
151      ------   decrypt   ---
152
153     TUNNEL MODE:
154
155      ------   encrypt   ---   plain   ---
156     |tun_if| <-------  |VPP| <------ |pg1|
157      ------             ---           ---
158
159      ------   decrypt   ---   plain   ---
160     |tun_if| ------->  |VPP| ------> |pg1|
161      ------             ---           ---
162     """
163     tun_spd_id = 1
164     tra_spd_id = 2
165
166     def ipsec_select_backend(self):
167         """ empty method to be overloaded when necessary """
168         pass
169
170     @classmethod
171     def setUpClass(cls):
172         super(TemplateIpsec, cls).setUpClass()
173
174     @classmethod
175     def tearDownClass(cls):
176         super(TemplateIpsec, cls).tearDownClass()
177
178     def setup_params(self):
179         self.ipv4_params = IPsecIPv4Params()
180         self.ipv6_params = IPsecIPv6Params()
181         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
182                        self.ipv6_params.addr_type: self.ipv6_params}
183
184     def config_interfaces(self):
185         self.create_pg_interfaces(range(3))
186         self.interfaces = list(self.pg_interfaces)
187         for i in self.interfaces:
188             i.admin_up()
189             i.config_ip4()
190             i.resolve_arp()
191             i.config_ip6()
192             i.resolve_ndp()
193
194     def setUp(self):
195         super(TemplateIpsec, self).setUp()
196
197         self.setup_params()
198
199         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
200                                  IPSEC_API_PROTO_ESP)
201         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
202                                 IPSEC_API_PROTO_AH)
203
204         self.config_interfaces()
205
206         self.ipsec_select_backend()
207
208     def unconfig_interfaces(self):
209         for i in self.interfaces:
210             i.admin_down()
211             i.unconfig_ip4()
212             i.unconfig_ip6()
213
214     def tearDown(self):
215         super(TemplateIpsec, self).tearDown()
216
217         self.unconfig_interfaces()
218
219     def show_commands_at_teardown(self):
220         self.logger.info(self.vapi.cli("show hardware"))
221
222     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
223                          payload_size=54):
224         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
225                 sa.encrypt(IP(src=src, dst=dst) /
226                            ICMP() / Raw('X' * payload_size))
227                 for i in range(count)]
228
229     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
230                           payload_size=54):
231         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
232                 sa.encrypt(IPv6(src=src, dst=dst) /
233                            ICMPv6EchoRequest(id=0, seq=1,
234                                              data='X' * payload_size))
235                 for i in range(count)]
236
237     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
238         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
239                 IP(src=src, dst=dst) / ICMP() / Raw('X' * payload_size)
240                 for i in range(count)]
241
242     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
243         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
244                 IPv6(src=src, dst=dst) /
245                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
246                 for i in range(count)]
247
248
249 class IpsecTcp(object):
250     def verify_tcp_checksum(self):
251         self.vapi.cli("test http server")
252         p = self.params[socket.AF_INET]
253         config_tun_params(p, self.encryption_type, self.tun_if)
254         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
255                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
256                                           dst=self.tun_if.local_ip4) /
257                                        TCP(flags='S', dport=80)))
258         self.logger.debug(ppp("Sending packet:", send))
259         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
260         recv = recv[0]
261         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
262         self.assert_packet_checksums_valid(decrypted)
263
264
265 class IpsecTcpTests(IpsecTcp):
266     def test_tcp_checksum(self):
267         """ verify checksum correctness for vpp generated packets """
268         self.verify_tcp_checksum()
269
270
271 class IpsecTra4(object):
272     """ verify methods for Transport v4 """
273     def verify_tra_anti_replay(self):
274         p = self.params[socket.AF_INET]
275         use_esn = p.vpp_tra_sa.use_esn
276
277         seq_cycle_node_name = ('/err/%s/sequence number cycled' %
278                                self.tra4_encrypt_node_name)
279         replay_node_name = ('/err/%s/SA replayed packet' %
280                             self.tra4_decrypt_node_name)
281         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
282             hash_failed_node_name = ('/err/%s/ESP decryption failed' %
283                                      self.tra4_decrypt_node_name)
284         else:
285             hash_failed_node_name = ('/err/%s/Integrity check failed' %
286                                      self.tra4_decrypt_node_name)
287         replay_count = self.statistics.get_err_counter(replay_node_name)
288         hash_failed_count = self.statistics.get_err_counter(
289             hash_failed_node_name)
290         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
291
292         if ESP == self.encryption_type:
293             undersize_node_name = ('/err/%s/undersized packet' %
294                                    self.tra4_decrypt_node_name)
295             undersize_count = self.statistics.get_err_counter(
296                 undersize_node_name)
297
298         #
299         # send packets with seq numbers 1->34
300         # this means the window size is still in Case B (see RFC4303
301         # Appendix A)
302         #
303         # for reasons i haven't investigated Scapy won't create a packet with
304         # seq_num=0
305         #
306         pkts = [(Ether(src=self.tra_if.remote_mac,
307                        dst=self.tra_if.local_mac) /
308                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
309                                            dst=self.tra_if.local_ip4) /
310                                         ICMP(),
311                                         seq_num=seq))
312                 for seq in range(1, 34)]
313         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
314
315         # replayed packets are dropped
316         self.send_and_assert_no_replies(self.tra_if, pkts)
317         replay_count += len(pkts)
318         self.assert_error_counter_equal(replay_node_name, replay_count)
319
320         #
321         # now send a batch of packets all with the same sequence number
322         # the first packet in the batch is legitimate, the rest bogus
323         #
324         pkts = (Ether(src=self.tra_if.remote_mac,
325                       dst=self.tra_if.local_mac) /
326                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
327                                           dst=self.tra_if.local_ip4) /
328                                        ICMP(),
329                                        seq_num=35))
330         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
331                                          self.tra_if, n_rx=1)
332         replay_count += 7
333         self.assert_error_counter_equal(replay_node_name, replay_count)
334
335         #
336         # now move the window over to 257 (more than one byte) and into Case A
337         #
338         pkt = (Ether(src=self.tra_if.remote_mac,
339                      dst=self.tra_if.local_mac) /
340                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
341                                          dst=self.tra_if.local_ip4) /
342                                       ICMP(),
343                                       seq_num=257))
344         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
345
346         # replayed packets are dropped
347         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
348         replay_count += 3
349         self.assert_error_counter_equal(replay_node_name, replay_count)
350
351         # the window size is 64 packets
352         # in window are still accepted
353         pkt = (Ether(src=self.tra_if.remote_mac,
354                      dst=self.tra_if.local_mac) /
355                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
356                                          dst=self.tra_if.local_ip4) /
357                                       ICMP(),
358                                       seq_num=200))
359         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
360
361         # a packet that does not decrypt does not move the window forward
362         bogus_sa = SecurityAssociation(self.encryption_type,
363                                        p.vpp_tra_spi,
364                                        crypt_algo=p.crypt_algo,
365                                        crypt_key=mk_scapy_crpyt_key(p)[::-1],
366                                        auth_algo=p.auth_algo,
367                                        auth_key=p.auth_key[::-1])
368         pkt = (Ether(src=self.tra_if.remote_mac,
369                      dst=self.tra_if.local_mac) /
370                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
371                                    dst=self.tra_if.local_ip4) /
372                                 ICMP(),
373                                 seq_num=350))
374         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
375
376         hash_failed_count += 17
377         self.assert_error_counter_equal(hash_failed_node_name,
378                                         hash_failed_count)
379
380         # a malformed 'runt' packet
381         #  created by a mis-constructed SA
382         if (ESP == self.encryption_type):
383             bogus_sa = SecurityAssociation(self.encryption_type,
384                                            p.vpp_tra_spi)
385             pkt = (Ether(src=self.tra_if.remote_mac,
386                          dst=self.tra_if.local_mac) /
387                    bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
388                                        dst=self.tra_if.local_ip4) /
389                                     ICMP(),
390                                     seq_num=350))
391             self.send_and_assert_no_replies(self.tra_if, pkt * 17)
392
393             undersize_count += 17
394             self.assert_error_counter_equal(undersize_node_name,
395                                             undersize_count)
396
397         # which we can determine since this packet is still in the window
398         pkt = (Ether(src=self.tra_if.remote_mac,
399                      dst=self.tra_if.local_mac) /
400                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
401                                          dst=self.tra_if.local_ip4) /
402                                       ICMP(),
403                                       seq_num=234))
404         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
405
406         #
407         # out of window are dropped
408         #  this is Case B. So VPP will consider this to be a high seq num wrap
409         #  and so the decrypt attempt will fail
410         #
411         pkt = (Ether(src=self.tra_if.remote_mac,
412                      dst=self.tra_if.local_mac) /
413                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
414                                          dst=self.tra_if.local_ip4) /
415                                       ICMP(),
416                                       seq_num=17))
417         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
418
419         if use_esn:
420             # an out of window error with ESN looks like a high sequence
421             # wrap. but since it isn't then the verify will fail.
422             hash_failed_count += 17
423             self.assert_error_counter_equal(hash_failed_node_name,
424                                             hash_failed_count)
425
426         else:
427             replay_count += 17
428             self.assert_error_counter_equal(replay_node_name,
429                                             replay_count)
430
431         # valid packet moves the window over to 258
432         pkt = (Ether(src=self.tra_if.remote_mac,
433                      dst=self.tra_if.local_mac) /
434                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
435                                          dst=self.tra_if.local_ip4) /
436                                       ICMP(),
437                                       seq_num=258))
438         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
439         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
440
441         #
442         # move VPP's SA TX seq-num to just before the seq-number wrap.
443         # then fire in a packet that VPP should drop on TX because it
444         # causes the TX seq number to wrap; unless we're using extened sequence
445         # numbers.
446         #
447         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
448         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
449         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
450
451         pkts = [(Ether(src=self.tra_if.remote_mac,
452                        dst=self.tra_if.local_mac) /
453                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
454                                            dst=self.tra_if.local_ip4) /
455                                         ICMP(),
456                                         seq_num=seq))
457                 for seq in range(259, 280)]
458
459         if use_esn:
460             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
461
462             #
463             # in order for scapy to decrypt its SA's high order number needs
464             # to wrap
465             #
466             p.vpp_tra_sa.seq_num = 0x100000000
467             for rx in rxs:
468                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
469
470             #
471             # wrap scapy's TX high sequence number. VPP is in case B, so it
472             # will consider this a high seq wrap also.
473             # The low seq num we set it to will place VPP's RX window in Case A
474             #
475             p.scapy_tra_sa.seq_num = 0x100000005
476             pkt = (Ether(src=self.tra_if.remote_mac,
477                          dst=self.tra_if.local_mac) /
478                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
479                                              dst=self.tra_if.local_ip4) /
480                                           ICMP(),
481                                           seq_num=0x100000005))
482             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
483             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
484
485             #
486             # A packet that has seq num between (2^32-64) and 5 is within
487             # the window
488             #
489             p.scapy_tra_sa.seq_num = 0xfffffffd
490             pkt = (Ether(src=self.tra_if.remote_mac,
491                          dst=self.tra_if.local_mac) /
492                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
493                                              dst=self.tra_if.local_ip4) /
494                                           ICMP(),
495                                           seq_num=0xfffffffd))
496             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
497             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
498
499             #
500             # While in case A we cannot wrap the high sequence number again
501             # becuase VPP will consider this packet to be one that moves the
502             # window forward
503             #
504             pkt = (Ether(src=self.tra_if.remote_mac,
505                          dst=self.tra_if.local_mac) /
506                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
507                                              dst=self.tra_if.local_ip4) /
508                                           ICMP(),
509                                           seq_num=0x200000999))
510             self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
511
512             hash_failed_count += 1
513             self.assert_error_counter_equal(hash_failed_node_name,
514                                             hash_failed_count)
515
516             #
517             # but if we move the wondow forward to case B, then we can wrap
518             # again
519             #
520             p.scapy_tra_sa.seq_num = 0x100000555
521             pkt = (Ether(src=self.tra_if.remote_mac,
522                          dst=self.tra_if.local_mac) /
523                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
524                                              dst=self.tra_if.local_ip4) /
525                                           ICMP(),
526                                           seq_num=0x100000555))
527             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
528             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
529
530             p.scapy_tra_sa.seq_num = 0x200000444
531             pkt = (Ether(src=self.tra_if.remote_mac,
532                          dst=self.tra_if.local_mac) /
533                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
534                                              dst=self.tra_if.local_ip4) /
535                                           ICMP(),
536                                           seq_num=0x200000444))
537             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
538             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
539
540         else:
541             #
542             # without ESN TX sequence numbers can't wrap and packets are
543             # dropped from here on out.
544             #
545             self.send_and_assert_no_replies(self.tra_if, pkts)
546             seq_cycle_count += len(pkts)
547             self.assert_error_counter_equal(seq_cycle_node_name,
548                                             seq_cycle_count)
549
550         # move the security-associations seq number on to the last we used
551         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
552         p.scapy_tra_sa.seq_num = 351
553         p.vpp_tra_sa.seq_num = 351
554
555     def verify_tra_basic4(self, count=1):
556         """ ipsec v4 transport basic test """
557         self.vapi.cli("clear errors")
558         self.vapi.cli("clear ipsec sa")
559         try:
560             p = self.params[socket.AF_INET]
561             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
562                                               src=self.tra_if.remote_ip4,
563                                               dst=self.tra_if.local_ip4,
564                                               count=count)
565             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
566                                              self.tra_if)
567             for rx in recv_pkts:
568                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
569                 self.assert_packet_checksums_valid(rx)
570                 try:
571                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
572                     self.assert_packet_checksums_valid(decrypted)
573                 except:
574                     self.logger.debug(ppp("Unexpected packet:", rx))
575                     raise
576         finally:
577             self.logger.info(self.vapi.ppcli("show error"))
578             self.logger.info(self.vapi.ppcli("show ipsec all"))
579
580         pkts = p.tra_sa_in.get_stats()['packets']
581         self.assertEqual(pkts, count,
582                          "incorrect SA in counts: expected %d != %d" %
583                          (count, pkts))
584         pkts = p.tra_sa_out.get_stats()['packets']
585         self.assertEqual(pkts, count,
586                          "incorrect SA out counts: expected %d != %d" %
587                          (count, pkts))
588
589         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
590         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
591
592
593 class IpsecTra4Tests(IpsecTra4):
594     """ UT test methods for Transport v4 """
595     def test_tra_anti_replay(self):
596         """ ipsec v4 transport anti-reply test """
597         self.verify_tra_anti_replay()
598
599     def test_tra_basic(self, count=1):
600         """ ipsec v4 transport basic test """
601         self.verify_tra_basic4(count=1)
602
603     def test_tra_burst(self):
604         """ ipsec v4 transport burst test """
605         self.verify_tra_basic4(count=257)
606
607
608 class IpsecTra6(object):
609     """ verify methods for Transport v6 """
610     def verify_tra_basic6(self, count=1):
611         self.vapi.cli("clear errors")
612         try:
613             p = self.params[socket.AF_INET6]
614             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
615                                                src=self.tra_if.remote_ip6,
616                                                dst=self.tra_if.local_ip6,
617                                                count=count)
618             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
619                                              self.tra_if)
620             for rx in recv_pkts:
621                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
622                                  rx[IPv6].plen)
623                 try:
624                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
625                     self.assert_packet_checksums_valid(decrypted)
626                 except:
627                     self.logger.debug(ppp("Unexpected packet:", rx))
628                     raise
629         finally:
630             self.logger.info(self.vapi.ppcli("show error"))
631             self.logger.info(self.vapi.ppcli("show ipsec all"))
632
633         pkts = p.tra_sa_in.get_stats()['packets']
634         self.assertEqual(pkts, count,
635                          "incorrect SA in counts: expected %d != %d" %
636                          (count, pkts))
637         pkts = p.tra_sa_out.get_stats()['packets']
638         self.assertEqual(pkts, count,
639                          "incorrect SA out counts: expected %d != %d" %
640                          (count, pkts))
641         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
642         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
643
644
645 class IpsecTra6Tests(IpsecTra6):
646     """ UT test methods for Transport v6 """
647     def test_tra_basic6(self):
648         """ ipsec v6 transport basic test """
649         self.verify_tra_basic6(count=1)
650
651     def test_tra_burst6(self):
652         """ ipsec v6 transport burst test """
653         self.verify_tra_basic6(count=257)
654
655
656 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
657     """ UT test methods for Transport v6 and v4"""
658     pass
659
660
661 class IpsecTun4(object):
662     """ verify methods for Tunnel v4 """
663     def verify_counters4(self, p, count, n_frags=None):
664         if not n_frags:
665             n_frags = count
666         if (hasattr(p, "spd_policy_in_any")):
667             pkts = p.spd_policy_in_any.get_stats()['packets']
668             self.assertEqual(pkts, count,
669                              "incorrect SPD any policy: expected %d != %d" %
670                              (count, pkts))
671
672         if (hasattr(p, "tun_sa_in")):
673             pkts = p.tun_sa_in.get_stats()['packets']
674             self.assertEqual(pkts, count,
675                              "incorrect SA in counts: expected %d != %d" %
676                              (count, pkts))
677             pkts = p.tun_sa_out.get_stats()['packets']
678             self.assertEqual(pkts, count,
679                              "incorrect SA out counts: expected %d != %d" %
680                              (count, pkts))
681
682         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
683         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
684
685     def verify_decrypted(self, p, rxs):
686         for rx in rxs:
687             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
688             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
689             self.assert_packet_checksums_valid(rx)
690
691     def verify_encrypted(self, p, sa, rxs):
692         decrypt_pkts = []
693         for rx in rxs:
694             if p.nat_header:
695                 self.assertEqual(rx[UDP].dport, 4500)
696             self.assert_packet_checksums_valid(rx)
697             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
698             try:
699                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
700                 if not decrypt_pkt.haslayer(IP):
701                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
702                 decrypt_pkts.append(decrypt_pkt)
703                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
704                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
705             except:
706                 self.logger.debug(ppp("Unexpected packet:", rx))
707                 try:
708                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
709                 except:
710                     pass
711                 raise
712         pkts = reassemble4(decrypt_pkts)
713         for pkt in pkts:
714             self.assert_packet_checksums_valid(pkt)
715
716     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
717         self.vapi.cli("clear errors")
718         if not n_rx:
719             n_rx = count
720         try:
721             config_tun_params(p, self.encryption_type, self.tun_if)
722             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
723                                               src=p.remote_tun_if_host,
724                                               dst=self.pg1.remote_ip4,
725                                               count=count)
726             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
727             self.verify_decrypted(p, recv_pkts)
728
729             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
730                                       dst=p.remote_tun_if_host, count=count,
731                                       payload_size=payload_size)
732             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
733                                              self.tun_if, n_rx)
734             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
735
736         finally:
737             self.logger.info(self.vapi.ppcli("show error"))
738             self.logger.info(self.vapi.ppcli("show ipsec all"))
739
740         self.verify_counters4(p, count, n_rx)
741
742     def verify_tun_reass_44(self, p):
743         self.vapi.cli("clear errors")
744         self.vapi.ip_reassembly_enable_disable(
745             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
746
747         try:
748             config_tun_params(p, self.encryption_type, self.tun_if)
749             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
750                                               src=p.remote_tun_if_host,
751                                               dst=self.pg1.remote_ip4,
752                                               payload_size=1900,
753                                               count=1)
754             send_pkts = fragment_rfc791(send_pkts[0], 1400)
755             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
756                                              self.pg1, n_rx=1)
757             self.verify_decrypted(p, recv_pkts)
758
759             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
760                                       dst=p.remote_tun_if_host, count=1)
761             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
762                                              self.tun_if)
763             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
764
765         finally:
766             self.logger.info(self.vapi.ppcli("show error"))
767             self.logger.info(self.vapi.ppcli("show ipsec all"))
768
769         self.verify_counters4(p, 1, 1)
770         self.vapi.ip_reassembly_enable_disable(
771             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
772
773     def verify_tun_64(self, p, count=1):
774         self.vapi.cli("clear errors")
775         try:
776             config_tun_params(p, self.encryption_type, self.tun_if)
777             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
778                                                src=p.remote_tun_if_host6,
779                                                dst=self.pg1.remote_ip6,
780                                                count=count)
781             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
782             for recv_pkt in recv_pkts:
783                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
784                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
785                 self.assert_packet_checksums_valid(recv_pkt)
786             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
787                                        dst=p.remote_tun_if_host6, count=count)
788             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
789             for recv_pkt in recv_pkts:
790                 try:
791                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
792                     if not decrypt_pkt.haslayer(IPv6):
793                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
794                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
795                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
796                     self.assert_packet_checksums_valid(decrypt_pkt)
797                 except:
798                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
799                     try:
800                         self.logger.debug(
801                             ppp("Decrypted packet:", decrypt_pkt))
802                     except:
803                         pass
804                     raise
805         finally:
806             self.logger.info(self.vapi.ppcli("show error"))
807             self.logger.info(self.vapi.ppcli("show ipsec all"))
808
809         self.verify_counters4(p, count)
810
811     def verify_keepalive(self, p):
812         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
813                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
814                UDP(sport=333, dport=4500) /
815                Raw(0xff))
816         self.send_and_assert_no_replies(self.tun_if, pkt*31)
817         self.assert_error_counter_equal(
818             '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
819
820         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
821                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
822                UDP(sport=333, dport=4500) /
823                Raw(0xfe))
824         self.send_and_assert_no_replies(self.tun_if, pkt*31)
825         self.assert_error_counter_equal(
826             '/err/%s/Too Short' % self.tun4_input_node, 31)
827
828
829 class IpsecTun4Tests(IpsecTun4):
830     """ UT test methods for Tunnel v4 """
831     def test_tun_basic44(self):
832         """ ipsec 4o4 tunnel basic test """
833         self.verify_tun_44(self.params[socket.AF_INET], count=1)
834
835     def test_tun_reass_basic44(self):
836         """ ipsec 4o4 tunnel basic reassembly test """
837         self.verify_tun_reass_44(self.params[socket.AF_INET])
838
839     def test_tun_burst44(self):
840         """ ipsec 4o4 tunnel burst test """
841         self.verify_tun_44(self.params[socket.AF_INET], count=257)
842
843
844 class IpsecTun6(object):
845     """ verify methods for Tunnel v6 """
846     def verify_counters6(self, p_in, p_out, count):
847         if (hasattr(p_in, "tun_sa_in")):
848             pkts = p_in.tun_sa_in.get_stats()['packets']
849             self.assertEqual(pkts, count,
850                              "incorrect SA in counts: expected %d != %d" %
851                              (count, pkts))
852         if (hasattr(p_out, "tun_sa_out")):
853             pkts = p_out.tun_sa_out.get_stats()['packets']
854             self.assertEqual(pkts, count,
855                              "incorrect SA out counts: expected %d != %d" %
856                              (count, pkts))
857         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
858         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
859
860     def verify_decrypted6(self, p, rxs):
861         for rx in rxs:
862             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
863             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
864             self.assert_packet_checksums_valid(rx)
865
866     def verify_encrypted6(self, p, sa, rxs):
867         for rx in rxs:
868             self.assert_packet_checksums_valid(rx)
869             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
870                              rx[IPv6].plen)
871             try:
872                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
873                 if not decrypt_pkt.haslayer(IPv6):
874                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
875                 self.assert_packet_checksums_valid(decrypt_pkt)
876                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
877                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
878             except:
879                 self.logger.debug(ppp("Unexpected packet:", rx))
880                 try:
881                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
882                 except:
883                     pass
884                 raise
885
886     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
887         self.vapi.cli("clear errors")
888         self.vapi.cli("clear ipsec sa")
889
890         config_tun_params(p_in, self.encryption_type, self.tun_if)
891         send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
892                                            src=p_in.remote_tun_if_host,
893                                            dst=self.pg1.remote_ip6,
894                                            count=count)
895         self.send_and_assert_no_replies(self.tun_if, send_pkts)
896         self.logger.info(self.vapi.cli("sh punt stats"))
897
898     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
899         self.vapi.cli("clear errors")
900         self.vapi.cli("clear ipsec sa")
901         if not p_out:
902             p_out = p_in
903         try:
904             config_tun_params(p_in, self.encryption_type, self.tun_if)
905             config_tun_params(p_out, self.encryption_type, self.tun_if)
906             send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
907                                                src=p_in.remote_tun_if_host,
908                                                dst=self.pg1.remote_ip6,
909                                                count=count)
910             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
911             self.verify_decrypted6(p_in, recv_pkts)
912
913             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
914                                        dst=p_out.remote_tun_if_host,
915                                        count=count,
916                                        payload_size=payload_size)
917             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
918                                              self.tun_if)
919             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
920
921         finally:
922             self.logger.info(self.vapi.ppcli("show error"))
923             self.logger.info(self.vapi.ppcli("show ipsec all"))
924         self.verify_counters6(p_in, p_out, count)
925
926     def verify_tun_reass_66(self, p):
927         self.vapi.cli("clear errors")
928         self.vapi.ip_reassembly_enable_disable(
929             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
930
931         try:
932             config_tun_params(p, self.encryption_type, self.tun_if)
933             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
934                                                src=p.remote_tun_if_host,
935                                                dst=self.pg1.remote_ip6,
936                                                count=1,
937                                                payload_size=1900)
938             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
939             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
940                                              self.pg1, n_rx=1)
941             self.verify_decrypted6(p, recv_pkts)
942
943             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
944                                        dst=p.remote_tun_if_host,
945                                        count=1,
946                                        payload_size=64)
947             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
948                                              self.tun_if)
949             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
950         finally:
951             self.logger.info(self.vapi.ppcli("show error"))
952             self.logger.info(self.vapi.ppcli("show ipsec all"))
953         self.verify_counters6(p, p, 1)
954         self.vapi.ip_reassembly_enable_disable(
955             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
956
957     def verify_tun_46(self, p, count=1):
958         """ ipsec 4o6 tunnel basic test """
959         self.vapi.cli("clear errors")
960         try:
961             config_tun_params(p, self.encryption_type, self.tun_if)
962             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
963                                               src=p.remote_tun_if_host4,
964                                               dst=self.pg1.remote_ip4,
965                                               count=count)
966             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
967             for recv_pkt in recv_pkts:
968                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
969                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
970                 self.assert_packet_checksums_valid(recv_pkt)
971             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
972                                       dst=p.remote_tun_if_host4,
973                                       count=count)
974             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
975             for recv_pkt in recv_pkts:
976                 try:
977                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
978                     if not decrypt_pkt.haslayer(IP):
979                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
980                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
981                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
982                     self.assert_packet_checksums_valid(decrypt_pkt)
983                 except:
984                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
985                     try:
986                         self.logger.debug(ppp("Decrypted packet:",
987                                               decrypt_pkt))
988                     except:
989                         pass
990                     raise
991         finally:
992             self.logger.info(self.vapi.ppcli("show error"))
993             self.logger.info(self.vapi.ppcli("show ipsec all"))
994         self.verify_counters6(p, p, count)
995
996
997 class IpsecTun6Tests(IpsecTun6):
998     """ UT test methods for Tunnel v6 """
999
1000     def test_tun_basic66(self):
1001         """ ipsec 6o6 tunnel basic test """
1002         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1003
1004     def test_tun_reass_basic66(self):
1005         """ ipsec 6o6 tunnel basic reassembly test """
1006         self.verify_tun_reass_66(self.params[socket.AF_INET6])
1007
1008     def test_tun_burst66(self):
1009         """ ipsec 6o6 tunnel burst test """
1010         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1011
1012
1013 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1014     """ UT test methods for Tunnel v6 & v4 """
1015     pass
1016
1017
1018 if __name__ == '__main__':
1019     unittest.main(testRunner=VppTestRunner)