ipsec: Tunnel SA DSCP behaviour
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
13     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
21 from vpp_teib import VppTeib
22 from util import ppp
23 from vpp_papi import VppEnum
24 from vpp_papi_provider import CliFailedCommandError
25 from vpp_acl import AclRule, VppAcl, VppAclInterface
26
27
28 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
29     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
30     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
31                              IPSEC_API_SAD_FLAG_USE_ESN))
32     crypt_key = mk_scapy_crypt_key(p)
33     if tun_if:
34         p.tun_dst = tun_if.remote_ip
35         p.tun_src = tun_if.local_ip
36     else:
37         p.tun_dst = dst
38         p.tun_src = src
39
40     p.scapy_tun_sa = SecurityAssociation(
41         encryption_type, spi=p.vpp_tun_spi,
42         crypt_algo=p.crypt_algo,
43         crypt_key=crypt_key,
44         auth_algo=p.auth_algo, auth_key=p.auth_key,
45         tunnel_header=ip_class_by_addr_type[p.addr_type](
46             src=p.tun_dst,
47             dst=p.tun_src),
48         nat_t_header=p.nat_header,
49         esn_en=esn_en)
50     p.vpp_tun_sa = SecurityAssociation(
51         encryption_type, spi=p.scapy_tun_spi,
52         crypt_algo=p.crypt_algo,
53         crypt_key=crypt_key,
54         auth_algo=p.auth_algo, auth_key=p.auth_key,
55         tunnel_header=ip_class_by_addr_type[p.addr_type](
56             dst=p.tun_dst,
57             src=p.tun_src),
58         nat_t_header=p.nat_header,
59         esn_en=esn_en)
60
61
62 def config_tra_params(p, encryption_type, tun_if):
63     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
64     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
65                              IPSEC_API_SAD_FLAG_USE_ESN))
66     crypt_key = mk_scapy_crypt_key(p)
67     p.tun_dst = tun_if.remote_ip
68     p.tun_src = tun_if.local_ip
69     p.scapy_tun_sa = SecurityAssociation(
70         encryption_type, spi=p.vpp_tun_spi,
71         crypt_algo=p.crypt_algo,
72         crypt_key=crypt_key,
73         auth_algo=p.auth_algo, auth_key=p.auth_key,
74         esn_en=esn_en,
75         nat_t_header=p.nat_header)
76     p.vpp_tun_sa = SecurityAssociation(
77         encryption_type, spi=p.scapy_tun_spi,
78         crypt_algo=p.crypt_algo,
79         crypt_key=crypt_key,
80         auth_algo=p.auth_algo, auth_key=p.auth_key,
81         esn_en=esn_en,
82         nat_t_header=p.nat_header)
83
84
85 class TemplateIpsec4TunIfEsp(TemplateIpsec):
86     """ IPsec tunnel interface tests """
87
88     encryption_type = ESP
89
90     @classmethod
91     def setUpClass(cls):
92         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
93
94     @classmethod
95     def tearDownClass(cls):
96         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
97
98     def setUp(self):
99         super(TemplateIpsec4TunIfEsp, self).setUp()
100
101         self.tun_if = self.pg0
102
103         p = self.ipv4_params
104
105         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
106                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
107                                         p.crypt_key, p.crypt_key,
108                                         p.auth_algo_vpp_id, p.auth_key,
109                                         p.auth_key)
110         p.tun_if.add_vpp_config()
111         p.tun_if.admin_up()
112         p.tun_if.config_ip4()
113         p.tun_if.config_ip6()
114         config_tun_params(p, self.encryption_type, p.tun_if)
115
116         r = VppIpRoute(self, p.remote_tun_if_host, 32,
117                        [VppRoutePath(p.tun_if.remote_ip4,
118                                      0xffffffff)])
119         r.add_vpp_config()
120         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
121                        [VppRoutePath(p.tun_if.remote_ip6,
122                                      0xffffffff,
123                                      proto=DpoProto.DPO_PROTO_IP6)])
124         r.add_vpp_config()
125
126     def tearDown(self):
127         super(TemplateIpsec4TunIfEsp, self).tearDown()
128
129
130 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
131     """ IPsec UDP tunnel interface tests """
132
133     tun4_encrypt_node_name = "esp4-encrypt-tun"
134     tun4_decrypt_node_name = "esp4-decrypt-tun"
135     encryption_type = ESP
136
137     @classmethod
138     def setUpClass(cls):
139         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
140
141     @classmethod
142     def tearDownClass(cls):
143         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
144
145     def verify_encrypted(self, p, sa, rxs):
146         for rx in rxs:
147             try:
148                 # ensure the UDP ports are correct before we decrypt
149                 # which strips them
150                 self.assertTrue(rx.haslayer(UDP))
151                 self.assert_equal(rx[UDP].sport, 4500)
152                 self.assert_equal(rx[UDP].dport, 4500)
153
154                 pkt = sa.decrypt(rx[IP])
155                 if not pkt.haslayer(IP):
156                     pkt = IP(pkt[Raw].load)
157
158                 self.assert_packet_checksums_valid(pkt)
159                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
160                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
161             except (IndexError, AssertionError):
162                 self.logger.debug(ppp("Unexpected packet:", rx))
163                 try:
164                     self.logger.debug(ppp("Decrypted packet:", pkt))
165                 except:
166                     pass
167                 raise
168
169     def setUp(self):
170         super(TemplateIpsec4TunIfEspUdp, self).setUp()
171
172         p = self.ipv4_params
173         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
174                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
175         p.nat_header = UDP(sport=5454, dport=4500)
176
177     def config_network(self):
178
179         self.tun_if = self.pg0
180         p = self.ipv4_params
181         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
182                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
183                                         p.crypt_key, p.crypt_key,
184                                         p.auth_algo_vpp_id, p.auth_key,
185                                         p.auth_key, udp_encap=True)
186         p.tun_if.add_vpp_config()
187         p.tun_if.admin_up()
188         p.tun_if.config_ip4()
189         p.tun_if.config_ip6()
190         config_tun_params(p, self.encryption_type, p.tun_if)
191
192         r = VppIpRoute(self, p.remote_tun_if_host, 32,
193                        [VppRoutePath(p.tun_if.remote_ip4,
194                                      0xffffffff)])
195         r.add_vpp_config()
196         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
197                        [VppRoutePath(p.tun_if.remote_ip6,
198                                      0xffffffff,
199                                      proto=DpoProto.DPO_PROTO_IP6)])
200         r.add_vpp_config()
201
202     def tearDown(self):
203         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
204
205
206 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
207     """ Ipsec ESP - TUN tests """
208     tun4_encrypt_node_name = "esp4-encrypt-tun"
209     tun4_decrypt_node_name = "esp4-decrypt-tun"
210
211     def test_tun_basic64(self):
212         """ ipsec 6o4 tunnel basic test """
213         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
214
215         self.verify_tun_64(self.params[socket.AF_INET], count=1)
216
217     def test_tun_burst64(self):
218         """ ipsec 6o4 tunnel basic test """
219         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
220
221         self.verify_tun_64(self.params[socket.AF_INET], count=257)
222
223     def test_tun_basic_frag44(self):
224         """ ipsec 4o4 tunnel frag basic test """
225         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
226
227         p = self.ipv4_params
228
229         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
230                                        [1500, 0, 0, 0])
231         self.verify_tun_44(self.params[socket.AF_INET],
232                            count=1, payload_size=1800, n_rx=2)
233         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
234                                        [9000, 0, 0, 0])
235
236
237 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
238     """ Ipsec ESP UDP tests """
239
240     tun4_input_node = "ipsec4-tun-input"
241
242     def setUp(self):
243         super(TemplateIpsec4TunIfEspUdp, self).setUp()
244         self.config_network()
245
246     def test_keepalive(self):
247         """ IPSEC NAT Keepalive """
248         self.verify_keepalive(self.ipv4_params)
249
250
251 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
252     """ Ipsec ESP UDP GCM tests """
253
254     tun4_input_node = "ipsec4-tun-input"
255
256     def setUp(self):
257         super(TemplateIpsec4TunIfEspUdp, self).setUp()
258         p = self.ipv4_params
259         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
260                               IPSEC_API_INTEG_ALG_NONE)
261         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
262                                IPSEC_API_CRYPTO_ALG_AES_GCM_256)
263         p.crypt_algo = "AES-GCM"
264         p.auth_algo = "NULL"
265         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
266         p.salt = 0
267         self.config_network()
268
269
270 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
271     """ Ipsec ESP - TCP tests """
272     pass
273
274
275 class TemplateIpsec6TunIfEsp(TemplateIpsec):
276     """ IPsec tunnel interface tests """
277
278     encryption_type = ESP
279
280     def setUp(self):
281         super(TemplateIpsec6TunIfEsp, self).setUp()
282
283         self.tun_if = self.pg0
284
285         p = self.ipv6_params
286         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
287                                         p.scapy_tun_spi, p.crypt_algo_vpp_id,
288                                         p.crypt_key, p.crypt_key,
289                                         p.auth_algo_vpp_id, p.auth_key,
290                                         p.auth_key, is_ip6=True)
291         p.tun_if.add_vpp_config()
292         p.tun_if.admin_up()
293         p.tun_if.config_ip6()
294         p.tun_if.config_ip4()
295         config_tun_params(p, self.encryption_type, p.tun_if)
296
297         r = VppIpRoute(self, p.remote_tun_if_host, 128,
298                        [VppRoutePath(p.tun_if.remote_ip6,
299                                      0xffffffff,
300                                      proto=DpoProto.DPO_PROTO_IP6)])
301         r.add_vpp_config()
302         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
303                        [VppRoutePath(p.tun_if.remote_ip4,
304                                      0xffffffff)])
305         r.add_vpp_config()
306
307     def tearDown(self):
308         super(TemplateIpsec6TunIfEsp, self).tearDown()
309
310
311 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
312                           IpsecTun6Tests):
313     """ Ipsec ESP - TUN tests """
314     tun6_encrypt_node_name = "esp6-encrypt-tun"
315     tun6_decrypt_node_name = "esp6-decrypt-tun"
316
317     def test_tun_basic46(self):
318         """ ipsec 4o6 tunnel basic test """
319         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
320         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
321
322     def test_tun_burst46(self):
323         """ ipsec 4o6 tunnel burst test """
324         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
325         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
326
327
328 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
329                                 IpsecTun6HandoffTests):
330     """ Ipsec ESP 6 Handoff tests """
331     tun6_encrypt_node_name = "esp6-encrypt-tun"
332     tun6_decrypt_node_name = "esp6-decrypt-tun"
333
334
335 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
336                                 IpsecTun4HandoffTests):
337     """ Ipsec ESP 4 Handoff tests """
338     tun4_encrypt_node_name = "esp4-encrypt-tun"
339     tun4_decrypt_node_name = "esp4-decrypt-tun"
340
341
342 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
343     """ IPsec IPv4 Multi Tunnel interface """
344
345     encryption_type = ESP
346     tun4_encrypt_node_name = "esp4-encrypt-tun"
347     tun4_decrypt_node_name = "esp4-decrypt-tun"
348
349     def setUp(self):
350         super(TestIpsec4MultiTunIfEsp, self).setUp()
351
352         self.tun_if = self.pg0
353
354         self.multi_params = []
355         self.pg0.generate_remote_hosts(10)
356         self.pg0.configure_ipv4_neighbors()
357
358         for ii in range(10):
359             p = copy.copy(self.ipv4_params)
360
361             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
362             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
363             p.scapy_tun_spi = p.scapy_tun_spi + ii
364             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
365             p.vpp_tun_spi = p.vpp_tun_spi + ii
366
367             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
368             p.scapy_tra_spi = p.scapy_tra_spi + ii
369             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
370             p.vpp_tra_spi = p.vpp_tra_spi + ii
371             p.tun_dst = self.pg0.remote_hosts[ii].ip4
372
373             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
374                                             p.scapy_tun_spi,
375                                             p.crypt_algo_vpp_id,
376                                             p.crypt_key, p.crypt_key,
377                                             p.auth_algo_vpp_id, p.auth_key,
378                                             p.auth_key,
379                                             dst=p.tun_dst)
380             p.tun_if.add_vpp_config()
381             p.tun_if.admin_up()
382             p.tun_if.config_ip4()
383             config_tun_params(p, self.encryption_type, p.tun_if)
384             self.multi_params.append(p)
385
386             VppIpRoute(self, p.remote_tun_if_host, 32,
387                        [VppRoutePath(p.tun_if.remote_ip4,
388                                      0xffffffff)]).add_vpp_config()
389
390     def tearDown(self):
391         super(TestIpsec4MultiTunIfEsp, self).tearDown()
392
393     def test_tun_44(self):
394         """Multiple IPSEC tunnel interfaces """
395         for p in self.multi_params:
396             self.verify_tun_44(p, count=127)
397             c = p.tun_if.get_rx_stats()
398             self.assertEqual(c['packets'], 127)
399             c = p.tun_if.get_tx_stats()
400             self.assertEqual(c['packets'], 127)
401
402     def test_tun_rr_44(self):
403         """ Round-robin packets acrros multiple interface """
404         tx = []
405         for p in self.multi_params:
406             tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
407                                             src=p.remote_tun_if_host,
408                                             dst=self.pg1.remote_ip4)
409         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
410
411         for rx, p in zip(rxs, self.multi_params):
412             self.verify_decrypted(p, [rx])
413
414         tx = []
415         for p in self.multi_params:
416             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
417                                     dst=p.remote_tun_if_host)
418         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
419
420         for rx, p in zip(rxs, self.multi_params):
421             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
422
423
424 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
425     """ IPsec IPv4 Tunnel interface all Algos """
426
427     encryption_type = ESP
428     tun4_encrypt_node_name = "esp4-encrypt-tun"
429     tun4_decrypt_node_name = "esp4-decrypt-tun"
430
431     def config_network(self, p):
432
433         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
434                                         p.scapy_tun_spi,
435                                         p.crypt_algo_vpp_id,
436                                         p.crypt_key, p.crypt_key,
437                                         p.auth_algo_vpp_id, p.auth_key,
438                                         p.auth_key,
439                                         salt=p.salt)
440         p.tun_if.add_vpp_config()
441         p.tun_if.admin_up()
442         p.tun_if.config_ip4()
443         config_tun_params(p, self.encryption_type, p.tun_if)
444         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
445         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
446
447         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
448                              [VppRoutePath(p.tun_if.remote_ip4,
449                                            0xffffffff)])
450         p.route.add_vpp_config()
451
452     def unconfig_network(self, p):
453         p.tun_if.unconfig_ip4()
454         p.tun_if.remove_vpp_config()
455         p.route.remove_vpp_config()
456
457     def setUp(self):
458         super(TestIpsec4TunIfEspAll, self).setUp()
459
460         self.tun_if = self.pg0
461
462     def tearDown(self):
463         super(TestIpsec4TunIfEspAll, self).tearDown()
464
465     def rekey(self, p):
466         #
467         # change the key and the SPI
468         #
469         p.crypt_key = b'X' + p.crypt_key[1:]
470         p.scapy_tun_spi += 1
471         p.scapy_tun_sa_id += 1
472         p.vpp_tun_spi += 1
473         p.vpp_tun_sa_id += 1
474         p.tun_if.local_spi = p.vpp_tun_spi
475         p.tun_if.remote_spi = p.scapy_tun_spi
476
477         config_tun_params(p, self.encryption_type, p.tun_if)
478
479         p.tun_sa_in = VppIpsecSA(self,
480                                  p.scapy_tun_sa_id,
481                                  p.scapy_tun_spi,
482                                  p.auth_algo_vpp_id,
483                                  p.auth_key,
484                                  p.crypt_algo_vpp_id,
485                                  p.crypt_key,
486                                  self.vpp_esp_protocol,
487                                  flags=p.flags,
488                                  salt=p.salt)
489         p.tun_sa_out = VppIpsecSA(self,
490                                   p.vpp_tun_sa_id,
491                                   p.vpp_tun_spi,
492                                   p.auth_algo_vpp_id,
493                                   p.auth_key,
494                                   p.crypt_algo_vpp_id,
495                                   p.crypt_key,
496                                   self.vpp_esp_protocol,
497                                   flags=p.flags,
498                                   salt=p.salt)
499         p.tun_sa_in.add_vpp_config()
500         p.tun_sa_out.add_vpp_config()
501
502         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
503                                          sa_id=p.tun_sa_in.id,
504                                          is_outbound=1)
505         self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
506                                          sa_id=p.tun_sa_out.id,
507                                          is_outbound=0)
508         self.logger.info(self.vapi.cli("sh ipsec sa"))
509
510     def test_tun_44(self):
511         """IPSEC tunnel all algos """
512
513         # foreach VPP crypto engine
514         engines = ["ia32", "ipsecmb", "openssl"]
515
516         # foreach crypto algorithm
517         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
518                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
519                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
520                                 IPSEC_API_INTEG_ALG_NONE),
521                   'scapy-crypto': "AES-GCM",
522                   'scapy-integ': "NULL",
523                   'key': b"JPjyOWBeVEQiMe7h",
524                   'salt': 3333},
525                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
526                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
527                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
528                                 IPSEC_API_INTEG_ALG_NONE),
529                   'scapy-crypto': "AES-GCM",
530                   'scapy-integ': "NULL",
531                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
532                   'salt': 0},
533                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
534                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
535                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
536                                 IPSEC_API_INTEG_ALG_NONE),
537                   'scapy-crypto': "AES-GCM",
538                   'scapy-integ': "NULL",
539                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
540                   'salt': 9999},
541                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
542                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
543                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
544                                 IPSEC_API_INTEG_ALG_SHA1_96),
545                   'scapy-crypto': "AES-CBC",
546                   'scapy-integ': "HMAC-SHA1-96",
547                   'salt': 0,
548                   'key': b"JPjyOWBeVEQiMe7h"},
549                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
550                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
551                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
552                                 IPSEC_API_INTEG_ALG_SHA_512_256),
553                   'scapy-crypto': "AES-CBC",
554                   'scapy-integ': "SHA2-512-256",
555                   'salt': 0,
556                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
557                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
558                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
559                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
560                                 IPSEC_API_INTEG_ALG_SHA_256_128),
561                   'scapy-crypto': "AES-CBC",
562                   'scapy-integ': "SHA2-256-128",
563                   'salt': 0,
564                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
565                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
566                                  IPSEC_API_CRYPTO_ALG_NONE),
567                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
568                                 IPSEC_API_INTEG_ALG_SHA1_96),
569                   'scapy-crypto': "NULL",
570                   'scapy-integ': "HMAC-SHA1-96",
571                   'salt': 0,
572                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
573
574         for engine in engines:
575             self.vapi.cli("set crypto handler all %s" % engine)
576
577             #
578             # loop through each of the algorithms
579             #
580             for algo in algos:
581                 # with self.subTest(algo=algo['scapy']):
582
583                 p = copy.copy(self.ipv4_params)
584                 p.auth_algo_vpp_id = algo['vpp-integ']
585                 p.crypt_algo_vpp_id = algo['vpp-crypto']
586                 p.crypt_algo = algo['scapy-crypto']
587                 p.auth_algo = algo['scapy-integ']
588                 p.crypt_key = algo['key']
589                 p.salt = algo['salt']
590
591                 self.config_network(p)
592
593                 self.verify_tun_44(p, count=127)
594                 c = p.tun_if.get_rx_stats()
595                 self.assertEqual(c['packets'], 127)
596                 c = p.tun_if.get_tx_stats()
597                 self.assertEqual(c['packets'], 127)
598
599                 #
600                 # rekey the tunnel
601                 #
602                 self.rekey(p)
603                 self.verify_tun_44(p, count=127)
604
605                 self.unconfig_network(p)
606                 p.tun_sa_out.remove_vpp_config()
607                 p.tun_sa_in.remove_vpp_config()
608
609
610 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
611     """ IPsec IPv4 Tunnel interface no Algos """
612
613     encryption_type = ESP
614     tun4_encrypt_node_name = "esp4-encrypt-tun"
615     tun4_decrypt_node_name = "esp4-decrypt-tun"
616
617     def config_network(self, p):
618
619         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
620                               IPSEC_API_INTEG_ALG_NONE)
621         p.auth_algo = 'NULL'
622         p.auth_key = []
623
624         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
625                                IPSEC_API_CRYPTO_ALG_NONE)
626         p.crypt_algo = 'NULL'
627         p.crypt_key = []
628
629         p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
630                                         p.scapy_tun_spi,
631                                         p.crypt_algo_vpp_id,
632                                         p.crypt_key, p.crypt_key,
633                                         p.auth_algo_vpp_id, p.auth_key,
634                                         p.auth_key,
635                                         salt=p.salt)
636         p.tun_if.add_vpp_config()
637         p.tun_if.admin_up()
638         p.tun_if.config_ip4()
639         config_tun_params(p, self.encryption_type, p.tun_if)
640         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
641         self.logger.info(self.vapi.cli("sh ipsec sa 1"))
642
643         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
644                              [VppRoutePath(p.tun_if.remote_ip4,
645                                            0xffffffff)])
646         p.route.add_vpp_config()
647
648     def unconfig_network(self, p):
649         p.tun_if.unconfig_ip4()
650         p.tun_if.remove_vpp_config()
651         p.route.remove_vpp_config()
652
653     def setUp(self):
654         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
655
656         self.tun_if = self.pg0
657
658     def tearDown(self):
659         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
660
661     def test_tun_44(self):
662         """ IPSec SA with NULL algos """
663         p = self.ipv4_params
664
665         self.config_network(p)
666
667         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
668                            dst=p.remote_tun_if_host)
669         self.send_and_assert_no_replies(self.pg1, tx)
670
671         self.unconfig_network(p)
672
673
674 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
675     """ IPsec IPv6 Multi Tunnel interface """
676
677     encryption_type = ESP
678     tun6_encrypt_node_name = "esp6-encrypt-tun"
679     tun6_decrypt_node_name = "esp6-decrypt-tun"
680
681     def setUp(self):
682         super(TestIpsec6MultiTunIfEsp, self).setUp()
683
684         self.tun_if = self.pg0
685
686         self.multi_params = []
687         self.pg0.generate_remote_hosts(10)
688         self.pg0.configure_ipv6_neighbors()
689
690         for ii in range(10):
691             p = copy.copy(self.ipv6_params)
692
693             p.remote_tun_if_host = "1111::%d" % (ii + 1)
694             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
695             p.scapy_tun_spi = p.scapy_tun_spi + ii
696             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
697             p.vpp_tun_spi = p.vpp_tun_spi + ii
698
699             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
700             p.scapy_tra_spi = p.scapy_tra_spi + ii
701             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
702             p.vpp_tra_spi = p.vpp_tra_spi + ii
703
704             p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
705                                             p.scapy_tun_spi,
706                                             p.crypt_algo_vpp_id,
707                                             p.crypt_key, p.crypt_key,
708                                             p.auth_algo_vpp_id, p.auth_key,
709                                             p.auth_key, is_ip6=True,
710                                             dst=self.pg0.remote_hosts[ii].ip6)
711             p.tun_if.add_vpp_config()
712             p.tun_if.admin_up()
713             p.tun_if.config_ip6()
714             config_tun_params(p, self.encryption_type, p.tun_if)
715             self.multi_params.append(p)
716
717             r = VppIpRoute(self, p.remote_tun_if_host, 128,
718                            [VppRoutePath(p.tun_if.remote_ip6,
719                                          0xffffffff,
720                                          proto=DpoProto.DPO_PROTO_IP6)])
721             r.add_vpp_config()
722
723     def tearDown(self):
724         super(TestIpsec6MultiTunIfEsp, self).tearDown()
725
726     def test_tun_66(self):
727         """Multiple IPSEC tunnel interfaces """
728         for p in self.multi_params:
729             self.verify_tun_66(p, count=127)
730             c = p.tun_if.get_rx_stats()
731             self.assertEqual(c['packets'], 127)
732             c = p.tun_if.get_tx_stats()
733             self.assertEqual(c['packets'], 127)
734
735
736 class TestIpsecGreTebIfEsp(TemplateIpsec,
737                            IpsecTun4Tests):
738     """ Ipsec GRE TEB ESP - TUN tests """
739     tun4_encrypt_node_name = "esp4-encrypt-tun"
740     tun4_decrypt_node_name = "esp4-decrypt-tun"
741     encryption_type = ESP
742     omac = "00:11:22:33:44:55"
743
744     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
745                          payload_size=100):
746         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
747                 sa.encrypt(IP(src=self.pg0.remote_ip4,
748                               dst=self.pg0.local_ip4) /
749                            GRE() /
750                            Ether(dst=self.omac) /
751                            IP(src="1.1.1.1", dst="1.1.1.2") /
752                            UDP(sport=1144, dport=2233) /
753                            Raw(b'X' * payload_size))
754                 for i in range(count)]
755
756     def gen_pkts(self, sw_intf, src, dst, count=1,
757                  payload_size=100):
758         return [Ether(dst=self.omac) /
759                 IP(src="1.1.1.1", dst="1.1.1.2") /
760                 UDP(sport=1144, dport=2233) /
761                 Raw(b'X' * payload_size)
762                 for i in range(count)]
763
764     def verify_decrypted(self, p, rxs):
765         for rx in rxs:
766             self.assert_equal(rx[Ether].dst, self.omac)
767             self.assert_equal(rx[IP].dst, "1.1.1.2")
768
769     def verify_encrypted(self, p, sa, rxs):
770         for rx in rxs:
771             try:
772                 pkt = sa.decrypt(rx[IP])
773                 if not pkt.haslayer(IP):
774                     pkt = IP(pkt[Raw].load)
775                 self.assert_packet_checksums_valid(pkt)
776                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
777                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
778                 self.assertTrue(pkt.haslayer(GRE))
779                 e = pkt[Ether]
780                 self.assertEqual(e[Ether].dst, self.omac)
781                 self.assertEqual(e[IP].dst, "1.1.1.2")
782             except (IndexError, AssertionError):
783                 self.logger.debug(ppp("Unexpected packet:", rx))
784                 try:
785                     self.logger.debug(ppp("Decrypted packet:", pkt))
786                 except:
787                     pass
788                 raise
789
790     def setUp(self):
791         super(TestIpsecGreTebIfEsp, self).setUp()
792
793         self.tun_if = self.pg0
794
795         p = self.ipv4_params
796
797         bd1 = VppBridgeDomain(self, 1)
798         bd1.add_vpp_config()
799
800         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
801                                   p.auth_algo_vpp_id, p.auth_key,
802                                   p.crypt_algo_vpp_id, p.crypt_key,
803                                   self.vpp_esp_protocol,
804                                   self.pg0.local_ip4,
805                                   self.pg0.remote_ip4)
806         p.tun_sa_out.add_vpp_config()
807
808         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
809                                  p.auth_algo_vpp_id, p.auth_key,
810                                  p.crypt_algo_vpp_id, p.crypt_key,
811                                  self.vpp_esp_protocol,
812                                  self.pg0.remote_ip4,
813                                  self.pg0.local_ip4)
814         p.tun_sa_in.add_vpp_config()
815
816         p.tun_if = VppGreInterface(self,
817                                    self.pg0.local_ip4,
818                                    self.pg0.remote_ip4,
819                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
820                                          GRE_API_TUNNEL_TYPE_TEB))
821         p.tun_if.add_vpp_config()
822
823         p.tun_protect = VppIpsecTunProtect(self,
824                                            p.tun_if,
825                                            p.tun_sa_out,
826                                            [p.tun_sa_in])
827
828         p.tun_protect.add_vpp_config()
829
830         p.tun_if.admin_up()
831         p.tun_if.config_ip4()
832         config_tun_params(p, self.encryption_type, p.tun_if)
833
834         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
835         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
836
837         self.vapi.cli("clear ipsec sa")
838         self.vapi.cli("sh adj")
839         self.vapi.cli("sh ipsec tun")
840
841     def tearDown(self):
842         p = self.ipv4_params
843         p.tun_if.unconfig_ip4()
844         super(TestIpsecGreTebIfEsp, self).tearDown()
845
846
847 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
848                                IpsecTun4Tests):
849     """ Ipsec GRE TEB ESP - TUN tests """
850     tun4_encrypt_node_name = "esp4-encrypt-tun"
851     tun4_decrypt_node_name = "esp4-decrypt-tun"
852     encryption_type = ESP
853     omac = "00:11:22:33:44:55"
854
855     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
856                          payload_size=100):
857         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
858                 sa.encrypt(IP(src=self.pg0.remote_ip4,
859                               dst=self.pg0.local_ip4) /
860                            GRE() /
861                            Ether(dst=self.omac) /
862                            IP(src="1.1.1.1", dst="1.1.1.2") /
863                            UDP(sport=1144, dport=2233) /
864                            Raw(b'X' * payload_size))
865                 for i in range(count)]
866
867     def gen_pkts(self, sw_intf, src, dst, count=1,
868                  payload_size=100):
869         return [Ether(dst=self.omac) /
870                 Dot1Q(vlan=11) /
871                 IP(src="1.1.1.1", dst="1.1.1.2") /
872                 UDP(sport=1144, dport=2233) /
873                 Raw(b'X' * payload_size)
874                 for i in range(count)]
875
876     def verify_decrypted(self, p, rxs):
877         for rx in rxs:
878             self.assert_equal(rx[Ether].dst, self.omac)
879             self.assert_equal(rx[Dot1Q].vlan, 11)
880             self.assert_equal(rx[IP].dst, "1.1.1.2")
881
882     def verify_encrypted(self, p, sa, rxs):
883         for rx in rxs:
884             try:
885                 pkt = sa.decrypt(rx[IP])
886                 if not pkt.haslayer(IP):
887                     pkt = IP(pkt[Raw].load)
888                 self.assert_packet_checksums_valid(pkt)
889                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
890                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
891                 self.assertTrue(pkt.haslayer(GRE))
892                 e = pkt[Ether]
893                 self.assertEqual(e[Ether].dst, self.omac)
894                 self.assertFalse(e.haslayer(Dot1Q))
895                 self.assertEqual(e[IP].dst, "1.1.1.2")
896             except (IndexError, AssertionError):
897                 self.logger.debug(ppp("Unexpected packet:", rx))
898                 try:
899                     self.logger.debug(ppp("Decrypted packet:", pkt))
900                 except:
901                     pass
902                 raise
903
904     def setUp(self):
905         super(TestIpsecGreTebVlanIfEsp, self).setUp()
906
907         self.tun_if = self.pg0
908
909         p = self.ipv4_params
910
911         bd1 = VppBridgeDomain(self, 1)
912         bd1.add_vpp_config()
913
914         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
915         self.vapi.l2_interface_vlan_tag_rewrite(
916             sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
917             push_dot1q=11)
918         self.pg1_11.admin_up()
919
920         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
921                                   p.auth_algo_vpp_id, p.auth_key,
922                                   p.crypt_algo_vpp_id, p.crypt_key,
923                                   self.vpp_esp_protocol,
924                                   self.pg0.local_ip4,
925                                   self.pg0.remote_ip4)
926         p.tun_sa_out.add_vpp_config()
927
928         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
929                                  p.auth_algo_vpp_id, p.auth_key,
930                                  p.crypt_algo_vpp_id, p.crypt_key,
931                                  self.vpp_esp_protocol,
932                                  self.pg0.remote_ip4,
933                                  self.pg0.local_ip4)
934         p.tun_sa_in.add_vpp_config()
935
936         p.tun_if = VppGreInterface(self,
937                                    self.pg0.local_ip4,
938                                    self.pg0.remote_ip4,
939                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
940                                          GRE_API_TUNNEL_TYPE_TEB))
941         p.tun_if.add_vpp_config()
942
943         p.tun_protect = VppIpsecTunProtect(self,
944                                            p.tun_if,
945                                            p.tun_sa_out,
946                                            [p.tun_sa_in])
947
948         p.tun_protect.add_vpp_config()
949
950         p.tun_if.admin_up()
951         p.tun_if.config_ip4()
952         config_tun_params(p, self.encryption_type, p.tun_if)
953
954         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
955         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
956
957         self.vapi.cli("clear ipsec sa")
958
959     def tearDown(self):
960         p = self.ipv4_params
961         p.tun_if.unconfig_ip4()
962         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
963         self.pg1_11.admin_down()
964         self.pg1_11.remove_vpp_config()
965
966
967 class TestIpsecGreTebIfEspTra(TemplateIpsec,
968                               IpsecTun4Tests):
969     """ Ipsec GRE TEB ESP - Tra tests """
970     tun4_encrypt_node_name = "esp4-encrypt-tun"
971     tun4_decrypt_node_name = "esp4-decrypt-tun"
972     encryption_type = ESP
973     omac = "00:11:22:33:44:55"
974
975     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
976                          payload_size=100):
977         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
978                 sa.encrypt(IP(src=self.pg0.remote_ip4,
979                               dst=self.pg0.local_ip4) /
980                            GRE() /
981                            Ether(dst=self.omac) /
982                            IP(src="1.1.1.1", dst="1.1.1.2") /
983                            UDP(sport=1144, dport=2233) /
984                            Raw(b'X' * payload_size))
985                 for i in range(count)]
986
987     def gen_pkts(self, sw_intf, src, dst, count=1,
988                  payload_size=100):
989         return [Ether(dst=self.omac) /
990                 IP(src="1.1.1.1", dst="1.1.1.2") /
991                 UDP(sport=1144, dport=2233) /
992                 Raw(b'X' * payload_size)
993                 for i in range(count)]
994
995     def verify_decrypted(self, p, rxs):
996         for rx in rxs:
997             self.assert_equal(rx[Ether].dst, self.omac)
998             self.assert_equal(rx[IP].dst, "1.1.1.2")
999
1000     def verify_encrypted(self, p, sa, rxs):
1001         for rx in rxs:
1002             try:
1003                 pkt = sa.decrypt(rx[IP])
1004                 if not pkt.haslayer(IP):
1005                     pkt = IP(pkt[Raw].load)
1006                 self.assert_packet_checksums_valid(pkt)
1007                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1008                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1009                 self.assertTrue(pkt.haslayer(GRE))
1010                 e = pkt[Ether]
1011                 self.assertEqual(e[Ether].dst, self.omac)
1012                 self.assertEqual(e[IP].dst, "1.1.1.2")
1013             except (IndexError, AssertionError):
1014                 self.logger.debug(ppp("Unexpected packet:", rx))
1015                 try:
1016                     self.logger.debug(ppp("Decrypted packet:", pkt))
1017                 except:
1018                     pass
1019                 raise
1020
1021     def setUp(self):
1022         super(TestIpsecGreTebIfEspTra, self).setUp()
1023
1024         self.tun_if = self.pg0
1025
1026         p = self.ipv4_params
1027
1028         bd1 = VppBridgeDomain(self, 1)
1029         bd1.add_vpp_config()
1030
1031         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1032                                   p.auth_algo_vpp_id, p.auth_key,
1033                                   p.crypt_algo_vpp_id, p.crypt_key,
1034                                   self.vpp_esp_protocol)
1035         p.tun_sa_out.add_vpp_config()
1036
1037         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1038                                  p.auth_algo_vpp_id, p.auth_key,
1039                                  p.crypt_algo_vpp_id, p.crypt_key,
1040                                  self.vpp_esp_protocol)
1041         p.tun_sa_in.add_vpp_config()
1042
1043         p.tun_if = VppGreInterface(self,
1044                                    self.pg0.local_ip4,
1045                                    self.pg0.remote_ip4,
1046                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1047                                          GRE_API_TUNNEL_TYPE_TEB))
1048         p.tun_if.add_vpp_config()
1049
1050         p.tun_protect = VppIpsecTunProtect(self,
1051                                            p.tun_if,
1052                                            p.tun_sa_out,
1053                                            [p.tun_sa_in])
1054
1055         p.tun_protect.add_vpp_config()
1056
1057         p.tun_if.admin_up()
1058         p.tun_if.config_ip4()
1059         config_tra_params(p, self.encryption_type, p.tun_if)
1060
1061         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1062         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1063
1064         self.vapi.cli("clear ipsec sa")
1065
1066     def tearDown(self):
1067         p = self.ipv4_params
1068         p.tun_if.unconfig_ip4()
1069         super(TestIpsecGreTebIfEspTra, self).tearDown()
1070
1071
1072 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1073                                  IpsecTun4Tests):
1074     """ Ipsec GRE TEB UDP ESP - Tra tests """
1075     tun4_encrypt_node_name = "esp4-encrypt-tun"
1076     tun4_decrypt_node_name = "esp4-decrypt-tun"
1077     encryption_type = ESP
1078     omac = "00:11:22:33:44:55"
1079
1080     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1081                          payload_size=100):
1082         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1083                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1084                               dst=self.pg0.local_ip4) /
1085                            GRE() /
1086                            Ether(dst=self.omac) /
1087                            IP(src="1.1.1.1", dst="1.1.1.2") /
1088                            UDP(sport=1144, dport=2233) /
1089                            Raw(b'X' * payload_size))
1090                 for i in range(count)]
1091
1092     def gen_pkts(self, sw_intf, src, dst, count=1,
1093                  payload_size=100):
1094         return [Ether(dst=self.omac) /
1095                 IP(src="1.1.1.1", dst="1.1.1.2") /
1096                 UDP(sport=1144, dport=2233) /
1097                 Raw(b'X' * payload_size)
1098                 for i in range(count)]
1099
1100     def verify_decrypted(self, p, rxs):
1101         for rx in rxs:
1102             self.assert_equal(rx[Ether].dst, self.omac)
1103             self.assert_equal(rx[IP].dst, "1.1.1.2")
1104
1105     def verify_encrypted(self, p, sa, rxs):
1106         for rx in rxs:
1107             self.assertTrue(rx.haslayer(UDP))
1108             self.assertEqual(rx[UDP].dport, 4545)
1109             self.assertEqual(rx[UDP].sport, 5454)
1110             try:
1111                 pkt = sa.decrypt(rx[IP])
1112                 if not pkt.haslayer(IP):
1113                     pkt = IP(pkt[Raw].load)
1114                 self.assert_packet_checksums_valid(pkt)
1115                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1116                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1117                 self.assertTrue(pkt.haslayer(GRE))
1118                 e = pkt[Ether]
1119                 self.assertEqual(e[Ether].dst, self.omac)
1120                 self.assertEqual(e[IP].dst, "1.1.1.2")
1121             except (IndexError, AssertionError):
1122                 self.logger.debug(ppp("Unexpected packet:", rx))
1123                 try:
1124                     self.logger.debug(ppp("Decrypted packet:", pkt))
1125                 except:
1126                     pass
1127                 raise
1128
1129     def setUp(self):
1130         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1131
1132         self.tun_if = self.pg0
1133
1134         p = self.ipv4_params
1135         p = self.ipv4_params
1136         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1137                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1138         p.nat_header = UDP(sport=5454, dport=4545)
1139
1140         bd1 = VppBridgeDomain(self, 1)
1141         bd1.add_vpp_config()
1142
1143         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1144                                   p.auth_algo_vpp_id, p.auth_key,
1145                                   p.crypt_algo_vpp_id, p.crypt_key,
1146                                   self.vpp_esp_protocol,
1147                                   flags=p.flags,
1148                                   udp_src=5454,
1149                                   udp_dst=4545)
1150         p.tun_sa_out.add_vpp_config()
1151
1152         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1153                                  p.auth_algo_vpp_id, p.auth_key,
1154                                  p.crypt_algo_vpp_id, p.crypt_key,
1155                                  self.vpp_esp_protocol,
1156                                  flags=(p.flags |
1157                                         VppEnum.vl_api_ipsec_sad_flags_t.
1158                                         IPSEC_API_SAD_FLAG_IS_INBOUND),
1159                                  udp_src=5454,
1160                                  udp_dst=4545)
1161         p.tun_sa_in.add_vpp_config()
1162
1163         p.tun_if = VppGreInterface(self,
1164                                    self.pg0.local_ip4,
1165                                    self.pg0.remote_ip4,
1166                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1167                                          GRE_API_TUNNEL_TYPE_TEB))
1168         p.tun_if.add_vpp_config()
1169
1170         p.tun_protect = VppIpsecTunProtect(self,
1171                                            p.tun_if,
1172                                            p.tun_sa_out,
1173                                            [p.tun_sa_in])
1174
1175         p.tun_protect.add_vpp_config()
1176
1177         p.tun_if.admin_up()
1178         p.tun_if.config_ip4()
1179         config_tra_params(p, self.encryption_type, p.tun_if)
1180
1181         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1182         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1183
1184         self.vapi.cli("clear ipsec sa")
1185         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1186
1187     def tearDown(self):
1188         p = self.ipv4_params
1189         p.tun_if.unconfig_ip4()
1190         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1191
1192
1193 class TestIpsecGreIfEsp(TemplateIpsec,
1194                         IpsecTun4Tests):
1195     """ Ipsec GRE ESP - TUN tests """
1196     tun4_encrypt_node_name = "esp4-encrypt-tun"
1197     tun4_decrypt_node_name = "esp4-decrypt-tun"
1198     encryption_type = ESP
1199
1200     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1201                          payload_size=100):
1202         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1203                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1204                               dst=self.pg0.local_ip4) /
1205                            GRE() /
1206                            IP(src=self.pg1.local_ip4,
1207                               dst=self.pg1.remote_ip4) /
1208                            UDP(sport=1144, dport=2233) /
1209                            Raw(b'X' * payload_size))
1210                 for i in range(count)]
1211
1212     def gen_pkts(self, sw_intf, src, dst, count=1,
1213                  payload_size=100):
1214         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1215                 IP(src="1.1.1.1", dst="1.1.1.2") /
1216                 UDP(sport=1144, dport=2233) /
1217                 Raw(b'X' * payload_size)
1218                 for i in range(count)]
1219
1220     def verify_decrypted(self, p, rxs):
1221         for rx in rxs:
1222             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1223             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1224
1225     def verify_encrypted(self, p, sa, rxs):
1226         for rx in rxs:
1227             try:
1228                 pkt = sa.decrypt(rx[IP])
1229                 if not pkt.haslayer(IP):
1230                     pkt = IP(pkt[Raw].load)
1231                 self.assert_packet_checksums_valid(pkt)
1232                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1233                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1234                 self.assertTrue(pkt.haslayer(GRE))
1235                 e = pkt[GRE]
1236                 self.assertEqual(e[IP].dst, "1.1.1.2")
1237             except (IndexError, AssertionError):
1238                 self.logger.debug(ppp("Unexpected packet:", rx))
1239                 try:
1240                     self.logger.debug(ppp("Decrypted packet:", pkt))
1241                 except:
1242                     pass
1243                 raise
1244
1245     def setUp(self):
1246         super(TestIpsecGreIfEsp, self).setUp()
1247
1248         self.tun_if = self.pg0
1249
1250         p = self.ipv4_params
1251
1252         bd1 = VppBridgeDomain(self, 1)
1253         bd1.add_vpp_config()
1254
1255         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1256                                   p.auth_algo_vpp_id, p.auth_key,
1257                                   p.crypt_algo_vpp_id, p.crypt_key,
1258                                   self.vpp_esp_protocol,
1259                                   self.pg0.local_ip4,
1260                                   self.pg0.remote_ip4)
1261         p.tun_sa_out.add_vpp_config()
1262
1263         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1264                                  p.auth_algo_vpp_id, p.auth_key,
1265                                  p.crypt_algo_vpp_id, p.crypt_key,
1266                                  self.vpp_esp_protocol,
1267                                  self.pg0.remote_ip4,
1268                                  self.pg0.local_ip4)
1269         p.tun_sa_in.add_vpp_config()
1270
1271         p.tun_if = VppGreInterface(self,
1272                                    self.pg0.local_ip4,
1273                                    self.pg0.remote_ip4)
1274         p.tun_if.add_vpp_config()
1275
1276         p.tun_protect = VppIpsecTunProtect(self,
1277                                            p.tun_if,
1278                                            p.tun_sa_out,
1279                                            [p.tun_sa_in])
1280         p.tun_protect.add_vpp_config()
1281
1282         p.tun_if.admin_up()
1283         p.tun_if.config_ip4()
1284         config_tun_params(p, self.encryption_type, p.tun_if)
1285
1286         VppIpRoute(self, "1.1.1.2", 32,
1287                    [VppRoutePath(p.tun_if.remote_ip4,
1288                                  0xffffffff)]).add_vpp_config()
1289
1290     def tearDown(self):
1291         p = self.ipv4_params
1292         p.tun_if.unconfig_ip4()
1293         super(TestIpsecGreIfEsp, self).tearDown()
1294
1295
1296 class TestIpsecGreIfEspTra(TemplateIpsec,
1297                            IpsecTun4Tests):
1298     """ Ipsec GRE ESP - TRA tests """
1299     tun4_encrypt_node_name = "esp4-encrypt-tun"
1300     tun4_decrypt_node_name = "esp4-decrypt-tun"
1301     encryption_type = ESP
1302
1303     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1304                          payload_size=100):
1305         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1306                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1307                               dst=self.pg0.local_ip4) /
1308                            GRE() /
1309                            IP(src=self.pg1.local_ip4,
1310                               dst=self.pg1.remote_ip4) /
1311                            UDP(sport=1144, dport=2233) /
1312                            Raw(b'X' * payload_size))
1313                 for i in range(count)]
1314
1315     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1316                                 payload_size=100):
1317         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1318                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1319                               dst=self.pg0.local_ip4) /
1320                            GRE() /
1321                            UDP(sport=1144, dport=2233) /
1322                            Raw(b'X' * payload_size))
1323                 for i in range(count)]
1324
1325     def gen_pkts(self, sw_intf, src, dst, count=1,
1326                  payload_size=100):
1327         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1328                 IP(src="1.1.1.1", dst="1.1.1.2") /
1329                 UDP(sport=1144, dport=2233) /
1330                 Raw(b'X' * payload_size)
1331                 for i in range(count)]
1332
1333     def verify_decrypted(self, p, rxs):
1334         for rx in rxs:
1335             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1336             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1337
1338     def verify_encrypted(self, p, sa, rxs):
1339         for rx in rxs:
1340             try:
1341                 pkt = sa.decrypt(rx[IP])
1342                 if not pkt.haslayer(IP):
1343                     pkt = IP(pkt[Raw].load)
1344                 self.assert_packet_checksums_valid(pkt)
1345                 self.assertTrue(pkt.haslayer(GRE))
1346                 e = pkt[GRE]
1347                 self.assertEqual(e[IP].dst, "1.1.1.2")
1348             except (IndexError, AssertionError):
1349                 self.logger.debug(ppp("Unexpected packet:", rx))
1350                 try:
1351                     self.logger.debug(ppp("Decrypted packet:", pkt))
1352                 except:
1353                     pass
1354                 raise
1355
1356     def setUp(self):
1357         super(TestIpsecGreIfEspTra, self).setUp()
1358
1359         self.tun_if = self.pg0
1360
1361         p = self.ipv4_params
1362
1363         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1364                                   p.auth_algo_vpp_id, p.auth_key,
1365                                   p.crypt_algo_vpp_id, p.crypt_key,
1366                                   self.vpp_esp_protocol)
1367         p.tun_sa_out.add_vpp_config()
1368
1369         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1370                                  p.auth_algo_vpp_id, p.auth_key,
1371                                  p.crypt_algo_vpp_id, p.crypt_key,
1372                                  self.vpp_esp_protocol)
1373         p.tun_sa_in.add_vpp_config()
1374
1375         p.tun_if = VppGreInterface(self,
1376                                    self.pg0.local_ip4,
1377                                    self.pg0.remote_ip4)
1378         p.tun_if.add_vpp_config()
1379
1380         p.tun_protect = VppIpsecTunProtect(self,
1381                                            p.tun_if,
1382                                            p.tun_sa_out,
1383                                            [p.tun_sa_in])
1384         p.tun_protect.add_vpp_config()
1385
1386         p.tun_if.admin_up()
1387         p.tun_if.config_ip4()
1388         config_tra_params(p, self.encryption_type, p.tun_if)
1389
1390         VppIpRoute(self, "1.1.1.2", 32,
1391                    [VppRoutePath(p.tun_if.remote_ip4,
1392                                  0xffffffff)]).add_vpp_config()
1393
1394     def tearDown(self):
1395         p = self.ipv4_params
1396         p.tun_if.unconfig_ip4()
1397         super(TestIpsecGreIfEspTra, self).tearDown()
1398
1399     def test_gre_non_ip(self):
1400         p = self.ipv4_params
1401         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1402                                           src=p.remote_tun_if_host,
1403                                           dst=self.pg1.remote_ip6)
1404         self.send_and_assert_no_replies(self.tun_if, tx)
1405         node_name = ('/err/%s/unsupported payload' %
1406                      self.tun4_decrypt_node_name)
1407         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1408
1409
1410 class TestIpsecGre6IfEspTra(TemplateIpsec,
1411                             IpsecTun6Tests):
1412     """ Ipsec GRE ESP - TRA tests """
1413     tun6_encrypt_node_name = "esp6-encrypt-tun"
1414     tun6_decrypt_node_name = "esp6-decrypt-tun"
1415     encryption_type = ESP
1416
1417     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1418                           payload_size=100):
1419         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1420                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1421                                 dst=self.pg0.local_ip6) /
1422                            GRE() /
1423                            IPv6(src=self.pg1.local_ip6,
1424                                 dst=self.pg1.remote_ip6) /
1425                            UDP(sport=1144, dport=2233) /
1426                            Raw(b'X' * payload_size))
1427                 for i in range(count)]
1428
1429     def gen_pkts6(self, sw_intf, src, dst, count=1,
1430                   payload_size=100):
1431         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1432                 IPv6(src="1::1", dst="1::2") /
1433                 UDP(sport=1144, dport=2233) /
1434                 Raw(b'X' * payload_size)
1435                 for i in range(count)]
1436
1437     def verify_decrypted6(self, p, rxs):
1438         for rx in rxs:
1439             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1440             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1441
1442     def verify_encrypted6(self, p, sa, rxs):
1443         for rx in rxs:
1444             try:
1445                 pkt = sa.decrypt(rx[IPv6])
1446                 if not pkt.haslayer(IPv6):
1447                     pkt = IPv6(pkt[Raw].load)
1448                 self.assert_packet_checksums_valid(pkt)
1449                 self.assertTrue(pkt.haslayer(GRE))
1450                 e = pkt[GRE]
1451                 self.assertEqual(e[IPv6].dst, "1::2")
1452             except (IndexError, AssertionError):
1453                 self.logger.debug(ppp("Unexpected packet:", rx))
1454                 try:
1455                     self.logger.debug(ppp("Decrypted packet:", pkt))
1456                 except:
1457                     pass
1458                 raise
1459
1460     def setUp(self):
1461         super(TestIpsecGre6IfEspTra, self).setUp()
1462
1463         self.tun_if = self.pg0
1464
1465         p = self.ipv6_params
1466
1467         bd1 = VppBridgeDomain(self, 1)
1468         bd1.add_vpp_config()
1469
1470         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1471                                   p.auth_algo_vpp_id, p.auth_key,
1472                                   p.crypt_algo_vpp_id, p.crypt_key,
1473                                   self.vpp_esp_protocol)
1474         p.tun_sa_out.add_vpp_config()
1475
1476         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1477                                  p.auth_algo_vpp_id, p.auth_key,
1478                                  p.crypt_algo_vpp_id, p.crypt_key,
1479                                  self.vpp_esp_protocol)
1480         p.tun_sa_in.add_vpp_config()
1481
1482         p.tun_if = VppGreInterface(self,
1483                                    self.pg0.local_ip6,
1484                                    self.pg0.remote_ip6)
1485         p.tun_if.add_vpp_config()
1486
1487         p.tun_protect = VppIpsecTunProtect(self,
1488                                            p.tun_if,
1489                                            p.tun_sa_out,
1490                                            [p.tun_sa_in])
1491         p.tun_protect.add_vpp_config()
1492
1493         p.tun_if.admin_up()
1494         p.tun_if.config_ip6()
1495         config_tra_params(p, self.encryption_type, p.tun_if)
1496
1497         r = VppIpRoute(self, "1::2", 128,
1498                        [VppRoutePath(p.tun_if.remote_ip6,
1499                                      0xffffffff,
1500                                      proto=DpoProto.DPO_PROTO_IP6)])
1501         r.add_vpp_config()
1502
1503     def tearDown(self):
1504         p = self.ipv6_params
1505         p.tun_if.unconfig_ip6()
1506         super(TestIpsecGre6IfEspTra, self).tearDown()
1507
1508
1509 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1510     """ Ipsec mGRE ESP v4 TRA tests """
1511     tun4_encrypt_node_name = "esp4-encrypt-tun"
1512     tun4_decrypt_node_name = "esp4-decrypt-tun"
1513     encryption_type = ESP
1514
1515     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1516                          payload_size=100):
1517         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1518                 sa.encrypt(IP(src=p.tun_dst,
1519                               dst=self.pg0.local_ip4) /
1520                            GRE() /
1521                            IP(src=self.pg1.local_ip4,
1522                               dst=self.pg1.remote_ip4) /
1523                            UDP(sport=1144, dport=2233) /
1524                            Raw(b'X' * payload_size))
1525                 for i in range(count)]
1526
1527     def gen_pkts(self, sw_intf, src, dst, count=1,
1528                  payload_size=100):
1529         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1530                 IP(src="1.1.1.1", dst=dst) /
1531                 UDP(sport=1144, dport=2233) /
1532                 Raw(b'X' * payload_size)
1533                 for i in range(count)]
1534
1535     def verify_decrypted(self, p, rxs):
1536         for rx in rxs:
1537             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1538             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1539
1540     def verify_encrypted(self, p, sa, rxs):
1541         for rx in rxs:
1542             try:
1543                 pkt = sa.decrypt(rx[IP])
1544                 if not pkt.haslayer(IP):
1545                     pkt = IP(pkt[Raw].load)
1546                 self.assert_packet_checksums_valid(pkt)
1547                 self.assertTrue(pkt.haslayer(GRE))
1548                 e = pkt[GRE]
1549                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1550             except (IndexError, AssertionError):
1551                 self.logger.debug(ppp("Unexpected packet:", rx))
1552                 try:
1553                     self.logger.debug(ppp("Decrypted packet:", pkt))
1554                 except:
1555                     pass
1556                 raise
1557
1558     def setUp(self):
1559         super(TestIpsecMGreIfEspTra4, self).setUp()
1560
1561         N_NHS = 16
1562         self.tun_if = self.pg0
1563         p = self.ipv4_params
1564         p.tun_if = VppGreInterface(self,
1565                                    self.pg0.local_ip4,
1566                                    "0.0.0.0",
1567                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1568                                          TUNNEL_API_MODE_MP))
1569         p.tun_if.add_vpp_config()
1570         p.tun_if.admin_up()
1571         p.tun_if.config_ip4()
1572         p.tun_if.generate_remote_hosts(N_NHS)
1573         self.pg0.generate_remote_hosts(N_NHS)
1574         self.pg0.configure_ipv4_neighbors()
1575
1576         # setup some SAs for several next-hops on the interface
1577         self.multi_params = []
1578
1579         for ii in range(N_NHS):
1580             p = copy.copy(self.ipv4_params)
1581
1582             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1583             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1584             p.scapy_tun_spi = p.scapy_tun_spi + ii
1585             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1586             p.vpp_tun_spi = p.vpp_tun_spi + ii
1587
1588             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1589             p.scapy_tra_spi = p.scapy_tra_spi + ii
1590             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1591             p.vpp_tra_spi = p.vpp_tra_spi + ii
1592             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1593                                       p.auth_algo_vpp_id, p.auth_key,
1594                                       p.crypt_algo_vpp_id, p.crypt_key,
1595                                       self.vpp_esp_protocol)
1596             p.tun_sa_out.add_vpp_config()
1597
1598             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1599                                      p.auth_algo_vpp_id, p.auth_key,
1600                                      p.crypt_algo_vpp_id, p.crypt_key,
1601                                      self.vpp_esp_protocol)
1602             p.tun_sa_in.add_vpp_config()
1603
1604             p.tun_protect = VppIpsecTunProtect(
1605                 self,
1606                 p.tun_if,
1607                 p.tun_sa_out,
1608                 [p.tun_sa_in],
1609                 nh=p.tun_if.remote_hosts[ii].ip4)
1610             p.tun_protect.add_vpp_config()
1611             config_tra_params(p, self.encryption_type, p.tun_if)
1612             self.multi_params.append(p)
1613
1614             VppIpRoute(self, p.remote_tun_if_host, 32,
1615                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1616                                      p.tun_if.sw_if_index)]).add_vpp_config()
1617
1618             # in this v4 variant add the teibs after the protect
1619             p.teib = VppTeib(self, p.tun_if,
1620                              p.tun_if.remote_hosts[ii].ip4,
1621                              self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1622             p.tun_dst = self.pg0.remote_hosts[ii].ip4
1623         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1624
1625     def tearDown(self):
1626         p = self.ipv4_params
1627         p.tun_if.unconfig_ip4()
1628         super(TestIpsecMGreIfEspTra4, self).tearDown()
1629
1630     def test_tun_44(self):
1631         """mGRE IPSEC 44"""
1632         N_PKTS = 63
1633         for p in self.multi_params:
1634             self.verify_tun_44(p, count=N_PKTS)
1635             p.teib.remove_vpp_config()
1636             self.verify_tun_dropped_44(p, count=N_PKTS)
1637             p.teib.add_vpp_config()
1638             self.verify_tun_44(p, count=N_PKTS)
1639
1640
1641 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1642     """ Ipsec mGRE ESP v6 TRA tests """
1643     tun6_encrypt_node_name = "esp6-encrypt-tun"
1644     tun6_decrypt_node_name = "esp6-decrypt-tun"
1645     encryption_type = ESP
1646
1647     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1648                           payload_size=100):
1649         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1650                 sa.encrypt(IPv6(src=p.tun_dst,
1651                                 dst=self.pg0.local_ip6) /
1652                            GRE() /
1653                            IPv6(src=self.pg1.local_ip6,
1654                                 dst=self.pg1.remote_ip6) /
1655                            UDP(sport=1144, dport=2233) /
1656                            Raw(b'X' * payload_size))
1657                 for i in range(count)]
1658
1659     def gen_pkts6(self, sw_intf, src, dst, count=1,
1660                   payload_size=100):
1661         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1662                 IPv6(src="1::1", dst=dst) /
1663                 UDP(sport=1144, dport=2233) /
1664                 Raw(b'X' * payload_size)
1665                 for i in range(count)]
1666
1667     def verify_decrypted6(self, p, rxs):
1668         for rx in rxs:
1669             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1670             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1671
1672     def verify_encrypted6(self, p, sa, rxs):
1673         for rx in rxs:
1674             try:
1675                 pkt = sa.decrypt(rx[IPv6])
1676                 if not pkt.haslayer(IPv6):
1677                     pkt = IPv6(pkt[Raw].load)
1678                 self.assert_packet_checksums_valid(pkt)
1679                 self.assertTrue(pkt.haslayer(GRE))
1680                 e = pkt[GRE]
1681                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1682             except (IndexError, AssertionError):
1683                 self.logger.debug(ppp("Unexpected packet:", rx))
1684                 try:
1685                     self.logger.debug(ppp("Decrypted packet:", pkt))
1686                 except:
1687                     pass
1688                 raise
1689
1690     def setUp(self):
1691         super(TestIpsecMGreIfEspTra6, self).setUp()
1692
1693         self.vapi.cli("set logging class ipsec level debug")
1694
1695         N_NHS = 16
1696         self.tun_if = self.pg0
1697         p = self.ipv6_params
1698         p.tun_if = VppGreInterface(self,
1699                                    self.pg0.local_ip6,
1700                                    "::",
1701                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1702                                          TUNNEL_API_MODE_MP))
1703         p.tun_if.add_vpp_config()
1704         p.tun_if.admin_up()
1705         p.tun_if.config_ip6()
1706         p.tun_if.generate_remote_hosts(N_NHS)
1707         self.pg0.generate_remote_hosts(N_NHS)
1708         self.pg0.configure_ipv6_neighbors()
1709
1710         # setup some SAs for several next-hops on the interface
1711         self.multi_params = []
1712
1713         for ii in range(N_NHS):
1714             p = copy.copy(self.ipv6_params)
1715
1716             p.remote_tun_if_host = "1::%d" % (ii + 1)
1717             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1718             p.scapy_tun_spi = p.scapy_tun_spi + ii
1719             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1720             p.vpp_tun_spi = p.vpp_tun_spi + ii
1721
1722             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1723             p.scapy_tra_spi = p.scapy_tra_spi + ii
1724             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1725             p.vpp_tra_spi = p.vpp_tra_spi + ii
1726             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1727                                       p.auth_algo_vpp_id, p.auth_key,
1728                                       p.crypt_algo_vpp_id, p.crypt_key,
1729                                       self.vpp_esp_protocol)
1730             p.tun_sa_out.add_vpp_config()
1731
1732             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1733                                      p.auth_algo_vpp_id, p.auth_key,
1734                                      p.crypt_algo_vpp_id, p.crypt_key,
1735                                      self.vpp_esp_protocol)
1736             p.tun_sa_in.add_vpp_config()
1737
1738             # in this v6 variant add the teibs first then the protection
1739             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1740             VppTeib(self, p.tun_if,
1741                     p.tun_if.remote_hosts[ii].ip6,
1742                     p.tun_dst).add_vpp_config()
1743
1744             p.tun_protect = VppIpsecTunProtect(
1745                 self,
1746                 p.tun_if,
1747                 p.tun_sa_out,
1748                 [p.tun_sa_in],
1749                 nh=p.tun_if.remote_hosts[ii].ip6)
1750             p.tun_protect.add_vpp_config()
1751             config_tra_params(p, self.encryption_type, p.tun_if)
1752             self.multi_params.append(p)
1753
1754             VppIpRoute(self, p.remote_tun_if_host, 128,
1755                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1756                                      p.tun_if.sw_if_index)]).add_vpp_config()
1757             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1758
1759         self.logger.info(self.vapi.cli("sh log"))
1760         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1761         self.logger.info(self.vapi.cli("sh adj 41"))
1762
1763     def tearDown(self):
1764         p = self.ipv6_params
1765         p.tun_if.unconfig_ip6()
1766         super(TestIpsecMGreIfEspTra6, self).tearDown()
1767
1768     def test_tun_66(self):
1769         """mGRE IPSec 66"""
1770         for p in self.multi_params:
1771             self.verify_tun_66(p, count=63)
1772
1773
1774 class TemplateIpsec4TunProtect(object):
1775     """ IPsec IPv4 Tunnel protect """
1776
1777     encryption_type = ESP
1778     tun4_encrypt_node_name = "esp4-encrypt-tun"
1779     tun4_decrypt_node_name = "esp4-decrypt-tun"
1780     tun4_input_node = "ipsec4-tun-input"
1781
1782     def config_sa_tra(self, p):
1783         config_tun_params(p, self.encryption_type, p.tun_if)
1784
1785         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1786                                   p.auth_algo_vpp_id, p.auth_key,
1787                                   p.crypt_algo_vpp_id, p.crypt_key,
1788                                   self.vpp_esp_protocol,
1789                                   flags=p.flags)
1790         p.tun_sa_out.add_vpp_config()
1791
1792         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1793                                  p.auth_algo_vpp_id, p.auth_key,
1794                                  p.crypt_algo_vpp_id, p.crypt_key,
1795                                  self.vpp_esp_protocol,
1796                                  flags=p.flags)
1797         p.tun_sa_in.add_vpp_config()
1798
1799     def config_sa_tun(self, p):
1800         config_tun_params(p, self.encryption_type, p.tun_if)
1801
1802         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1803                                   p.auth_algo_vpp_id, p.auth_key,
1804                                   p.crypt_algo_vpp_id, p.crypt_key,
1805                                   self.vpp_esp_protocol,
1806                                   self.tun_if.local_addr[p.addr_type],
1807                                   self.tun_if.remote_addr[p.addr_type],
1808                                   flags=p.flags)
1809         p.tun_sa_out.add_vpp_config()
1810
1811         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1812                                  p.auth_algo_vpp_id, p.auth_key,
1813                                  p.crypt_algo_vpp_id, p.crypt_key,
1814                                  self.vpp_esp_protocol,
1815                                  self.tun_if.remote_addr[p.addr_type],
1816                                  self.tun_if.local_addr[p.addr_type],
1817                                  flags=p.flags)
1818         p.tun_sa_in.add_vpp_config()
1819
1820     def config_protect(self, p):
1821         p.tun_protect = VppIpsecTunProtect(self,
1822                                            p.tun_if,
1823                                            p.tun_sa_out,
1824                                            [p.tun_sa_in])
1825         p.tun_protect.add_vpp_config()
1826
1827     def config_network(self, p):
1828         p.tun_if = VppIpIpTunInterface(self, self.pg0,
1829                                        self.pg0.local_ip4,
1830                                        self.pg0.remote_ip4)
1831         p.tun_if.add_vpp_config()
1832         p.tun_if.admin_up()
1833         p.tun_if.config_ip4()
1834         p.tun_if.config_ip6()
1835
1836         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1837                              [VppRoutePath(p.tun_if.remote_ip4,
1838                                            0xffffffff)])
1839         p.route.add_vpp_config()
1840         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1841                        [VppRoutePath(p.tun_if.remote_ip6,
1842                                      0xffffffff,
1843                                      proto=DpoProto.DPO_PROTO_IP6)])
1844         r.add_vpp_config()
1845
1846     def unconfig_network(self, p):
1847         p.route.remove_vpp_config()
1848         p.tun_if.remove_vpp_config()
1849
1850     def unconfig_protect(self, p):
1851         p.tun_protect.remove_vpp_config()
1852
1853     def unconfig_sa(self, p):
1854         p.tun_sa_out.remove_vpp_config()
1855         p.tun_sa_in.remove_vpp_config()
1856
1857
1858 class TestIpsec4TunProtect(TemplateIpsec,
1859                            TemplateIpsec4TunProtect,
1860                            IpsecTun4):
1861     """ IPsec IPv4 Tunnel protect - transport mode"""
1862
1863     def setUp(self):
1864         super(TestIpsec4TunProtect, self).setUp()
1865
1866         self.tun_if = self.pg0
1867
1868     def tearDown(self):
1869         super(TestIpsec4TunProtect, self).tearDown()
1870
1871     def test_tun_44(self):
1872         """IPSEC tunnel protect"""
1873
1874         p = self.ipv4_params
1875
1876         self.config_network(p)
1877         self.config_sa_tra(p)
1878         self.config_protect(p)
1879
1880         self.verify_tun_44(p, count=127)
1881         c = p.tun_if.get_rx_stats()
1882         self.assertEqual(c['packets'], 127)
1883         c = p.tun_if.get_tx_stats()
1884         self.assertEqual(c['packets'], 127)
1885
1886         self.vapi.cli("clear ipsec sa")
1887         self.verify_tun_64(p, count=127)
1888         c = p.tun_if.get_rx_stats()
1889         self.assertEqual(c['packets'], 254)
1890         c = p.tun_if.get_tx_stats()
1891         self.assertEqual(c['packets'], 254)
1892
1893         # rekey - create new SAs and update the tunnel protection
1894         np = copy.copy(p)
1895         np.crypt_key = b'X' + p.crypt_key[1:]
1896         np.scapy_tun_spi += 100
1897         np.scapy_tun_sa_id += 1
1898         np.vpp_tun_spi += 100
1899         np.vpp_tun_sa_id += 1
1900         np.tun_if.local_spi = p.vpp_tun_spi
1901         np.tun_if.remote_spi = p.scapy_tun_spi
1902
1903         self.config_sa_tra(np)
1904         self.config_protect(np)
1905         self.unconfig_sa(p)
1906
1907         self.verify_tun_44(np, count=127)
1908         c = p.tun_if.get_rx_stats()
1909         self.assertEqual(c['packets'], 381)
1910         c = p.tun_if.get_tx_stats()
1911         self.assertEqual(c['packets'], 381)
1912
1913         # teardown
1914         self.unconfig_protect(np)
1915         self.unconfig_sa(np)
1916         self.unconfig_network(p)
1917
1918
1919 class TestIpsec4TunProtectUdp(TemplateIpsec,
1920                               TemplateIpsec4TunProtect,
1921                               IpsecTun4):
1922     """ IPsec IPv4 Tunnel protect - transport mode"""
1923
1924     def setUp(self):
1925         super(TestIpsec4TunProtectUdp, self).setUp()
1926
1927         self.tun_if = self.pg0
1928
1929         p = self.ipv4_params
1930         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1931                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1932         p.nat_header = UDP(sport=4500, dport=4500)
1933         self.config_network(p)
1934         self.config_sa_tra(p)
1935         self.config_protect(p)
1936
1937     def tearDown(self):
1938         p = self.ipv4_params
1939         self.unconfig_protect(p)
1940         self.unconfig_sa(p)
1941         self.unconfig_network(p)
1942         super(TestIpsec4TunProtectUdp, self).tearDown()
1943
1944     def verify_encrypted(self, p, sa, rxs):
1945         # ensure encrypted packets are recieved with the default UDP ports
1946         for rx in rxs:
1947             self.assertEqual(rx[UDP].sport, 4500)
1948             self.assertEqual(rx[UDP].dport, 4500)
1949         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
1950
1951     def test_tun_44(self):
1952         """IPSEC UDP tunnel protect"""
1953
1954         p = self.ipv4_params
1955
1956         self.verify_tun_44(p, count=127)
1957         c = p.tun_if.get_rx_stats()
1958         self.assertEqual(c['packets'], 127)
1959         c = p.tun_if.get_tx_stats()
1960         self.assertEqual(c['packets'], 127)
1961
1962     def test_keepalive(self):
1963         """ IPSEC NAT Keepalive """
1964         self.verify_keepalive(self.ipv4_params)
1965
1966
1967 class TestIpsec4TunProtectTun(TemplateIpsec,
1968                               TemplateIpsec4TunProtect,
1969                               IpsecTun4):
1970     """ IPsec IPv4 Tunnel protect - tunnel mode"""
1971
1972     encryption_type = ESP
1973     tun4_encrypt_node_name = "esp4-encrypt-tun"
1974     tun4_decrypt_node_name = "esp4-decrypt-tun"
1975
1976     def setUp(self):
1977         super(TestIpsec4TunProtectTun, self).setUp()
1978
1979         self.tun_if = self.pg0
1980
1981     def tearDown(self):
1982         super(TestIpsec4TunProtectTun, self).tearDown()
1983
1984     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1985                          payload_size=100):
1986         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1987                 sa.encrypt(IP(src=sw_intf.remote_ip4,
1988                               dst=sw_intf.local_ip4) /
1989                            IP(src=src, dst=dst) /
1990                            UDP(sport=1144, dport=2233) /
1991                            Raw(b'X' * payload_size))
1992                 for i in range(count)]
1993
1994     def gen_pkts(self, sw_intf, src, dst, count=1,
1995                  payload_size=100):
1996         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1997                 IP(src=src, dst=dst) /
1998                 UDP(sport=1144, dport=2233) /
1999                 Raw(b'X' * payload_size)
2000                 for i in range(count)]
2001
2002     def verify_decrypted(self, p, rxs):
2003         for rx in rxs:
2004             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2005             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2006             self.assert_packet_checksums_valid(rx)
2007
2008     def verify_encrypted(self, p, sa, rxs):
2009         for rx in rxs:
2010             try:
2011                 pkt = sa.decrypt(rx[IP])
2012                 if not pkt.haslayer(IP):
2013                     pkt = IP(pkt[Raw].load)
2014                 self.assert_packet_checksums_valid(pkt)
2015                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2016                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2017                 inner = pkt[IP].payload
2018                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2019
2020             except (IndexError, AssertionError):
2021                 self.logger.debug(ppp("Unexpected packet:", rx))
2022                 try:
2023                     self.logger.debug(ppp("Decrypted packet:", pkt))
2024                 except:
2025                     pass
2026                 raise
2027
2028     def test_tun_44(self):
2029         """IPSEC tunnel protect """
2030
2031         p = self.ipv4_params
2032
2033         self.config_network(p)
2034         self.config_sa_tun(p)
2035         self.config_protect(p)
2036
2037         # also add an output features on the tunnel and physical interface
2038         # so we test they still work
2039         r_all = AclRule(True,
2040                         src_prefix="0.0.0.0/0",
2041                         dst_prefix="0.0.0.0/0",
2042                         proto=0)
2043         a = VppAcl(self, [r_all]).add_vpp_config()
2044
2045         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2046         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2047
2048         self.verify_tun_44(p, count=127)
2049
2050         c = p.tun_if.get_rx_stats()
2051         self.assertEqual(c['packets'], 127)
2052         c = p.tun_if.get_tx_stats()
2053         self.assertEqual(c['packets'], 127)
2054
2055         # rekey - create new SAs and update the tunnel protection
2056         np = copy.copy(p)
2057         np.crypt_key = b'X' + p.crypt_key[1:]
2058         np.scapy_tun_spi += 100
2059         np.scapy_tun_sa_id += 1
2060         np.vpp_tun_spi += 100
2061         np.vpp_tun_sa_id += 1
2062         np.tun_if.local_spi = p.vpp_tun_spi
2063         np.tun_if.remote_spi = p.scapy_tun_spi
2064
2065         self.config_sa_tun(np)
2066         self.config_protect(np)
2067         self.unconfig_sa(p)
2068
2069         self.verify_tun_44(np, count=127)
2070         c = p.tun_if.get_rx_stats()
2071         self.assertEqual(c['packets'], 254)
2072         c = p.tun_if.get_tx_stats()
2073         self.assertEqual(c['packets'], 254)
2074
2075         # teardown
2076         self.unconfig_protect(np)
2077         self.unconfig_sa(np)
2078         self.unconfig_network(p)
2079
2080
2081 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2082                                   TemplateIpsec4TunProtect,
2083                                   IpsecTun4):
2084     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2085
2086     encryption_type = ESP
2087     tun4_encrypt_node_name = "esp4-encrypt-tun"
2088     tun4_decrypt_node_name = "esp4-decrypt-tun"
2089
2090     def setUp(self):
2091         super(TestIpsec4TunProtectTunDrop, self).setUp()
2092
2093         self.tun_if = self.pg0
2094
2095     def tearDown(self):
2096         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2097
2098     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2099                          payload_size=100):
2100         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2101                 sa.encrypt(IP(src=sw_intf.remote_ip4,
2102                               dst="5.5.5.5") /
2103                            IP(src=src, dst=dst) /
2104                            UDP(sport=1144, dport=2233) /
2105                            Raw(b'X' * payload_size))
2106                 for i in range(count)]
2107
2108     def test_tun_drop_44(self):
2109         """IPSEC tunnel protect bogus tunnel header """
2110
2111         p = self.ipv4_params
2112
2113         self.config_network(p)
2114         self.config_sa_tun(p)
2115         self.config_protect(p)
2116
2117         tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2118                                    src=p.remote_tun_if_host,
2119                                    dst=self.pg1.remote_ip4,
2120                                    count=63)
2121         self.send_and_assert_no_replies(self.tun_if, tx)
2122
2123         # teardown
2124         self.unconfig_protect(p)
2125         self.unconfig_sa(p)
2126         self.unconfig_network(p)
2127
2128
2129 class TemplateIpsec6TunProtect(object):
2130     """ IPsec IPv6 Tunnel protect """
2131
2132     def config_sa_tra(self, p):
2133         config_tun_params(p, self.encryption_type, p.tun_if)
2134
2135         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2136                                   p.auth_algo_vpp_id, p.auth_key,
2137                                   p.crypt_algo_vpp_id, p.crypt_key,
2138                                   self.vpp_esp_protocol)
2139         p.tun_sa_out.add_vpp_config()
2140
2141         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2142                                  p.auth_algo_vpp_id, p.auth_key,
2143                                  p.crypt_algo_vpp_id, p.crypt_key,
2144                                  self.vpp_esp_protocol)
2145         p.tun_sa_in.add_vpp_config()
2146
2147     def config_sa_tun(self, p):
2148         config_tun_params(p, self.encryption_type, p.tun_if)
2149
2150         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2151                                   p.auth_algo_vpp_id, p.auth_key,
2152                                   p.crypt_algo_vpp_id, p.crypt_key,
2153                                   self.vpp_esp_protocol,
2154                                   self.tun_if.local_addr[p.addr_type],
2155                                   self.tun_if.remote_addr[p.addr_type])
2156         p.tun_sa_out.add_vpp_config()
2157
2158         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2159                                  p.auth_algo_vpp_id, p.auth_key,
2160                                  p.crypt_algo_vpp_id, p.crypt_key,
2161                                  self.vpp_esp_protocol,
2162                                  self.tun_if.remote_addr[p.addr_type],
2163                                  self.tun_if.local_addr[p.addr_type])
2164         p.tun_sa_in.add_vpp_config()
2165
2166     def config_protect(self, p):
2167         p.tun_protect = VppIpsecTunProtect(self,
2168                                            p.tun_if,
2169                                            p.tun_sa_out,
2170                                            [p.tun_sa_in])
2171         p.tun_protect.add_vpp_config()
2172
2173     def config_network(self, p):
2174         p.tun_if = VppIpIpTunInterface(self, self.pg0,
2175                                        self.pg0.local_ip6,
2176                                        self.pg0.remote_ip6)
2177         p.tun_if.add_vpp_config()
2178         p.tun_if.admin_up()
2179         p.tun_if.config_ip6()
2180         p.tun_if.config_ip4()
2181
2182         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2183                              [VppRoutePath(p.tun_if.remote_ip6,
2184                                            0xffffffff,
2185                                            proto=DpoProto.DPO_PROTO_IP6)])
2186         p.route.add_vpp_config()
2187         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2188                        [VppRoutePath(p.tun_if.remote_ip4,
2189                                      0xffffffff)])
2190         r.add_vpp_config()
2191
2192     def unconfig_network(self, p):
2193         p.route.remove_vpp_config()
2194         p.tun_if.remove_vpp_config()
2195
2196     def unconfig_protect(self, p):
2197         p.tun_protect.remove_vpp_config()
2198
2199     def unconfig_sa(self, p):
2200         p.tun_sa_out.remove_vpp_config()
2201         p.tun_sa_in.remove_vpp_config()
2202
2203
2204 class TestIpsec6TunProtect(TemplateIpsec,
2205                            TemplateIpsec6TunProtect,
2206                            IpsecTun6):
2207     """ IPsec IPv6 Tunnel protect - transport mode"""
2208
2209     encryption_type = ESP
2210     tun6_encrypt_node_name = "esp6-encrypt-tun"
2211     tun6_decrypt_node_name = "esp6-decrypt-tun"
2212
2213     def setUp(self):
2214         super(TestIpsec6TunProtect, self).setUp()
2215
2216         self.tun_if = self.pg0
2217
2218     def tearDown(self):
2219         super(TestIpsec6TunProtect, self).tearDown()
2220
2221     def test_tun_66(self):
2222         """IPSEC tunnel protect 6o6"""
2223
2224         p = self.ipv6_params
2225
2226         self.config_network(p)
2227         self.config_sa_tra(p)
2228         self.config_protect(p)
2229
2230         self.verify_tun_66(p, count=127)
2231         c = p.tun_if.get_rx_stats()
2232         self.assertEqual(c['packets'], 127)
2233         c = p.tun_if.get_tx_stats()
2234         self.assertEqual(c['packets'], 127)
2235
2236         # rekey - create new SAs and update the tunnel protection
2237         np = copy.copy(p)
2238         np.crypt_key = b'X' + p.crypt_key[1:]
2239         np.scapy_tun_spi += 100
2240         np.scapy_tun_sa_id += 1
2241         np.vpp_tun_spi += 100
2242         np.vpp_tun_sa_id += 1
2243         np.tun_if.local_spi = p.vpp_tun_spi
2244         np.tun_if.remote_spi = p.scapy_tun_spi
2245
2246         self.config_sa_tra(np)
2247         self.config_protect(np)
2248         self.unconfig_sa(p)
2249
2250         self.verify_tun_66(np, count=127)
2251         c = p.tun_if.get_rx_stats()
2252         self.assertEqual(c['packets'], 254)
2253         c = p.tun_if.get_tx_stats()
2254         self.assertEqual(c['packets'], 254)
2255
2256         # bounce the interface state
2257         p.tun_if.admin_down()
2258         self.verify_drop_tun_66(np, count=127)
2259         node = ('/err/ipsec6-tun-input/%s' %
2260                 'ipsec packets received on disabled interface')
2261         self.assertEqual(127, self.statistics.get_err_counter(node))
2262         p.tun_if.admin_up()
2263         self.verify_tun_66(np, count=127)
2264
2265         # 3 phase rekey
2266         #  1) add two input SAs [old, new]
2267         #  2) swap output SA to [new]
2268         #  3) use only [new] input SA
2269         np3 = copy.copy(np)
2270         np3.crypt_key = b'Z' + p.crypt_key[1:]
2271         np3.scapy_tun_spi += 100
2272         np3.scapy_tun_sa_id += 1
2273         np3.vpp_tun_spi += 100
2274         np3.vpp_tun_sa_id += 1
2275         np3.tun_if.local_spi = p.vpp_tun_spi
2276         np3.tun_if.remote_spi = p.scapy_tun_spi
2277
2278         self.config_sa_tra(np3)
2279
2280         # step 1;
2281         p.tun_protect.update_vpp_config(np.tun_sa_out,
2282                                         [np.tun_sa_in, np3.tun_sa_in])
2283         self.verify_tun_66(np, np, count=127)
2284         self.verify_tun_66(np3, np, count=127)
2285
2286         # step 2;
2287         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2288                                         [np.tun_sa_in, np3.tun_sa_in])
2289         self.verify_tun_66(np, np3, count=127)
2290         self.verify_tun_66(np3, np3, count=127)
2291
2292         # step 1;
2293         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2294                                         [np3.tun_sa_in])
2295         self.verify_tun_66(np3, np3, count=127)
2296         self.verify_drop_tun_66(np, count=127)
2297
2298         c = p.tun_if.get_rx_stats()
2299         self.assertEqual(c['packets'], 127*9)
2300         c = p.tun_if.get_tx_stats()
2301         self.assertEqual(c['packets'], 127*8)
2302         self.unconfig_sa(np)
2303
2304         # teardown
2305         self.unconfig_protect(np3)
2306         self.unconfig_sa(np3)
2307         self.unconfig_network(p)
2308
2309     def test_tun_46(self):
2310         """IPSEC tunnel protect 4o6"""
2311
2312         p = self.ipv6_params
2313
2314         self.config_network(p)
2315         self.config_sa_tra(p)
2316         self.config_protect(p)
2317
2318         self.verify_tun_46(p, count=127)
2319         c = p.tun_if.get_rx_stats()
2320         self.assertEqual(c['packets'], 127)
2321         c = p.tun_if.get_tx_stats()
2322         self.assertEqual(c['packets'], 127)
2323
2324         # teardown
2325         self.unconfig_protect(p)
2326         self.unconfig_sa(p)
2327         self.unconfig_network(p)
2328
2329
2330 class TestIpsec6TunProtectTun(TemplateIpsec,
2331                               TemplateIpsec6TunProtect,
2332                               IpsecTun6):
2333     """ IPsec IPv6 Tunnel protect - tunnel mode"""
2334
2335     encryption_type = ESP
2336     tun6_encrypt_node_name = "esp6-encrypt-tun"
2337     tun6_decrypt_node_name = "esp6-decrypt-tun"
2338
2339     def setUp(self):
2340         super(TestIpsec6TunProtectTun, self).setUp()
2341
2342         self.tun_if = self.pg0
2343
2344     def tearDown(self):
2345         super(TestIpsec6TunProtectTun, self).tearDown()
2346
2347     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2348                           payload_size=100):
2349         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2350                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2351                                 dst=sw_intf.local_ip6) /
2352                            IPv6(src=src, dst=dst) /
2353                            UDP(sport=1166, dport=2233) /
2354                            Raw(b'X' * payload_size))
2355                 for i in range(count)]
2356
2357     def gen_pkts6(self, sw_intf, src, dst, count=1,
2358                   payload_size=100):
2359         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2360                 IPv6(src=src, dst=dst) /
2361                 UDP(sport=1166, dport=2233) /
2362                 Raw(b'X' * payload_size)
2363                 for i in range(count)]
2364
2365     def verify_decrypted6(self, p, rxs):
2366         for rx in rxs:
2367             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2368             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2369             self.assert_packet_checksums_valid(rx)
2370
2371     def verify_encrypted6(self, p, sa, rxs):
2372         for rx in rxs:
2373             try:
2374                 pkt = sa.decrypt(rx[IPv6])
2375                 if not pkt.haslayer(IPv6):
2376                     pkt = IPv6(pkt[Raw].load)
2377                 self.assert_packet_checksums_valid(pkt)
2378                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2379                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2380                 inner = pkt[IPv6].payload
2381                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2382
2383             except (IndexError, AssertionError):
2384                 self.logger.debug(ppp("Unexpected packet:", rx))
2385                 try:
2386                     self.logger.debug(ppp("Decrypted packet:", pkt))
2387                 except:
2388                     pass
2389                 raise
2390
2391     def test_tun_66(self):
2392         """IPSEC tunnel protect """
2393
2394         p = self.ipv6_params
2395
2396         self.config_network(p)
2397         self.config_sa_tun(p)
2398         self.config_protect(p)
2399
2400         self.verify_tun_66(p, count=127)
2401
2402         c = p.tun_if.get_rx_stats()
2403         self.assertEqual(c['packets'], 127)
2404         c = p.tun_if.get_tx_stats()
2405         self.assertEqual(c['packets'], 127)
2406
2407         # rekey - create new SAs and update the tunnel protection
2408         np = copy.copy(p)
2409         np.crypt_key = b'X' + p.crypt_key[1:]
2410         np.scapy_tun_spi += 100
2411         np.scapy_tun_sa_id += 1
2412         np.vpp_tun_spi += 100
2413         np.vpp_tun_sa_id += 1
2414         np.tun_if.local_spi = p.vpp_tun_spi
2415         np.tun_if.remote_spi = p.scapy_tun_spi
2416
2417         self.config_sa_tun(np)
2418         self.config_protect(np)
2419         self.unconfig_sa(p)
2420
2421         self.verify_tun_66(np, count=127)
2422         c = p.tun_if.get_rx_stats()
2423         self.assertEqual(c['packets'], 254)
2424         c = p.tun_if.get_tx_stats()
2425         self.assertEqual(c['packets'], 254)
2426
2427         # teardown
2428         self.unconfig_protect(np)
2429         self.unconfig_sa(np)
2430         self.unconfig_network(p)
2431
2432
2433 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2434                                   TemplateIpsec6TunProtect,
2435                                   IpsecTun6):
2436     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2437
2438     encryption_type = ESP
2439     tun6_encrypt_node_name = "esp6-encrypt-tun"
2440     tun6_decrypt_node_name = "esp6-decrypt-tun"
2441
2442     def setUp(self):
2443         super(TestIpsec6TunProtectTunDrop, self).setUp()
2444
2445         self.tun_if = self.pg0
2446
2447     def tearDown(self):
2448         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2449
2450     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2451                           payload_size=100):
2452         # the IP destination of the revelaed packet does not match
2453         # that assigned to the tunnel
2454         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2455                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2456                                 dst="5::5") /
2457                            IPv6(src=src, dst=dst) /
2458                            UDP(sport=1144, dport=2233) /
2459                            Raw(b'X' * payload_size))
2460                 for i in range(count)]
2461
2462     def test_tun_drop_66(self):
2463         """IPSEC 6 tunnel protect bogus tunnel header """
2464
2465         p = self.ipv6_params
2466
2467         self.config_network(p)
2468         self.config_sa_tun(p)
2469         self.config_protect(p)
2470
2471         tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2472                                     src=p.remote_tun_if_host,
2473                                     dst=self.pg1.remote_ip6,
2474                                     count=63)
2475         self.send_and_assert_no_replies(self.tun_if, tx)
2476
2477         self.unconfig_protect(p)
2478         self.unconfig_sa(p)
2479         self.unconfig_network(p)
2480
2481
2482 class TemplateIpsecItf4(object):
2483     """ IPsec Interface IPv4 """
2484
2485     encryption_type = ESP
2486     tun4_encrypt_node_name = "esp4-encrypt-tun"
2487     tun4_decrypt_node_name = "esp4-decrypt-tun"
2488     tun4_input_node = "ipsec4-tun-input"
2489
2490     def config_sa_tun(self, p, src, dst):
2491         config_tun_params(p, self.encryption_type, None, src, dst)
2492
2493         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2494                                   p.auth_algo_vpp_id, p.auth_key,
2495                                   p.crypt_algo_vpp_id, p.crypt_key,
2496                                   self.vpp_esp_protocol,
2497                                   src, dst,
2498                                   flags=p.flags)
2499         p.tun_sa_out.add_vpp_config()
2500
2501         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2502                                  p.auth_algo_vpp_id, p.auth_key,
2503                                  p.crypt_algo_vpp_id, p.crypt_key,
2504                                  self.vpp_esp_protocol,
2505                                  dst, src,
2506                                  flags=p.flags)
2507         p.tun_sa_in.add_vpp_config()
2508
2509     def config_protect(self, p):
2510         p.tun_protect = VppIpsecTunProtect(self,
2511                                            p.tun_if,
2512                                            p.tun_sa_out,
2513                                            [p.tun_sa_in])
2514         p.tun_protect.add_vpp_config()
2515
2516     def config_network(self, p, instance=0xffffffff):
2517         p.tun_if = VppIpsecInterface(self, instance=instance)
2518
2519         p.tun_if.add_vpp_config()
2520         p.tun_if.admin_up()
2521         p.tun_if.config_ip4()
2522         p.tun_if.config_ip6()
2523
2524         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2525                              [VppRoutePath(p.tun_if.remote_ip4,
2526                                            0xffffffff)])
2527         p.route.add_vpp_config()
2528         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2529                        [VppRoutePath(p.tun_if.remote_ip6,
2530                                      0xffffffff,
2531                                      proto=DpoProto.DPO_PROTO_IP6)])
2532         r.add_vpp_config()
2533
2534     def unconfig_network(self, p):
2535         p.route.remove_vpp_config()
2536         p.tun_if.remove_vpp_config()
2537
2538     def unconfig_protect(self, p):
2539         p.tun_protect.remove_vpp_config()
2540
2541     def unconfig_sa(self, p):
2542         p.tun_sa_out.remove_vpp_config()
2543         p.tun_sa_in.remove_vpp_config()
2544
2545
2546 class TestIpsecItf4(TemplateIpsec,
2547                     TemplateIpsecItf4,
2548                     IpsecTun4):
2549     """ IPsec Interface IPv4 """
2550
2551     def setUp(self):
2552         super(TestIpsecItf4, self).setUp()
2553
2554         self.tun_if = self.pg0
2555
2556     def tearDown(self):
2557         super(TestIpsecItf4, self).tearDown()
2558
2559     def test_tun_instance_44(self):
2560         p = self.ipv4_params
2561         self.config_network(p, instance=3)
2562
2563         with self.assertRaises(CliFailedCommandError):
2564             self.vapi.cli("show interface ipsec0")
2565
2566         output = self.vapi.cli("show interface ipsec3")
2567         self.assertTrue("unknown" not in output)
2568
2569         self.unconfig_network(p)
2570
2571     def test_tun_44(self):
2572         """IPSEC interface IPv4"""
2573
2574         n_pkts = 127
2575         p = self.ipv4_params
2576
2577         self.config_network(p)
2578         self.config_sa_tun(p,
2579                            self.pg0.local_ip4,
2580                            self.pg0.remote_ip4)
2581         self.config_protect(p)
2582
2583         self.verify_tun_44(p, count=n_pkts)
2584         c = p.tun_if.get_rx_stats()
2585         self.assertEqual(c['packets'], n_pkts)
2586         c = p.tun_if.get_tx_stats()
2587         self.assertEqual(c['packets'], n_pkts)
2588
2589         p.tun_if.admin_down()
2590         self.verify_tun_dropped_44(p, count=n_pkts)
2591         p.tun_if.admin_up()
2592         self.verify_tun_44(p, count=n_pkts)
2593
2594         c = p.tun_if.get_rx_stats()
2595         self.assertEqual(c['packets'], 3*n_pkts)
2596         c = p.tun_if.get_tx_stats()
2597         self.assertEqual(c['packets'], 2*n_pkts)
2598
2599         # it's a v6 packet when its encrypted
2600         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2601
2602         self.verify_tun_64(p, count=n_pkts)
2603         c = p.tun_if.get_rx_stats()
2604         self.assertEqual(c['packets'], 4*n_pkts)
2605         c = p.tun_if.get_tx_stats()
2606         self.assertEqual(c['packets'], 3*n_pkts)
2607
2608         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2609
2610         self.vapi.cli("clear interfaces")
2611
2612         # rekey - create new SAs and update the tunnel protection
2613         np = copy.copy(p)
2614         np.crypt_key = b'X' + p.crypt_key[1:]
2615         np.scapy_tun_spi += 100
2616         np.scapy_tun_sa_id += 1
2617         np.vpp_tun_spi += 100
2618         np.vpp_tun_sa_id += 1
2619         np.tun_if.local_spi = p.vpp_tun_spi
2620         np.tun_if.remote_spi = p.scapy_tun_spi
2621
2622         self.config_sa_tun(np,
2623                            self.pg0.local_ip4,
2624                            self.pg0.remote_ip4)
2625         self.config_protect(np)
2626         self.unconfig_sa(p)
2627
2628         self.verify_tun_44(np, count=n_pkts)
2629         c = p.tun_if.get_rx_stats()
2630         self.assertEqual(c['packets'], n_pkts)
2631         c = p.tun_if.get_tx_stats()
2632         self.assertEqual(c['packets'], n_pkts)
2633
2634         # teardown
2635         self.unconfig_protect(np)
2636         self.unconfig_sa(np)
2637         self.unconfig_network(p)
2638
2639     def test_tun_44_null(self):
2640         """IPSEC interface IPv4 NULL auth/crypto"""
2641
2642         n_pkts = 127
2643         p = copy.copy(self.ipv4_params)
2644
2645         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
2646                               IPSEC_API_INTEG_ALG_NONE)
2647         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
2648                                IPSEC_API_CRYPTO_ALG_NONE)
2649         p.crypt_algo = "NULL"
2650         p.auth_algo = "NULL"
2651
2652         self.config_network(p)
2653         self.config_sa_tun(p,
2654                            self.pg0.local_ip4,
2655                            self.pg0.remote_ip4)
2656         self.config_protect(p)
2657
2658         self.verify_tun_44(p, count=n_pkts)
2659
2660         # teardown
2661         self.unconfig_protect(p)
2662         self.unconfig_sa(p)
2663         self.unconfig_network(p)
2664
2665
2666 class TemplateIpsecItf6(object):
2667     """ IPsec Interface IPv6 """
2668
2669     encryption_type = ESP
2670     tun6_encrypt_node_name = "esp6-encrypt-tun"
2671     tun6_decrypt_node_name = "esp6-decrypt-tun"
2672     tun6_input_node = "ipsec6-tun-input"
2673
2674     def config_sa_tun(self, p, src, dst):
2675         config_tun_params(p, self.encryption_type, None, src, dst)
2676
2677         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2678                                   p.auth_algo_vpp_id, p.auth_key,
2679                                   p.crypt_algo_vpp_id, p.crypt_key,
2680                                   self.vpp_esp_protocol,
2681                                   src, dst,
2682                                   flags=p.flags)
2683         p.tun_sa_out.add_vpp_config()
2684
2685         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2686                                  p.auth_algo_vpp_id, p.auth_key,
2687                                  p.crypt_algo_vpp_id, p.crypt_key,
2688                                  self.vpp_esp_protocol,
2689                                  dst, src,
2690                                  flags=p.flags)
2691         p.tun_sa_in.add_vpp_config()
2692
2693     def config_protect(self, p):
2694         p.tun_protect = VppIpsecTunProtect(self,
2695                                            p.tun_if,
2696                                            p.tun_sa_out,
2697                                            [p.tun_sa_in])
2698         p.tun_protect.add_vpp_config()
2699
2700     def config_network(self, p):
2701         p.tun_if = VppIpsecInterface(self)
2702
2703         p.tun_if.add_vpp_config()
2704         p.tun_if.admin_up()
2705         p.tun_if.config_ip4()
2706         p.tun_if.config_ip6()
2707
2708         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2709                        [VppRoutePath(p.tun_if.remote_ip4,
2710                                      0xffffffff)])
2711         r.add_vpp_config()
2712
2713         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2714                              [VppRoutePath(p.tun_if.remote_ip6,
2715                                            0xffffffff,
2716                                            proto=DpoProto.DPO_PROTO_IP6)])
2717         p.route.add_vpp_config()
2718
2719     def unconfig_network(self, p):
2720         p.route.remove_vpp_config()
2721         p.tun_if.remove_vpp_config()
2722
2723     def unconfig_protect(self, p):
2724         p.tun_protect.remove_vpp_config()
2725
2726     def unconfig_sa(self, p):
2727         p.tun_sa_out.remove_vpp_config()
2728         p.tun_sa_in.remove_vpp_config()
2729
2730
2731 class TestIpsecItf6(TemplateIpsec,
2732                     TemplateIpsecItf6,
2733                     IpsecTun6):
2734     """ IPsec Interface IPv6 """
2735
2736     def setUp(self):
2737         super(TestIpsecItf6, self).setUp()
2738
2739         self.tun_if = self.pg0
2740
2741     def tearDown(self):
2742         super(TestIpsecItf6, self).tearDown()
2743
2744     def test_tun_44(self):
2745         """IPSEC interface IPv6"""
2746
2747         n_pkts = 127
2748         p = self.ipv6_params
2749
2750         self.config_network(p)
2751         self.config_sa_tun(p,
2752                            self.pg0.local_ip6,
2753                            self.pg0.remote_ip6)
2754         self.config_protect(p)
2755
2756         self.verify_tun_66(p, count=n_pkts)
2757         c = p.tun_if.get_rx_stats()
2758         self.assertEqual(c['packets'], n_pkts)
2759         c = p.tun_if.get_tx_stats()
2760         self.assertEqual(c['packets'], n_pkts)
2761
2762         p.tun_if.admin_down()
2763         self.verify_drop_tun_66(p, count=n_pkts)
2764         p.tun_if.admin_up()
2765         self.verify_tun_66(p, count=n_pkts)
2766
2767         c = p.tun_if.get_rx_stats()
2768         self.assertEqual(c['packets'], 3*n_pkts)
2769         c = p.tun_if.get_tx_stats()
2770         self.assertEqual(c['packets'], 2*n_pkts)
2771
2772         # it's a v4 packet when its encrypted
2773         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2774
2775         self.verify_tun_46(p, count=n_pkts)
2776         c = p.tun_if.get_rx_stats()
2777         self.assertEqual(c['packets'], 4*n_pkts)
2778         c = p.tun_if.get_tx_stats()
2779         self.assertEqual(c['packets'], 3*n_pkts)
2780
2781         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2782
2783         self.vapi.cli("clear interfaces")
2784
2785         # rekey - create new SAs and update the tunnel protection
2786         np = copy.copy(p)
2787         np.crypt_key = b'X' + p.crypt_key[1:]
2788         np.scapy_tun_spi += 100
2789         np.scapy_tun_sa_id += 1
2790         np.vpp_tun_spi += 100
2791         np.vpp_tun_sa_id += 1
2792         np.tun_if.local_spi = p.vpp_tun_spi
2793         np.tun_if.remote_spi = p.scapy_tun_spi
2794
2795         self.config_sa_tun(np,
2796                            self.pg0.local_ip6,
2797                            self.pg0.remote_ip6)
2798         self.config_protect(np)
2799         self.unconfig_sa(p)
2800
2801         self.verify_tun_66(np, count=n_pkts)
2802         c = p.tun_if.get_rx_stats()
2803         self.assertEqual(c['packets'], n_pkts)
2804         c = p.tun_if.get_tx_stats()
2805         self.assertEqual(c['packets'], n_pkts)
2806
2807         # teardown
2808         self.unconfig_protect(np)
2809         self.unconfig_sa(np)
2810         self.unconfig_network(p)
2811
2812
2813 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
2814     """ Ipsec P2MP ESP v4 tests """
2815     tun4_encrypt_node_name = "esp4-encrypt-tun"
2816     tun4_decrypt_node_name = "esp4-decrypt-tun"
2817     encryption_type = ESP
2818
2819     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2820                          payload_size=100):
2821         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2822                 sa.encrypt(IP(src=self.pg1.local_ip4,
2823                               dst=self.pg1.remote_ip4) /
2824                            UDP(sport=1144, dport=2233) /
2825                            Raw(b'X' * payload_size))
2826                 for i in range(count)]
2827
2828     def gen_pkts(self, sw_intf, src, dst, count=1,
2829                  payload_size=100):
2830         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2831                 IP(src="1.1.1.1", dst=dst) /
2832                 UDP(sport=1144, dport=2233) /
2833                 Raw(b'X' * payload_size)
2834                 for i in range(count)]
2835
2836     def verify_decrypted(self, p, rxs):
2837         for rx in rxs:
2838             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2839             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2840
2841     def verify_encrypted(self, p, sa, rxs):
2842         for rx in rxs:
2843             try:
2844                 self.assertEqual(rx[IP].tos,
2845                                  VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2)
2846                 pkt = sa.decrypt(rx[IP])
2847                 if not pkt.haslayer(IP):
2848                     pkt = IP(pkt[Raw].load)
2849                 self.assert_packet_checksums_valid(pkt)
2850                 e = pkt[IP]
2851                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2852             except (IndexError, AssertionError):
2853                 self.logger.debug(ppp("Unexpected packet:", rx))
2854                 try:
2855                     self.logger.debug(ppp("Decrypted packet:", pkt))
2856                 except:
2857                     pass
2858                 raise
2859
2860     def setUp(self):
2861         super(TestIpsecMIfEsp4, self).setUp()
2862
2863         N_NHS = 16
2864         self.tun_if = self.pg0
2865         p = self.ipv4_params
2866         p.tun_if = VppIpsecInterface(self,
2867                                      mode=(VppEnum.vl_api_tunnel_mode_t.
2868                                            TUNNEL_API_MODE_MP))
2869         p.tun_if.add_vpp_config()
2870         p.tun_if.admin_up()
2871         p.tun_if.config_ip4()
2872         p.tun_if.generate_remote_hosts(N_NHS)
2873         self.pg0.generate_remote_hosts(N_NHS)
2874         self.pg0.configure_ipv4_neighbors()
2875
2876         # setup some SAs for several next-hops on the interface
2877         self.multi_params = []
2878
2879         for ii in range(N_NHS):
2880             p = copy.copy(self.ipv4_params)
2881
2882             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2883             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2884             p.scapy_tun_spi = p.scapy_tun_spi + ii
2885             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2886             p.vpp_tun_spi = p.vpp_tun_spi + ii
2887
2888             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2889             p.scapy_tra_spi = p.scapy_tra_spi + ii
2890             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2891             p.vpp_tra_spi = p.vpp_tra_spi + ii
2892             p.tun_sa_out = VppIpsecSA(
2893                 self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2894                 p.auth_algo_vpp_id, p.auth_key,
2895                 p.crypt_algo_vpp_id, p.crypt_key,
2896                 self.vpp_esp_protocol,
2897                 self.pg0.local_ip4,
2898                 self.pg0.remote_hosts[ii].ip4,
2899                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF)
2900             p.tun_sa_out.add_vpp_config()
2901
2902             p.tun_sa_in = VppIpsecSA(
2903                 self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2904                 p.auth_algo_vpp_id, p.auth_key,
2905                 p.crypt_algo_vpp_id, p.crypt_key,
2906                 self.vpp_esp_protocol,
2907                 self.pg0.remote_hosts[ii].ip4,
2908                 self.pg0.local_ip4,
2909                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF)
2910             p.tun_sa_in.add_vpp_config()
2911
2912             p.tun_protect = VppIpsecTunProtect(
2913                 self,
2914                 p.tun_if,
2915                 p.tun_sa_out,
2916                 [p.tun_sa_in],
2917                 nh=p.tun_if.remote_hosts[ii].ip4)
2918             p.tun_protect.add_vpp_config()
2919             config_tun_params(p, self.encryption_type, None,
2920                               self.pg0.local_ip4,
2921                               self.pg0.remote_hosts[ii].ip4)
2922             self.multi_params.append(p)
2923
2924             VppIpRoute(self, p.remote_tun_if_host, 32,
2925                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
2926                                      p.tun_if.sw_if_index)]).add_vpp_config()
2927
2928             p.tun_dst = self.pg0.remote_hosts[ii].ip4
2929
2930     def tearDown(self):
2931         p = self.ipv4_params
2932         p.tun_if.unconfig_ip4()
2933         super(TestIpsecMIfEsp4, self).tearDown()
2934
2935     def test_tun_44(self):
2936         """P2MP IPSEC 44"""
2937         N_PKTS = 63
2938         for p in self.multi_params:
2939             self.verify_tun_44(p, count=N_PKTS)
2940
2941
2942 if __name__ == '__main__':
2943     unittest.main(testRunner=VppTestRunner)