ipsec: Dedicated IPSec interface type
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
13     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
21 from vpp_teib import VppTeib
22 from util import ppp
23 from vpp_papi import VppEnum
24 from vpp_acl import AclRule, VppAcl, VppAclInterface
25
26
27 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
28     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
29     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
30                              IPSEC_API_SAD_FLAG_USE_ESN))
31     crypt_key = mk_scapy_crypt_key(p)
32     if tun_if:
33         p.tun_dst = tun_if.remote_ip
34         p.tun_src = tun_if.local_ip
35     else:
36         p.tun_dst = dst
37         p.tun_src = src
38
39     p.scapy_tun_sa = SecurityAssociation(
40         encryption_type, spi=p.vpp_tun_spi,
41         crypt_algo=p.crypt_algo,
42         crypt_key=crypt_key,
43         auth_algo=p.auth_algo, auth_key=p.auth_key,
44         tunnel_header=ip_class_by_addr_type[p.addr_type](
45             src=p.tun_dst,
46             dst=p.tun_src),
47         nat_t_header=p.nat_header,
48         esn_en=esn_en)
49     p.vpp_tun_sa = SecurityAssociation(
50         encryption_type, spi=p.scapy_tun_spi,
51         crypt_algo=p.crypt_algo,
52         crypt_key=crypt_key,
53         auth_algo=p.auth_algo, auth_key=p.auth_key,
54         tunnel_header=ip_class_by_addr_type[p.addr_type](
55             dst=p.tun_dst,
56             src=p.tun_src),
57         nat_t_header=p.nat_header,
58         esn_en=esn_en)
59
60
61 def config_tra_params(p, encryption_type, tun_if):
62     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
63     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
64                              IPSEC_API_SAD_FLAG_USE_ESN))
65     crypt_key = mk_scapy_crypt_key(p)
66     p.tun_dst = tun_if.remote_ip
67     p.tun_src = tun_if.local_ip
68     p.scapy_tun_sa = SecurityAssociation(
69         encryption_type, spi=p.vpp_tun_spi,
70         crypt_algo=p.crypt_algo,
71         crypt_key=crypt_key,
72         auth_algo=p.auth_algo, auth_key=p.auth_key,
73         esn_en=esn_en,
74         nat_t_header=p.nat_header)
75     p.vpp_tun_sa = SecurityAssociation(
76         encryption_type, spi=p.scapy_tun_spi,
77         crypt_algo=p.crypt_algo,
78         crypt_key=crypt_key,
79         auth_algo=p.auth_algo, auth_key=p.auth_key,
80         esn_en=esn_en,
81         nat_t_header=p.nat_header)
82
83
84 class TemplateIpsec4TunIfEsp(TemplateIpsec):
85     """ IPsec tunnel interface tests """
86
87     encryption_type = ESP
88
89     @classmethod
90     def setUpClass(cls):
91         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
92
93     @classmethod
94     def tearDownClass(cls):
95         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
96
97     def setUp(self):
98         super(TemplateIpsec4TunIfEsp, self).setUp()
99
100         self.tun_if = self.pg0
101
102         p = self.ipv4_params
103
104         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
105                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
106                                         p.crypt_key, p.crypt_key,
107                                         p.auth_algo_vpp_id, p.auth_key,
108                                         p.auth_key)
109         p.tun_if.add_vpp_config()
110         p.tun_if.admin_up()
111         p.tun_if.config_ip4()
112         p.tun_if.config_ip6()
113         config_tun_params(p, self.encryption_type, p.tun_if)
114
115         r = VppIpRoute(self, p.remote_tun_if_host, 32,
116                        [VppRoutePath(p.tun_if.remote_ip4,
117                                      0xffffffff)])
118         r.add_vpp_config()
119         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
120                        [VppRoutePath(p.tun_if.remote_ip6,
121                                      0xffffffff,
122                                      proto=DpoProto.DPO_PROTO_IP6)])
123         r.add_vpp_config()
124
125     def tearDown(self):
126         super(TemplateIpsec4TunIfEsp, self).tearDown()
127
128
129 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
130     """ IPsec UDP tunnel interface tests """
131
132     tun4_encrypt_node_name = "esp4-encrypt-tun"
133     tun4_decrypt_node_name = "esp4-decrypt-tun"
134     encryption_type = ESP
135
136     @classmethod
137     def setUpClass(cls):
138         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
139
140     @classmethod
141     def tearDownClass(cls):
142         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
143
144     def verify_encrypted(self, p, sa, rxs):
145         for rx in rxs:
146             try:
147                 # ensure the UDP ports are correct before we decrypt
148                 # which strips them
149                 self.assertTrue(rx.haslayer(UDP))
150                 self.assert_equal(rx[UDP].sport, 4500)
151                 self.assert_equal(rx[UDP].dport, 4500)
152
153                 pkt = sa.decrypt(rx[IP])
154                 if not pkt.haslayer(IP):
155                     pkt = IP(pkt[Raw].load)
156
157                 self.assert_packet_checksums_valid(pkt)
158                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
159                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
160             except (IndexError, AssertionError):
161                 self.logger.debug(ppp("Unexpected packet:", rx))
162                 try:
163                     self.logger.debug(ppp("Decrypted packet:", pkt))
164                 except:
165                     pass
166                 raise
167
168     def setUp(self):
169         super(TemplateIpsec4TunIfEspUdp, self).setUp()
170
171         p = self.ipv4_params
172         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
173                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
174         p.nat_header = UDP(sport=5454, dport=4500)
175
176     def config_network(self):
177
178         self.tun_if = self.pg0
179         p = self.ipv4_params
180         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
181                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
182                                         p.crypt_key, p.crypt_key,
183                                         p.auth_algo_vpp_id, p.auth_key,
184                                         p.auth_key, udp_encap=True)
185         p.tun_if.add_vpp_config()
186         p.tun_if.admin_up()
187         p.tun_if.config_ip4()
188         p.tun_if.config_ip6()
189         config_tun_params(p, self.encryption_type, p.tun_if)
190
191         r = VppIpRoute(self, p.remote_tun_if_host, 32,
192                        [VppRoutePath(p.tun_if.remote_ip4,
193                                      0xffffffff)])
194         r.add_vpp_config()
195         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
196                        [VppRoutePath(p.tun_if.remote_ip6,
197                                      0xffffffff,
198                                      proto=DpoProto.DPO_PROTO_IP6)])
199         r.add_vpp_config()
200
201     def tearDown(self):
202         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
203
204
205 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
206     """ Ipsec ESP - TUN tests """
207     tun4_encrypt_node_name = "esp4-encrypt-tun"
208     tun4_decrypt_node_name = "esp4-decrypt-tun"
209
210     def test_tun_basic64(self):
211         """ ipsec 6o4 tunnel basic test """
212         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
213
214         self.verify_tun_64(self.params[socket.AF_INET], count=1)
215
216     def test_tun_burst64(self):
217         """ ipsec 6o4 tunnel basic test """
218         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
219
220         self.verify_tun_64(self.params[socket.AF_INET], count=257)
221
222     def test_tun_basic_frag44(self):
223         """ ipsec 4o4 tunnel frag basic test """
224         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
225
226         p = self.ipv4_params
227
228         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
229                                        [1500, 0, 0, 0])
230         self.verify_tun_44(self.params[socket.AF_INET],
231                            count=1, payload_size=1800, n_rx=2)
232         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
233                                        [9000, 0, 0, 0])
234
235
236 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
237     """ Ipsec ESP UDP tests """
238
239     tun4_input_node = "ipsec4-tun-input"
240
241     def setUp(self):
242         super(TemplateIpsec4TunIfEspUdp, self).setUp()
243         self.config_network()
244
245     def test_keepalive(self):
246         """ IPSEC NAT Keepalive """
247         self.verify_keepalive(self.ipv4_params)
248
249
250 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
251     """ Ipsec ESP UDP GCM tests """
252
253     tun4_input_node = "ipsec4-tun-input"
254
255     def setUp(self):
256         super(TemplateIpsec4TunIfEspUdp, self).setUp()
257         p = self.ipv4_params
258         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
259                               IPSEC_API_INTEG_ALG_NONE)
260         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
261                                IPSEC_API_CRYPTO_ALG_AES_GCM_256)
262         p.crypt_algo = "AES-GCM"
263         p.auth_algo = "NULL"
264         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
265         p.salt = 0
266         self.config_network()
267
268
269 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
270     """ Ipsec ESP - TCP tests """
271     pass
272
273
274 class TemplateIpsec6TunIfEsp(TemplateIpsec):
275     """ IPsec tunnel interface tests """
276
277     encryption_type = ESP
278
279     def setUp(self):
280         super(TemplateIpsec6TunIfEsp, self).setUp()
281
282         self.tun_if = self.pg0
283
284         p = self.ipv6_params
285         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
286                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
287                                         p.crypt_key, p.crypt_key,
288                                         p.auth_algo_vpp_id, p.auth_key,
289                                         p.auth_key, is_ip6=True)
290         p.tun_if.add_vpp_config()
291         p.tun_if.admin_up()
292         p.tun_if.config_ip6()
293         p.tun_if.config_ip4()
294         config_tun_params(p, self.encryption_type, p.tun_if)
295
296         r = VppIpRoute(self, p.remote_tun_if_host, 128,
297                        [VppRoutePath(p.tun_if.remote_ip6,
298                                      0xffffffff,
299                                      proto=DpoProto.DPO_PROTO_IP6)])
300         r.add_vpp_config()
301         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
302                        [VppRoutePath(p.tun_if.remote_ip4,
303                                      0xffffffff)])
304         r.add_vpp_config()
305
306     def tearDown(self):
307         super(TemplateIpsec6TunIfEsp, self).tearDown()
308
309
310 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
311                           IpsecTun6Tests):
312     """ Ipsec ESP - TUN tests """
313     tun6_encrypt_node_name = "esp6-encrypt-tun"
314     tun6_decrypt_node_name = "esp6-decrypt-tun"
315
316     def test_tun_basic46(self):
317         """ ipsec 4o6 tunnel basic test """
318         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
319         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
320
321     def test_tun_burst46(self):
322         """ ipsec 4o6 tunnel burst test """
323         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
324         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
325
326
327 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
328                                 IpsecTun6HandoffTests):
329     """ Ipsec ESP 6 Handoff tests """
330     tun6_encrypt_node_name = "esp6-encrypt-tun"
331     tun6_decrypt_node_name = "esp6-decrypt-tun"
332
333
334 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
335                                 IpsecTun4HandoffTests):
336     """ Ipsec ESP 4 Handoff tests """
337     tun4_encrypt_node_name = "esp4-encrypt-tun"
338     tun4_decrypt_node_name = "esp4-decrypt-tun"
339
340
341 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
342     """ IPsec IPv4 Multi Tunnel interface """
343
344     encryption_type = ESP
345     tun4_encrypt_node_name = "esp4-encrypt-tun"
346     tun4_decrypt_node_name = "esp4-decrypt-tun"
347
348     def setUp(self):
349         super(TestIpsec4MultiTunIfEsp, self).setUp()
350
351         self.tun_if = self.pg0
352
353         self.multi_params = []
354         self.pg0.generate_remote_hosts(10)
355         self.pg0.configure_ipv4_neighbors()
356
357         for ii in range(10):
358             p = copy.copy(self.ipv4_params)
359
360             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
361             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
362             p.scapy_tun_spi = p.scapy_tun_spi + ii
363             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
364             p.vpp_tun_spi = p.vpp_tun_spi + ii
365
366             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
367             p.scapy_tra_spi = p.scapy_tra_spi + ii
368             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
369             p.vpp_tra_spi = p.vpp_tra_spi + ii
370             p.tun_dst = self.pg0.remote_hosts[ii].ip4
371
372             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
373                                             p.scapy_tun_spi,
374                                             p.crypt_algo_vpp_id,
375                                             p.crypt_key, p.crypt_key,
376                                             p.auth_algo_vpp_id, p.auth_key,
377                                             p.auth_key,
378                                             dst=p.tun_dst)
379             p.tun_if.add_vpp_config()
380             p.tun_if.admin_up()
381             p.tun_if.config_ip4()
382             config_tun_params(p, self.encryption_type, p.tun_if)
383             self.multi_params.append(p)
384
385             VppIpRoute(self, p.remote_tun_if_host, 32,
386                        [VppRoutePath(p.tun_if.remote_ip4,
387                                      0xffffffff)]).add_vpp_config()
388
389     def tearDown(self):
390         super(TestIpsec4MultiTunIfEsp, self).tearDown()
391
392     def test_tun_44(self):
393         """Multiple IPSEC tunnel interfaces """
394         for p in self.multi_params:
395             self.verify_tun_44(p, count=127)
396             c = p.tun_if.get_rx_stats()
397             self.assertEqual(c['packets'], 127)
398             c = p.tun_if.get_tx_stats()
399             self.assertEqual(c['packets'], 127)
400
401     def test_tun_rr_44(self):
402         """ Round-robin packets acrros multiple interface """
403         tx = []
404         for p in self.multi_params:
405             tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
406                                             src=p.remote_tun_if_host,
407                                             dst=self.pg1.remote_ip4)
408         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
409
410         for rx, p in zip(rxs, self.multi_params):
411             self.verify_decrypted(p, [rx])
412
413         tx = []
414         for p in self.multi_params:
415             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
416                                     dst=p.remote_tun_if_host)
417         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
418
419         for rx, p in zip(rxs, self.multi_params):
420             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
421
422
423 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
424     """ IPsec IPv4 Tunnel interface all Algos """
425
426     encryption_type = ESP
427     tun4_encrypt_node_name = "esp4-encrypt-tun"
428     tun4_decrypt_node_name = "esp4-decrypt-tun"
429
430     def config_network(self, p):
431
432         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
433                                         p.scapy_tun_spi,
434                                         p.crypt_algo_vpp_id,
435                                         p.crypt_key, p.crypt_key,
436                                         p.auth_algo_vpp_id, p.auth_key,
437                                         p.auth_key,
438                                         salt=p.salt)
439         p.tun_if.add_vpp_config()
440         p.tun_if.admin_up()
441         p.tun_if.config_ip4()
442         config_tun_params(p, self.encryption_type, p.tun_if)
443         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
444         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
445
446         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
447                              [VppRoutePath(p.tun_if.remote_ip4,
448                                            0xffffffff)])
449         p.route.add_vpp_config()
450
451     def unconfig_network(self, p):
452         p.tun_if.unconfig_ip4()
453         p.tun_if.remove_vpp_config()
454         p.route.remove_vpp_config()
455
456     def setUp(self):
457         super(TestIpsec4TunIfEspAll, self).setUp()
458
459         self.tun_if = self.pg0
460
461     def tearDown(self):
462         super(TestIpsec4TunIfEspAll, self).tearDown()
463
464     def rekey(self, p):
465         #
466         # change the key and the SPI
467         #
468         p.crypt_key = b'X' + p.crypt_key[1:]
469         p.scapy_tun_spi += 1
470         p.scapy_tun_sa_id += 1
471         p.vpp_tun_spi += 1
472         p.vpp_tun_sa_id += 1
473         p.tun_if.local_spi = p.vpp_tun_spi
474         p.tun_if.remote_spi = p.scapy_tun_spi
475
476         config_tun_params(p, self.encryption_type, p.tun_if)
477
478         p.tun_sa_in = VppIpsecSA(self,
479                                  p.scapy_tun_sa_id,
480                                  p.scapy_tun_spi,
481                                  p.auth_algo_vpp_id,
482                                  p.auth_key,
483                                  p.crypt_algo_vpp_id,
484                                  p.crypt_key,
485                                  self.vpp_esp_protocol,
486                                  flags=p.flags,
487                                  salt=p.salt)
488         p.tun_sa_out = VppIpsecSA(self,
489                                   p.vpp_tun_sa_id,
490                                   p.vpp_tun_spi,
491                                   p.auth_algo_vpp_id,
492                                   p.auth_key,
493                                   p.crypt_algo_vpp_id,
494                                   p.crypt_key,
495                                   self.vpp_esp_protocol,
496                                   flags=p.flags,
497                                   salt=p.salt)
498         p.tun_sa_in.add_vpp_config()
499         p.tun_sa_out.add_vpp_config()
500
501         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
502                                          sa_id=p.tun_sa_in.id,
503                                          is_outbound=1)
504         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
505                                          sa_id=p.tun_sa_out.id,
506                                          is_outbound=0)
507         self.logger.info(self.vapi.cli("sh ipsec sa"))
508
509     def test_tun_44(self):
510         """IPSEC tunnel all algos """
511
512         # foreach VPP crypto engine
513         engines = ["ia32", "ipsecmb", "openssl"]
514
515         # foreach crypto algorithm
516         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
517                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
518                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
519                                 IPSEC_API_INTEG_ALG_NONE),
520                   'scapy-crypto': "AES-GCM",
521                   'scapy-integ': "NULL",
522                   'key': b"JPjyOWBeVEQiMe7h",
523                   'salt': 3333},
524                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
525                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
526                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
527                                 IPSEC_API_INTEG_ALG_NONE),
528                   'scapy-crypto': "AES-GCM",
529                   'scapy-integ': "NULL",
530                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
531                   'salt': 0},
532                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
533                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
534                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
535                                 IPSEC_API_INTEG_ALG_NONE),
536                   'scapy-crypto': "AES-GCM",
537                   'scapy-integ': "NULL",
538                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
539                   'salt': 9999},
540                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
541                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
542                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
543                                 IPSEC_API_INTEG_ALG_SHA1_96),
544                   'scapy-crypto': "AES-CBC",
545                   'scapy-integ': "HMAC-SHA1-96",
546                   'salt': 0,
547                   'key': b"JPjyOWBeVEQiMe7h"},
548                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
549                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
550                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
551                                 IPSEC_API_INTEG_ALG_SHA1_96),
552                   'scapy-crypto': "AES-CBC",
553                   'scapy-integ': "HMAC-SHA1-96",
554                   'salt': 0,
555                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
556                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
557                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
558                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
559                                 IPSEC_API_INTEG_ALG_SHA1_96),
560                   'scapy-crypto': "AES-CBC",
561                   'scapy-integ': "HMAC-SHA1-96",
562                   'salt': 0,
563                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
564                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
565                                  IPSEC_API_CRYPTO_ALG_NONE),
566                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
567                                 IPSEC_API_INTEG_ALG_SHA1_96),
568                   'scapy-crypto': "NULL",
569                   'scapy-integ': "HMAC-SHA1-96",
570                   'salt': 0,
571                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
572
573         for engine in engines:
574             self.vapi.cli("set crypto handler all %s" % engine)
575
576             #
577             # loop through each of the algorithms
578             #
579             for algo in algos:
580                 # with self.subTest(algo=algo['scapy']):
581
582                 p = copy.copy(self.ipv4_params)
583                 p.auth_algo_vpp_id = algo['vpp-integ']
584                 p.crypt_algo_vpp_id = algo['vpp-crypto']
585                 p.crypt_algo = algo['scapy-crypto']
586                 p.auth_algo = algo['scapy-integ']
587                 p.crypt_key = algo['key']
588                 p.salt = algo['salt']
589
590                 self.config_network(p)
591
592                 self.verify_tun_44(p, count=127)
593                 c = p.tun_if.get_rx_stats()
594                 self.assertEqual(c['packets'], 127)
595                 c = p.tun_if.get_tx_stats()
596                 self.assertEqual(c['packets'], 127)
597
598                 #
599                 # rekey the tunnel
600                 #
601                 self.rekey(p)
602                 self.verify_tun_44(p, count=127)
603
604                 self.unconfig_network(p)
605                 p.tun_sa_out.remove_vpp_config()
606                 p.tun_sa_in.remove_vpp_config()
607
608
609 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
610     """ IPsec IPv4 Tunnel interface no Algos """
611
612     encryption_type = ESP
613     tun4_encrypt_node_name = "esp4-encrypt-tun"
614     tun4_decrypt_node_name = "esp4-decrypt-tun"
615
616     def config_network(self, p):
617
618         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
619                               IPSEC_API_INTEG_ALG_NONE)
620         p.auth_algo = 'NULL'
621         p.auth_key = []
622
623         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
624                                IPSEC_API_CRYPTO_ALG_NONE)
625         p.crypt_algo = 'NULL'
626         p.crypt_key = []
627
628         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
629                                         p.scapy_tun_spi,
630                                         p.crypt_algo_vpp_id,
631                                         p.crypt_key, p.crypt_key,
632                                         p.auth_algo_vpp_id, p.auth_key,
633                                         p.auth_key,
634                                         salt=p.salt)
635         p.tun_if.add_vpp_config()
636         p.tun_if.admin_up()
637         p.tun_if.config_ip4()
638         config_tun_params(p, self.encryption_type, p.tun_if)
639         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
640         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
641
642         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
643                              [VppRoutePath(p.tun_if.remote_ip4,
644                                            0xffffffff)])
645         p.route.add_vpp_config()
646
647     def unconfig_network(self, p):
648         p.tun_if.unconfig_ip4()
649         p.tun_if.remove_vpp_config()
650         p.route.remove_vpp_config()
651
652     def setUp(self):
653         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
654
655         self.tun_if = self.pg0
656
657     def tearDown(self):
658         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
659
660     def test_tun_44(self):
661         """ IPSec SA with NULL algos """
662         p = self.ipv4_params
663
664         self.config_network(p)
665
666         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
667                            dst=p.remote_tun_if_host)
668         self.send_and_assert_no_replies(self.pg1, tx)
669
670         self.unconfig_network(p)
671
672
673 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
674     """ IPsec IPv6 Multi Tunnel interface """
675
676     encryption_type = ESP
677     tun6_encrypt_node_name = "esp6-encrypt-tun"
678     tun6_decrypt_node_name = "esp6-decrypt-tun"
679
680     def setUp(self):
681         super(TestIpsec6MultiTunIfEsp, self).setUp()
682
683         self.tun_if = self.pg0
684
685         self.multi_params = []
686         self.pg0.generate_remote_hosts(10)
687         self.pg0.configure_ipv6_neighbors()
688
689         for ii in range(10):
690             p = copy.copy(self.ipv6_params)
691
692             p.remote_tun_if_host = "1111::%d" % (ii + 1)
693             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
694             p.scapy_tun_spi = p.scapy_tun_spi + ii
695             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
696             p.vpp_tun_spi = p.vpp_tun_spi + ii
697
698             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
699             p.scapy_tra_spi = p.scapy_tra_spi + ii
700             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
701             p.vpp_tra_spi = p.vpp_tra_spi + ii
702
703             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
704                                             p.scapy_tun_spi,
705                                             p.crypt_algo_vpp_id,
706                                             p.crypt_key, p.crypt_key,
707                                             p.auth_algo_vpp_id, p.auth_key,
708                                             p.auth_key, is_ip6=True,
709                                             dst=self.pg0.remote_hosts[ii].ip6)
710             p.tun_if.add_vpp_config()
711             p.tun_if.admin_up()
712             p.tun_if.config_ip6()
713             config_tun_params(p, self.encryption_type, p.tun_if)
714             self.multi_params.append(p)
715
716             r = VppIpRoute(self, p.remote_tun_if_host, 128,
717                            [VppRoutePath(p.tun_if.remote_ip6,
718                                          0xffffffff,
719                                          proto=DpoProto.DPO_PROTO_IP6)])
720             r.add_vpp_config()
721
722     def tearDown(self):
723         super(TestIpsec6MultiTunIfEsp, self).tearDown()
724
725     def test_tun_66(self):
726         """Multiple IPSEC tunnel interfaces """
727         for p in self.multi_params:
728             self.verify_tun_66(p, count=127)
729             c = p.tun_if.get_rx_stats()
730             self.assertEqual(c['packets'], 127)
731             c = p.tun_if.get_tx_stats()
732             self.assertEqual(c['packets'], 127)
733
734
735 class TestIpsecGreTebIfEsp(TemplateIpsec,
736                            IpsecTun4Tests):
737     """ Ipsec GRE TEB ESP - TUN tests """
738     tun4_encrypt_node_name = "esp4-encrypt-tun"
739     tun4_decrypt_node_name = "esp4-decrypt-tun"
740     encryption_type = ESP
741     omac = "00:11:22:33:44:55"
742
743     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
744                          payload_size=100):
745         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
746                 sa.encrypt(IP(src=self.pg0.remote_ip4,
747                               dst=self.pg0.local_ip4) /
748                            GRE() /
749                            Ether(dst=self.omac) /
750                            IP(src="1.1.1.1", dst="1.1.1.2") /
751                            UDP(sport=1144, dport=2233) /
752                            Raw(b'X' * payload_size))
753                 for i in range(count)]
754
755     def gen_pkts(self, sw_intf, src, dst, count=1,
756                  payload_size=100):
757         return [Ether(dst=self.omac) /
758                 IP(src="1.1.1.1", dst="1.1.1.2") /
759                 UDP(sport=1144, dport=2233) /
760                 Raw(b'X' * payload_size)
761                 for i in range(count)]
762
763     def verify_decrypted(self, p, rxs):
764         for rx in rxs:
765             self.assert_equal(rx[Ether].dst, self.omac)
766             self.assert_equal(rx[IP].dst, "1.1.1.2")
767
768     def verify_encrypted(self, p, sa, rxs):
769         for rx in rxs:
770             try:
771                 pkt = sa.decrypt(rx[IP])
772                 if not pkt.haslayer(IP):
773                     pkt = IP(pkt[Raw].load)
774                 self.assert_packet_checksums_valid(pkt)
775                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
776                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
777                 self.assertTrue(pkt.haslayer(GRE))
778                 e = pkt[Ether]
779                 self.assertEqual(e[Ether].dst, self.omac)
780                 self.assertEqual(e[IP].dst, "1.1.1.2")
781             except (IndexError, AssertionError):
782                 self.logger.debug(ppp("Unexpected packet:", rx))
783                 try:
784                     self.logger.debug(ppp("Decrypted packet:", pkt))
785                 except:
786                     pass
787                 raise
788
789     def setUp(self):
790         super(TestIpsecGreTebIfEsp, self).setUp()
791
792         self.tun_if = self.pg0
793
794         p = self.ipv4_params
795
796         bd1 = VppBridgeDomain(self, 1)
797         bd1.add_vpp_config()
798
799         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
800                                   p.auth_algo_vpp_id, p.auth_key,
801                                   p.crypt_algo_vpp_id, p.crypt_key,
802                                   self.vpp_esp_protocol,
803                                   self.pg0.local_ip4,
804                                   self.pg0.remote_ip4)
805         p.tun_sa_out.add_vpp_config()
806
807         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
808                                  p.auth_algo_vpp_id, p.auth_key,
809                                  p.crypt_algo_vpp_id, p.crypt_key,
810                                  self.vpp_esp_protocol,
811                                  self.pg0.remote_ip4,
812                                  self.pg0.local_ip4)
813         p.tun_sa_in.add_vpp_config()
814
815         p.tun_if = VppGreInterface(self,
816                                    self.pg0.local_ip4,
817                                    self.pg0.remote_ip4,
818                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
819                                          GRE_API_TUNNEL_TYPE_TEB))
820         p.tun_if.add_vpp_config()
821
822         p.tun_protect = VppIpsecTunProtect(self,
823                                            p.tun_if,
824                                            p.tun_sa_out,
825                                            [p.tun_sa_in])
826
827         p.tun_protect.add_vpp_config()
828
829         p.tun_if.admin_up()
830         p.tun_if.config_ip4()
831         config_tun_params(p, self.encryption_type, p.tun_if)
832
833         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
834         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
835
836         self.vapi.cli("clear ipsec sa")
837         self.vapi.cli("sh adj")
838         self.vapi.cli("sh ipsec tun")
839
840     def tearDown(self):
841         p = self.ipv4_params
842         p.tun_if.unconfig_ip4()
843         super(TestIpsecGreTebIfEsp, self).tearDown()
844
845
846 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
847                                IpsecTun4Tests):
848     """ Ipsec GRE TEB ESP - TUN tests """
849     tun4_encrypt_node_name = "esp4-encrypt-tun"
850     tun4_decrypt_node_name = "esp4-decrypt-tun"
851     encryption_type = ESP
852     omac = "00:11:22:33:44:55"
853
854     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
855                          payload_size=100):
856         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
857                 sa.encrypt(IP(src=self.pg0.remote_ip4,
858                               dst=self.pg0.local_ip4) /
859                            GRE() /
860                            Ether(dst=self.omac) /
861                            IP(src="1.1.1.1", dst="1.1.1.2") /
862                            UDP(sport=1144, dport=2233) /
863                            Raw(b'X' * payload_size))
864                 for i in range(count)]
865
866     def gen_pkts(self, sw_intf, src, dst, count=1,
867                  payload_size=100):
868         return [Ether(dst=self.omac) /
869                 Dot1Q(vlan=11) /
870                 IP(src="1.1.1.1", dst="1.1.1.2") /
871                 UDP(sport=1144, dport=2233) /
872                 Raw(b'X' * payload_size)
873                 for i in range(count)]
874
875     def verify_decrypted(self, p, rxs):
876         for rx in rxs:
877             self.assert_equal(rx[Ether].dst, self.omac)
878             self.assert_equal(rx[Dot1Q].vlan, 11)
879             self.assert_equal(rx[IP].dst, "1.1.1.2")
880
881     def verify_encrypted(self, p, sa, rxs):
882         for rx in rxs:
883             try:
884                 pkt = sa.decrypt(rx[IP])
885                 if not pkt.haslayer(IP):
886                     pkt = IP(pkt[Raw].load)
887                 self.assert_packet_checksums_valid(pkt)
888                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
889                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
890                 self.assertTrue(pkt.haslayer(GRE))
891                 e = pkt[Ether]
892                 self.assertEqual(e[Ether].dst, self.omac)
893                 self.assertFalse(e.haslayer(Dot1Q))
894                 self.assertEqual(e[IP].dst, "1.1.1.2")
895             except (IndexError, AssertionError):
896                 self.logger.debug(ppp("Unexpected packet:", rx))
897                 try:
898                     self.logger.debug(ppp("Decrypted packet:", pkt))
899                 except:
900                     pass
901                 raise
902
903     def setUp(self):
904         super(TestIpsecGreTebVlanIfEsp, self).setUp()
905
906         self.tun_if = self.pg0
907
908         p = self.ipv4_params
909
910         bd1 = VppBridgeDomain(self, 1)
911         bd1.add_vpp_config()
912
913         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
914         self.vapi.l2_interface_vlan_tag_rewrite(
915             sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
916             push_dot1q=11)
917         self.pg1_11.admin_up()
918
919         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
920                                   p.auth_algo_vpp_id, p.auth_key,
921                                   p.crypt_algo_vpp_id, p.crypt_key,
922                                   self.vpp_esp_protocol,
923                                   self.pg0.local_ip4,
924                                   self.pg0.remote_ip4)
925         p.tun_sa_out.add_vpp_config()
926
927         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
928                                  p.auth_algo_vpp_id, p.auth_key,
929                                  p.crypt_algo_vpp_id, p.crypt_key,
930                                  self.vpp_esp_protocol,
931                                  self.pg0.remote_ip4,
932                                  self.pg0.local_ip4)
933         p.tun_sa_in.add_vpp_config()
934
935         p.tun_if = VppGreInterface(self,
936                                    self.pg0.local_ip4,
937                                    self.pg0.remote_ip4,
938                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
939                                          GRE_API_TUNNEL_TYPE_TEB))
940         p.tun_if.add_vpp_config()
941
942         p.tun_protect = VppIpsecTunProtect(self,
943                                            p.tun_if,
944                                            p.tun_sa_out,
945                                            [p.tun_sa_in])
946
947         p.tun_protect.add_vpp_config()
948
949         p.tun_if.admin_up()
950         p.tun_if.config_ip4()
951         config_tun_params(p, self.encryption_type, p.tun_if)
952
953         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
954         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
955
956         self.vapi.cli("clear ipsec sa")
957
958     def tearDown(self):
959         p = self.ipv4_params
960         p.tun_if.unconfig_ip4()
961         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
962         self.pg1_11.admin_down()
963         self.pg1_11.remove_vpp_config()
964
965
966 class TestIpsecGreTebIfEspTra(TemplateIpsec,
967                               IpsecTun4Tests):
968     """ Ipsec GRE TEB ESP - Tra tests """
969     tun4_encrypt_node_name = "esp4-encrypt-tun"
970     tun4_decrypt_node_name = "esp4-decrypt-tun"
971     encryption_type = ESP
972     omac = "00:11:22:33:44:55"
973
974     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
975                          payload_size=100):
976         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
977                 sa.encrypt(IP(src=self.pg0.remote_ip4,
978                               dst=self.pg0.local_ip4) /
979                            GRE() /
980                            Ether(dst=self.omac) /
981                            IP(src="1.1.1.1", dst="1.1.1.2") /
982                            UDP(sport=1144, dport=2233) /
983                            Raw(b'X' * payload_size))
984                 for i in range(count)]
985
986     def gen_pkts(self, sw_intf, src, dst, count=1,
987                  payload_size=100):
988         return [Ether(dst=self.omac) /
989                 IP(src="1.1.1.1", dst="1.1.1.2") /
990                 UDP(sport=1144, dport=2233) /
991                 Raw(b'X' * payload_size)
992                 for i in range(count)]
993
994     def verify_decrypted(self, p, rxs):
995         for rx in rxs:
996             self.assert_equal(rx[Ether].dst, self.omac)
997             self.assert_equal(rx[IP].dst, "1.1.1.2")
998
999     def verify_encrypted(self, p, sa, rxs):
1000         for rx in rxs:
1001             try:
1002                 pkt = sa.decrypt(rx[IP])
1003                 if not pkt.haslayer(IP):
1004                     pkt = IP(pkt[Raw].load)
1005                 self.assert_packet_checksums_valid(pkt)
1006                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1007                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1008                 self.assertTrue(pkt.haslayer(GRE))
1009                 e = pkt[Ether]
1010                 self.assertEqual(e[Ether].dst, self.omac)
1011                 self.assertEqual(e[IP].dst, "1.1.1.2")
1012             except (IndexError, AssertionError):
1013                 self.logger.debug(ppp("Unexpected packet:", rx))
1014                 try:
1015                     self.logger.debug(ppp("Decrypted packet:", pkt))
1016                 except:
1017                     pass
1018                 raise
1019
1020     def setUp(self):
1021         super(TestIpsecGreTebIfEspTra, self).setUp()
1022
1023         self.tun_if = self.pg0
1024
1025         p = self.ipv4_params
1026
1027         bd1 = VppBridgeDomain(self, 1)
1028         bd1.add_vpp_config()
1029
1030         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1031                                   p.auth_algo_vpp_id, p.auth_key,
1032                                   p.crypt_algo_vpp_id, p.crypt_key,
1033                                   self.vpp_esp_protocol)
1034         p.tun_sa_out.add_vpp_config()
1035
1036         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1037                                  p.auth_algo_vpp_id, p.auth_key,
1038                                  p.crypt_algo_vpp_id, p.crypt_key,
1039                                  self.vpp_esp_protocol)
1040         p.tun_sa_in.add_vpp_config()
1041
1042         p.tun_if = VppGreInterface(self,
1043                                    self.pg0.local_ip4,
1044                                    self.pg0.remote_ip4,
1045                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1046                                          GRE_API_TUNNEL_TYPE_TEB))
1047         p.tun_if.add_vpp_config()
1048
1049         p.tun_protect = VppIpsecTunProtect(self,
1050                                            p.tun_if,
1051                                            p.tun_sa_out,
1052                                            [p.tun_sa_in])
1053
1054         p.tun_protect.add_vpp_config()
1055
1056         p.tun_if.admin_up()
1057         p.tun_if.config_ip4()
1058         config_tra_params(p, self.encryption_type, p.tun_if)
1059
1060         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1061         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1062
1063         self.vapi.cli("clear ipsec sa")
1064
1065     def tearDown(self):
1066         p = self.ipv4_params
1067         p.tun_if.unconfig_ip4()
1068         super(TestIpsecGreTebIfEspTra, self).tearDown()
1069
1070
1071 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1072                                  IpsecTun4Tests):
1073     """ Ipsec GRE TEB UDP ESP - Tra tests """
1074     tun4_encrypt_node_name = "esp4-encrypt-tun"
1075     tun4_decrypt_node_name = "esp4-decrypt-tun"
1076     encryption_type = ESP
1077     omac = "00:11:22:33:44:55"
1078
1079     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1080                          payload_size=100):
1081         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1082                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1083                               dst=self.pg0.local_ip4) /
1084                            GRE() /
1085                            Ether(dst=self.omac) /
1086                            IP(src="1.1.1.1", dst="1.1.1.2") /
1087                            UDP(sport=1144, dport=2233) /
1088                            Raw(b'X' * payload_size))
1089                 for i in range(count)]
1090
1091     def gen_pkts(self, sw_intf, src, dst, count=1,
1092                  payload_size=100):
1093         return [Ether(dst=self.omac) /
1094                 IP(src="1.1.1.1", dst="1.1.1.2") /
1095                 UDP(sport=1144, dport=2233) /
1096                 Raw(b'X' * payload_size)
1097                 for i in range(count)]
1098
1099     def verify_decrypted(self, p, rxs):
1100         for rx in rxs:
1101             self.assert_equal(rx[Ether].dst, self.omac)
1102             self.assert_equal(rx[IP].dst, "1.1.1.2")
1103
1104     def verify_encrypted(self, p, sa, rxs):
1105         for rx in rxs:
1106             self.assertTrue(rx.haslayer(UDP))
1107             self.assertEqual(rx[UDP].dport, 4545)
1108             self.assertEqual(rx[UDP].sport, 5454)
1109             try:
1110                 pkt = sa.decrypt(rx[IP])
1111                 if not pkt.haslayer(IP):
1112                     pkt = IP(pkt[Raw].load)
1113                 self.assert_packet_checksums_valid(pkt)
1114                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1115                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1116                 self.assertTrue(pkt.haslayer(GRE))
1117                 e = pkt[Ether]
1118                 self.assertEqual(e[Ether].dst, self.omac)
1119                 self.assertEqual(e[IP].dst, "1.1.1.2")
1120             except (IndexError, AssertionError):
1121                 self.logger.debug(ppp("Unexpected packet:", rx))
1122                 try:
1123                     self.logger.debug(ppp("Decrypted packet:", pkt))
1124                 except:
1125                     pass
1126                 raise
1127
1128     def setUp(self):
1129         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1130
1131         self.tun_if = self.pg0
1132
1133         p = self.ipv4_params
1134         p = self.ipv4_params
1135         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1136                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1137         p.nat_header = UDP(sport=5454, dport=4545)
1138
1139         bd1 = VppBridgeDomain(self, 1)
1140         bd1.add_vpp_config()
1141
1142         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1143                                   p.auth_algo_vpp_id, p.auth_key,
1144                                   p.crypt_algo_vpp_id, p.crypt_key,
1145                                   self.vpp_esp_protocol,
1146                                   flags=p.flags,
1147                                   udp_src=5454,
1148                                   udp_dst=4545)
1149         p.tun_sa_out.add_vpp_config()
1150
1151         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1152                                  p.auth_algo_vpp_id, p.auth_key,
1153                                  p.crypt_algo_vpp_id, p.crypt_key,
1154                                  self.vpp_esp_protocol,
1155                                  flags=(p.flags |
1156                                         VppEnum.vl_api_ipsec_sad_flags_t.
1157                                         IPSEC_API_SAD_FLAG_IS_INBOUND),
1158                                  udp_src=5454,
1159                                  udp_dst=4545)
1160         p.tun_sa_in.add_vpp_config()
1161
1162         p.tun_if = VppGreInterface(self,
1163                                    self.pg0.local_ip4,
1164                                    self.pg0.remote_ip4,
1165                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1166                                          GRE_API_TUNNEL_TYPE_TEB))
1167         p.tun_if.add_vpp_config()
1168
1169         p.tun_protect = VppIpsecTunProtect(self,
1170                                            p.tun_if,
1171                                            p.tun_sa_out,
1172                                            [p.tun_sa_in])
1173
1174         p.tun_protect.add_vpp_config()
1175
1176         p.tun_if.admin_up()
1177         p.tun_if.config_ip4()
1178         config_tra_params(p, self.encryption_type, p.tun_if)
1179
1180         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1181         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1182
1183         self.vapi.cli("clear ipsec sa")
1184         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1185
1186     def tearDown(self):
1187         p = self.ipv4_params
1188         p.tun_if.unconfig_ip4()
1189         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1190
1191
1192 class TestIpsecGreIfEsp(TemplateIpsec,
1193                         IpsecTun4Tests):
1194     """ Ipsec GRE ESP - TUN tests """
1195     tun4_encrypt_node_name = "esp4-encrypt-tun"
1196     tun4_decrypt_node_name = "esp4-decrypt-tun"
1197     encryption_type = ESP
1198
1199     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1200                          payload_size=100):
1201         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1202                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1203                               dst=self.pg0.local_ip4) /
1204                            GRE() /
1205                            IP(src=self.pg1.local_ip4,
1206                               dst=self.pg1.remote_ip4) /
1207                            UDP(sport=1144, dport=2233) /
1208                            Raw(b'X' * payload_size))
1209                 for i in range(count)]
1210
1211     def gen_pkts(self, sw_intf, src, dst, count=1,
1212                  payload_size=100):
1213         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1214                 IP(src="1.1.1.1", dst="1.1.1.2") /
1215                 UDP(sport=1144, dport=2233) /
1216                 Raw(b'X' * payload_size)
1217                 for i in range(count)]
1218
1219     def verify_decrypted(self, p, rxs):
1220         for rx in rxs:
1221             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1222             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1223
1224     def verify_encrypted(self, p, sa, rxs):
1225         for rx in rxs:
1226             try:
1227                 pkt = sa.decrypt(rx[IP])
1228                 if not pkt.haslayer(IP):
1229                     pkt = IP(pkt[Raw].load)
1230                 self.assert_packet_checksums_valid(pkt)
1231                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1232                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1233                 self.assertTrue(pkt.haslayer(GRE))
1234                 e = pkt[GRE]
1235                 self.assertEqual(e[IP].dst, "1.1.1.2")
1236             except (IndexError, AssertionError):
1237                 self.logger.debug(ppp("Unexpected packet:", rx))
1238                 try:
1239                     self.logger.debug(ppp("Decrypted packet:", pkt))
1240                 except:
1241                     pass
1242                 raise
1243
1244     def setUp(self):
1245         super(TestIpsecGreIfEsp, self).setUp()
1246
1247         self.tun_if = self.pg0
1248
1249         p = self.ipv4_params
1250
1251         bd1 = VppBridgeDomain(self, 1)
1252         bd1.add_vpp_config()
1253
1254         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1255                                   p.auth_algo_vpp_id, p.auth_key,
1256                                   p.crypt_algo_vpp_id, p.crypt_key,
1257                                   self.vpp_esp_protocol,
1258                                   self.pg0.local_ip4,
1259                                   self.pg0.remote_ip4)
1260         p.tun_sa_out.add_vpp_config()
1261
1262         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1263                                  p.auth_algo_vpp_id, p.auth_key,
1264                                  p.crypt_algo_vpp_id, p.crypt_key,
1265                                  self.vpp_esp_protocol,
1266                                  self.pg0.remote_ip4,
1267                                  self.pg0.local_ip4)
1268         p.tun_sa_in.add_vpp_config()
1269
1270         p.tun_if = VppGreInterface(self,
1271                                    self.pg0.local_ip4,
1272                                    self.pg0.remote_ip4)
1273         p.tun_if.add_vpp_config()
1274
1275         p.tun_protect = VppIpsecTunProtect(self,
1276                                            p.tun_if,
1277                                            p.tun_sa_out,
1278                                            [p.tun_sa_in])
1279         p.tun_protect.add_vpp_config()
1280
1281         p.tun_if.admin_up()
1282         p.tun_if.config_ip4()
1283         config_tun_params(p, self.encryption_type, p.tun_if)
1284
1285         VppIpRoute(self, "1.1.1.2", 32,
1286                    [VppRoutePath(p.tun_if.remote_ip4,
1287                                  0xffffffff)]).add_vpp_config()
1288
1289     def tearDown(self):
1290         p = self.ipv4_params
1291         p.tun_if.unconfig_ip4()
1292         super(TestIpsecGreIfEsp, self).tearDown()
1293
1294
1295 class TestIpsecGreIfEspTra(TemplateIpsec,
1296                            IpsecTun4Tests):
1297     """ Ipsec GRE ESP - TRA tests """
1298     tun4_encrypt_node_name = "esp4-encrypt-tun"
1299     tun4_decrypt_node_name = "esp4-decrypt-tun"
1300     encryption_type = ESP
1301
1302     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1303                          payload_size=100):
1304         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1305                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1306                               dst=self.pg0.local_ip4) /
1307                            GRE() /
1308                            IP(src=self.pg1.local_ip4,
1309                               dst=self.pg1.remote_ip4) /
1310                            UDP(sport=1144, dport=2233) /
1311                            Raw(b'X' * payload_size))
1312                 for i in range(count)]
1313
1314     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1315                                 payload_size=100):
1316         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1317                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1318                               dst=self.pg0.local_ip4) /
1319                            GRE() /
1320                            UDP(sport=1144, dport=2233) /
1321                            Raw(b'X' * payload_size))
1322                 for i in range(count)]
1323
1324     def gen_pkts(self, sw_intf, src, dst, count=1,
1325                  payload_size=100):
1326         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1327                 IP(src="1.1.1.1", dst="1.1.1.2") /
1328                 UDP(sport=1144, dport=2233) /
1329                 Raw(b'X' * payload_size)
1330                 for i in range(count)]
1331
1332     def verify_decrypted(self, p, rxs):
1333         for rx in rxs:
1334             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1335             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1336
1337     def verify_encrypted(self, p, sa, rxs):
1338         for rx in rxs:
1339             try:
1340                 pkt = sa.decrypt(rx[IP])
1341                 if not pkt.haslayer(IP):
1342                     pkt = IP(pkt[Raw].load)
1343                 self.assert_packet_checksums_valid(pkt)
1344                 self.assertTrue(pkt.haslayer(GRE))
1345                 e = pkt[GRE]
1346                 self.assertEqual(e[IP].dst, "1.1.1.2")
1347             except (IndexError, AssertionError):
1348                 self.logger.debug(ppp("Unexpected packet:", rx))
1349                 try:
1350                     self.logger.debug(ppp("Decrypted packet:", pkt))
1351                 except:
1352                     pass
1353                 raise
1354
1355     def setUp(self):
1356         super(TestIpsecGreIfEspTra, self).setUp()
1357
1358         self.tun_if = self.pg0
1359
1360         p = self.ipv4_params
1361
1362         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1363                                   p.auth_algo_vpp_id, p.auth_key,
1364                                   p.crypt_algo_vpp_id, p.crypt_key,
1365                                   self.vpp_esp_protocol)
1366         p.tun_sa_out.add_vpp_config()
1367
1368         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1369                                  p.auth_algo_vpp_id, p.auth_key,
1370                                  p.crypt_algo_vpp_id, p.crypt_key,
1371                                  self.vpp_esp_protocol)
1372         p.tun_sa_in.add_vpp_config()
1373
1374         p.tun_if = VppGreInterface(self,
1375                                    self.pg0.local_ip4,
1376                                    self.pg0.remote_ip4)
1377         p.tun_if.add_vpp_config()
1378
1379         p.tun_protect = VppIpsecTunProtect(self,
1380                                            p.tun_if,
1381                                            p.tun_sa_out,
1382                                            [p.tun_sa_in])
1383         p.tun_protect.add_vpp_config()
1384
1385         p.tun_if.admin_up()
1386         p.tun_if.config_ip4()
1387         config_tra_params(p, self.encryption_type, p.tun_if)
1388
1389         VppIpRoute(self, "1.1.1.2", 32,
1390                    [VppRoutePath(p.tun_if.remote_ip4,
1391                                  0xffffffff)]).add_vpp_config()
1392
1393     def tearDown(self):
1394         p = self.ipv4_params
1395         p.tun_if.unconfig_ip4()
1396         super(TestIpsecGreIfEspTra, self).tearDown()
1397
1398     def test_gre_non_ip(self):
1399         p = self.ipv4_params
1400         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1401                                           src=p.remote_tun_if_host,
1402                                           dst=self.pg1.remote_ip6)
1403         self.send_and_assert_no_replies(self.tun_if, tx)
1404         node_name = ('/err/%s/unsupported payload' %
1405                      self.tun4_decrypt_node_name)
1406         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1407
1408
1409 class TestIpsecGre6IfEspTra(TemplateIpsec,
1410                             IpsecTun6Tests):
1411     """ Ipsec GRE ESP - TRA tests """
1412     tun6_encrypt_node_name = "esp6-encrypt-tun"
1413     tun6_decrypt_node_name = "esp6-decrypt-tun"
1414     encryption_type = ESP
1415
1416     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1417                           payload_size=100):
1418         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1419                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1420                                 dst=self.pg0.local_ip6) /
1421                            GRE() /
1422                            IPv6(src=self.pg1.local_ip6,
1423                                 dst=self.pg1.remote_ip6) /
1424                            UDP(sport=1144, dport=2233) /
1425                            Raw(b'X' * payload_size))
1426                 for i in range(count)]
1427
1428     def gen_pkts6(self, sw_intf, src, dst, count=1,
1429                   payload_size=100):
1430         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1431                 IPv6(src="1::1", dst="1::2") /
1432                 UDP(sport=1144, dport=2233) /
1433                 Raw(b'X' * payload_size)
1434                 for i in range(count)]
1435
1436     def verify_decrypted6(self, p, rxs):
1437         for rx in rxs:
1438             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1439             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1440
1441     def verify_encrypted6(self, p, sa, rxs):
1442         for rx in rxs:
1443             try:
1444                 pkt = sa.decrypt(rx[IPv6])
1445                 if not pkt.haslayer(IPv6):
1446                     pkt = IPv6(pkt[Raw].load)
1447                 self.assert_packet_checksums_valid(pkt)
1448                 self.assertTrue(pkt.haslayer(GRE))
1449                 e = pkt[GRE]
1450                 self.assertEqual(e[IPv6].dst, "1::2")
1451             except (IndexError, AssertionError):
1452                 self.logger.debug(ppp("Unexpected packet:", rx))
1453                 try:
1454                     self.logger.debug(ppp("Decrypted packet:", pkt))
1455                 except:
1456                     pass
1457                 raise
1458
1459     def setUp(self):
1460         super(TestIpsecGre6IfEspTra, self).setUp()
1461
1462         self.tun_if = self.pg0
1463
1464         p = self.ipv6_params
1465
1466         bd1 = VppBridgeDomain(self, 1)
1467         bd1.add_vpp_config()
1468
1469         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1470                                   p.auth_algo_vpp_id, p.auth_key,
1471                                   p.crypt_algo_vpp_id, p.crypt_key,
1472                                   self.vpp_esp_protocol)
1473         p.tun_sa_out.add_vpp_config()
1474
1475         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1476                                  p.auth_algo_vpp_id, p.auth_key,
1477                                  p.crypt_algo_vpp_id, p.crypt_key,
1478                                  self.vpp_esp_protocol)
1479         p.tun_sa_in.add_vpp_config()
1480
1481         p.tun_if = VppGreInterface(self,
1482                                    self.pg0.local_ip6,
1483                                    self.pg0.remote_ip6)
1484         p.tun_if.add_vpp_config()
1485
1486         p.tun_protect = VppIpsecTunProtect(self,
1487                                            p.tun_if,
1488                                            p.tun_sa_out,
1489                                            [p.tun_sa_in])
1490         p.tun_protect.add_vpp_config()
1491
1492         p.tun_if.admin_up()
1493         p.tun_if.config_ip6()
1494         config_tra_params(p, self.encryption_type, p.tun_if)
1495
1496         r = VppIpRoute(self, "1::2", 128,
1497                        [VppRoutePath(p.tun_if.remote_ip6,
1498                                      0xffffffff,
1499                                      proto=DpoProto.DPO_PROTO_IP6)])
1500         r.add_vpp_config()
1501
1502     def tearDown(self):
1503         p = self.ipv6_params
1504         p.tun_if.unconfig_ip6()
1505         super(TestIpsecGre6IfEspTra, self).tearDown()
1506
1507
1508 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1509     """ Ipsec mGRE ESP v4 TRA tests """
1510     tun4_encrypt_node_name = "esp4-encrypt-tun"
1511     tun4_decrypt_node_name = "esp4-decrypt-tun"
1512     encryption_type = ESP
1513
1514     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1515                          payload_size=100):
1516         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1517                 sa.encrypt(IP(src=p.tun_dst,
1518                               dst=self.pg0.local_ip4) /
1519                            GRE() /
1520                            IP(src=self.pg1.local_ip4,
1521                               dst=self.pg1.remote_ip4) /
1522                            UDP(sport=1144, dport=2233) /
1523                            Raw(b'X' * payload_size))
1524                 for i in range(count)]
1525
1526     def gen_pkts(self, sw_intf, src, dst, count=1,
1527                  payload_size=100):
1528         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1529                 IP(src="1.1.1.1", dst=dst) /
1530                 UDP(sport=1144, dport=2233) /
1531                 Raw(b'X' * payload_size)
1532                 for i in range(count)]
1533
1534     def verify_decrypted(self, p, rxs):
1535         for rx in rxs:
1536             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1537             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1538
1539     def verify_encrypted(self, p, sa, rxs):
1540         for rx in rxs:
1541             try:
1542                 pkt = sa.decrypt(rx[IP])
1543                 if not pkt.haslayer(IP):
1544                     pkt = IP(pkt[Raw].load)
1545                 self.assert_packet_checksums_valid(pkt)
1546                 self.assertTrue(pkt.haslayer(GRE))
1547                 e = pkt[GRE]
1548                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1549             except (IndexError, AssertionError):
1550                 self.logger.debug(ppp("Unexpected packet:", rx))
1551                 try:
1552                     self.logger.debug(ppp("Decrypted packet:", pkt))
1553                 except:
1554                     pass
1555                 raise
1556
1557     def setUp(self):
1558         super(TestIpsecMGreIfEspTra4, self).setUp()
1559
1560         N_NHS = 16
1561         self.tun_if = self.pg0
1562         p = self.ipv4_params
1563         p.tun_if = VppGreInterface(self,
1564                                    self.pg0.local_ip4,
1565                                    "0.0.0.0",
1566                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1567                                          TUNNEL_API_MODE_MP))
1568         p.tun_if.add_vpp_config()
1569         p.tun_if.admin_up()
1570         p.tun_if.config_ip4()
1571         p.tun_if.generate_remote_hosts(N_NHS)
1572         self.pg0.generate_remote_hosts(N_NHS)
1573         self.pg0.configure_ipv4_neighbors()
1574
1575         # setup some SAs for several next-hops on the interface
1576         self.multi_params = []
1577
1578         for ii in range(1):
1579             p = copy.copy(self.ipv4_params)
1580
1581             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1582             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1583             p.scapy_tun_spi = p.scapy_tun_spi + ii
1584             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1585             p.vpp_tun_spi = p.vpp_tun_spi + ii
1586
1587             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1588             p.scapy_tra_spi = p.scapy_tra_spi + ii
1589             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1590             p.vpp_tra_spi = p.vpp_tra_spi + ii
1591             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1592                                       p.auth_algo_vpp_id, p.auth_key,
1593                                       p.crypt_algo_vpp_id, p.crypt_key,
1594                                       self.vpp_esp_protocol)
1595             p.tun_sa_out.add_vpp_config()
1596
1597             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1598                                      p.auth_algo_vpp_id, p.auth_key,
1599                                      p.crypt_algo_vpp_id, p.crypt_key,
1600                                      self.vpp_esp_protocol)
1601             p.tun_sa_in.add_vpp_config()
1602
1603             p.tun_protect = VppIpsecTunProtect(
1604                 self,
1605                 p.tun_if,
1606                 p.tun_sa_out,
1607                 [p.tun_sa_in],
1608                 nh=p.tun_if.remote_hosts[ii].ip4)
1609             p.tun_protect.add_vpp_config()
1610             config_tra_params(p, self.encryption_type, p.tun_if)
1611             self.multi_params.append(p)
1612
1613             VppIpRoute(self, p.remote_tun_if_host, 32,
1614                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1615                                      p.tun_if.sw_if_index)]).add_vpp_config()
1616
1617             # in this v4 variant add the teibs after the protect
1618             p.teib = VppTeib(self, p.tun_if,
1619                              p.tun_if.remote_hosts[ii].ip4,
1620                              self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1621             p.tun_dst = self.pg0.remote_hosts[ii].ip4
1622         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1623
1624     def tearDown(self):
1625         p = self.ipv4_params
1626         p.tun_if.unconfig_ip4()
1627         super(TestIpsecMGreIfEspTra4, self).tearDown()
1628
1629     def test_tun_44(self):
1630         """mGRE IPSEC 44"""
1631         N_PKTS = 63
1632         for p in self.multi_params:
1633             self.verify_tun_44(p, count=N_PKTS)
1634             p.teib.remove_vpp_config()
1635             self.verify_tun_dropped_44(p, count=N_PKTS)
1636             p.teib.add_vpp_config()
1637             self.verify_tun_44(p, count=N_PKTS)
1638
1639
1640 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1641     """ Ipsec mGRE ESP v6 TRA tests """
1642     tun6_encrypt_node_name = "esp6-encrypt-tun"
1643     tun6_decrypt_node_name = "esp6-decrypt-tun"
1644     encryption_type = ESP
1645
1646     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1647                           payload_size=100):
1648         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1649                 sa.encrypt(IPv6(src=p.tun_dst,
1650                                 dst=self.pg0.local_ip6) /
1651                            GRE() /
1652                            IPv6(src=self.pg1.local_ip6,
1653                                 dst=self.pg1.remote_ip6) /
1654                            UDP(sport=1144, dport=2233) /
1655                            Raw(b'X' * payload_size))
1656                 for i in range(count)]
1657
1658     def gen_pkts6(self, sw_intf, src, dst, count=1,
1659                   payload_size=100):
1660         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1661                 IPv6(src="1::1", dst=dst) /
1662                 UDP(sport=1144, dport=2233) /
1663                 Raw(b'X' * payload_size)
1664                 for i in range(count)]
1665
1666     def verify_decrypted6(self, p, rxs):
1667         for rx in rxs:
1668             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1669             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1670
1671     def verify_encrypted6(self, p, sa, rxs):
1672         for rx in rxs:
1673             try:
1674                 pkt = sa.decrypt(rx[IPv6])
1675                 if not pkt.haslayer(IPv6):
1676                     pkt = IPv6(pkt[Raw].load)
1677                 self.assert_packet_checksums_valid(pkt)
1678                 self.assertTrue(pkt.haslayer(GRE))
1679                 e = pkt[GRE]
1680                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1681             except (IndexError, AssertionError):
1682                 self.logger.debug(ppp("Unexpected packet:", rx))
1683                 try:
1684                     self.logger.debug(ppp("Decrypted packet:", pkt))
1685                 except:
1686                     pass
1687                 raise
1688
1689     def setUp(self):
1690         super(TestIpsecMGreIfEspTra6, self).setUp()
1691
1692         self.vapi.cli("set logging class ipsec level debug")
1693
1694         N_NHS = 16
1695         self.tun_if = self.pg0
1696         p = self.ipv6_params
1697         p.tun_if = VppGreInterface(self,
1698                                    self.pg0.local_ip6,
1699                                    "::",
1700                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1701                                          TUNNEL_API_MODE_MP))
1702         p.tun_if.add_vpp_config()
1703         p.tun_if.admin_up()
1704         p.tun_if.config_ip6()
1705         p.tun_if.generate_remote_hosts(N_NHS)
1706         self.pg0.generate_remote_hosts(N_NHS)
1707         self.pg0.configure_ipv6_neighbors()
1708
1709         # setup some SAs for several next-hops on the interface
1710         self.multi_params = []
1711
1712         for ii in range(N_NHS):
1713             p = copy.copy(self.ipv6_params)
1714
1715             p.remote_tun_if_host = "1::%d" % (ii + 1)
1716             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1717             p.scapy_tun_spi = p.scapy_tun_spi + ii
1718             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1719             p.vpp_tun_spi = p.vpp_tun_spi + ii
1720
1721             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1722             p.scapy_tra_spi = p.scapy_tra_spi + ii
1723             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1724             p.vpp_tra_spi = p.vpp_tra_spi + ii
1725             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1726                                       p.auth_algo_vpp_id, p.auth_key,
1727                                       p.crypt_algo_vpp_id, p.crypt_key,
1728                                       self.vpp_esp_protocol)
1729             p.tun_sa_out.add_vpp_config()
1730
1731             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1732                                      p.auth_algo_vpp_id, p.auth_key,
1733                                      p.crypt_algo_vpp_id, p.crypt_key,
1734                                      self.vpp_esp_protocol)
1735             p.tun_sa_in.add_vpp_config()
1736
1737             # in this v6 variant add the teibs first then the protection
1738             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1739             VppTeib(self, p.tun_if,
1740                     p.tun_if.remote_hosts[ii].ip6,
1741                     p.tun_dst).add_vpp_config()
1742
1743             p.tun_protect = VppIpsecTunProtect(
1744                 self,
1745                 p.tun_if,
1746                 p.tun_sa_out,
1747                 [p.tun_sa_in],
1748                 nh=p.tun_if.remote_hosts[ii].ip6)
1749             p.tun_protect.add_vpp_config()
1750             config_tra_params(p, self.encryption_type, p.tun_if)
1751             self.multi_params.append(p)
1752
1753             VppIpRoute(self, p.remote_tun_if_host, 128,
1754                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1755                                      p.tun_if.sw_if_index)]).add_vpp_config()
1756             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1757
1758         self.logger.info(self.vapi.cli("sh log"))
1759         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1760         self.logger.info(self.vapi.cli("sh adj 41"))
1761
1762     def tearDown(self):
1763         p = self.ipv6_params
1764         p.tun_if.unconfig_ip6()
1765         super(TestIpsecMGreIfEspTra6, self).tearDown()
1766
1767     def test_tun_66(self):
1768         """mGRE IPSec 66"""
1769         for p in self.multi_params:
1770             self.verify_tun_66(p, count=63)
1771
1772
1773 class TemplateIpsec4TunProtect(object):
1774     """ IPsec IPv4 Tunnel protect """
1775
1776     encryption_type = ESP
1777     tun4_encrypt_node_name = "esp4-encrypt-tun"
1778     tun4_decrypt_node_name = "esp4-decrypt-tun"
1779     tun4_input_node = "ipsec4-tun-input"
1780
1781     def config_sa_tra(self, p):
1782         config_tun_params(p, self.encryption_type, p.tun_if)
1783
1784         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1785                                   p.auth_algo_vpp_id, p.auth_key,
1786                                   p.crypt_algo_vpp_id, p.crypt_key,
1787                                   self.vpp_esp_protocol,
1788                                   flags=p.flags)
1789         p.tun_sa_out.add_vpp_config()
1790
1791         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1792                                  p.auth_algo_vpp_id, p.auth_key,
1793                                  p.crypt_algo_vpp_id, p.crypt_key,
1794                                  self.vpp_esp_protocol,
1795                                  flags=p.flags)
1796         p.tun_sa_in.add_vpp_config()
1797
1798     def config_sa_tun(self, p):
1799         config_tun_params(p, self.encryption_type, p.tun_if)
1800
1801         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1802                                   p.auth_algo_vpp_id, p.auth_key,
1803                                   p.crypt_algo_vpp_id, p.crypt_key,
1804                                   self.vpp_esp_protocol,
1805                                   self.tun_if.local_addr[p.addr_type],
1806                                   self.tun_if.remote_addr[p.addr_type],
1807                                   flags=p.flags)
1808         p.tun_sa_out.add_vpp_config()
1809
1810         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1811                                  p.auth_algo_vpp_id, p.auth_key,
1812                                  p.crypt_algo_vpp_id, p.crypt_key,
1813                                  self.vpp_esp_protocol,
1814                                  self.tun_if.remote_addr[p.addr_type],
1815                                  self.tun_if.local_addr[p.addr_type],
1816                                  flags=p.flags)
1817         p.tun_sa_in.add_vpp_config()
1818
1819     def config_protect(self, p):
1820         p.tun_protect = VppIpsecTunProtect(self,
1821                                            p.tun_if,
1822                                            p.tun_sa_out,
1823                                            [p.tun_sa_in])
1824         p.tun_protect.add_vpp_config()
1825
1826     def config_network(self, p):
1827         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1828                                        self.pg0.local_ip4,
1829                                        self.pg0.remote_ip4)
1830         p.tun_if.add_vpp_config()
1831         p.tun_if.admin_up()
1832         p.tun_if.config_ip4()
1833         p.tun_if.config_ip6()
1834
1835         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1836                              [VppRoutePath(p.tun_if.remote_ip4,
1837                                            0xffffffff)])
1838         p.route.add_vpp_config()
1839         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1840                        [VppRoutePath(p.tun_if.remote_ip6,
1841                                      0xffffffff,
1842                                      proto=DpoProto.DPO_PROTO_IP6)])
1843         r.add_vpp_config()
1844
1845     def unconfig_network(self, p):
1846         p.route.remove_vpp_config()
1847         p.tun_if.remove_vpp_config()
1848
1849     def unconfig_protect(self, p):
1850         p.tun_protect.remove_vpp_config()
1851
1852     def unconfig_sa(self, p):
1853         p.tun_sa_out.remove_vpp_config()
1854         p.tun_sa_in.remove_vpp_config()
1855
1856
1857 class TestIpsec4TunProtect(TemplateIpsec,
1858                            TemplateIpsec4TunProtect,
1859                            IpsecTun4):
1860     """ IPsec IPv4 Tunnel protect - transport mode"""
1861
1862     def setUp(self):
1863         super(TestIpsec4TunProtect, self).setUp()
1864
1865         self.tun_if = self.pg0
1866
1867     def tearDown(self):
1868         super(TestIpsec4TunProtect, self).tearDown()
1869
1870     def test_tun_44(self):
1871         """IPSEC tunnel protect"""
1872
1873         p = self.ipv4_params
1874
1875         self.config_network(p)
1876         self.config_sa_tra(p)
1877         self.config_protect(p)
1878
1879         self.verify_tun_44(p, count=127)
1880         c = p.tun_if.get_rx_stats()
1881         self.assertEqual(c['packets'], 127)
1882         c = p.tun_if.get_tx_stats()
1883         self.assertEqual(c['packets'], 127)
1884
1885         self.vapi.cli("clear ipsec sa")
1886         self.verify_tun_64(p, count=127)
1887         c = p.tun_if.get_rx_stats()
1888         self.assertEqual(c['packets'], 254)
1889         c = p.tun_if.get_tx_stats()
1890         self.assertEqual(c['packets'], 254)
1891
1892         # rekey - create new SAs and update the tunnel protection
1893         np = copy.copy(p)
1894         np.crypt_key = b'X' + p.crypt_key[1:]
1895         np.scapy_tun_spi += 100
1896         np.scapy_tun_sa_id += 1
1897         np.vpp_tun_spi += 100
1898         np.vpp_tun_sa_id += 1
1899         np.tun_if.local_spi = p.vpp_tun_spi
1900         np.tun_if.remote_spi = p.scapy_tun_spi
1901
1902         self.config_sa_tra(np)
1903         self.config_protect(np)
1904         self.unconfig_sa(p)
1905
1906         self.verify_tun_44(np, count=127)
1907         c = p.tun_if.get_rx_stats()
1908         self.assertEqual(c['packets'], 381)
1909         c = p.tun_if.get_tx_stats()
1910         self.assertEqual(c['packets'], 381)
1911
1912         # teardown
1913         self.unconfig_protect(np)
1914         self.unconfig_sa(np)
1915         self.unconfig_network(p)
1916
1917
1918 class TestIpsec4TunProtectUdp(TemplateIpsec,
1919                               TemplateIpsec4TunProtect,
1920                               IpsecTun4):
1921     """ IPsec IPv4 Tunnel protect - transport mode"""
1922
1923     def setUp(self):
1924         super(TestIpsec4TunProtectUdp, self).setUp()
1925
1926         self.tun_if = self.pg0
1927
1928         p = self.ipv4_params
1929         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1930                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1931         p.nat_header = UDP(sport=4500, dport=4500)
1932         self.config_network(p)
1933         self.config_sa_tra(p)
1934         self.config_protect(p)
1935
1936     def tearDown(self):
1937         p = self.ipv4_params
1938         self.unconfig_protect(p)
1939         self.unconfig_sa(p)
1940         self.unconfig_network(p)
1941         super(TestIpsec4TunProtectUdp, self).tearDown()
1942
1943     def verify_encrypted(self, p, sa, rxs):
1944         # ensure encrypted packets are recieved with the default UDP ports
1945         for rx in rxs:
1946             self.assertEqual(rx[UDP].sport, 4500)
1947             self.assertEqual(rx[UDP].dport, 4500)
1948         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
1949
1950     def test_tun_44(self):
1951         """IPSEC UDP tunnel protect"""
1952
1953         p = self.ipv4_params
1954
1955         self.verify_tun_44(p, count=127)
1956         c = p.tun_if.get_rx_stats()
1957         self.assertEqual(c['packets'], 127)
1958         c = p.tun_if.get_tx_stats()
1959         self.assertEqual(c['packets'], 127)
1960
1961     def test_keepalive(self):
1962         """ IPSEC NAT Keepalive """
1963         self.verify_keepalive(self.ipv4_params)
1964
1965
1966 class TestIpsec4TunProtectTun(TemplateIpsec,
1967                               TemplateIpsec4TunProtect,
1968                               IpsecTun4):
1969     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1970
1971     encryption_type = ESP
1972     tun4_encrypt_node_name = "esp4-encrypt-tun"
1973     tun4_decrypt_node_name = "esp4-decrypt-tun"
1974
1975     def setUp(self):
1976         super(TestIpsec4TunProtectTun, self).setUp()
1977
1978         self.tun_if = self.pg0
1979
1980     def tearDown(self):
1981         super(TestIpsec4TunProtectTun, self).tearDown()
1982
1983     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1984                          payload_size=100):
1985         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1986                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1987                               dst=sw_intf.local_ip4) /
1988                            IP(src=src, dst=dst) /
1989                            UDP(sport=1144, dport=2233) /
1990                            Raw(b'X' * payload_size))
1991                 for i in range(count)]
1992
1993     def gen_pkts(self, sw_intf, src, dst, count=1,
1994                  payload_size=100):
1995         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1996                 IP(src=src, dst=dst) /
1997                 UDP(sport=1144, dport=2233) /
1998                 Raw(b'X' * payload_size)
1999                 for i in range(count)]
2000
2001     def verify_decrypted(self, p, rxs):
2002         for rx in rxs:
2003             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2004             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2005             self.assert_packet_checksums_valid(rx)
2006
2007     def verify_encrypted(self, p, sa, rxs):
2008         for rx in rxs:
2009             try:
2010                 pkt = sa.decrypt(rx[IP])
2011                 if not pkt.haslayer(IP):
2012                     pkt = IP(pkt[Raw].load)
2013                 self.assert_packet_checksums_valid(pkt)
2014                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2015                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2016                 inner = pkt[IP].payload
2017                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2018
2019             except (IndexError, AssertionError):
2020                 self.logger.debug(ppp("Unexpected packet:", rx))
2021                 try:
2022                     self.logger.debug(ppp("Decrypted packet:", pkt))
2023                 except:
2024                     pass
2025                 raise
2026
2027     def test_tun_44(self):
2028         """IPSEC tunnel protect """
2029
2030         p = self.ipv4_params
2031
2032         self.config_network(p)
2033         self.config_sa_tun(p)
2034         self.config_protect(p)
2035
2036         # also add an output features on the tunnel and physical interface
2037         # so we test they still work
2038         r_all = AclRule(True,
2039                         src_prefix="0.0.0.0/0",
2040                         dst_prefix="0.0.0.0/0",
2041                         proto=0)
2042         a = VppAcl(self, [r_all]).add_vpp_config()
2043
2044         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2045         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2046
2047         self.verify_tun_44(p, count=127)
2048
2049         c = p.tun_if.get_rx_stats()
2050         self.assertEqual(c['packets'], 127)
2051         c = p.tun_if.get_tx_stats()
2052         self.assertEqual(c['packets'], 127)
2053
2054         # rekey - create new SAs and update the tunnel protection
2055         np = copy.copy(p)
2056         np.crypt_key = b'X' + p.crypt_key[1:]
2057         np.scapy_tun_spi += 100
2058         np.scapy_tun_sa_id += 1
2059         np.vpp_tun_spi += 100
2060         np.vpp_tun_sa_id += 1
2061         np.tun_if.local_spi = p.vpp_tun_spi
2062         np.tun_if.remote_spi = p.scapy_tun_spi
2063
2064         self.config_sa_tun(np)
2065         self.config_protect(np)
2066         self.unconfig_sa(p)
2067
2068         self.verify_tun_44(np, count=127)
2069         c = p.tun_if.get_rx_stats()
2070         self.assertEqual(c['packets'], 254)
2071         c = p.tun_if.get_tx_stats()
2072         self.assertEqual(c['packets'], 254)
2073
2074         # teardown
2075         self.unconfig_protect(np)
2076         self.unconfig_sa(np)
2077         self.unconfig_network(p)
2078
2079
2080 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2081                                   TemplateIpsec4TunProtect,
2082                                   IpsecTun4):
2083     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2084
2085     encryption_type = ESP
2086     tun4_encrypt_node_name = "esp4-encrypt-tun"
2087     tun4_decrypt_node_name = "esp4-decrypt-tun"
2088
2089     def setUp(self):
2090         super(TestIpsec4TunProtectTunDrop, self).setUp()
2091
2092         self.tun_if = self.pg0
2093
2094     def tearDown(self):
2095         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2096
2097     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2098                          payload_size=100):
2099         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2100                 sa.encrypt(IP(src=sw_intf.remote_ip4,
2101                               dst="5.5.5.5") /
2102                            IP(src=src, dst=dst) /
2103                            UDP(sport=1144, dport=2233) /
2104                            Raw(b'X' * payload_size))
2105                 for i in range(count)]
2106
2107     def test_tun_drop_44(self):
2108         """IPSEC tunnel protect bogus tunnel header """
2109
2110         p = self.ipv4_params
2111
2112         self.config_network(p)
2113         self.config_sa_tun(p)
2114         self.config_protect(p)
2115
2116         tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2117                                    src=p.remote_tun_if_host,
2118                                    dst=self.pg1.remote_ip4,
2119                                    count=63)
2120         self.send_and_assert_no_replies(self.tun_if, tx)
2121
2122         # teardown
2123         self.unconfig_protect(p)
2124         self.unconfig_sa(p)
2125         self.unconfig_network(p)
2126
2127
2128 class TemplateIpsec6TunProtect(object):
2129     """ IPsec IPv6 Tunnel protect """
2130
2131     def config_sa_tra(self, p):
2132         config_tun_params(p, self.encryption_type, p.tun_if)
2133
2134         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2135                                   p.auth_algo_vpp_id, p.auth_key,
2136                                   p.crypt_algo_vpp_id, p.crypt_key,
2137                                   self.vpp_esp_protocol)
2138         p.tun_sa_out.add_vpp_config()
2139
2140         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2141                                  p.auth_algo_vpp_id, p.auth_key,
2142                                  p.crypt_algo_vpp_id, p.crypt_key,
2143                                  self.vpp_esp_protocol)
2144         p.tun_sa_in.add_vpp_config()
2145
2146     def config_sa_tun(self, p):
2147         config_tun_params(p, self.encryption_type, p.tun_if)
2148
2149         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2150                                   p.auth_algo_vpp_id, p.auth_key,
2151                                   p.crypt_algo_vpp_id, p.crypt_key,
2152                                   self.vpp_esp_protocol,
2153                                   self.tun_if.local_addr[p.addr_type],
2154                                   self.tun_if.remote_addr[p.addr_type])
2155         p.tun_sa_out.add_vpp_config()
2156
2157         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2158                                  p.auth_algo_vpp_id, p.auth_key,
2159                                  p.crypt_algo_vpp_id, p.crypt_key,
2160                                  self.vpp_esp_protocol,
2161                                  self.tun_if.remote_addr[p.addr_type],
2162                                  self.tun_if.local_addr[p.addr_type])
2163         p.tun_sa_in.add_vpp_config()
2164
2165     def config_protect(self, p):
2166         p.tun_protect = VppIpsecTunProtect(self,
2167                                            p.tun_if,
2168                                            p.tun_sa_out,
2169                                            [p.tun_sa_in])
2170         p.tun_protect.add_vpp_config()
2171
2172     def config_network(self, p):
2173         p.tun_if = VppIpIpTunInterface(self, self.pg0,
2174                                        self.pg0.local_ip6,
2175                                        self.pg0.remote_ip6)
2176         p.tun_if.add_vpp_config()
2177         p.tun_if.admin_up()
2178         p.tun_if.config_ip6()
2179         p.tun_if.config_ip4()
2180
2181         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2182                              [VppRoutePath(p.tun_if.remote_ip6,
2183                                            0xffffffff,
2184                                            proto=DpoProto.DPO_PROTO_IP6)])
2185         p.route.add_vpp_config()
2186         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2187                        [VppRoutePath(p.tun_if.remote_ip4,
2188                                      0xffffffff)])
2189         r.add_vpp_config()
2190
2191     def unconfig_network(self, p):
2192         p.route.remove_vpp_config()
2193         p.tun_if.remove_vpp_config()
2194
2195     def unconfig_protect(self, p):
2196         p.tun_protect.remove_vpp_config()
2197
2198     def unconfig_sa(self, p):
2199         p.tun_sa_out.remove_vpp_config()
2200         p.tun_sa_in.remove_vpp_config()
2201
2202
2203 class TestIpsec6TunProtect(TemplateIpsec,
2204                            TemplateIpsec6TunProtect,
2205                            IpsecTun6):
2206     """ IPsec IPv6 Tunnel protect - transport mode"""
2207
2208     encryption_type = ESP
2209     tun6_encrypt_node_name = "esp6-encrypt-tun"
2210     tun6_decrypt_node_name = "esp6-decrypt-tun"
2211
2212     def setUp(self):
2213         super(TestIpsec6TunProtect, self).setUp()
2214
2215         self.tun_if = self.pg0
2216
2217     def tearDown(self):
2218         super(TestIpsec6TunProtect, self).tearDown()
2219
2220     def test_tun_66(self):
2221         """IPSEC tunnel protect 6o6"""
2222
2223         p = self.ipv6_params
2224
2225         self.config_network(p)
2226         self.config_sa_tra(p)
2227         self.config_protect(p)
2228
2229         self.verify_tun_66(p, count=127)
2230         c = p.tun_if.get_rx_stats()
2231         self.assertEqual(c['packets'], 127)
2232         c = p.tun_if.get_tx_stats()
2233         self.assertEqual(c['packets'], 127)
2234
2235         # rekey - create new SAs and update the tunnel protection
2236         np = copy.copy(p)
2237         np.crypt_key = b'X' + p.crypt_key[1:]
2238         np.scapy_tun_spi += 100
2239         np.scapy_tun_sa_id += 1
2240         np.vpp_tun_spi += 100
2241         np.vpp_tun_sa_id += 1
2242         np.tun_if.local_spi = p.vpp_tun_spi
2243         np.tun_if.remote_spi = p.scapy_tun_spi
2244
2245         self.config_sa_tra(np)
2246         self.config_protect(np)
2247         self.unconfig_sa(p)
2248
2249         self.verify_tun_66(np, count=127)
2250         c = p.tun_if.get_rx_stats()
2251         self.assertEqual(c['packets'], 254)
2252         c = p.tun_if.get_tx_stats()
2253         self.assertEqual(c['packets'], 254)
2254
2255         # bounce the interface state
2256         p.tun_if.admin_down()
2257         self.verify_drop_tun_66(np, count=127)
2258         node = ('/err/ipsec6-tun-input/%s' %
2259                 'ipsec packets received on disabled interface')
2260         self.assertEqual(127, self.statistics.get_err_counter(node))
2261         p.tun_if.admin_up()
2262         self.verify_tun_66(np, count=127)
2263
2264         # 3 phase rekey
2265         #  1) add two input SAs [old, new]
2266         #  2) swap output SA to [new]
2267         #  3) use only [new] input SA
2268         np3 = copy.copy(np)
2269         np3.crypt_key = b'Z' + p.crypt_key[1:]
2270         np3.scapy_tun_spi += 100
2271         np3.scapy_tun_sa_id += 1
2272         np3.vpp_tun_spi += 100
2273         np3.vpp_tun_sa_id += 1
2274         np3.tun_if.local_spi = p.vpp_tun_spi
2275         np3.tun_if.remote_spi = p.scapy_tun_spi
2276
2277         self.config_sa_tra(np3)
2278
2279         # step 1;
2280         p.tun_protect.update_vpp_config(np.tun_sa_out,
2281                                         [np.tun_sa_in, np3.tun_sa_in])
2282         self.verify_tun_66(np, np, count=127)
2283         self.verify_tun_66(np3, np, count=127)
2284
2285         # step 2;
2286         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2287                                         [np.tun_sa_in, np3.tun_sa_in])
2288         self.verify_tun_66(np, np3, count=127)
2289         self.verify_tun_66(np3, np3, count=127)
2290
2291         # step 1;
2292         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2293                                         [np3.tun_sa_in])
2294         self.verify_tun_66(np3, np3, count=127)
2295         self.verify_drop_tun_66(np, count=127)
2296
2297         c = p.tun_if.get_rx_stats()
2298         self.assertEqual(c['packets'], 127*9)
2299         c = p.tun_if.get_tx_stats()
2300         self.assertEqual(c['packets'], 127*8)
2301         self.unconfig_sa(np)
2302
2303         # teardown
2304         self.unconfig_protect(np3)
2305         self.unconfig_sa(np3)
2306         self.unconfig_network(p)
2307
2308     def test_tun_46(self):
2309         """IPSEC tunnel protect 4o6"""
2310
2311         p = self.ipv6_params
2312
2313         self.config_network(p)
2314         self.config_sa_tra(p)
2315         self.config_protect(p)
2316
2317         self.verify_tun_46(p, count=127)
2318         c = p.tun_if.get_rx_stats()
2319         self.assertEqual(c['packets'], 127)
2320         c = p.tun_if.get_tx_stats()
2321         self.assertEqual(c['packets'], 127)
2322
2323         # teardown
2324         self.unconfig_protect(p)
2325         self.unconfig_sa(p)
2326         self.unconfig_network(p)
2327
2328
2329 class TestIpsec6TunProtectTun(TemplateIpsec,
2330                               TemplateIpsec6TunProtect,
2331                               IpsecTun6):
2332     """ IPsec IPv6 Tunnel protect - tunnel mode"""
2333
2334     encryption_type = ESP
2335     tun6_encrypt_node_name = "esp6-encrypt-tun"
2336     tun6_decrypt_node_name = "esp6-decrypt-tun"
2337
2338     def setUp(self):
2339         super(TestIpsec6TunProtectTun, self).setUp()
2340
2341         self.tun_if = self.pg0
2342
2343     def tearDown(self):
2344         super(TestIpsec6TunProtectTun, self).tearDown()
2345
2346     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2347                           payload_size=100):
2348         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2349                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2350                                 dst=sw_intf.local_ip6) /
2351                            IPv6(src=src, dst=dst) /
2352                            UDP(sport=1166, dport=2233) /
2353                            Raw(b'X' * payload_size))
2354                 for i in range(count)]
2355
2356     def gen_pkts6(self, sw_intf, src, dst, count=1,
2357                   payload_size=100):
2358         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2359                 IPv6(src=src, dst=dst) /
2360                 UDP(sport=1166, dport=2233) /
2361                 Raw(b'X' * payload_size)
2362                 for i in range(count)]
2363
2364     def verify_decrypted6(self, p, rxs):
2365         for rx in rxs:
2366             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2367             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2368             self.assert_packet_checksums_valid(rx)
2369
2370     def verify_encrypted6(self, p, sa, rxs):
2371         for rx in rxs:
2372             try:
2373                 pkt = sa.decrypt(rx[IPv6])
2374                 if not pkt.haslayer(IPv6):
2375                     pkt = IPv6(pkt[Raw].load)
2376                 self.assert_packet_checksums_valid(pkt)
2377                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2378                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2379                 inner = pkt[IPv6].payload
2380                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2381
2382             except (IndexError, AssertionError):
2383                 self.logger.debug(ppp("Unexpected packet:", rx))
2384                 try:
2385                     self.logger.debug(ppp("Decrypted packet:", pkt))
2386                 except:
2387                     pass
2388                 raise
2389
2390     def test_tun_66(self):
2391         """IPSEC tunnel protect """
2392
2393         p = self.ipv6_params
2394
2395         self.config_network(p)
2396         self.config_sa_tun(p)
2397         self.config_protect(p)
2398
2399         self.verify_tun_66(p, count=127)
2400
2401         c = p.tun_if.get_rx_stats()
2402         self.assertEqual(c['packets'], 127)
2403         c = p.tun_if.get_tx_stats()
2404         self.assertEqual(c['packets'], 127)
2405
2406         # rekey - create new SAs and update the tunnel protection
2407         np = copy.copy(p)
2408         np.crypt_key = b'X' + p.crypt_key[1:]
2409         np.scapy_tun_spi += 100
2410         np.scapy_tun_sa_id += 1
2411         np.vpp_tun_spi += 100
2412         np.vpp_tun_sa_id += 1
2413         np.tun_if.local_spi = p.vpp_tun_spi
2414         np.tun_if.remote_spi = p.scapy_tun_spi
2415
2416         self.config_sa_tun(np)
2417         self.config_protect(np)
2418         self.unconfig_sa(p)
2419
2420         self.verify_tun_66(np, count=127)
2421         c = p.tun_if.get_rx_stats()
2422         self.assertEqual(c['packets'], 254)
2423         c = p.tun_if.get_tx_stats()
2424         self.assertEqual(c['packets'], 254)
2425
2426         # teardown
2427         self.unconfig_protect(np)
2428         self.unconfig_sa(np)
2429         self.unconfig_network(p)
2430
2431
2432 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2433                                   TemplateIpsec6TunProtect,
2434                                   IpsecTun6):
2435     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2436
2437     encryption_type = ESP
2438     tun6_encrypt_node_name = "esp6-encrypt-tun"
2439     tun6_decrypt_node_name = "esp6-decrypt-tun"
2440
2441     def setUp(self):
2442         super(TestIpsec6TunProtectTunDrop, self).setUp()
2443
2444         self.tun_if = self.pg0
2445
2446     def tearDown(self):
2447         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2448
2449     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2450                           payload_size=100):
2451         # the IP destination of the revelaed packet does not match
2452         # that assigned to the tunnel
2453         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2454                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2455                                 dst="5::5") /
2456                            IPv6(src=src, dst=dst) /
2457                            UDP(sport=1144, dport=2233) /
2458                            Raw(b'X' * payload_size))
2459                 for i in range(count)]
2460
2461     def test_tun_drop_66(self):
2462         """IPSEC 6 tunnel protect bogus tunnel header """
2463
2464         p = self.ipv6_params
2465
2466         self.config_network(p)
2467         self.config_sa_tun(p)
2468         self.config_protect(p)
2469
2470         tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2471                                     src=p.remote_tun_if_host,
2472                                     dst=self.pg1.remote_ip6,
2473                                     count=63)
2474         self.send_and_assert_no_replies(self.tun_if, tx)
2475
2476         self.unconfig_protect(p)
2477         self.unconfig_sa(p)
2478         self.unconfig_network(p)
2479
2480
2481 class TemplateIpsecItf4(object):
2482     """ IPsec Interface IPv4 """
2483
2484     encryption_type = ESP
2485     tun4_encrypt_node_name = "esp4-encrypt-tun"
2486     tun4_decrypt_node_name = "esp4-decrypt-tun"
2487     tun4_input_node = "ipsec4-tun-input"
2488
2489     def config_sa_tun(self, p, src, dst):
2490         config_tun_params(p, self.encryption_type, None, src, dst)
2491
2492         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2493                                   p.auth_algo_vpp_id, p.auth_key,
2494                                   p.crypt_algo_vpp_id, p.crypt_key,
2495                                   self.vpp_esp_protocol,
2496                                   src, dst,
2497                                   flags=p.flags)
2498         p.tun_sa_out.add_vpp_config()
2499
2500         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2501                                  p.auth_algo_vpp_id, p.auth_key,
2502                                  p.crypt_algo_vpp_id, p.crypt_key,
2503                                  self.vpp_esp_protocol,
2504                                  dst, src,
2505                                  flags=p.flags)
2506         p.tun_sa_in.add_vpp_config()
2507
2508     def config_protect(self, p):
2509         p.tun_protect = VppIpsecTunProtect(self,
2510                                            p.tun_if,
2511                                            p.tun_sa_out,
2512                                            [p.tun_sa_in])
2513         p.tun_protect.add_vpp_config()
2514
2515     def config_network(self, p):
2516         p.tun_if = VppIpsecInterface(self)
2517
2518         p.tun_if.add_vpp_config()
2519         p.tun_if.admin_up()
2520         p.tun_if.config_ip4()
2521         p.tun_if.config_ip6()
2522
2523         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2524                              [VppRoutePath(p.tun_if.remote_ip4,
2525                                            0xffffffff)])
2526         p.route.add_vpp_config()
2527         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2528                        [VppRoutePath(p.tun_if.remote_ip6,
2529                                      0xffffffff,
2530                                      proto=DpoProto.DPO_PROTO_IP6)])
2531         r.add_vpp_config()
2532
2533     def unconfig_network(self, p):
2534         p.route.remove_vpp_config()
2535         p.tun_if.remove_vpp_config()
2536
2537     def unconfig_protect(self, p):
2538         p.tun_protect.remove_vpp_config()
2539
2540     def unconfig_sa(self, p):
2541         p.tun_sa_out.remove_vpp_config()
2542         p.tun_sa_in.remove_vpp_config()
2543
2544
2545 class TestIpsecItf4(TemplateIpsec,
2546                     TemplateIpsecItf4,
2547                     IpsecTun4):
2548     """ IPsec Interface IPv4 """
2549
2550     def setUp(self):
2551         super(TestIpsecItf4, self).setUp()
2552
2553         self.tun_if = self.pg0
2554
2555     def tearDown(self):
2556         super(TestIpsecItf4, self).tearDown()
2557
2558     def test_tun_44(self):
2559         """IPSEC interface IPv4"""
2560
2561         n_pkts = 127
2562         p = self.ipv4_params
2563
2564         self.config_network(p)
2565         self.config_sa_tun(p,
2566                            self.pg0.local_ip4,
2567                            self.pg0.remote_ip4)
2568         self.config_protect(p)
2569
2570         self.verify_tun_44(p, count=n_pkts)
2571         c = p.tun_if.get_rx_stats()
2572         self.assertEqual(c['packets'], n_pkts)
2573         c = p.tun_if.get_tx_stats()
2574         self.assertEqual(c['packets'], n_pkts)
2575
2576         p.tun_if.admin_down()
2577         self.verify_tun_dropped_44(p, count=n_pkts)
2578         p.tun_if.admin_up()
2579         self.verify_tun_44(p, count=n_pkts)
2580
2581         c = p.tun_if.get_rx_stats()
2582         self.assertEqual(c['packets'], 3*n_pkts)
2583         c = p.tun_if.get_tx_stats()
2584         self.assertEqual(c['packets'], 2*n_pkts)
2585
2586         # it's a v6 packet when its encrypted
2587         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2588
2589         self.verify_tun_64(p, count=n_pkts)
2590         c = p.tun_if.get_rx_stats()
2591         self.assertEqual(c['packets'], 4*n_pkts)
2592         c = p.tun_if.get_tx_stats()
2593         self.assertEqual(c['packets'], 3*n_pkts)
2594
2595         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2596
2597         self.vapi.cli("clear interfaces")
2598
2599         # rekey - create new SAs and update the tunnel protection
2600         np = copy.copy(p)
2601         np.crypt_key = b'X' + p.crypt_key[1:]
2602         np.scapy_tun_spi += 100
2603         np.scapy_tun_sa_id += 1
2604         np.vpp_tun_spi += 100
2605         np.vpp_tun_sa_id += 1
2606         np.tun_if.local_spi = p.vpp_tun_spi
2607         np.tun_if.remote_spi = p.scapy_tun_spi
2608
2609         self.config_sa_tun(np,
2610                            self.pg0.local_ip4,
2611                            self.pg0.remote_ip4)
2612         self.config_protect(np)
2613         self.unconfig_sa(p)
2614
2615         self.verify_tun_44(np, count=n_pkts)
2616         c = p.tun_if.get_rx_stats()
2617         self.assertEqual(c['packets'], n_pkts)
2618         c = p.tun_if.get_tx_stats()
2619         self.assertEqual(c['packets'], n_pkts)
2620
2621         # teardown
2622         self.unconfig_protect(np)
2623         self.unconfig_sa(np)
2624         self.unconfig_network(p)
2625
2626
2627 class TemplateIpsecItf6(object):
2628     """ IPsec Interface IPv6 """
2629
2630     encryption_type = ESP
2631     tun6_encrypt_node_name = "esp6-encrypt-tun"
2632     tun6_decrypt_node_name = "esp6-decrypt-tun"
2633     tun6_input_node = "ipsec6-tun-input"
2634
2635     def config_sa_tun(self, p, src, dst):
2636         config_tun_params(p, self.encryption_type, None, src, dst)
2637
2638         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2639                                   p.auth_algo_vpp_id, p.auth_key,
2640                                   p.crypt_algo_vpp_id, p.crypt_key,
2641                                   self.vpp_esp_protocol,
2642                                   src, dst,
2643                                   flags=p.flags)
2644         p.tun_sa_out.add_vpp_config()
2645
2646         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2647                                  p.auth_algo_vpp_id, p.auth_key,
2648                                  p.crypt_algo_vpp_id, p.crypt_key,
2649                                  self.vpp_esp_protocol,
2650                                  dst, src,
2651                                  flags=p.flags)
2652         p.tun_sa_in.add_vpp_config()
2653
2654     def config_protect(self, p):
2655         p.tun_protect = VppIpsecTunProtect(self,
2656                                            p.tun_if,
2657                                            p.tun_sa_out,
2658                                            [p.tun_sa_in])
2659         p.tun_protect.add_vpp_config()
2660
2661     def config_network(self, p):
2662         p.tun_if = VppIpsecInterface(self)
2663
2664         p.tun_if.add_vpp_config()
2665         p.tun_if.admin_up()
2666         p.tun_if.config_ip4()
2667         p.tun_if.config_ip6()
2668
2669         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2670                        [VppRoutePath(p.tun_if.remote_ip4,
2671                                      0xffffffff)])
2672         r.add_vpp_config()
2673
2674         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2675                              [VppRoutePath(p.tun_if.remote_ip6,
2676                                            0xffffffff,
2677                                            proto=DpoProto.DPO_PROTO_IP6)])
2678         p.route.add_vpp_config()
2679
2680     def unconfig_network(self, p):
2681         p.route.remove_vpp_config()
2682         p.tun_if.remove_vpp_config()
2683
2684     def unconfig_protect(self, p):
2685         p.tun_protect.remove_vpp_config()
2686
2687     def unconfig_sa(self, p):
2688         p.tun_sa_out.remove_vpp_config()
2689         p.tun_sa_in.remove_vpp_config()
2690
2691
2692 class TestIpsecItf6(TemplateIpsec,
2693                     TemplateIpsecItf6,
2694                     IpsecTun6):
2695     """ IPsec Interface IPv6 """
2696
2697     def setUp(self):
2698         super(TestIpsecItf6, self).setUp()
2699
2700         self.tun_if = self.pg0
2701
2702     def tearDown(self):
2703         super(TestIpsecItf6, self).tearDown()
2704
2705     def test_tun_44(self):
2706         """IPSEC interface IPv6"""
2707
2708         n_pkts = 127
2709         p = self.ipv6_params
2710
2711         self.config_network(p)
2712         self.config_sa_tun(p,
2713                            self.pg0.local_ip6,
2714                            self.pg0.remote_ip6)
2715         self.config_protect(p)
2716
2717         self.verify_tun_66(p, count=n_pkts)
2718         c = p.tun_if.get_rx_stats()
2719         self.assertEqual(c['packets'], n_pkts)
2720         c = p.tun_if.get_tx_stats()
2721         self.assertEqual(c['packets'], n_pkts)
2722
2723         p.tun_if.admin_down()
2724         self.verify_drop_tun_66(p, count=n_pkts)
2725         p.tun_if.admin_up()
2726         self.verify_tun_66(p, count=n_pkts)
2727
2728         c = p.tun_if.get_rx_stats()
2729         self.assertEqual(c['packets'], 3*n_pkts)
2730         c = p.tun_if.get_tx_stats()
2731         self.assertEqual(c['packets'], 2*n_pkts)
2732
2733         # it's a v4 packet when its encrypted
2734         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2735
2736         self.verify_tun_46(p, count=n_pkts)
2737         c = p.tun_if.get_rx_stats()
2738         self.assertEqual(c['packets'], 4*n_pkts)
2739         c = p.tun_if.get_tx_stats()
2740         self.assertEqual(c['packets'], 3*n_pkts)
2741
2742         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2743
2744         self.vapi.cli("clear interfaces")
2745
2746         # rekey - create new SAs and update the tunnel protection
2747         np = copy.copy(p)
2748         np.crypt_key = b'X' + p.crypt_key[1:]
2749         np.scapy_tun_spi += 100
2750         np.scapy_tun_sa_id += 1
2751         np.vpp_tun_spi += 100
2752         np.vpp_tun_sa_id += 1
2753         np.tun_if.local_spi = p.vpp_tun_spi
2754         np.tun_if.remote_spi = p.scapy_tun_spi
2755
2756         self.config_sa_tun(np,
2757                            self.pg0.local_ip6,
2758                            self.pg0.remote_ip6)
2759         self.config_protect(np)
2760         self.unconfig_sa(p)
2761
2762         self.verify_tun_66(np, count=n_pkts)
2763         c = p.tun_if.get_rx_stats()
2764         self.assertEqual(c['packets'], n_pkts)
2765         c = p.tun_if.get_tx_stats()
2766         self.assertEqual(c['packets'], n_pkts)
2767
2768         # teardown
2769         self.unconfig_protect(np)
2770         self.unconfig_sa(np)
2771         self.unconfig_network(p)
2772
2773
2774 if __name__ == '__main__':
2775     unittest.main(testRunner=VppTestRunner)