ipsec: Use the new tunnel API types to add flow label and TTL copy
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
10     IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
11
12
13 from framework import VppTestCase, VppTestRunner
14 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
15 from vpp_papi import VppEnum
16
17
18 class IPsecIPv4Params:
19
20     addr_type = socket.AF_INET
21     addr_any = "0.0.0.0"
22     addr_bcast = "255.255.255.255"
23     addr_len = 32
24     is_ipv6 = 0
25
26     def __init__(self):
27         self.remote_tun_if_host = '1.1.1.1'
28         self.remote_tun_if_host6 = '1111::1'
29
30         self.scapy_tun_sa_id = 100
31         self.scapy_tun_spi = 1000
32         self.vpp_tun_sa_id = 200
33         self.vpp_tun_spi = 2000
34
35         self.scapy_tra_sa_id = 300
36         self.scapy_tra_spi = 3000
37         self.vpp_tra_sa_id = 400
38         self.vpp_tra_spi = 4000
39
40         self.outer_hop_limit = 64
41         self.inner_hop_limit = 255
42         self.outer_flow_label = 0
43         self.inner_flow_label = 0x12345
44
45         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
46                                  IPSEC_API_INTEG_ALG_SHA1_96)
47         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
48         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
49
50         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
51                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
52         self.crypt_algo = 'AES-CBC'  # scapy name
53         self.crypt_key = b'JPjyOWBeVEQiMe7h'
54         self.salt = 0
55         self.flags = 0
56         self.nat_header = None
57         self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
58                           TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
59         self.dscp = 0
60
61
62 class IPsecIPv6Params:
63
64     addr_type = socket.AF_INET6
65     addr_any = "0::0"
66     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
67     addr_len = 128
68     is_ipv6 = 1
69
70     def __init__(self):
71         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
72         self.remote_tun_if_host4 = '1.1.1.1'
73
74         self.scapy_tun_sa_id = 500
75         self.scapy_tun_spi = 3001
76         self.vpp_tun_sa_id = 600
77         self.vpp_tun_spi = 3000
78
79         self.scapy_tra_sa_id = 700
80         self.scapy_tra_spi = 4001
81         self.vpp_tra_sa_id = 800
82         self.vpp_tra_spi = 4000
83
84         self.outer_hop_limit = 64
85         self.inner_hop_limit = 255
86         self.outer_flow_label = 0
87         self.inner_flow_label = 0x12345
88
89         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
90                                  IPSEC_API_INTEG_ALG_SHA1_96)
91         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
92         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
93
94         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
95                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
96         self.crypt_algo = 'AES-CBC'  # scapy name
97         self.crypt_key = b'JPjyOWBeVEQiMe7h'
98         self.salt = 0
99         self.flags = 0
100         self.nat_header = None
101         self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
102                           TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
103         self.dscp = 0
104
105
106 def mk_scapy_crypt_key(p):
107     if p.crypt_algo in ("AES-GCM", "AES-CTR"):
108         return p.crypt_key + struct.pack("!I", p.salt)
109     else:
110         return p.crypt_key
111
112
113 def config_tun_params(p, encryption_type, tun_if):
114     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
115     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
116                              IPSEC_API_SAD_FLAG_USE_ESN))
117     p.tun_dst = tun_if.remote_addr[p.addr_type]
118     p.tun_src = tun_if.local_addr[p.addr_type]
119     crypt_key = mk_scapy_crypt_key(p)
120     p.scapy_tun_sa = SecurityAssociation(
121         encryption_type, spi=p.vpp_tun_spi,
122         crypt_algo=p.crypt_algo,
123         crypt_key=crypt_key,
124         auth_algo=p.auth_algo, auth_key=p.auth_key,
125         tunnel_header=ip_class_by_addr_type[p.addr_type](
126             src=p.tun_dst,
127             dst=p.tun_src),
128         nat_t_header=p.nat_header,
129         esn_en=esn_en)
130     p.vpp_tun_sa = SecurityAssociation(
131         encryption_type, spi=p.scapy_tun_spi,
132         crypt_algo=p.crypt_algo,
133         crypt_key=crypt_key,
134         auth_algo=p.auth_algo, auth_key=p.auth_key,
135         tunnel_header=ip_class_by_addr_type[p.addr_type](
136             dst=p.tun_dst,
137             src=p.tun_src),
138         nat_t_header=p.nat_header,
139         esn_en=esn_en)
140
141
142 def config_tra_params(p, encryption_type):
143     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
144                              IPSEC_API_SAD_FLAG_USE_ESN))
145     crypt_key = mk_scapy_crypt_key(p)
146     p.scapy_tra_sa = SecurityAssociation(
147         encryption_type,
148         spi=p.vpp_tra_spi,
149         crypt_algo=p.crypt_algo,
150         crypt_key=crypt_key,
151         auth_algo=p.auth_algo,
152         auth_key=p.auth_key,
153         nat_t_header=p.nat_header,
154         esn_en=esn_en)
155     p.vpp_tra_sa = SecurityAssociation(
156         encryption_type,
157         spi=p.scapy_tra_spi,
158         crypt_algo=p.crypt_algo,
159         crypt_key=crypt_key,
160         auth_algo=p.auth_algo,
161         auth_key=p.auth_key,
162         nat_t_header=p.nat_header,
163         esn_en=esn_en)
164
165
166 class TemplateIpsec(VppTestCase):
167     """
168     TRANSPORT MODE:
169
170      ------   encrypt   ---
171     |tra_if| <-------> |VPP|
172      ------   decrypt   ---
173
174     TUNNEL MODE:
175
176      ------   encrypt   ---   plain   ---
177     |tun_if| <-------  |VPP| <------ |pg1|
178      ------             ---           ---
179
180      ------   decrypt   ---   plain   ---
181     |tun_if| ------->  |VPP| ------> |pg1|
182      ------             ---           ---
183     """
184     tun_spd_id = 1
185     tra_spd_id = 2
186
187     def ipsec_select_backend(self):
188         """ empty method to be overloaded when necessary """
189         pass
190
191     @classmethod
192     def setUpClass(cls):
193         super(TemplateIpsec, cls).setUpClass()
194
195     @classmethod
196     def tearDownClass(cls):
197         super(TemplateIpsec, cls).tearDownClass()
198
199     def setup_params(self):
200         if not hasattr(self, 'ipv4_params'):
201             self.ipv4_params = IPsecIPv4Params()
202         if not hasattr(self, 'ipv6_params'):
203             self.ipv6_params = IPsecIPv6Params()
204         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
205                        self.ipv6_params.addr_type: self.ipv6_params}
206
207     def config_interfaces(self):
208         self.create_pg_interfaces(range(3))
209         self.interfaces = list(self.pg_interfaces)
210         for i in self.interfaces:
211             i.admin_up()
212             i.config_ip4()
213             i.resolve_arp()
214             i.config_ip6()
215             i.resolve_ndp()
216
217     def setUp(self):
218         super(TemplateIpsec, self).setUp()
219
220         self.setup_params()
221
222         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
223                                  IPSEC_API_PROTO_ESP)
224         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
225                                 IPSEC_API_PROTO_AH)
226
227         self.config_interfaces()
228
229         self.ipsec_select_backend()
230
231     def unconfig_interfaces(self):
232         for i in self.interfaces:
233             i.admin_down()
234             i.unconfig_ip4()
235             i.unconfig_ip6()
236
237     def tearDown(self):
238         super(TemplateIpsec, self).tearDown()
239
240         self.unconfig_interfaces()
241
242     def show_commands_at_teardown(self):
243         self.logger.info(self.vapi.cli("show hardware"))
244
245     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
246                          payload_size=54):
247         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
248                 sa.encrypt(IP(src=src, dst=dst) /
249                            ICMP() / Raw(b'X' * payload_size))
250                 for i in range(count)]
251
252     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
253                           payload_size=54):
254         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
255                 sa.encrypt(IPv6(src=src, dst=dst,
256                                 hlim=p.inner_hop_limit,
257                                 fl=p.inner_flow_label) /
258                            ICMPv6EchoRequest(id=0, seq=1,
259                                              data='X' * payload_size))
260                 for i in range(count)]
261
262     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
263         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
264                 IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
265                 for i in range(count)]
266
267     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
268         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
269                 IPv6(src=src, dst=dst,
270                      hlim=p.inner_hop_limit, fl=p.inner_flow_label) /
271                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
272                 for i in range(count)]
273
274
275 class IpsecTcp(object):
276     def verify_tcp_checksum(self):
277         self.vapi.cli("test http server")
278         p = self.params[socket.AF_INET]
279         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
280                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
281                                           dst=self.tun_if.local_ip4) /
282                                        TCP(flags='S', dport=80)))
283         self.logger.debug(ppp("Sending packet:", send))
284         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
285         recv = recv[0]
286         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
287         self.assert_packet_checksums_valid(decrypted)
288
289
290 class IpsecTcpTests(IpsecTcp):
291     def test_tcp_checksum(self):
292         """ verify checksum correctness for vpp generated packets """
293         self.verify_tcp_checksum()
294
295
296 class IpsecTra4(object):
297     """ verify methods for Transport v4 """
298     def verify_tra_anti_replay(self):
299         p = self.params[socket.AF_INET]
300         esn_en = p.vpp_tra_sa.esn_en
301
302         seq_cycle_node_name = ('/err/%s/sequence number cycled' %
303                                self.tra4_encrypt_node_name)
304         replay_node_name = ('/err/%s/SA replayed packet' %
305                             self.tra4_decrypt_node_name)
306         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
307             hash_failed_node_name = ('/err/%s/ESP decryption failed' %
308                                      self.tra4_decrypt_node_name)
309         else:
310             hash_failed_node_name = ('/err/%s/Integrity check failed' %
311                                      self.tra4_decrypt_node_name)
312         replay_count = self.statistics.get_err_counter(replay_node_name)
313         hash_failed_count = self.statistics.get_err_counter(
314             hash_failed_node_name)
315         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
316
317         if ESP == self.encryption_type:
318             undersize_node_name = ('/err/%s/undersized packet' %
319                                    self.tra4_decrypt_node_name)
320             undersize_count = self.statistics.get_err_counter(
321                 undersize_node_name)
322
323         #
324         # send packets with seq numbers 1->34
325         # this means the window size is still in Case B (see RFC4303
326         # Appendix A)
327         #
328         # for reasons i haven't investigated Scapy won't create a packet with
329         # seq_num=0
330         #
331         pkts = [(Ether(src=self.tra_if.remote_mac,
332                        dst=self.tra_if.local_mac) /
333                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
334                                            dst=self.tra_if.local_ip4) /
335                                         ICMP(),
336                                         seq_num=seq))
337                 for seq in range(1, 34)]
338         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
339
340         # replayed packets are dropped
341         self.send_and_assert_no_replies(self.tra_if, pkts)
342         replay_count += len(pkts)
343         self.assert_error_counter_equal(replay_node_name, replay_count)
344
345         #
346         # now send a batch of packets all with the same sequence number
347         # the first packet in the batch is legitimate, the rest bogus
348         #
349         pkts = (Ether(src=self.tra_if.remote_mac,
350                       dst=self.tra_if.local_mac) /
351                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
352                                           dst=self.tra_if.local_ip4) /
353                                        ICMP(),
354                                        seq_num=35))
355         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
356                                          self.tra_if, n_rx=1)
357         replay_count += 7
358         self.assert_error_counter_equal(replay_node_name, replay_count)
359
360         #
361         # now move the window over to 257 (more than one byte) and into Case A
362         #
363         pkt = (Ether(src=self.tra_if.remote_mac,
364                      dst=self.tra_if.local_mac) /
365                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
366                                          dst=self.tra_if.local_ip4) /
367                                       ICMP(),
368                                       seq_num=257))
369         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
370
371         # replayed packets are dropped
372         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
373         replay_count += 3
374         self.assert_error_counter_equal(replay_node_name, replay_count)
375
376         # the window size is 64 packets
377         # in window are still accepted
378         pkt = (Ether(src=self.tra_if.remote_mac,
379                      dst=self.tra_if.local_mac) /
380                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
381                                          dst=self.tra_if.local_ip4) /
382                                       ICMP(),
383                                       seq_num=200))
384         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
385
386         # a packet that does not decrypt does not move the window forward
387         bogus_sa = SecurityAssociation(self.encryption_type,
388                                        p.vpp_tra_spi,
389                                        crypt_algo=p.crypt_algo,
390                                        crypt_key=mk_scapy_crypt_key(p)[::-1],
391                                        auth_algo=p.auth_algo,
392                                        auth_key=p.auth_key[::-1])
393         pkt = (Ether(src=self.tra_if.remote_mac,
394                      dst=self.tra_if.local_mac) /
395                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
396                                    dst=self.tra_if.local_ip4) /
397                                 ICMP(),
398                                 seq_num=350))
399         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
400
401         hash_failed_count += 17
402         self.assert_error_counter_equal(hash_failed_node_name,
403                                         hash_failed_count)
404
405         # a malformed 'runt' packet
406         #  created by a mis-constructed SA
407         if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
408             bogus_sa = SecurityAssociation(self.encryption_type,
409                                            p.vpp_tra_spi)
410             pkt = (Ether(src=self.tra_if.remote_mac,
411                          dst=self.tra_if.local_mac) /
412                    bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
413                                        dst=self.tra_if.local_ip4) /
414                                     ICMP(),
415                                     seq_num=350))
416             self.send_and_assert_no_replies(self.tra_if, pkt * 17)
417
418             undersize_count += 17
419             self.assert_error_counter_equal(undersize_node_name,
420                                             undersize_count)
421
422         # which we can determine since this packet is still in the window
423         pkt = (Ether(src=self.tra_if.remote_mac,
424                      dst=self.tra_if.local_mac) /
425                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
426                                          dst=self.tra_if.local_ip4) /
427                                       ICMP(),
428                                       seq_num=234))
429         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
430
431         #
432         # out of window are dropped
433         #  this is Case B. So VPP will consider this to be a high seq num wrap
434         #  and so the decrypt attempt will fail
435         #
436         pkt = (Ether(src=self.tra_if.remote_mac,
437                      dst=self.tra_if.local_mac) /
438                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
439                                          dst=self.tra_if.local_ip4) /
440                                       ICMP(),
441                                       seq_num=17))
442         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
443
444         if esn_en:
445             # an out of window error with ESN looks like a high sequence
446             # wrap. but since it isn't then the verify will fail.
447             hash_failed_count += 17
448             self.assert_error_counter_equal(hash_failed_node_name,
449                                             hash_failed_count)
450
451         else:
452             replay_count += 17
453             self.assert_error_counter_equal(replay_node_name,
454                                             replay_count)
455
456         # valid packet moves the window over to 258
457         pkt = (Ether(src=self.tra_if.remote_mac,
458                      dst=self.tra_if.local_mac) /
459                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
460                                          dst=self.tra_if.local_ip4) /
461                                       ICMP(),
462                                       seq_num=258))
463         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
464         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
465
466         #
467         # move VPP's SA TX seq-num to just before the seq-number wrap.
468         # then fire in a packet that VPP should drop on TX because it
469         # causes the TX seq number to wrap; unless we're using extened sequence
470         # numbers.
471         #
472         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
473         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
474         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
475
476         pkts = [(Ether(src=self.tra_if.remote_mac,
477                        dst=self.tra_if.local_mac) /
478                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
479                                            dst=self.tra_if.local_ip4) /
480                                         ICMP(),
481                                         seq_num=seq))
482                 for seq in range(259, 280)]
483
484         if esn_en:
485             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
486
487             #
488             # in order for scapy to decrypt its SA's high order number needs
489             # to wrap
490             #
491             p.vpp_tra_sa.seq_num = 0x100000000
492             for rx in rxs:
493                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
494
495             #
496             # wrap scapy's TX high sequence number. VPP is in case B, so it
497             # will consider this a high seq wrap also.
498             # The low seq num we set it to will place VPP's RX window in Case A
499             #
500             p.scapy_tra_sa.seq_num = 0x100000005
501             pkt = (Ether(src=self.tra_if.remote_mac,
502                          dst=self.tra_if.local_mac) /
503                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
504                                              dst=self.tra_if.local_ip4) /
505                                           ICMP(),
506                                           seq_num=0x100000005))
507             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
508             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
509
510             #
511             # A packet that has seq num between (2^32-64) and 5 is within
512             # the window
513             #
514             p.scapy_tra_sa.seq_num = 0xfffffffd
515             pkt = (Ether(src=self.tra_if.remote_mac,
516                          dst=self.tra_if.local_mac) /
517                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
518                                              dst=self.tra_if.local_ip4) /
519                                           ICMP(),
520                                           seq_num=0xfffffffd))
521             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
522             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
523
524             #
525             # While in case A we cannot wrap the high sequence number again
526             # becuase VPP will consider this packet to be one that moves the
527             # window forward
528             #
529             pkt = (Ether(src=self.tra_if.remote_mac,
530                          dst=self.tra_if.local_mac) /
531                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
532                                              dst=self.tra_if.local_ip4) /
533                                           ICMP(),
534                                           seq_num=0x200000999))
535             self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
536
537             hash_failed_count += 1
538             self.assert_error_counter_equal(hash_failed_node_name,
539                                             hash_failed_count)
540
541             #
542             # but if we move the wondow forward to case B, then we can wrap
543             # again
544             #
545             p.scapy_tra_sa.seq_num = 0x100000555
546             pkt = (Ether(src=self.tra_if.remote_mac,
547                          dst=self.tra_if.local_mac) /
548                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
549                                              dst=self.tra_if.local_ip4) /
550                                           ICMP(),
551                                           seq_num=0x100000555))
552             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
553             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
554
555             p.scapy_tra_sa.seq_num = 0x200000444
556             pkt = (Ether(src=self.tra_if.remote_mac,
557                          dst=self.tra_if.local_mac) /
558                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
559                                              dst=self.tra_if.local_ip4) /
560                                           ICMP(),
561                                           seq_num=0x200000444))
562             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
563             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
564
565         else:
566             #
567             # without ESN TX sequence numbers can't wrap and packets are
568             # dropped from here on out.
569             #
570             self.send_and_assert_no_replies(self.tra_if, pkts)
571             seq_cycle_count += len(pkts)
572             self.assert_error_counter_equal(seq_cycle_node_name,
573                                             seq_cycle_count)
574
575         # move the security-associations seq number on to the last we used
576         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
577         p.scapy_tra_sa.seq_num = 351
578         p.vpp_tra_sa.seq_num = 351
579
580     def verify_tra_basic4(self, count=1, payload_size=54):
581         """ ipsec v4 transport basic test """
582         self.vapi.cli("clear errors")
583         self.vapi.cli("clear ipsec sa")
584         try:
585             p = self.params[socket.AF_INET]
586             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tra_sa, self.tra_if,
587                                               src=self.tra_if.remote_ip4,
588                                               dst=self.tra_if.local_ip4,
589                                               count=count,
590                                               payload_size=payload_size)
591             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
592                                              self.tra_if)
593             for rx in recv_pkts:
594                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
595                 self.assert_packet_checksums_valid(rx)
596                 try:
597                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
598                     self.assert_packet_checksums_valid(decrypted)
599                 except:
600                     self.logger.debug(ppp("Unexpected packet:", rx))
601                     raise
602         finally:
603             self.logger.info(self.vapi.ppcli("show error"))
604             self.logger.info(self.vapi.ppcli("show ipsec all"))
605
606         pkts = p.tra_sa_in.get_stats()['packets']
607         self.assertEqual(pkts, count,
608                          "incorrect SA in counts: expected %d != %d" %
609                          (count, pkts))
610         pkts = p.tra_sa_out.get_stats()['packets']
611         self.assertEqual(pkts, count,
612                          "incorrect SA out counts: expected %d != %d" %
613                          (count, pkts))
614
615         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
616         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
617
618
619 class IpsecTra4Tests(IpsecTra4):
620     """ UT test methods for Transport v4 """
621     def test_tra_anti_replay(self):
622         """ ipsec v4 transport anti-replay test """
623         self.verify_tra_anti_replay()
624
625     def test_tra_basic(self, count=1):
626         """ ipsec v4 transport basic test """
627         self.verify_tra_basic4(count=1)
628
629     def test_tra_burst(self):
630         """ ipsec v4 transport burst test """
631         self.verify_tra_basic4(count=257)
632
633
634 class IpsecTra6(object):
635     """ verify methods for Transport v6 """
636     def verify_tra_basic6(self, count=1, payload_size=54):
637         self.vapi.cli("clear errors")
638         self.vapi.cli("clear ipsec sa")
639         try:
640             p = self.params[socket.AF_INET6]
641             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tra_sa, self.tra_if,
642                                                src=self.tra_if.remote_ip6,
643                                                dst=self.tra_if.local_ip6,
644                                                count=count,
645                                                payload_size=payload_size)
646             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
647                                              self.tra_if)
648             for rx in recv_pkts:
649                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
650                                  rx[IPv6].plen)
651                 try:
652                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
653                     self.assert_packet_checksums_valid(decrypted)
654                 except:
655                     self.logger.debug(ppp("Unexpected packet:", rx))
656                     raise
657         finally:
658             self.logger.info(self.vapi.ppcli("show error"))
659             self.logger.info(self.vapi.ppcli("show ipsec all"))
660
661         pkts = p.tra_sa_in.get_stats()['packets']
662         self.assertEqual(pkts, count,
663                          "incorrect SA in counts: expected %d != %d" %
664                          (count, pkts))
665         pkts = p.tra_sa_out.get_stats()['packets']
666         self.assertEqual(pkts, count,
667                          "incorrect SA out counts: expected %d != %d" %
668                          (count, pkts))
669         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
670         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
671
672     def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1,
673                                    payload_size=54):
674         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
675                 sa.encrypt(IPv6(src=src, dst=dst) /
676                            ICMPv6EchoRequest(id=0, seq=1,
677                                              data='X' * payload_size))
678                 for i in range(count)]
679
680     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
681         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
682                 IPv6(src=src, dst=dst) /
683                 IPv6ExtHdrHopByHop() /
684                 IPv6ExtHdrFragment(id=2, offset=200) /
685                 Raw(b'\xff' * 200)
686                 for i in range(count)]
687
688     def verify_tra_encrypted6(self, p, sa, rxs):
689         decrypted = []
690         for rx in rxs:
691             self.assert_packet_checksums_valid(rx)
692             try:
693                 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
694                 decrypted.append(decrypt_pkt)
695                 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
696                 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
697             except:
698                 self.logger.debug(ppp("Unexpected packet:", rx))
699                 try:
700                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
701                 except:
702                     pass
703                 raise
704         return decrypted
705
706     def verify_tra_66_ext_hdrs(self, p):
707         count = 63
708
709         #
710         # check we can decrypt with options
711         #
712         tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if,
713                                              src=self.tra_if.remote_ip6,
714                                              dst=self.tra_if.local_ip6,
715                                              count=count)
716         self.send_and_expect(self.tra_if, tx, self.tra_if)
717
718         #
719         # injecting a packet from ourselves to be routed of box is a hack
720         # but it matches an outbout policy, alors je ne regrette rien
721         #
722
723         # one extension before ESP
724         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
725               IPv6(src=self.tra_if.local_ip6,
726                    dst=self.tra_if.remote_ip6) /
727               IPv6ExtHdrFragment(id=2, offset=200) /
728               Raw(b'\xff' * 200))
729
730         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
731         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
732
733         for dc in dcs:
734             # for reasons i'm not going to investigate scapy does not
735             # created the correct headers after decrypt. but reparsing
736             # the ipv6 packet fixes it
737             dc = IPv6(raw(dc[IPv6]))
738             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
739
740         # two extensions before ESP
741         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
742               IPv6(src=self.tra_if.local_ip6,
743                    dst=self.tra_if.remote_ip6) /
744               IPv6ExtHdrHopByHop() /
745               IPv6ExtHdrFragment(id=2, offset=200) /
746               Raw(b'\xff' * 200))
747
748         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
749         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
750
751         for dc in dcs:
752             dc = IPv6(raw(dc[IPv6]))
753             self.assertTrue(dc[IPv6ExtHdrHopByHop])
754             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
755
756         # two extensions before ESP, one after
757         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
758               IPv6(src=self.tra_if.local_ip6,
759                    dst=self.tra_if.remote_ip6) /
760               IPv6ExtHdrHopByHop() /
761               IPv6ExtHdrFragment(id=2, offset=200) /
762               IPv6ExtHdrDestOpt() /
763               Raw(b'\xff' * 200))
764
765         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
766         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
767
768         for dc in dcs:
769             dc = IPv6(raw(dc[IPv6]))
770             self.assertTrue(dc[IPv6ExtHdrDestOpt])
771             self.assertTrue(dc[IPv6ExtHdrHopByHop])
772             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
773
774
775 class IpsecTra6Tests(IpsecTra6):
776     """ UT test methods for Transport v6 """
777     def test_tra_basic6(self):
778         """ ipsec v6 transport basic test """
779         self.verify_tra_basic6(count=1)
780
781     def test_tra_burst6(self):
782         """ ipsec v6 transport burst test """
783         self.verify_tra_basic6(count=257)
784
785
786 class IpsecTra6ExtTests(IpsecTra6):
787     def test_tra_ext_hdrs_66(self):
788         """ ipsec 6o6 tra extension headers test """
789         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
790
791
792 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
793     """ UT test methods for Transport v6 and v4"""
794     pass
795
796
797 class IpsecTun4(object):
798     """ verify methods for Tunnel v4 """
799     def verify_counters4(self, p, count, n_frags=None, worker=None):
800         if not n_frags:
801             n_frags = count
802         if (hasattr(p, "spd_policy_in_any")):
803             pkts = p.spd_policy_in_any.get_stats(worker)['packets']
804             self.assertEqual(pkts, count,
805                              "incorrect SPD any policy: expected %d != %d" %
806                              (count, pkts))
807
808         if (hasattr(p, "tun_sa_in")):
809             pkts = p.tun_sa_in.get_stats(worker)['packets']
810             self.assertEqual(pkts, count,
811                              "incorrect SA in counts: expected %d != %d" %
812                              (count, pkts))
813             pkts = p.tun_sa_out.get_stats(worker)['packets']
814             self.assertEqual(pkts, n_frags,
815                              "incorrect SA out counts: expected %d != %d" %
816                              (count, pkts))
817
818         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
819         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
820
821     def verify_decrypted(self, p, rxs):
822         for rx in rxs:
823             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
824             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
825             self.assert_packet_checksums_valid(rx)
826
827     def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
828         align = sa.crypt_algo.block_size
829         if align < 4:
830             align = 4
831         exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
832         exp_len += sa.crypt_algo.iv_size
833         exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
834         self.assertEqual(exp_len, len(esp_payload))
835
836     def verify_encrypted(self, p, sa, rxs):
837         decrypt_pkts = []
838         for rx in rxs:
839             if p.nat_header:
840                 self.assertEqual(rx[UDP].dport, 4500)
841             self.assert_packet_checksums_valid(rx)
842             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
843             try:
844                 rx_ip = rx[IP]
845                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
846                 if not decrypt_pkt.haslayer(IP):
847                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
848                 if rx_ip.proto == socket.IPPROTO_ESP:
849                     self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
850                 decrypt_pkts.append(decrypt_pkt)
851                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
852                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
853             except:
854                 self.logger.debug(ppp("Unexpected packet:", rx))
855                 try:
856                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
857                 except:
858                     pass
859                 raise
860         pkts = reassemble4(decrypt_pkts)
861         for pkt in pkts:
862             self.assert_packet_checksums_valid(pkt)
863
864     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
865         self.vapi.cli("clear errors")
866         self.vapi.cli("clear ipsec counters")
867         self.vapi.cli("clear ipsec sa")
868         if not n_rx:
869             n_rx = count
870         try:
871             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
872                                               src=p.remote_tun_if_host,
873                                               dst=self.pg1.remote_ip4,
874                                               count=count,
875                                               payload_size=payload_size)
876             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
877             self.verify_decrypted(p, recv_pkts)
878
879             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
880                                       dst=p.remote_tun_if_host, count=count,
881                                       payload_size=payload_size)
882             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
883                                              self.tun_if, n_rx)
884             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
885
886             for rx in recv_pkts:
887                 self.assertEqual(rx[IP].src, p.tun_src)
888                 self.assertEqual(rx[IP].dst, p.tun_dst)
889
890         finally:
891             self.logger.info(self.vapi.ppcli("show error"))
892             self.logger.info(self.vapi.ppcli("show ipsec all"))
893
894         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
895         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
896         self.verify_counters4(p, count, n_rx)
897
898     def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
899         self.vapi.cli("clear errors")
900         if not n_rx:
901             n_rx = count
902         try:
903             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
904                                               src=p.remote_tun_if_host,
905                                               dst=self.pg1.remote_ip4,
906                                               count=count)
907             self.send_and_assert_no_replies(self.tun_if, send_pkts)
908
909             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
910                                       dst=p.remote_tun_if_host, count=count,
911                                       payload_size=payload_size)
912             self.send_and_assert_no_replies(self.pg1, send_pkts)
913
914         finally:
915             self.logger.info(self.vapi.ppcli("show error"))
916             self.logger.info(self.vapi.ppcli("show ipsec all"))
917
918     def verify_tun_reass_44(self, p):
919         self.vapi.cli("clear errors")
920         self.vapi.ip_reassembly_enable_disable(
921             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
922
923         try:
924             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
925                                               src=p.remote_tun_if_host,
926                                               dst=self.pg1.remote_ip4,
927                                               payload_size=1900,
928                                               count=1)
929             send_pkts = fragment_rfc791(send_pkts[0], 1400)
930             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
931                                              self.pg1, n_rx=1)
932             self.verify_decrypted(p, recv_pkts)
933
934             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
935                                       dst=p.remote_tun_if_host, count=1)
936             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
937                                              self.tun_if)
938             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
939
940         finally:
941             self.logger.info(self.vapi.ppcli("show error"))
942             self.logger.info(self.vapi.ppcli("show ipsec all"))
943
944         self.verify_counters4(p, 1, 1)
945         self.vapi.ip_reassembly_enable_disable(
946             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
947
948     def verify_tun_64(self, p, count=1):
949         self.vapi.cli("clear errors")
950         self.vapi.cli("clear ipsec sa")
951         try:
952             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
953                                                src=p.remote_tun_if_host6,
954                                                dst=self.pg1.remote_ip6,
955                                                count=count)
956             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
957             for recv_pkt in recv_pkts:
958                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
959                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
960                 self.assert_packet_checksums_valid(recv_pkt)
961             send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
962                                        dst=p.remote_tun_if_host6, count=count)
963             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
964             for recv_pkt in recv_pkts:
965                 try:
966                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
967                     if not decrypt_pkt.haslayer(IPv6):
968                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
969                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
970                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
971                     self.assert_packet_checksums_valid(decrypt_pkt)
972                 except:
973                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
974                     try:
975                         self.logger.debug(
976                             ppp("Decrypted packet:", decrypt_pkt))
977                     except:
978                         pass
979                     raise
980         finally:
981             self.logger.info(self.vapi.ppcli("show error"))
982             self.logger.info(self.vapi.ppcli("show ipsec all"))
983
984         self.verify_counters4(p, count)
985
986     def verify_keepalive(self, p):
987         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
988                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
989                UDP(sport=333, dport=4500) /
990                Raw(b'\xff'))
991         self.send_and_assert_no_replies(self.tun_if, pkt*31)
992         self.assert_error_counter_equal(
993             '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
994
995         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
996                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
997                UDP(sport=333, dport=4500) /
998                Raw(b'\xfe'))
999         self.send_and_assert_no_replies(self.tun_if, pkt*31)
1000         self.assert_error_counter_equal(
1001             '/err/%s/Too Short' % self.tun4_input_node, 31)
1002
1003
1004 class IpsecTun4Tests(IpsecTun4):
1005     """ UT test methods for Tunnel v4 """
1006     def test_tun_basic44(self):
1007         """ ipsec 4o4 tunnel basic test """
1008         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1009         self.tun_if.admin_down()
1010         self.tun_if.resolve_arp()
1011         self.tun_if.admin_up()
1012         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1013
1014     def test_tun_reass_basic44(self):
1015         """ ipsec 4o4 tunnel basic reassembly test """
1016         self.verify_tun_reass_44(self.params[socket.AF_INET])
1017
1018     def test_tun_burst44(self):
1019         """ ipsec 4o4 tunnel burst test """
1020         self.verify_tun_44(self.params[socket.AF_INET], count=127)
1021
1022
1023 class IpsecTun6(object):
1024     """ verify methods for Tunnel v6 """
1025     def verify_counters6(self, p_in, p_out, count, worker=None):
1026         if (hasattr(p_in, "tun_sa_in")):
1027             pkts = p_in.tun_sa_in.get_stats(worker)['packets']
1028             self.assertEqual(pkts, count,
1029                              "incorrect SA in counts: expected %d != %d" %
1030                              (count, pkts))
1031         if (hasattr(p_out, "tun_sa_out")):
1032             pkts = p_out.tun_sa_out.get_stats(worker)['packets']
1033             self.assertEqual(pkts, count,
1034                              "incorrect SA out counts: expected %d != %d" %
1035                              (count, pkts))
1036         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1037         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
1038
1039     def verify_decrypted6(self, p, rxs):
1040         for rx in rxs:
1041             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1042             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1043             self.assert_packet_checksums_valid(rx)
1044
1045     def verify_encrypted6(self, p, sa, rxs):
1046         for rx in rxs:
1047             self.assert_packet_checksums_valid(rx)
1048             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
1049                              rx[IPv6].plen)
1050             self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
1051             if p.outer_flow_label:
1052                 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
1053             try:
1054                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1055                 if not decrypt_pkt.haslayer(IPv6):
1056                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1057                 self.assert_packet_checksums_valid(decrypt_pkt)
1058                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1059                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1060                 self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
1061                 self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
1062             except:
1063                 self.logger.debug(ppp("Unexpected packet:", rx))
1064                 try:
1065                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1066                 except:
1067                     pass
1068                 raise
1069
1070     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1071         self.vapi.cli("clear errors")
1072         self.vapi.cli("clear ipsec sa")
1073
1074         send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
1075                                            self.tun_if,
1076                                            src=p_in.remote_tun_if_host,
1077                                            dst=self.pg1.remote_ip6,
1078                                            count=count)
1079         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1080         self.logger.info(self.vapi.cli("sh punt stats"))
1081
1082     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1083         self.vapi.cli("clear errors")
1084         self.vapi.cli("clear ipsec sa")
1085         if not p_out:
1086             p_out = p_in
1087         try:
1088             send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
1089                                                self.tun_if,
1090                                                src=p_in.remote_tun_if_host,
1091                                                dst=self.pg1.remote_ip6,
1092                                                count=count,
1093                                                payload_size=payload_size)
1094             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1095             self.verify_decrypted6(p_in, recv_pkts)
1096
1097             send_pkts = self.gen_pkts6(p_in, self.pg1, src=self.pg1.remote_ip6,
1098                                        dst=p_out.remote_tun_if_host,
1099                                        count=count,
1100                                        payload_size=payload_size)
1101             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1102             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1103
1104             for rx in recv_pkts:
1105                 self.assertEqual(rx[IPv6].src, p_out.tun_src)
1106                 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
1107
1108         finally:
1109             self.logger.info(self.vapi.ppcli("show error"))
1110             self.logger.info(self.vapi.ppcli("show ipsec all"))
1111         self.verify_counters6(p_in, p_out, count)
1112
1113     def verify_tun_reass_66(self, p):
1114         self.vapi.cli("clear errors")
1115         self.vapi.ip_reassembly_enable_disable(
1116             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
1117
1118         try:
1119             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
1120                                                src=p.remote_tun_if_host,
1121                                                dst=self.pg1.remote_ip6,
1122                                                count=1,
1123                                                payload_size=1850)
1124             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1125             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1126                                              self.pg1, n_rx=1)
1127             self.verify_decrypted6(p, recv_pkts)
1128
1129             send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
1130                                        dst=p.remote_tun_if_host,
1131                                        count=1,
1132                                        payload_size=64)
1133             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1134                                              self.tun_if)
1135             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1136         finally:
1137             self.logger.info(self.vapi.ppcli("show error"))
1138             self.logger.info(self.vapi.ppcli("show ipsec all"))
1139         self.verify_counters6(p, p, 1)
1140         self.vapi.ip_reassembly_enable_disable(
1141             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
1142
1143     def verify_tun_46(self, p, count=1):
1144         """ ipsec 4o6 tunnel basic test """
1145         self.vapi.cli("clear errors")
1146         self.vapi.cli("clear ipsec sa")
1147         try:
1148             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1149                                               src=p.remote_tun_if_host4,
1150                                               dst=self.pg1.remote_ip4,
1151                                               count=count)
1152             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1153             for recv_pkt in recv_pkts:
1154                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1155                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1156                 self.assert_packet_checksums_valid(recv_pkt)
1157             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1158                                       dst=p.remote_tun_if_host4,
1159                                       count=count)
1160             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1161             for recv_pkt in recv_pkts:
1162                 try:
1163                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1164                     if not decrypt_pkt.haslayer(IP):
1165                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
1166                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1167                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1168                     self.assert_packet_checksums_valid(decrypt_pkt)
1169                 except:
1170                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1171                     try:
1172                         self.logger.debug(ppp("Decrypted packet:",
1173                                               decrypt_pkt))
1174                     except:
1175                         pass
1176                     raise
1177         finally:
1178             self.logger.info(self.vapi.ppcli("show error"))
1179             self.logger.info(self.vapi.ppcli("show ipsec all"))
1180         self.verify_counters6(p, p, count)
1181
1182
1183 class IpsecTun6Tests(IpsecTun6):
1184     """ UT test methods for Tunnel v6 """
1185
1186     def test_tun_basic66(self):
1187         """ ipsec 6o6 tunnel basic test """
1188         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1189
1190     def test_tun_reass_basic66(self):
1191         """ ipsec 6o6 tunnel basic reassembly test """
1192         self.verify_tun_reass_66(self.params[socket.AF_INET6])
1193
1194     def test_tun_burst66(self):
1195         """ ipsec 6o6 tunnel burst test """
1196         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1197
1198
1199 class IpsecTun6HandoffTests(IpsecTun6):
1200     """ UT test methods for Tunnel v6 with multiple workers """
1201     worker_config = "workers 2"
1202
1203     def test_tun_handoff_66(self):
1204         """ ipsec 6o6 tunnel worker hand-off test """
1205         N_PKTS = 15
1206         p = self.params[socket.AF_INET6]
1207
1208         # inject alternately on worker 0 and 1. all counts on the SA
1209         # should be against worker 0
1210         for worker in [0, 1, 0, 1]:
1211             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
1212                                                src=p.remote_tun_if_host,
1213                                                dst=self.pg1.remote_ip6,
1214                                                count=N_PKTS)
1215             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1216                                              self.pg1, worker=worker)
1217             self.verify_decrypted6(p, recv_pkts)
1218
1219             send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
1220                                        dst=p.remote_tun_if_host,
1221                                        count=N_PKTS)
1222             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1223                                              self.tun_if, worker=worker)
1224             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1225
1226         # all counts against the first worker that was used
1227         self.verify_counters6(p, p, 4*N_PKTS, worker=0)
1228
1229
1230 class IpsecTun4HandoffTests(IpsecTun4):
1231     """ UT test methods for Tunnel v4 with multiple workers """
1232     worker_config = "workers 2"
1233
1234     def test_tun_handooff_44(self):
1235         """ ipsec 4o4 tunnel worker hand-off test """
1236         N_PKTS = 15
1237         p = self.params[socket.AF_INET]
1238
1239         # inject alternately on worker 0 and 1. all counts on the SA
1240         # should be against worker 0
1241         for worker in [0, 1, 0, 1]:
1242             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1243                                               src=p.remote_tun_if_host,
1244                                               dst=self.pg1.remote_ip4,
1245                                               count=N_PKTS)
1246             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1247                                              self.pg1, worker=worker)
1248             self.verify_decrypted(p, recv_pkts)
1249
1250             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1251                                       dst=p.remote_tun_if_host,
1252                                       count=N_PKTS)
1253             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1254                                              self.tun_if, worker=worker)
1255             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1256
1257         # all counts against the first worker that was used
1258         self.verify_counters4(p, 4*N_PKTS, worker=0)
1259
1260
1261 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1262     """ UT test methods for Tunnel v6 & v4 """
1263     pass
1264
1265
1266 if __name__ == '__main__':
1267     unittest.main(testRunner=VppTestRunner)