tests: changes for scapy 2.4.3 migration
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
10
11 from framework import VppTestCase, VppTestRunner
12 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
13 from vpp_papi import VppEnum
14
15
16 class IPsecIPv4Params(object):
17
18     addr_type = socket.AF_INET
19     addr_any = "0.0.0.0"
20     addr_bcast = "255.255.255.255"
21     addr_len = 32
22     is_ipv6 = 0
23
24     def __init__(self):
25         self.remote_tun_if_host = '1.1.1.1'
26         self.remote_tun_if_host6 = '1111::1'
27
28         self.scapy_tun_sa_id = 10
29         self.scapy_tun_spi = 1001
30         self.vpp_tun_sa_id = 20
31         self.vpp_tun_spi = 1000
32
33         self.scapy_tra_sa_id = 30
34         self.scapy_tra_spi = 2001
35         self.vpp_tra_sa_id = 40
36         self.vpp_tra_spi = 2000
37
38         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
39                                  IPSEC_API_INTEG_ALG_SHA1_96)
40         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
41         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
42
43         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
44                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
45         self.crypt_algo = 'AES-CBC'  # scapy name
46         self.crypt_key = b'JPjyOWBeVEQiMe7h'
47         self.salt = 0
48         self.flags = 0
49         self.nat_header = None
50
51
52 class IPsecIPv6Params(object):
53
54     addr_type = socket.AF_INET6
55     addr_any = "0::0"
56     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
57     addr_len = 128
58     is_ipv6 = 1
59
60     def __init__(self):
61         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
62         self.remote_tun_if_host4 = '1.1.1.1'
63
64         self.scapy_tun_sa_id = 50
65         self.scapy_tun_spi = 3001
66         self.vpp_tun_sa_id = 60
67         self.vpp_tun_spi = 3000
68
69         self.scapy_tra_sa_id = 70
70         self.scapy_tra_spi = 4001
71         self.vpp_tra_sa_id = 80
72         self.vpp_tra_spi = 4000
73
74         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
75                                  IPSEC_API_INTEG_ALG_SHA1_96)
76         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
77         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
78
79         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
80                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
81         self.crypt_algo = 'AES-CBC'  # scapy name
82         self.crypt_key = b'JPjyOWBeVEQiMe7h'
83         self.salt = 0
84         self.flags = 0
85         self.nat_header = None
86
87
88 def mk_scapy_crypt_key(p):
89     if p.crypt_algo == "AES-GCM":
90         return p.crypt_key + struct.pack("!I", p.salt)
91     else:
92         return p.crypt_key
93
94
95 def config_tun_params(p, encryption_type, tun_if):
96     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
97     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
98                              IPSEC_API_SAD_FLAG_USE_ESN))
99     crypt_key = mk_scapy_crypt_key(p)
100     p.scapy_tun_sa = SecurityAssociation(
101         encryption_type, spi=p.vpp_tun_spi,
102         crypt_algo=p.crypt_algo,
103         crypt_key=crypt_key,
104         auth_algo=p.auth_algo, auth_key=p.auth_key,
105         tunnel_header=ip_class_by_addr_type[p.addr_type](
106             src=tun_if.remote_addr[p.addr_type],
107             dst=tun_if.local_addr[p.addr_type]),
108         nat_t_header=p.nat_header,
109         esn_en=esn_en)
110     p.vpp_tun_sa = SecurityAssociation(
111         encryption_type, spi=p.scapy_tun_spi,
112         crypt_algo=p.crypt_algo,
113         crypt_key=crypt_key,
114         auth_algo=p.auth_algo, auth_key=p.auth_key,
115         tunnel_header=ip_class_by_addr_type[p.addr_type](
116             dst=tun_if.remote_addr[p.addr_type],
117             src=tun_if.local_addr[p.addr_type]),
118         nat_t_header=p.nat_header,
119         esn_en=esn_en)
120
121
122 def config_tra_params(p, encryption_type):
123     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
124                              IPSEC_API_SAD_FLAG_USE_ESN))
125     crypt_key = mk_scapy_crypt_key(p)
126     p.scapy_tra_sa = SecurityAssociation(
127         encryption_type,
128         spi=p.vpp_tra_spi,
129         crypt_algo=p.crypt_algo,
130         crypt_key=crypt_key,
131         auth_algo=p.auth_algo,
132         auth_key=p.auth_key,
133         nat_t_header=p.nat_header,
134         esn_en=esn_en)
135     p.vpp_tra_sa = SecurityAssociation(
136         encryption_type,
137         spi=p.scapy_tra_spi,
138         crypt_algo=p.crypt_algo,
139         crypt_key=crypt_key,
140         auth_algo=p.auth_algo,
141         auth_key=p.auth_key,
142         nat_t_header=p.nat_header,
143         esn_en=esn_en)
144
145
146 class TemplateIpsec(VppTestCase):
147     """
148     TRANSPORT MODE:
149
150      ------   encrypt   ---
151     |tra_if| <-------> |VPP|
152      ------   decrypt   ---
153
154     TUNNEL MODE:
155
156      ------   encrypt   ---   plain   ---
157     |tun_if| <-------  |VPP| <------ |pg1|
158      ------             ---           ---
159
160      ------   decrypt   ---   plain   ---
161     |tun_if| ------->  |VPP| ------> |pg1|
162      ------             ---           ---
163     """
164     tun_spd_id = 1
165     tra_spd_id = 2
166
167     def ipsec_select_backend(self):
168         """ empty method to be overloaded when necessary """
169         pass
170
171     @classmethod
172     def setUpClass(cls):
173         super(TemplateIpsec, cls).setUpClass()
174
175     @classmethod
176     def tearDownClass(cls):
177         super(TemplateIpsec, cls).tearDownClass()
178
179     def setup_params(self):
180         self.ipv4_params = IPsecIPv4Params()
181         self.ipv6_params = IPsecIPv6Params()
182         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
183                        self.ipv6_params.addr_type: self.ipv6_params}
184
185     def config_interfaces(self):
186         self.create_pg_interfaces(range(3))
187         self.interfaces = list(self.pg_interfaces)
188         for i in self.interfaces:
189             i.admin_up()
190             i.config_ip4()
191             i.resolve_arp()
192             i.config_ip6()
193             i.resolve_ndp()
194
195     def setUp(self):
196         super(TemplateIpsec, self).setUp()
197
198         self.setup_params()
199
200         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
201                                  IPSEC_API_PROTO_ESP)
202         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
203                                 IPSEC_API_PROTO_AH)
204
205         self.config_interfaces()
206
207         self.ipsec_select_backend()
208
209     def unconfig_interfaces(self):
210         for i in self.interfaces:
211             i.admin_down()
212             i.unconfig_ip4()
213             i.unconfig_ip6()
214
215     def tearDown(self):
216         super(TemplateIpsec, self).tearDown()
217
218         self.unconfig_interfaces()
219
220     def show_commands_at_teardown(self):
221         self.logger.info(self.vapi.cli("show hardware"))
222
223     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
224                          payload_size=54):
225         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
226                 sa.encrypt(IP(src=src, dst=dst) /
227                            ICMP() / Raw(b'X' * payload_size))
228                 for i in range(count)]
229
230     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
231                           payload_size=54):
232         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
233                 sa.encrypt(IPv6(src=src, dst=dst) /
234                            ICMPv6EchoRequest(id=0, seq=1,
235                                              data='X' * payload_size))
236                 for i in range(count)]
237
238     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
239         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
240                 IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
241                 for i in range(count)]
242
243     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
244         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
245                 IPv6(src=src, dst=dst) /
246                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
247                 for i in range(count)]
248
249
250 class IpsecTcp(object):
251     def verify_tcp_checksum(self):
252         self.vapi.cli("test http server")
253         p = self.params[socket.AF_INET]
254         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
255                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
256                                           dst=self.tun_if.local_ip4) /
257                                        TCP(flags='S', dport=80)))
258         self.logger.debug(ppp("Sending packet:", send))
259         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
260         recv = recv[0]
261         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
262         self.assert_packet_checksums_valid(decrypted)
263
264
265 class IpsecTcpTests(IpsecTcp):
266     def test_tcp_checksum(self):
267         """ verify checksum correctness for vpp generated packets """
268         self.verify_tcp_checksum()
269
270
271 class IpsecTra4(object):
272     """ verify methods for Transport v4 """
273     def verify_tra_anti_replay(self):
274         p = self.params[socket.AF_INET]
275         esn_en = p.vpp_tra_sa.esn_en
276
277         seq_cycle_node_name = ('/err/%s/sequence number cycled' %
278                                self.tra4_encrypt_node_name)
279         replay_node_name = ('/err/%s/SA replayed packet' %
280                             self.tra4_decrypt_node_name)
281         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
282             hash_failed_node_name = ('/err/%s/ESP decryption failed' %
283                                      self.tra4_decrypt_node_name)
284         else:
285             hash_failed_node_name = ('/err/%s/Integrity check failed' %
286                                      self.tra4_decrypt_node_name)
287         replay_count = self.statistics.get_err_counter(replay_node_name)
288         hash_failed_count = self.statistics.get_err_counter(
289             hash_failed_node_name)
290         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
291
292         if ESP == self.encryption_type:
293             undersize_node_name = ('/err/%s/undersized packet' %
294                                    self.tra4_decrypt_node_name)
295             undersize_count = self.statistics.get_err_counter(
296                 undersize_node_name)
297
298         #
299         # send packets with seq numbers 1->34
300         # this means the window size is still in Case B (see RFC4303
301         # Appendix A)
302         #
303         # for reasons i haven't investigated Scapy won't create a packet with
304         # seq_num=0
305         #
306         pkts = [(Ether(src=self.tra_if.remote_mac,
307                        dst=self.tra_if.local_mac) /
308                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
309                                            dst=self.tra_if.local_ip4) /
310                                         ICMP(),
311                                         seq_num=seq))
312                 for seq in range(1, 34)]
313         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
314
315         # replayed packets are dropped
316         self.send_and_assert_no_replies(self.tra_if, pkts)
317         replay_count += len(pkts)
318         self.assert_error_counter_equal(replay_node_name, replay_count)
319
320         #
321         # now send a batch of packets all with the same sequence number
322         # the first packet in the batch is legitimate, the rest bogus
323         #
324         pkts = (Ether(src=self.tra_if.remote_mac,
325                       dst=self.tra_if.local_mac) /
326                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
327                                           dst=self.tra_if.local_ip4) /
328                                        ICMP(),
329                                        seq_num=35))
330         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
331                                          self.tra_if, n_rx=1)
332         replay_count += 7
333         self.assert_error_counter_equal(replay_node_name, replay_count)
334
335         #
336         # now move the window over to 257 (more than one byte) and into Case A
337         #
338         pkt = (Ether(src=self.tra_if.remote_mac,
339                      dst=self.tra_if.local_mac) /
340                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
341                                          dst=self.tra_if.local_ip4) /
342                                       ICMP(),
343                                       seq_num=257))
344         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
345
346         # replayed packets are dropped
347         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
348         replay_count += 3
349         self.assert_error_counter_equal(replay_node_name, replay_count)
350
351         # the window size is 64 packets
352         # in window are still accepted
353         pkt = (Ether(src=self.tra_if.remote_mac,
354                      dst=self.tra_if.local_mac) /
355                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
356                                          dst=self.tra_if.local_ip4) /
357                                       ICMP(),
358                                       seq_num=200))
359         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
360
361         # a packet that does not decrypt does not move the window forward
362         bogus_sa = SecurityAssociation(self.encryption_type,
363                                        p.vpp_tra_spi,
364                                        crypt_algo=p.crypt_algo,
365                                        crypt_key=mk_scapy_crypt_key(p)[::-1],
366                                        auth_algo=p.auth_algo,
367                                        auth_key=p.auth_key[::-1])
368         pkt = (Ether(src=self.tra_if.remote_mac,
369                      dst=self.tra_if.local_mac) /
370                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
371                                    dst=self.tra_if.local_ip4) /
372                                 ICMP(),
373                                 seq_num=350))
374         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
375
376         hash_failed_count += 17
377         self.assert_error_counter_equal(hash_failed_node_name,
378                                         hash_failed_count)
379
380         # a malformed 'runt' packet
381         #  created by a mis-constructed SA
382         if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
383             bogus_sa = SecurityAssociation(self.encryption_type,
384                                            p.vpp_tra_spi)
385             pkt = (Ether(src=self.tra_if.remote_mac,
386                          dst=self.tra_if.local_mac) /
387                    bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
388                                        dst=self.tra_if.local_ip4) /
389                                     ICMP(),
390                                     seq_num=350))
391             self.send_and_assert_no_replies(self.tra_if, pkt * 17)
392
393             undersize_count += 17
394             self.assert_error_counter_equal(undersize_node_name,
395                                             undersize_count)
396
397         # which we can determine since this packet is still in the window
398         pkt = (Ether(src=self.tra_if.remote_mac,
399                      dst=self.tra_if.local_mac) /
400                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
401                                          dst=self.tra_if.local_ip4) /
402                                       ICMP(),
403                                       seq_num=234))
404         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
405
406         #
407         # out of window are dropped
408         #  this is Case B. So VPP will consider this to be a high seq num wrap
409         #  and so the decrypt attempt will fail
410         #
411         pkt = (Ether(src=self.tra_if.remote_mac,
412                      dst=self.tra_if.local_mac) /
413                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
414                                          dst=self.tra_if.local_ip4) /
415                                       ICMP(),
416                                       seq_num=17))
417         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
418
419         if esn_en:
420             # an out of window error with ESN looks like a high sequence
421             # wrap. but since it isn't then the verify will fail.
422             hash_failed_count += 17
423             self.assert_error_counter_equal(hash_failed_node_name,
424                                             hash_failed_count)
425
426         else:
427             replay_count += 17
428             self.assert_error_counter_equal(replay_node_name,
429                                             replay_count)
430
431         # valid packet moves the window over to 258
432         pkt = (Ether(src=self.tra_if.remote_mac,
433                      dst=self.tra_if.local_mac) /
434                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
435                                          dst=self.tra_if.local_ip4) /
436                                       ICMP(),
437                                       seq_num=258))
438         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
439         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
440
441         #
442         # move VPP's SA TX seq-num to just before the seq-number wrap.
443         # then fire in a packet that VPP should drop on TX because it
444         # causes the TX seq number to wrap; unless we're using extened sequence
445         # numbers.
446         #
447         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
448         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
449         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
450
451         pkts = [(Ether(src=self.tra_if.remote_mac,
452                        dst=self.tra_if.local_mac) /
453                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
454                                            dst=self.tra_if.local_ip4) /
455                                         ICMP(),
456                                         seq_num=seq))
457                 for seq in range(259, 280)]
458
459         if esn_en:
460             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
461
462             #
463             # in order for scapy to decrypt its SA's high order number needs
464             # to wrap
465             #
466             p.vpp_tra_sa.seq_num = 0x100000000
467             for rx in rxs:
468                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
469
470             #
471             # wrap scapy's TX high sequence number. VPP is in case B, so it
472             # will consider this a high seq wrap also.
473             # The low seq num we set it to will place VPP's RX window in Case A
474             #
475             p.scapy_tra_sa.seq_num = 0x100000005
476             pkt = (Ether(src=self.tra_if.remote_mac,
477                          dst=self.tra_if.local_mac) /
478                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
479                                              dst=self.tra_if.local_ip4) /
480                                           ICMP(),
481                                           seq_num=0x100000005))
482             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
483             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
484
485             #
486             # A packet that has seq num between (2^32-64) and 5 is within
487             # the window
488             #
489             p.scapy_tra_sa.seq_num = 0xfffffffd
490             pkt = (Ether(src=self.tra_if.remote_mac,
491                          dst=self.tra_if.local_mac) /
492                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
493                                              dst=self.tra_if.local_ip4) /
494                                           ICMP(),
495                                           seq_num=0xfffffffd))
496             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
497             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
498
499             #
500             # While in case A we cannot wrap the high sequence number again
501             # becuase VPP will consider this packet to be one that moves the
502             # window forward
503             #
504             pkt = (Ether(src=self.tra_if.remote_mac,
505                          dst=self.tra_if.local_mac) /
506                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
507                                              dst=self.tra_if.local_ip4) /
508                                           ICMP(),
509                                           seq_num=0x200000999))
510             self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
511
512             hash_failed_count += 1
513             self.assert_error_counter_equal(hash_failed_node_name,
514                                             hash_failed_count)
515
516             #
517             # but if we move the wondow forward to case B, then we can wrap
518             # again
519             #
520             p.scapy_tra_sa.seq_num = 0x100000555
521             pkt = (Ether(src=self.tra_if.remote_mac,
522                          dst=self.tra_if.local_mac) /
523                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
524                                              dst=self.tra_if.local_ip4) /
525                                           ICMP(),
526                                           seq_num=0x100000555))
527             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
528             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
529
530             p.scapy_tra_sa.seq_num = 0x200000444
531             pkt = (Ether(src=self.tra_if.remote_mac,
532                          dst=self.tra_if.local_mac) /
533                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
534                                              dst=self.tra_if.local_ip4) /
535                                           ICMP(),
536                                           seq_num=0x200000444))
537             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
538             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
539
540         else:
541             #
542             # without ESN TX sequence numbers can't wrap and packets are
543             # dropped from here on out.
544             #
545             self.send_and_assert_no_replies(self.tra_if, pkts)
546             seq_cycle_count += len(pkts)
547             self.assert_error_counter_equal(seq_cycle_node_name,
548                                             seq_cycle_count)
549
550         # move the security-associations seq number on to the last we used
551         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
552         p.scapy_tra_sa.seq_num = 351
553         p.vpp_tra_sa.seq_num = 351
554
555     def verify_tra_basic4(self, count=1):
556         """ ipsec v4 transport basic test """
557         self.vapi.cli("clear errors")
558         self.vapi.cli("clear ipsec sa")
559         try:
560             p = self.params[socket.AF_INET]
561             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
562                                               src=self.tra_if.remote_ip4,
563                                               dst=self.tra_if.local_ip4,
564                                               count=count)
565             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
566                                              self.tra_if)
567             for rx in recv_pkts:
568                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
569                 self.assert_packet_checksums_valid(rx)
570                 try:
571                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
572                     self.assert_packet_checksums_valid(decrypted)
573                 except:
574                     self.logger.debug(ppp("Unexpected packet:", rx))
575                     raise
576         finally:
577             self.logger.info(self.vapi.ppcli("show error"))
578             self.logger.info(self.vapi.ppcli("show ipsec all"))
579
580         pkts = p.tra_sa_in.get_stats()['packets']
581         self.assertEqual(pkts, count,
582                          "incorrect SA in counts: expected %d != %d" %
583                          (count, pkts))
584         pkts = p.tra_sa_out.get_stats()['packets']
585         self.assertEqual(pkts, count,
586                          "incorrect SA out counts: expected %d != %d" %
587                          (count, pkts))
588
589         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
590         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
591
592
593 class IpsecTra4Tests(IpsecTra4):
594     """ UT test methods for Transport v4 """
595     def test_tra_anti_replay(self):
596         """ ipsec v4 transport anti-reply test """
597         self.verify_tra_anti_replay()
598
599     def test_tra_basic(self, count=1):
600         """ ipsec v4 transport basic test """
601         self.verify_tra_basic4(count=1)
602
603     def test_tra_burst(self):
604         """ ipsec v4 transport burst test """
605         self.verify_tra_basic4(count=257)
606
607
608 class IpsecTra6(object):
609     """ verify methods for Transport v6 """
610     def verify_tra_basic6(self, count=1):
611         self.vapi.cli("clear errors")
612         try:
613             p = self.params[socket.AF_INET6]
614             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
615                                                src=self.tra_if.remote_ip6,
616                                                dst=self.tra_if.local_ip6,
617                                                count=count)
618             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
619                                              self.tra_if)
620             for rx in recv_pkts:
621                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
622                                  rx[IPv6].plen)
623                 try:
624                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
625                     self.assert_packet_checksums_valid(decrypted)
626                 except:
627                     self.logger.debug(ppp("Unexpected packet:", rx))
628                     raise
629         finally:
630             self.logger.info(self.vapi.ppcli("show error"))
631             self.logger.info(self.vapi.ppcli("show ipsec all"))
632
633         pkts = p.tra_sa_in.get_stats()['packets']
634         self.assertEqual(pkts, count,
635                          "incorrect SA in counts: expected %d != %d" %
636                          (count, pkts))
637         pkts = p.tra_sa_out.get_stats()['packets']
638         self.assertEqual(pkts, count,
639                          "incorrect SA out counts: expected %d != %d" %
640                          (count, pkts))
641         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
642         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
643
644
645 class IpsecTra6Tests(IpsecTra6):
646     """ UT test methods for Transport v6 """
647     def test_tra_basic6(self):
648         """ ipsec v6 transport basic test """
649         self.verify_tra_basic6(count=1)
650
651     def test_tra_burst6(self):
652         """ ipsec v6 transport burst test """
653         self.verify_tra_basic6(count=257)
654
655
656 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
657     """ UT test methods for Transport v6 and v4"""
658     pass
659
660
661 class IpsecTun4(object):
662     """ verify methods for Tunnel v4 """
663     def verify_counters4(self, p, count, n_frags=None):
664         if not n_frags:
665             n_frags = count
666         if (hasattr(p, "spd_policy_in_any")):
667             pkts = p.spd_policy_in_any.get_stats()['packets']
668             self.assertEqual(pkts, count,
669                              "incorrect SPD any policy: expected %d != %d" %
670                              (count, pkts))
671
672         if (hasattr(p, "tun_sa_in")):
673             pkts = p.tun_sa_in.get_stats()['packets']
674             self.assertEqual(pkts, count,
675                              "incorrect SA in counts: expected %d != %d" %
676                              (count, pkts))
677             pkts = p.tun_sa_out.get_stats()['packets']
678             self.assertEqual(pkts, count,
679                              "incorrect SA out counts: expected %d != %d" %
680                              (count, pkts))
681
682         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
683         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
684
685     def verify_decrypted(self, p, rxs):
686         for rx in rxs:
687             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
688             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
689             self.assert_packet_checksums_valid(rx)
690
691     def verify_encrypted(self, p, sa, rxs):
692         decrypt_pkts = []
693         for rx in rxs:
694             if p.nat_header:
695                 self.assertEqual(rx[UDP].dport, 4500)
696             self.assert_packet_checksums_valid(rx)
697             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
698             try:
699                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
700                 if not decrypt_pkt.haslayer(IP):
701                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
702                 decrypt_pkts.append(decrypt_pkt)
703                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
704                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
705             except:
706                 self.logger.debug(ppp("Unexpected packet:", rx))
707                 try:
708                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
709                 except:
710                     pass
711                 raise
712         pkts = reassemble4(decrypt_pkts)
713         for pkt in pkts:
714             self.assert_packet_checksums_valid(pkt)
715
716     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
717         self.vapi.cli("clear errors")
718         if not n_rx:
719             n_rx = count
720         try:
721             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
722                                               src=p.remote_tun_if_host,
723                                               dst=self.pg1.remote_ip4,
724                                               count=count)
725             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
726             self.verify_decrypted(p, recv_pkts)
727
728             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
729                                       dst=p.remote_tun_if_host, count=count,
730                                       payload_size=payload_size)
731             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
732                                              self.tun_if, n_rx)
733             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
734
735         finally:
736             self.logger.info(self.vapi.ppcli("show error"))
737             self.logger.info(self.vapi.ppcli("show ipsec all"))
738
739         self.verify_counters4(p, count, n_rx)
740
741     def verify_tun_reass_44(self, p):
742         self.vapi.cli("clear errors")
743         self.vapi.ip_reassembly_enable_disable(
744             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
745
746         try:
747             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
748                                               src=p.remote_tun_if_host,
749                                               dst=self.pg1.remote_ip4,
750                                               payload_size=1900,
751                                               count=1)
752             send_pkts = fragment_rfc791(send_pkts[0], 1400)
753             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
754                                              self.pg1, n_rx=1)
755             self.verify_decrypted(p, recv_pkts)
756
757             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
758                                       dst=p.remote_tun_if_host, count=1)
759             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
760                                              self.tun_if)
761             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
762
763         finally:
764             self.logger.info(self.vapi.ppcli("show error"))
765             self.logger.info(self.vapi.ppcli("show ipsec all"))
766
767         self.verify_counters4(p, 1, 1)
768         self.vapi.ip_reassembly_enable_disable(
769             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
770
771     def verify_tun_64(self, p, count=1):
772         self.vapi.cli("clear errors")
773         try:
774             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
775                                                src=p.remote_tun_if_host6,
776                                                dst=self.pg1.remote_ip6,
777                                                count=count)
778             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
779             for recv_pkt in recv_pkts:
780                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
781                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
782                 self.assert_packet_checksums_valid(recv_pkt)
783             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
784                                        dst=p.remote_tun_if_host6, count=count)
785             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
786             for recv_pkt in recv_pkts:
787                 try:
788                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
789                     if not decrypt_pkt.haslayer(IPv6):
790                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
791                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
792                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
793                     self.assert_packet_checksums_valid(decrypt_pkt)
794                 except:
795                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
796                     try:
797                         self.logger.debug(
798                             ppp("Decrypted packet:", decrypt_pkt))
799                     except:
800                         pass
801                     raise
802         finally:
803             self.logger.info(self.vapi.ppcli("show error"))
804             self.logger.info(self.vapi.ppcli("show ipsec all"))
805
806         self.verify_counters4(p, count)
807
808     def verify_keepalive(self, p):
809         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
810                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
811                UDP(sport=333, dport=4500) /
812                Raw(b'\xff'))
813         self.send_and_assert_no_replies(self.tun_if, pkt*31)
814         self.assert_error_counter_equal(
815             '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
816
817         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
818                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
819                UDP(sport=333, dport=4500) /
820                Raw(b'\xfe'))
821         self.send_and_assert_no_replies(self.tun_if, pkt*31)
822         self.assert_error_counter_equal(
823             '/err/%s/Too Short' % self.tun4_input_node, 31)
824
825
826 class IpsecTun4Tests(IpsecTun4):
827     """ UT test methods for Tunnel v4 """
828     def test_tun_basic44(self):
829         """ ipsec 4o4 tunnel basic test """
830         self.verify_tun_44(self.params[socket.AF_INET], count=1)
831
832     def test_tun_reass_basic44(self):
833         """ ipsec 4o4 tunnel basic reassembly test """
834         self.verify_tun_reass_44(self.params[socket.AF_INET])
835
836     def test_tun_burst44(self):
837         """ ipsec 4o4 tunnel burst test """
838         self.verify_tun_44(self.params[socket.AF_INET], count=257)
839
840
841 class IpsecTun6(object):
842     """ verify methods for Tunnel v6 """
843     def verify_counters6(self, p_in, p_out, count):
844         if (hasattr(p_in, "tun_sa_in")):
845             pkts = p_in.tun_sa_in.get_stats()['packets']
846             self.assertEqual(pkts, count,
847                              "incorrect SA in counts: expected %d != %d" %
848                              (count, pkts))
849         if (hasattr(p_out, "tun_sa_out")):
850             pkts = p_out.tun_sa_out.get_stats()['packets']
851             self.assertEqual(pkts, count,
852                              "incorrect SA out counts: expected %d != %d" %
853                              (count, pkts))
854         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
855         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
856
857     def verify_decrypted6(self, p, rxs):
858         for rx in rxs:
859             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
860             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
861             self.assert_packet_checksums_valid(rx)
862
863     def verify_encrypted6(self, p, sa, rxs):
864         for rx in rxs:
865             self.assert_packet_checksums_valid(rx)
866             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
867                              rx[IPv6].plen)
868             try:
869                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
870                 if not decrypt_pkt.haslayer(IPv6):
871                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
872                 self.assert_packet_checksums_valid(decrypt_pkt)
873                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
874                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
875             except:
876                 self.logger.debug(ppp("Unexpected packet:", rx))
877                 try:
878                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
879                 except:
880                     pass
881                 raise
882
883     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
884         self.vapi.cli("clear errors")
885         self.vapi.cli("clear ipsec sa")
886
887         send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
888                                            src=p_in.remote_tun_if_host,
889                                            dst=self.pg1.remote_ip6,
890                                            count=count)
891         self.send_and_assert_no_replies(self.tun_if, send_pkts)
892         self.logger.info(self.vapi.cli("sh punt stats"))
893
894     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
895         self.vapi.cli("clear errors")
896         self.vapi.cli("clear ipsec sa")
897         if not p_out:
898             p_out = p_in
899         try:
900             send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
901                                                src=p_in.remote_tun_if_host,
902                                                dst=self.pg1.remote_ip6,
903                                                count=count)
904             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
905             self.verify_decrypted6(p_in, recv_pkts)
906
907             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
908                                        dst=p_out.remote_tun_if_host,
909                                        count=count,
910                                        payload_size=payload_size)
911             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
912                                              self.tun_if)
913             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
914
915         finally:
916             self.logger.info(self.vapi.ppcli("show error"))
917             self.logger.info(self.vapi.ppcli("show ipsec all"))
918         self.verify_counters6(p_in, p_out, count)
919
920     def verify_tun_reass_66(self, p):
921         self.vapi.cli("clear errors")
922         self.vapi.ip_reassembly_enable_disable(
923             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
924
925         try:
926             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
927                                                src=p.remote_tun_if_host,
928                                                dst=self.pg1.remote_ip6,
929                                                count=1,
930                                                payload_size=1900)
931             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
932             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
933                                              self.pg1, n_rx=1)
934             self.verify_decrypted6(p, recv_pkts)
935
936             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
937                                        dst=p.remote_tun_if_host,
938                                        count=1,
939                                        payload_size=64)
940             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
941                                              self.tun_if)
942             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
943         finally:
944             self.logger.info(self.vapi.ppcli("show error"))
945             self.logger.info(self.vapi.ppcli("show ipsec all"))
946         self.verify_counters6(p, p, 1)
947         self.vapi.ip_reassembly_enable_disable(
948             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
949
950     def verify_tun_46(self, p, count=1):
951         """ ipsec 4o6 tunnel basic test """
952         self.vapi.cli("clear errors")
953         try:
954             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
955                                               src=p.remote_tun_if_host4,
956                                               dst=self.pg1.remote_ip4,
957                                               count=count)
958             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
959             for recv_pkt in recv_pkts:
960                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
961                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
962                 self.assert_packet_checksums_valid(recv_pkt)
963             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
964                                       dst=p.remote_tun_if_host4,
965                                       count=count)
966             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
967             for recv_pkt in recv_pkts:
968                 try:
969                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
970                     if not decrypt_pkt.haslayer(IP):
971                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
972                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
973                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
974                     self.assert_packet_checksums_valid(decrypt_pkt)
975                 except:
976                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
977                     try:
978                         self.logger.debug(ppp("Decrypted packet:",
979                                               decrypt_pkt))
980                     except:
981                         pass
982                     raise
983         finally:
984             self.logger.info(self.vapi.ppcli("show error"))
985             self.logger.info(self.vapi.ppcli("show ipsec all"))
986         self.verify_counters6(p, p, count)
987
988
989 class IpsecTun6Tests(IpsecTun6):
990     """ UT test methods for Tunnel v6 """
991
992     def test_tun_basic66(self):
993         """ ipsec 6o6 tunnel basic test """
994         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
995
996     def test_tun_reass_basic66(self):
997         """ ipsec 6o6 tunnel basic reassembly test """
998         self.verify_tun_reass_66(self.params[socket.AF_INET6])
999
1000     def test_tun_burst66(self):
1001         """ ipsec 6o6 tunnel burst test """
1002         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1003
1004
1005 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1006     """ UT test methods for Tunnel v6 & v4 """
1007     pass
1008
1009
1010 if __name__ == '__main__':
1011     unittest.main(testRunner=VppTestRunner)