policer: output interface policer
[vpp.git] / test / test_ipsec_tun_if_esp.py
1 import unittest
2 import socket
3 import copy
4
5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
14     IpsecTun4, IpsecTun6,  IpsecTcpTests, mk_scapy_crypt_key, \
15     IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
16 from vpp_gre_interface import VppGreInterface
17 from vpp_ipip_tun_interface import VppIpIpTunInterface
18 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppMplsLabel, \
19     VppMplsTable, VppMplsRoute, FibPathProto
20 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
21 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
22 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
23 from vpp_teib import VppTeib
24 from util import ppp
25 from vpp_papi import VppEnum
26 from vpp_papi_provider import CliFailedCommandError
27 from vpp_acl import AclRule, VppAcl, VppAclInterface
28 from vpp_policer import PolicerAction, VppPolicer, Dir
29
30
31 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
32     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
33     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
34                              IPSEC_API_SAD_FLAG_USE_ESN))
35     crypt_key = mk_scapy_crypt_key(p)
36     if tun_if:
37         p.tun_dst = tun_if.remote_ip
38         p.tun_src = tun_if.local_ip
39     else:
40         p.tun_dst = dst
41         p.tun_src = src
42
43     if p.nat_header:
44         is_default_port = (p.nat_header.dport == 4500)
45     else:
46         is_default_port = True
47
48     if is_default_port:
49         outbound_nat_header = p.nat_header
50     else:
51         outbound_nat_header = UDP(sport=p.nat_header.dport,
52                                   dport=p.nat_header.sport)
53         bind_layers(UDP, ESP, dport=p.nat_header.dport)
54
55     p.scapy_tun_sa = SecurityAssociation(
56         encryption_type, spi=p.vpp_tun_spi,
57         crypt_algo=p.crypt_algo,
58         crypt_key=crypt_key,
59         auth_algo=p.auth_algo, auth_key=p.auth_key,
60         tunnel_header=ip_class_by_addr_type[p.addr_type](
61             src=p.tun_dst,
62             dst=p.tun_src),
63         nat_t_header=outbound_nat_header,
64         esn_en=esn_en)
65     p.vpp_tun_sa = SecurityAssociation(
66         encryption_type, spi=p.scapy_tun_spi,
67         crypt_algo=p.crypt_algo,
68         crypt_key=crypt_key,
69         auth_algo=p.auth_algo, auth_key=p.auth_key,
70         tunnel_header=ip_class_by_addr_type[p.addr_type](
71             dst=p.tun_dst,
72             src=p.tun_src),
73         nat_t_header=p.nat_header,
74         esn_en=esn_en)
75
76
77 def config_tra_params(p, encryption_type, tun_if):
78     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
79     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
80                              IPSEC_API_SAD_FLAG_USE_ESN))
81     crypt_key = mk_scapy_crypt_key(p)
82     p.tun_dst = tun_if.remote_ip
83     p.tun_src = tun_if.local_ip
84
85     if p.nat_header:
86         is_default_port = (p.nat_header.dport == 4500)
87     else:
88         is_default_port = True
89
90     if is_default_port:
91         outbound_nat_header = p.nat_header
92     else:
93         outbound_nat_header = UDP(sport=p.nat_header.dport,
94                                   dport=p.nat_header.sport)
95         bind_layers(UDP, ESP, dport=p.nat_header.dport)
96
97     p.scapy_tun_sa = SecurityAssociation(
98         encryption_type, spi=p.vpp_tun_spi,
99         crypt_algo=p.crypt_algo,
100         crypt_key=crypt_key,
101         auth_algo=p.auth_algo, auth_key=p.auth_key,
102         esn_en=esn_en,
103         nat_t_header=outbound_nat_header)
104     p.vpp_tun_sa = SecurityAssociation(
105         encryption_type, spi=p.scapy_tun_spi,
106         crypt_algo=p.crypt_algo,
107         crypt_key=crypt_key,
108         auth_algo=p.auth_algo, auth_key=p.auth_key,
109         esn_en=esn_en,
110         nat_t_header=p.nat_header)
111
112
113 class TemplateIpsec4TunProtect(object):
114     """ IPsec IPv4 Tunnel protect """
115
116     encryption_type = ESP
117     tun4_encrypt_node_name = "esp4-encrypt-tun"
118     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
119     tun4_input_node = "ipsec4-tun-input"
120
121     def config_sa_tra(self, p):
122         config_tun_params(p, self.encryption_type, p.tun_if)
123
124         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
125                                   p.auth_algo_vpp_id, p.auth_key,
126                                   p.crypt_algo_vpp_id, p.crypt_key,
127                                   self.vpp_esp_protocol,
128                                   flags=p.flags)
129         p.tun_sa_out.add_vpp_config()
130
131         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
132                                  p.auth_algo_vpp_id, p.auth_key,
133                                  p.crypt_algo_vpp_id, p.crypt_key,
134                                  self.vpp_esp_protocol,
135                                  flags=p.flags)
136         p.tun_sa_in.add_vpp_config()
137
138     def config_sa_tun(self, p):
139         config_tun_params(p, self.encryption_type, p.tun_if)
140
141         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
142                                   p.auth_algo_vpp_id, p.auth_key,
143                                   p.crypt_algo_vpp_id, p.crypt_key,
144                                   self.vpp_esp_protocol,
145                                   self.tun_if.local_addr[p.addr_type],
146                                   self.tun_if.remote_addr[p.addr_type],
147                                   flags=p.flags)
148         p.tun_sa_out.add_vpp_config()
149
150         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
151                                  p.auth_algo_vpp_id, p.auth_key,
152                                  p.crypt_algo_vpp_id, p.crypt_key,
153                                  self.vpp_esp_protocol,
154                                  self.tun_if.remote_addr[p.addr_type],
155                                  self.tun_if.local_addr[p.addr_type],
156                                  flags=p.flags)
157         p.tun_sa_in.add_vpp_config()
158
159     def config_protect(self, p):
160         p.tun_protect = VppIpsecTunProtect(self,
161                                            p.tun_if,
162                                            p.tun_sa_out,
163                                            [p.tun_sa_in])
164         p.tun_protect.add_vpp_config()
165
166     def config_network(self, p):
167         if hasattr(p, 'tun_dst'):
168             tun_dst = p.tun_dst
169         else:
170             tun_dst = self.pg0.remote_ip4
171         p.tun_if = VppIpIpTunInterface(self, self.pg0,
172                                        self.pg0.local_ip4,
173                                        tun_dst)
174         p.tun_if.add_vpp_config()
175         p.tun_if.admin_up()
176         p.tun_if.config_ip4()
177         p.tun_if.config_ip6()
178
179         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
180                              [VppRoutePath(p.tun_if.remote_ip4,
181                                            0xffffffff)])
182         p.route.add_vpp_config()
183         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
184                        [VppRoutePath(p.tun_if.remote_ip6,
185                                      0xffffffff,
186                                      proto=DpoProto.DPO_PROTO_IP6)])
187         r.add_vpp_config()
188
189     def unconfig_network(self, p):
190         p.route.remove_vpp_config()
191         p.tun_if.remove_vpp_config()
192
193     def unconfig_protect(self, p):
194         p.tun_protect.remove_vpp_config()
195
196     def unconfig_sa(self, p):
197         p.tun_sa_out.remove_vpp_config()
198         p.tun_sa_in.remove_vpp_config()
199
200
201 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect,
202                              TemplateIpsec):
203     """ IPsec tunnel interface tests """
204
205     encryption_type = ESP
206
207     @classmethod
208     def setUpClass(cls):
209         super(TemplateIpsec4TunIfEsp, cls).setUpClass()
210
211     @classmethod
212     def tearDownClass(cls):
213         super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
214
215     def setUp(self):
216         super(TemplateIpsec4TunIfEsp, self).setUp()
217
218         self.tun_if = self.pg0
219
220         p = self.ipv4_params
221
222         self.config_network(p)
223         self.config_sa_tra(p)
224         self.config_protect(p)
225
226     def tearDown(self):
227         super(TemplateIpsec4TunIfEsp, self).tearDown()
228
229
230 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
231                                 TemplateIpsec):
232     """ IPsec UDP tunnel interface tests """
233
234     tun4_encrypt_node_name = "esp4-encrypt-tun"
235     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
236     encryption_type = ESP
237
238     @classmethod
239     def setUpClass(cls):
240         super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
241
242     @classmethod
243     def tearDownClass(cls):
244         super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
245
246     def verify_encrypted(self, p, sa, rxs):
247         for rx in rxs:
248             try:
249                 # ensure the UDP ports are correct before we decrypt
250                 # which strips them
251                 self.assertTrue(rx.haslayer(UDP))
252                 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
253                 self.assert_equal(rx[UDP].dport, 4500)
254
255                 pkt = sa.decrypt(rx[IP])
256                 if not pkt.haslayer(IP):
257                     pkt = IP(pkt[Raw].load)
258
259                 self.assert_packet_checksums_valid(pkt)
260                 self.assert_equal(pkt[IP].dst, "1.1.1.1")
261                 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
262             except (IndexError, AssertionError):
263                 self.logger.debug(ppp("Unexpected packet:", rx))
264                 try:
265                     self.logger.debug(ppp("Decrypted packet:", pkt))
266                 except:
267                     pass
268                 raise
269
270     def config_sa_tra(self, p):
271         config_tun_params(p, self.encryption_type, p.tun_if)
272
273         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
274                                   p.auth_algo_vpp_id, p.auth_key,
275                                   p.crypt_algo_vpp_id, p.crypt_key,
276                                   self.vpp_esp_protocol,
277                                   flags=p.flags,
278                                   udp_src=p.nat_header.sport,
279                                   udp_dst=p.nat_header.dport)
280         p.tun_sa_out.add_vpp_config()
281
282         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
283                                  p.auth_algo_vpp_id, p.auth_key,
284                                  p.crypt_algo_vpp_id, p.crypt_key,
285                                  self.vpp_esp_protocol,
286                                  flags=p.flags,
287                                  udp_src=p.nat_header.sport,
288                                  udp_dst=p.nat_header.dport)
289         p.tun_sa_in.add_vpp_config()
290
291     def setUp(self):
292         super(TemplateIpsec4TunIfEspUdp, self).setUp()
293
294         p = self.ipv4_params
295         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
296                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
297         p.nat_header = UDP(sport=5454, dport=4500)
298
299         self.tun_if = self.pg0
300
301         self.config_network(p)
302         self.config_sa_tra(p)
303         self.config_protect(p)
304
305     def tearDown(self):
306         super(TemplateIpsec4TunIfEspUdp, self).tearDown()
307
308
309 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
310     """ Ipsec ESP - TUN tests """
311     tun4_encrypt_node_name = "esp4-encrypt-tun"
312     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
313
314     def test_tun_basic64(self):
315         """ ipsec 6o4 tunnel basic test """
316         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
317
318         self.verify_tun_64(self.params[socket.AF_INET], count=1)
319
320     def test_tun_burst64(self):
321         """ ipsec 6o4 tunnel basic test """
322         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
323
324         self.verify_tun_64(self.params[socket.AF_INET], count=257)
325
326     def test_tun_basic_frag44(self):
327         """ ipsec 4o4 tunnel frag basic test """
328         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
329
330         p = self.ipv4_params
331
332         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
333                                        [1500, 0, 0, 0])
334         self.verify_tun_44(self.params[socket.AF_INET],
335                            count=1, payload_size=1800, n_rx=2)
336         self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
337                                        [9000, 0, 0, 0])
338
339
340 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
341     """ Ipsec ESP UDP tests """
342
343     tun4_input_node = "ipsec4-tun-input"
344
345     def setUp(self):
346         super(TestIpsec4TunIfEspUdp, self).setUp()
347
348     def test_keepalive(self):
349         """ IPSEC NAT Keepalive """
350         self.verify_keepalive(self.ipv4_params)
351
352
353 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
354     """ Ipsec ESP UDP GCM tests """
355
356     tun4_input_node = "ipsec4-tun-input"
357
358     def setUp(self):
359         super(TestIpsec4TunIfEspUdpGCM, self).setUp()
360         p = self.ipv4_params
361         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
362                               IPSEC_API_INTEG_ALG_NONE)
363         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
364                                IPSEC_API_CRYPTO_ALG_AES_GCM_256)
365         p.crypt_algo = "AES-GCM"
366         p.auth_algo = "NULL"
367         p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
368         p.salt = 0
369
370
371 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
372     """ Ipsec ESP - TCP tests """
373     pass
374
375
376 class TemplateIpsec6TunProtect(object):
377     """ IPsec IPv6 Tunnel protect """
378
379     def config_sa_tra(self, p):
380         config_tun_params(p, self.encryption_type, p.tun_if)
381
382         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
383                                   p.auth_algo_vpp_id, p.auth_key,
384                                   p.crypt_algo_vpp_id, p.crypt_key,
385                                   self.vpp_esp_protocol)
386         p.tun_sa_out.add_vpp_config()
387
388         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
389                                  p.auth_algo_vpp_id, p.auth_key,
390                                  p.crypt_algo_vpp_id, p.crypt_key,
391                                  self.vpp_esp_protocol)
392         p.tun_sa_in.add_vpp_config()
393
394     def config_sa_tun(self, p):
395         config_tun_params(p, self.encryption_type, p.tun_if)
396
397         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
398                                   p.auth_algo_vpp_id, p.auth_key,
399                                   p.crypt_algo_vpp_id, p.crypt_key,
400                                   self.vpp_esp_protocol,
401                                   self.tun_if.local_addr[p.addr_type],
402                                   self.tun_if.remote_addr[p.addr_type])
403         p.tun_sa_out.add_vpp_config()
404
405         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
406                                  p.auth_algo_vpp_id, p.auth_key,
407                                  p.crypt_algo_vpp_id, p.crypt_key,
408                                  self.vpp_esp_protocol,
409                                  self.tun_if.remote_addr[p.addr_type],
410                                  self.tun_if.local_addr[p.addr_type])
411         p.tun_sa_in.add_vpp_config()
412
413     def config_protect(self, p):
414         p.tun_protect = VppIpsecTunProtect(self,
415                                            p.tun_if,
416                                            p.tun_sa_out,
417                                            [p.tun_sa_in])
418         p.tun_protect.add_vpp_config()
419
420     def config_network(self, p):
421         if hasattr(p, 'tun_dst'):
422             tun_dst = p.tun_dst
423         else:
424             tun_dst = self.pg0.remote_ip6
425         p.tun_if = VppIpIpTunInterface(self, self.pg0,
426                                        self.pg0.local_ip6,
427                                        tun_dst)
428         p.tun_if.add_vpp_config()
429         p.tun_if.admin_up()
430         p.tun_if.config_ip6()
431         p.tun_if.config_ip4()
432
433         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
434                              [VppRoutePath(p.tun_if.remote_ip6,
435                                            0xffffffff,
436                                            proto=DpoProto.DPO_PROTO_IP6)])
437         p.route.add_vpp_config()
438         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
439                        [VppRoutePath(p.tun_if.remote_ip4,
440                                      0xffffffff)])
441         r.add_vpp_config()
442
443     def unconfig_network(self, p):
444         p.route.remove_vpp_config()
445         p.tun_if.remove_vpp_config()
446
447     def unconfig_protect(self, p):
448         p.tun_protect.remove_vpp_config()
449
450     def unconfig_sa(self, p):
451         p.tun_sa_out.remove_vpp_config()
452         p.tun_sa_in.remove_vpp_config()
453
454
455 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect,
456                              TemplateIpsec):
457     """ IPsec tunnel interface tests """
458
459     encryption_type = ESP
460
461     def setUp(self):
462         super(TemplateIpsec6TunIfEsp, self).setUp()
463
464         self.tun_if = self.pg0
465
466         p = self.ipv6_params
467         self.config_network(p)
468         self.config_sa_tra(p)
469         self.config_protect(p)
470
471     def tearDown(self):
472         super(TemplateIpsec6TunIfEsp, self).tearDown()
473
474
475 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
476                           IpsecTun6Tests):
477     """ Ipsec ESP - TUN tests """
478     tun6_encrypt_node_name = "esp6-encrypt-tun"
479     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
480
481     def test_tun_basic46(self):
482         """ ipsec 4o6 tunnel basic test """
483         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
484         self.verify_tun_46(self.params[socket.AF_INET6], count=1)
485
486     def test_tun_burst46(self):
487         """ ipsec 4o6 tunnel burst test """
488         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
489         self.verify_tun_46(self.params[socket.AF_INET6], count=257)
490
491
492 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
493                                 IpsecTun6HandoffTests):
494     """ Ipsec ESP 6 Handoff tests """
495     tun6_encrypt_node_name = "esp6-encrypt-tun"
496     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
497
498     def test_tun_handoff_66_police(self):
499         """ ESP 6o6 tunnel with policer worker hand-off test """
500         self.vapi.cli("clear errors")
501         self.vapi.cli("clear ipsec sa")
502
503         N_PKTS = 15
504         p = self.params[socket.AF_INET6]
505
506         action_tx = PolicerAction(
507             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
508             0)
509         policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
510                              conform_action=action_tx,
511                              exceed_action=action_tx,
512                              violate_action=action_tx)
513         policer.add_vpp_config()
514
515         # Start policing on tun
516         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
517
518         for pol_bind in [1, 0]:
519             policer.bind_vpp_config(pol_bind, True)
520
521             # inject alternately on worker 0 and 1.
522             for worker in [0, 1, 0, 1]:
523                 send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa,
524                                                    self.tun_if,
525                                                    src=p.remote_tun_if_host,
526                                                    dst=self.pg1.remote_ip6,
527                                                    count=N_PKTS)
528                 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
529                                                  self.pg1, worker=worker)
530                 self.verify_decrypted6(p, recv_pkts)
531                 self.logger.debug(self.vapi.cli("show trace max 100"))
532
533             stats = policer.get_stats()
534             stats0 = policer.get_stats(worker=0)
535             stats1 = policer.get_stats(worker=1)
536
537             if pol_bind == 1:
538                 # First pass: Worker 1, should have done all the policing
539                 self.assertEqual(stats, stats1)
540
541                 # Worker 0, should have handed everything off
542                 self.assertEqual(stats0['conform_packets'], 0)
543                 self.assertEqual(stats0['exceed_packets'], 0)
544                 self.assertEqual(stats0['violate_packets'], 0)
545             else:
546                 # Second pass: both workers should have policed equal amounts
547                 self.assertGreater(stats1['conform_packets'], 0)
548                 self.assertEqual(stats1['exceed_packets'], 0)
549                 self.assertGreater(stats1['violate_packets'], 0)
550
551                 self.assertGreater(stats0['conform_packets'], 0)
552                 self.assertEqual(stats0['exceed_packets'], 0)
553                 self.assertGreater(stats0['violate_packets'], 0)
554
555                 self.assertEqual(stats0['conform_packets'] +
556                                  stats0['violate_packets'],
557                                  stats1['conform_packets'] +
558                                  stats1['violate_packets'])
559
560         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
561         policer.remove_vpp_config()
562
563
564 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
565                                 IpsecTun4HandoffTests):
566     """ Ipsec ESP 4 Handoff tests """
567     tun4_encrypt_node_name = "esp4-encrypt-tun"
568     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
569
570     def test_tun_handoff_44_police(self):
571         """ ESP 4o4 tunnel with policer worker hand-off test """
572         self.vapi.cli("clear errors")
573         self.vapi.cli("clear ipsec sa")
574
575         N_PKTS = 15
576         p = self.params[socket.AF_INET]
577
578         action_tx = PolicerAction(
579             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
580             0)
581         policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
582                              conform_action=action_tx,
583                              exceed_action=action_tx,
584                              violate_action=action_tx)
585         policer.add_vpp_config()
586
587         # Start policing on tun
588         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
589
590         for pol_bind in [1, 0]:
591             policer.bind_vpp_config(pol_bind, True)
592
593             # inject alternately on worker 0 and 1.
594             for worker in [0, 1, 0, 1]:
595                 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa,
596                                                   self.tun_if,
597                                                   src=p.remote_tun_if_host,
598                                                   dst=self.pg1.remote_ip4,
599                                                   count=N_PKTS)
600                 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
601                                                  self.pg1, worker=worker)
602                 self.verify_decrypted(p, recv_pkts)
603                 self.logger.debug(self.vapi.cli("show trace max 100"))
604
605             stats = policer.get_stats()
606             stats0 = policer.get_stats(worker=0)
607             stats1 = policer.get_stats(worker=1)
608
609             if pol_bind == 1:
610                 # First pass: Worker 1, should have done all the policing
611                 self.assertEqual(stats, stats1)
612
613                 # Worker 0, should have handed everything off
614                 self.assertEqual(stats0['conform_packets'], 0)
615                 self.assertEqual(stats0['exceed_packets'], 0)
616                 self.assertEqual(stats0['violate_packets'], 0)
617             else:
618                 # Second pass: both workers should have policed equal amounts
619                 self.assertGreater(stats1['conform_packets'], 0)
620                 self.assertEqual(stats1['exceed_packets'], 0)
621                 self.assertGreater(stats1['violate_packets'], 0)
622
623                 self.assertGreater(stats0['conform_packets'], 0)
624                 self.assertEqual(stats0['exceed_packets'], 0)
625                 self.assertGreater(stats0['violate_packets'], 0)
626
627                 self.assertEqual(stats0['conform_packets'] +
628                                  stats0['violate_packets'],
629                                  stats1['conform_packets'] +
630                                  stats1['violate_packets'])
631
632         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
633         policer.remove_vpp_config()
634
635
636 @tag_fixme_vpp_workers
637 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
638                               TemplateIpsec,
639                               IpsecTun4):
640     """ IPsec IPv4 Multi Tunnel interface """
641
642     encryption_type = ESP
643     tun4_encrypt_node_name = "esp4-encrypt-tun"
644     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
645
646     def setUp(self):
647         super(TestIpsec4MultiTunIfEsp, self).setUp()
648
649         self.tun_if = self.pg0
650
651         self.multi_params = []
652         self.pg0.generate_remote_hosts(10)
653         self.pg0.configure_ipv4_neighbors()
654
655         for ii in range(10):
656             p = copy.copy(self.ipv4_params)
657
658             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
659             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
660             p.scapy_tun_spi = p.scapy_tun_spi + ii
661             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
662             p.vpp_tun_spi = p.vpp_tun_spi + ii
663
664             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
665             p.scapy_tra_spi = p.scapy_tra_spi + ii
666             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
667             p.vpp_tra_spi = p.vpp_tra_spi + ii
668             p.tun_dst = self.pg0.remote_hosts[ii].ip4
669
670             self.multi_params.append(p)
671             self.config_network(p)
672             self.config_sa_tra(p)
673             self.config_protect(p)
674
675     def tearDown(self):
676         super(TestIpsec4MultiTunIfEsp, self).tearDown()
677
678     def test_tun_44(self):
679         """Multiple IPSEC tunnel interfaces """
680         for p in self.multi_params:
681             self.verify_tun_44(p, count=127)
682             self.assertEqual(p.tun_if.get_rx_stats(), 127)
683             self.assertEqual(p.tun_if.get_tx_stats(), 127)
684
685     def test_tun_rr_44(self):
686         """ Round-robin packets acrros multiple interface """
687         tx = []
688         for p in self.multi_params:
689             tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
690                                             src=p.remote_tun_if_host,
691                                             dst=self.pg1.remote_ip4)
692         rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
693
694         for rx, p in zip(rxs, self.multi_params):
695             self.verify_decrypted(p, [rx])
696
697         tx = []
698         for p in self.multi_params:
699             tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
700                                     dst=p.remote_tun_if_host)
701         rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
702
703         for rx, p in zip(rxs, self.multi_params):
704             self.verify_encrypted(p, p.vpp_tun_sa, [rx])
705
706
707 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
708                             TemplateIpsec,
709                             IpsecTun4):
710     """ IPsec IPv4 Tunnel interface all Algos """
711
712     encryption_type = ESP
713     tun4_encrypt_node_name = "esp4-encrypt-tun"
714     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
715
716     def setUp(self):
717         super(TestIpsec4TunIfEspAll, self).setUp()
718
719         self.tun_if = self.pg0
720         p = self.ipv4_params
721
722         self.config_network(p)
723         self.config_sa_tra(p)
724         self.config_protect(p)
725
726     def tearDown(self):
727         p = self.ipv4_params
728         self.unconfig_protect(p)
729         self.unconfig_network(p)
730         self.unconfig_sa(p)
731
732         super(TestIpsec4TunIfEspAll, self).tearDown()
733
734     def rekey(self, p):
735         #
736         # change the key and the SPI
737         #
738         np = copy.copy(p)
739         p.crypt_key = b'X' + p.crypt_key[1:]
740         p.scapy_tun_spi += 1
741         p.scapy_tun_sa_id += 1
742         p.vpp_tun_spi += 1
743         p.vpp_tun_sa_id += 1
744         p.tun_if.local_spi = p.vpp_tun_spi
745         p.tun_if.remote_spi = p.scapy_tun_spi
746
747         config_tun_params(p, self.encryption_type, p.tun_if)
748
749         p.tun_sa_out = VppIpsecSA(self,
750                                   p.scapy_tun_sa_id,
751                                   p.scapy_tun_spi,
752                                   p.auth_algo_vpp_id,
753                                   p.auth_key,
754                                   p.crypt_algo_vpp_id,
755                                   p.crypt_key,
756                                   self.vpp_esp_protocol,
757                                   flags=p.flags,
758                                   salt=p.salt)
759         p.tun_sa_in = VppIpsecSA(self,
760                                  p.vpp_tun_sa_id,
761                                  p.vpp_tun_spi,
762                                  p.auth_algo_vpp_id,
763                                  p.auth_key,
764                                  p.crypt_algo_vpp_id,
765                                  p.crypt_key,
766                                  self.vpp_esp_protocol,
767                                  flags=p.flags,
768                                  salt=p.salt)
769         p.tun_sa_in.add_vpp_config()
770         p.tun_sa_out.add_vpp_config()
771
772         self.config_protect(p)
773         np.tun_sa_out.remove_vpp_config()
774         np.tun_sa_in.remove_vpp_config()
775         self.logger.info(self.vapi.cli("sh ipsec sa"))
776
777     def test_tun_44(self):
778         """IPSEC tunnel all algos """
779
780         # foreach VPP crypto engine
781         engines = ["ia32", "ipsecmb", "openssl"]
782
783         # foreach crypto algorithm
784         algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
785                                  IPSEC_API_CRYPTO_ALG_AES_GCM_128),
786                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
787                                 IPSEC_API_INTEG_ALG_NONE),
788                   'scapy-crypto': "AES-GCM",
789                   'scapy-integ': "NULL",
790                   'key': b"JPjyOWBeVEQiMe7h",
791                   'salt': 3333},
792                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
793                                  IPSEC_API_CRYPTO_ALG_AES_GCM_192),
794                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
795                                 IPSEC_API_INTEG_ALG_NONE),
796                   'scapy-crypto': "AES-GCM",
797                   'scapy-integ': "NULL",
798                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
799                   'salt': 0},
800                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
801                                  IPSEC_API_CRYPTO_ALG_AES_GCM_256),
802                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
803                                 IPSEC_API_INTEG_ALG_NONE),
804                   'scapy-crypto': "AES-GCM",
805                   'scapy-integ': "NULL",
806                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
807                   'salt': 9999},
808                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
809                                  IPSEC_API_CRYPTO_ALG_AES_CBC_128),
810                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
811                                 IPSEC_API_INTEG_ALG_SHA1_96),
812                   'scapy-crypto': "AES-CBC",
813                   'scapy-integ': "HMAC-SHA1-96",
814                   'salt': 0,
815                   'key': b"JPjyOWBeVEQiMe7h"},
816                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
817                                  IPSEC_API_CRYPTO_ALG_AES_CBC_192),
818                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
819                                 IPSEC_API_INTEG_ALG_SHA_512_256),
820                   'scapy-crypto': "AES-CBC",
821                   'scapy-integ': "SHA2-512-256",
822                   'salt': 0,
823                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
824                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
825                                  IPSEC_API_CRYPTO_ALG_AES_CBC_256),
826                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
827                                 IPSEC_API_INTEG_ALG_SHA_256_128),
828                   'scapy-crypto': "AES-CBC",
829                   'scapy-integ': "SHA2-256-128",
830                   'salt': 0,
831                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
832                  {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
833                                  IPSEC_API_CRYPTO_ALG_NONE),
834                   'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
835                                 IPSEC_API_INTEG_ALG_SHA1_96),
836                   'scapy-crypto': "NULL",
837                   'scapy-integ': "HMAC-SHA1-96",
838                   'salt': 0,
839                   'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
840
841         for engine in engines:
842             self.vapi.cli("set crypto handler all %s" % engine)
843
844             #
845             # loop through each of the algorithms
846             #
847             for algo in algos:
848                 # with self.subTest(algo=algo['scapy']):
849
850                 p = self.ipv4_params
851                 p.auth_algo_vpp_id = algo['vpp-integ']
852                 p.crypt_algo_vpp_id = algo['vpp-crypto']
853                 p.crypt_algo = algo['scapy-crypto']
854                 p.auth_algo = algo['scapy-integ']
855                 p.crypt_key = algo['key']
856                 p.salt = algo['salt']
857
858                 #
859                 # rekey the tunnel
860                 #
861                 self.rekey(p)
862                 self.verify_tun_44(p, count=127)
863
864
865 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
866                                TemplateIpsec,
867                                IpsecTun4):
868     """ IPsec IPv4 Tunnel interface no Algos """
869
870     encryption_type = ESP
871     tun4_encrypt_node_name = "esp4-encrypt-tun"
872     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
873
874     def setUp(self):
875         super(TestIpsec4TunIfEspNoAlgo, self).setUp()
876
877         self.tun_if = self.pg0
878         p = self.ipv4_params
879         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
880                               IPSEC_API_INTEG_ALG_NONE)
881         p.auth_algo = 'NULL'
882         p.auth_key = []
883
884         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
885                                IPSEC_API_CRYPTO_ALG_NONE)
886         p.crypt_algo = 'NULL'
887         p.crypt_key = []
888
889     def tearDown(self):
890         super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
891
892     def test_tun_44(self):
893         """ IPSec SA with NULL algos """
894         p = self.ipv4_params
895
896         self.config_network(p)
897         self.config_sa_tra(p)
898         self.config_protect(p)
899
900         tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
901                            dst=p.remote_tun_if_host)
902         self.send_and_assert_no_replies(self.pg1, tx)
903
904         self.unconfig_protect(p)
905         self.unconfig_sa(p)
906         self.unconfig_network(p)
907
908
909 @tag_fixme_vpp_workers
910 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect,
911                               TemplateIpsec,
912                               IpsecTun6):
913     """ IPsec IPv6 Multi Tunnel interface """
914
915     encryption_type = ESP
916     tun6_encrypt_node_name = "esp6-encrypt-tun"
917     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
918
919     def setUp(self):
920         super(TestIpsec6MultiTunIfEsp, self).setUp()
921
922         self.tun_if = self.pg0
923
924         self.multi_params = []
925         self.pg0.generate_remote_hosts(10)
926         self.pg0.configure_ipv6_neighbors()
927
928         for ii in range(10):
929             p = copy.copy(self.ipv6_params)
930
931             p.remote_tun_if_host = "1111::%d" % (ii + 1)
932             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
933             p.scapy_tun_spi = p.scapy_tun_spi + ii
934             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
935             p.vpp_tun_spi = p.vpp_tun_spi + ii
936
937             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
938             p.scapy_tra_spi = p.scapy_tra_spi + ii
939             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
940             p.vpp_tra_spi = p.vpp_tra_spi + ii
941             p.tun_dst = self.pg0.remote_hosts[ii].ip6
942
943             self.multi_params.append(p)
944             self.config_network(p)
945             self.config_sa_tra(p)
946             self.config_protect(p)
947
948     def tearDown(self):
949         super(TestIpsec6MultiTunIfEsp, self).tearDown()
950
951     def test_tun_66(self):
952         """Multiple IPSEC tunnel interfaces """
953         for p in self.multi_params:
954             self.verify_tun_66(p, count=127)
955             self.assertEqual(p.tun_if.get_rx_stats(), 127)
956             self.assertEqual(p.tun_if.get_tx_stats(), 127)
957
958
959 class TestIpsecGreTebIfEsp(TemplateIpsec,
960                            IpsecTun4Tests):
961     """ Ipsec GRE TEB ESP - TUN tests """
962     tun4_encrypt_node_name = "esp4-encrypt-tun"
963     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
964     encryption_type = ESP
965     omac = "00:11:22:33:44:55"
966
967     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
968                          payload_size=100):
969         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
970                 sa.encrypt(IP(src=self.pg0.remote_ip4,
971                               dst=self.pg0.local_ip4) /
972                            GRE() /
973                            Ether(dst=self.omac) /
974                            IP(src="1.1.1.1", dst="1.1.1.2") /
975                            UDP(sport=1144, dport=2233) /
976                            Raw(b'X' * payload_size))
977                 for i in range(count)]
978
979     def gen_pkts(self, sw_intf, src, dst, count=1,
980                  payload_size=100):
981         return [Ether(dst=self.omac) /
982                 IP(src="1.1.1.1", dst="1.1.1.2") /
983                 UDP(sport=1144, dport=2233) /
984                 Raw(b'X' * payload_size)
985                 for i in range(count)]
986
987     def verify_decrypted(self, p, rxs):
988         for rx in rxs:
989             self.assert_equal(rx[Ether].dst, self.omac)
990             self.assert_equal(rx[IP].dst, "1.1.1.2")
991
992     def verify_encrypted(self, p, sa, rxs):
993         for rx in rxs:
994             try:
995                 pkt = sa.decrypt(rx[IP])
996                 if not pkt.haslayer(IP):
997                     pkt = IP(pkt[Raw].load)
998                 self.assert_packet_checksums_valid(pkt)
999                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1000                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1001                 self.assertTrue(pkt.haslayer(GRE))
1002                 e = pkt[Ether]
1003                 self.assertEqual(e[Ether].dst, self.omac)
1004                 self.assertEqual(e[IP].dst, "1.1.1.2")
1005             except (IndexError, AssertionError):
1006                 self.logger.debug(ppp("Unexpected packet:", rx))
1007                 try:
1008                     self.logger.debug(ppp("Decrypted packet:", pkt))
1009                 except:
1010                     pass
1011                 raise
1012
1013     def setUp(self):
1014         super(TestIpsecGreTebIfEsp, self).setUp()
1015
1016         self.tun_if = self.pg0
1017
1018         p = self.ipv4_params
1019
1020         bd1 = VppBridgeDomain(self, 1)
1021         bd1.add_vpp_config()
1022
1023         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1024                                   p.auth_algo_vpp_id, p.auth_key,
1025                                   p.crypt_algo_vpp_id, p.crypt_key,
1026                                   self.vpp_esp_protocol,
1027                                   self.pg0.local_ip4,
1028                                   self.pg0.remote_ip4)
1029         p.tun_sa_out.add_vpp_config()
1030
1031         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1032                                  p.auth_algo_vpp_id, p.auth_key,
1033                                  p.crypt_algo_vpp_id, p.crypt_key,
1034                                  self.vpp_esp_protocol,
1035                                  self.pg0.remote_ip4,
1036                                  self.pg0.local_ip4)
1037         p.tun_sa_in.add_vpp_config()
1038
1039         p.tun_if = VppGreInterface(self,
1040                                    self.pg0.local_ip4,
1041                                    self.pg0.remote_ip4,
1042                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1043                                          GRE_API_TUNNEL_TYPE_TEB))
1044         p.tun_if.add_vpp_config()
1045
1046         p.tun_protect = VppIpsecTunProtect(self,
1047                                            p.tun_if,
1048                                            p.tun_sa_out,
1049                                            [p.tun_sa_in])
1050
1051         p.tun_protect.add_vpp_config()
1052
1053         p.tun_if.admin_up()
1054         p.tun_if.config_ip4()
1055         config_tun_params(p, self.encryption_type, p.tun_if)
1056
1057         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1058         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1059
1060         self.vapi.cli("clear ipsec sa")
1061         self.vapi.cli("sh adj")
1062         self.vapi.cli("sh ipsec tun")
1063
1064     def tearDown(self):
1065         p = self.ipv4_params
1066         p.tun_if.unconfig_ip4()
1067         super(TestIpsecGreTebIfEsp, self).tearDown()
1068
1069
1070 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
1071                                IpsecTun4Tests):
1072     """ Ipsec GRE TEB ESP - TUN tests """
1073     tun4_encrypt_node_name = "esp4-encrypt-tun"
1074     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1075     encryption_type = ESP
1076     omac = "00:11:22:33:44:55"
1077
1078     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1079                          payload_size=100):
1080         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1081                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1082                               dst=self.pg0.local_ip4) /
1083                            GRE() /
1084                            Ether(dst=self.omac) /
1085                            IP(src="1.1.1.1", dst="1.1.1.2") /
1086                            UDP(sport=1144, dport=2233) /
1087                            Raw(b'X' * payload_size))
1088                 for i in range(count)]
1089
1090     def gen_pkts(self, sw_intf, src, dst, count=1,
1091                  payload_size=100):
1092         return [Ether(dst=self.omac) /
1093                 Dot1Q(vlan=11) /
1094                 IP(src="1.1.1.1", dst="1.1.1.2") /
1095                 UDP(sport=1144, dport=2233) /
1096                 Raw(b'X' * payload_size)
1097                 for i in range(count)]
1098
1099     def verify_decrypted(self, p, rxs):
1100         for rx in rxs:
1101             self.assert_equal(rx[Ether].dst, self.omac)
1102             self.assert_equal(rx[Dot1Q].vlan, 11)
1103             self.assert_equal(rx[IP].dst, "1.1.1.2")
1104
1105     def verify_encrypted(self, p, sa, rxs):
1106         for rx in rxs:
1107             try:
1108                 pkt = sa.decrypt(rx[IP])
1109                 if not pkt.haslayer(IP):
1110                     pkt = IP(pkt[Raw].load)
1111                 self.assert_packet_checksums_valid(pkt)
1112                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1113                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1114                 self.assertTrue(pkt.haslayer(GRE))
1115                 e = pkt[Ether]
1116                 self.assertEqual(e[Ether].dst, self.omac)
1117                 self.assertFalse(e.haslayer(Dot1Q))
1118                 self.assertEqual(e[IP].dst, "1.1.1.2")
1119             except (IndexError, AssertionError):
1120                 self.logger.debug(ppp("Unexpected packet:", rx))
1121                 try:
1122                     self.logger.debug(ppp("Decrypted packet:", pkt))
1123                 except:
1124                     pass
1125                 raise
1126
1127     def setUp(self):
1128         super(TestIpsecGreTebVlanIfEsp, self).setUp()
1129
1130         self.tun_if = self.pg0
1131
1132         p = self.ipv4_params
1133
1134         bd1 = VppBridgeDomain(self, 1)
1135         bd1.add_vpp_config()
1136
1137         self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1138         self.vapi.l2_interface_vlan_tag_rewrite(
1139             sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
1140             push_dot1q=11)
1141         self.pg1_11.admin_up()
1142
1143         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1144                                   p.auth_algo_vpp_id, p.auth_key,
1145                                   p.crypt_algo_vpp_id, p.crypt_key,
1146                                   self.vpp_esp_protocol,
1147                                   self.pg0.local_ip4,
1148                                   self.pg0.remote_ip4)
1149         p.tun_sa_out.add_vpp_config()
1150
1151         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1152                                  p.auth_algo_vpp_id, p.auth_key,
1153                                  p.crypt_algo_vpp_id, p.crypt_key,
1154                                  self.vpp_esp_protocol,
1155                                  self.pg0.remote_ip4,
1156                                  self.pg0.local_ip4)
1157         p.tun_sa_in.add_vpp_config()
1158
1159         p.tun_if = VppGreInterface(self,
1160                                    self.pg0.local_ip4,
1161                                    self.pg0.remote_ip4,
1162                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1163                                          GRE_API_TUNNEL_TYPE_TEB))
1164         p.tun_if.add_vpp_config()
1165
1166         p.tun_protect = VppIpsecTunProtect(self,
1167                                            p.tun_if,
1168                                            p.tun_sa_out,
1169                                            [p.tun_sa_in])
1170
1171         p.tun_protect.add_vpp_config()
1172
1173         p.tun_if.admin_up()
1174         p.tun_if.config_ip4()
1175         config_tun_params(p, self.encryption_type, p.tun_if)
1176
1177         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1178         VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1179
1180         self.vapi.cli("clear ipsec sa")
1181
1182     def tearDown(self):
1183         p = self.ipv4_params
1184         p.tun_if.unconfig_ip4()
1185         super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1186         self.pg1_11.admin_down()
1187         self.pg1_11.remove_vpp_config()
1188
1189
1190 class TestIpsecGreTebIfEspTra(TemplateIpsec,
1191                               IpsecTun4Tests):
1192     """ Ipsec GRE TEB ESP - Tra tests """
1193     tun4_encrypt_node_name = "esp4-encrypt-tun"
1194     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1195     encryption_type = ESP
1196     omac = "00:11:22:33:44:55"
1197
1198     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1199                          payload_size=100):
1200         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1201                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1202                               dst=self.pg0.local_ip4) /
1203                            GRE() /
1204                            Ether(dst=self.omac) /
1205                            IP(src="1.1.1.1", dst="1.1.1.2") /
1206                            UDP(sport=1144, dport=2233) /
1207                            Raw(b'X' * payload_size))
1208                 for i in range(count)]
1209
1210     def gen_pkts(self, sw_intf, src, dst, count=1,
1211                  payload_size=100):
1212         return [Ether(dst=self.omac) /
1213                 IP(src="1.1.1.1", dst="1.1.1.2") /
1214                 UDP(sport=1144, dport=2233) /
1215                 Raw(b'X' * payload_size)
1216                 for i in range(count)]
1217
1218     def verify_decrypted(self, p, rxs):
1219         for rx in rxs:
1220             self.assert_equal(rx[Ether].dst, self.omac)
1221             self.assert_equal(rx[IP].dst, "1.1.1.2")
1222
1223     def verify_encrypted(self, p, sa, rxs):
1224         for rx in rxs:
1225             try:
1226                 pkt = sa.decrypt(rx[IP])
1227                 if not pkt.haslayer(IP):
1228                     pkt = IP(pkt[Raw].load)
1229                 self.assert_packet_checksums_valid(pkt)
1230                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1231                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1232                 self.assertTrue(pkt.haslayer(GRE))
1233                 e = pkt[Ether]
1234                 self.assertEqual(e[Ether].dst, self.omac)
1235                 self.assertEqual(e[IP].dst, "1.1.1.2")
1236             except (IndexError, AssertionError):
1237                 self.logger.debug(ppp("Unexpected packet:", rx))
1238                 try:
1239                     self.logger.debug(ppp("Decrypted packet:", pkt))
1240                 except:
1241                     pass
1242                 raise
1243
1244     def setUp(self):
1245         super(TestIpsecGreTebIfEspTra, self).setUp()
1246
1247         self.tun_if = self.pg0
1248
1249         p = self.ipv4_params
1250
1251         bd1 = VppBridgeDomain(self, 1)
1252         bd1.add_vpp_config()
1253
1254         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1255                                   p.auth_algo_vpp_id, p.auth_key,
1256                                   p.crypt_algo_vpp_id, p.crypt_key,
1257                                   self.vpp_esp_protocol)
1258         p.tun_sa_out.add_vpp_config()
1259
1260         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1261                                  p.auth_algo_vpp_id, p.auth_key,
1262                                  p.crypt_algo_vpp_id, p.crypt_key,
1263                                  self.vpp_esp_protocol)
1264         p.tun_sa_in.add_vpp_config()
1265
1266         p.tun_if = VppGreInterface(self,
1267                                    self.pg0.local_ip4,
1268                                    self.pg0.remote_ip4,
1269                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1270                                          GRE_API_TUNNEL_TYPE_TEB))
1271         p.tun_if.add_vpp_config()
1272
1273         p.tun_protect = VppIpsecTunProtect(self,
1274                                            p.tun_if,
1275                                            p.tun_sa_out,
1276                                            [p.tun_sa_in])
1277
1278         p.tun_protect.add_vpp_config()
1279
1280         p.tun_if.admin_up()
1281         p.tun_if.config_ip4()
1282         config_tra_params(p, self.encryption_type, p.tun_if)
1283
1284         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1285         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1286
1287         self.vapi.cli("clear ipsec sa")
1288
1289     def tearDown(self):
1290         p = self.ipv4_params
1291         p.tun_if.unconfig_ip4()
1292         super(TestIpsecGreTebIfEspTra, self).tearDown()
1293
1294
1295 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1296                                  IpsecTun4Tests):
1297     """ Ipsec GRE TEB UDP ESP - Tra tests """
1298     tun4_encrypt_node_name = "esp4-encrypt-tun"
1299     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1300     encryption_type = ESP
1301     omac = "00:11:22:33:44:55"
1302
1303     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1304                          payload_size=100):
1305         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1306                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1307                               dst=self.pg0.local_ip4) /
1308                            GRE() /
1309                            Ether(dst=self.omac) /
1310                            IP(src="1.1.1.1", dst="1.1.1.2") /
1311                            UDP(sport=1144, dport=2233) /
1312                            Raw(b'X' * payload_size))
1313                 for i in range(count)]
1314
1315     def gen_pkts(self, sw_intf, src, dst, count=1,
1316                  payload_size=100):
1317         return [Ether(dst=self.omac) /
1318                 IP(src="1.1.1.1", dst="1.1.1.2") /
1319                 UDP(sport=1144, dport=2233) /
1320                 Raw(b'X' * payload_size)
1321                 for i in range(count)]
1322
1323     def verify_decrypted(self, p, rxs):
1324         for rx in rxs:
1325             self.assert_equal(rx[Ether].dst, self.omac)
1326             self.assert_equal(rx[IP].dst, "1.1.1.2")
1327
1328     def verify_encrypted(self, p, sa, rxs):
1329         for rx in rxs:
1330             self.assertTrue(rx.haslayer(UDP))
1331             self.assertEqual(rx[UDP].dport, 4545)
1332             self.assertEqual(rx[UDP].sport, 5454)
1333             try:
1334                 pkt = sa.decrypt(rx[IP])
1335                 if not pkt.haslayer(IP):
1336                     pkt = IP(pkt[Raw].load)
1337                 self.assert_packet_checksums_valid(pkt)
1338                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1339                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1340                 self.assertTrue(pkt.haslayer(GRE))
1341                 e = pkt[Ether]
1342                 self.assertEqual(e[Ether].dst, self.omac)
1343                 self.assertEqual(e[IP].dst, "1.1.1.2")
1344             except (IndexError, AssertionError):
1345                 self.logger.debug(ppp("Unexpected packet:", rx))
1346                 try:
1347                     self.logger.debug(ppp("Decrypted packet:", pkt))
1348                 except:
1349                     pass
1350                 raise
1351
1352     def setUp(self):
1353         super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1354
1355         self.tun_if = self.pg0
1356
1357         p = self.ipv4_params
1358         p = self.ipv4_params
1359         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1360                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
1361         p.nat_header = UDP(sport=5454, dport=4545)
1362
1363         bd1 = VppBridgeDomain(self, 1)
1364         bd1.add_vpp_config()
1365
1366         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1367                                   p.auth_algo_vpp_id, p.auth_key,
1368                                   p.crypt_algo_vpp_id, p.crypt_key,
1369                                   self.vpp_esp_protocol,
1370                                   flags=p.flags,
1371                                   udp_src=5454,
1372                                   udp_dst=4545)
1373         p.tun_sa_out.add_vpp_config()
1374
1375         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1376                                  p.auth_algo_vpp_id, p.auth_key,
1377                                  p.crypt_algo_vpp_id, p.crypt_key,
1378                                  self.vpp_esp_protocol,
1379                                  flags=(p.flags |
1380                                         VppEnum.vl_api_ipsec_sad_flags_t.
1381                                         IPSEC_API_SAD_FLAG_IS_INBOUND),
1382                                  udp_src=4545,
1383                                  udp_dst=5454)
1384         p.tun_sa_in.add_vpp_config()
1385
1386         p.tun_if = VppGreInterface(self,
1387                                    self.pg0.local_ip4,
1388                                    self.pg0.remote_ip4,
1389                                    type=(VppEnum.vl_api_gre_tunnel_type_t.
1390                                          GRE_API_TUNNEL_TYPE_TEB))
1391         p.tun_if.add_vpp_config()
1392
1393         p.tun_protect = VppIpsecTunProtect(self,
1394                                            p.tun_if,
1395                                            p.tun_sa_out,
1396                                            [p.tun_sa_in])
1397
1398         p.tun_protect.add_vpp_config()
1399
1400         p.tun_if.admin_up()
1401         p.tun_if.config_ip4()
1402         config_tra_params(p, self.encryption_type, p.tun_if)
1403
1404         VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1405         VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1406
1407         self.vapi.cli("clear ipsec sa")
1408         self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1409
1410     def tearDown(self):
1411         p = self.ipv4_params
1412         p.tun_if.unconfig_ip4()
1413         super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1414
1415
1416 class TestIpsecGreIfEsp(TemplateIpsec,
1417                         IpsecTun4Tests):
1418     """ Ipsec GRE ESP - TUN tests """
1419     tun4_encrypt_node_name = "esp4-encrypt-tun"
1420     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1421     encryption_type = ESP
1422
1423     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1424                          payload_size=100):
1425         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1426                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1427                               dst=self.pg0.local_ip4) /
1428                            GRE() /
1429                            IP(src=self.pg1.local_ip4,
1430                               dst=self.pg1.remote_ip4) /
1431                            UDP(sport=1144, dport=2233) /
1432                            Raw(b'X' * payload_size))
1433                 for i in range(count)]
1434
1435     def gen_pkts(self, sw_intf, src, dst, count=1,
1436                  payload_size=100):
1437         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1438                 IP(src="1.1.1.1", dst="1.1.1.2") /
1439                 UDP(sport=1144, dport=2233) /
1440                 Raw(b'X' * payload_size)
1441                 for i in range(count)]
1442
1443     def verify_decrypted(self, p, rxs):
1444         for rx in rxs:
1445             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1446             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1447
1448     def verify_encrypted(self, p, sa, rxs):
1449         for rx in rxs:
1450             try:
1451                 pkt = sa.decrypt(rx[IP])
1452                 if not pkt.haslayer(IP):
1453                     pkt = IP(pkt[Raw].load)
1454                 self.assert_packet_checksums_valid(pkt)
1455                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1456                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1457                 self.assertTrue(pkt.haslayer(GRE))
1458                 e = pkt[GRE]
1459                 self.assertEqual(e[IP].dst, "1.1.1.2")
1460             except (IndexError, AssertionError):
1461                 self.logger.debug(ppp("Unexpected packet:", rx))
1462                 try:
1463                     self.logger.debug(ppp("Decrypted packet:", pkt))
1464                 except:
1465                     pass
1466                 raise
1467
1468     def setUp(self):
1469         super(TestIpsecGreIfEsp, self).setUp()
1470
1471         self.tun_if = self.pg0
1472
1473         p = self.ipv4_params
1474
1475         bd1 = VppBridgeDomain(self, 1)
1476         bd1.add_vpp_config()
1477
1478         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1479                                   p.auth_algo_vpp_id, p.auth_key,
1480                                   p.crypt_algo_vpp_id, p.crypt_key,
1481                                   self.vpp_esp_protocol,
1482                                   self.pg0.local_ip4,
1483                                   self.pg0.remote_ip4)
1484         p.tun_sa_out.add_vpp_config()
1485
1486         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1487                                  p.auth_algo_vpp_id, p.auth_key,
1488                                  p.crypt_algo_vpp_id, p.crypt_key,
1489                                  self.vpp_esp_protocol,
1490                                  self.pg0.remote_ip4,
1491                                  self.pg0.local_ip4)
1492         p.tun_sa_in.add_vpp_config()
1493
1494         p.tun_if = VppGreInterface(self,
1495                                    self.pg0.local_ip4,
1496                                    self.pg0.remote_ip4)
1497         p.tun_if.add_vpp_config()
1498
1499         p.tun_protect = VppIpsecTunProtect(self,
1500                                            p.tun_if,
1501                                            p.tun_sa_out,
1502                                            [p.tun_sa_in])
1503         p.tun_protect.add_vpp_config()
1504
1505         p.tun_if.admin_up()
1506         p.tun_if.config_ip4()
1507         config_tun_params(p, self.encryption_type, p.tun_if)
1508
1509         VppIpRoute(self, "1.1.1.2", 32,
1510                    [VppRoutePath(p.tun_if.remote_ip4,
1511                                  0xffffffff)]).add_vpp_config()
1512
1513     def tearDown(self):
1514         p = self.ipv4_params
1515         p.tun_if.unconfig_ip4()
1516         super(TestIpsecGreIfEsp, self).tearDown()
1517
1518
1519 class TestIpsecGreIfEspTra(TemplateIpsec,
1520                            IpsecTun4Tests):
1521     """ Ipsec GRE ESP - TRA tests """
1522     tun4_encrypt_node_name = "esp4-encrypt-tun"
1523     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1524     encryption_type = ESP
1525
1526     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1527                          payload_size=100):
1528         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1529                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1530                               dst=self.pg0.local_ip4) /
1531                            GRE() /
1532                            IP(src=self.pg1.local_ip4,
1533                               dst=self.pg1.remote_ip4) /
1534                            UDP(sport=1144, dport=2233) /
1535                            Raw(b'X' * payload_size))
1536                 for i in range(count)]
1537
1538     def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1539                                 payload_size=100):
1540         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1541                 sa.encrypt(IP(src=self.pg0.remote_ip4,
1542                               dst=self.pg0.local_ip4) /
1543                            GRE() /
1544                            UDP(sport=1144, dport=2233) /
1545                            Raw(b'X' * payload_size))
1546                 for i in range(count)]
1547
1548     def gen_pkts(self, sw_intf, src, dst, count=1,
1549                  payload_size=100):
1550         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1551                 IP(src="1.1.1.1", dst="1.1.1.2") /
1552                 UDP(sport=1144, dport=2233) /
1553                 Raw(b'X' * payload_size)
1554                 for i in range(count)]
1555
1556     def verify_decrypted(self, p, rxs):
1557         for rx in rxs:
1558             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1559             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1560
1561     def verify_encrypted(self, p, sa, rxs):
1562         for rx in rxs:
1563             try:
1564                 pkt = sa.decrypt(rx[IP])
1565                 if not pkt.haslayer(IP):
1566                     pkt = IP(pkt[Raw].load)
1567                 self.assert_packet_checksums_valid(pkt)
1568                 self.assertTrue(pkt.haslayer(GRE))
1569                 e = pkt[GRE]
1570                 self.assertEqual(e[IP].dst, "1.1.1.2")
1571             except (IndexError, AssertionError):
1572                 self.logger.debug(ppp("Unexpected packet:", rx))
1573                 try:
1574                     self.logger.debug(ppp("Decrypted packet:", pkt))
1575                 except:
1576                     pass
1577                 raise
1578
1579     def setUp(self):
1580         super(TestIpsecGreIfEspTra, self).setUp()
1581
1582         self.tun_if = self.pg0
1583
1584         p = self.ipv4_params
1585
1586         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1587                                   p.auth_algo_vpp_id, p.auth_key,
1588                                   p.crypt_algo_vpp_id, p.crypt_key,
1589                                   self.vpp_esp_protocol)
1590         p.tun_sa_out.add_vpp_config()
1591
1592         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1593                                  p.auth_algo_vpp_id, p.auth_key,
1594                                  p.crypt_algo_vpp_id, p.crypt_key,
1595                                  self.vpp_esp_protocol)
1596         p.tun_sa_in.add_vpp_config()
1597
1598         p.tun_if = VppGreInterface(self,
1599                                    self.pg0.local_ip4,
1600                                    self.pg0.remote_ip4)
1601         p.tun_if.add_vpp_config()
1602
1603         p.tun_protect = VppIpsecTunProtect(self,
1604                                            p.tun_if,
1605                                            p.tun_sa_out,
1606                                            [p.tun_sa_in])
1607         p.tun_protect.add_vpp_config()
1608
1609         p.tun_if.admin_up()
1610         p.tun_if.config_ip4()
1611         config_tra_params(p, self.encryption_type, p.tun_if)
1612
1613         VppIpRoute(self, "1.1.1.2", 32,
1614                    [VppRoutePath(p.tun_if.remote_ip4,
1615                                  0xffffffff)]).add_vpp_config()
1616
1617     def tearDown(self):
1618         p = self.ipv4_params
1619         p.tun_if.unconfig_ip4()
1620         super(TestIpsecGreIfEspTra, self).tearDown()
1621
1622     def test_gre_non_ip(self):
1623         p = self.ipv4_params
1624         tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1625                                           src=p.remote_tun_if_host,
1626                                           dst=self.pg1.remote_ip6)
1627         self.send_and_assert_no_replies(self.tun_if, tx)
1628         node_name = ('/err/%s/unsupported payload' %
1629                      self.tun4_decrypt_node_name[0])
1630         self.assertEqual(1, self.statistics.get_err_counter(node_name))
1631
1632
1633 class TestIpsecGre6IfEspTra(TemplateIpsec,
1634                             IpsecTun6Tests):
1635     """ Ipsec GRE ESP - TRA tests """
1636     tun6_encrypt_node_name = "esp6-encrypt-tun"
1637     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1638     encryption_type = ESP
1639
1640     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1641                           payload_size=100):
1642         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1643                 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1644                                 dst=self.pg0.local_ip6) /
1645                            GRE() /
1646                            IPv6(src=self.pg1.local_ip6,
1647                                 dst=self.pg1.remote_ip6) /
1648                            UDP(sport=1144, dport=2233) /
1649                            Raw(b'X' * payload_size))
1650                 for i in range(count)]
1651
1652     def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1653                   payload_size=100):
1654         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1655                 IPv6(src="1::1", dst="1::2") /
1656                 UDP(sport=1144, dport=2233) /
1657                 Raw(b'X' * payload_size)
1658                 for i in range(count)]
1659
1660     def verify_decrypted6(self, p, rxs):
1661         for rx in rxs:
1662             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1663             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1664
1665     def verify_encrypted6(self, p, sa, rxs):
1666         for rx in rxs:
1667             try:
1668                 pkt = sa.decrypt(rx[IPv6])
1669                 if not pkt.haslayer(IPv6):
1670                     pkt = IPv6(pkt[Raw].load)
1671                 self.assert_packet_checksums_valid(pkt)
1672                 self.assertTrue(pkt.haslayer(GRE))
1673                 e = pkt[GRE]
1674                 self.assertEqual(e[IPv6].dst, "1::2")
1675             except (IndexError, AssertionError):
1676                 self.logger.debug(ppp("Unexpected packet:", rx))
1677                 try:
1678                     self.logger.debug(ppp("Decrypted packet:", pkt))
1679                 except:
1680                     pass
1681                 raise
1682
1683     def setUp(self):
1684         super(TestIpsecGre6IfEspTra, self).setUp()
1685
1686         self.tun_if = self.pg0
1687
1688         p = self.ipv6_params
1689
1690         bd1 = VppBridgeDomain(self, 1)
1691         bd1.add_vpp_config()
1692
1693         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1694                                   p.auth_algo_vpp_id, p.auth_key,
1695                                   p.crypt_algo_vpp_id, p.crypt_key,
1696                                   self.vpp_esp_protocol)
1697         p.tun_sa_out.add_vpp_config()
1698
1699         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1700                                  p.auth_algo_vpp_id, p.auth_key,
1701                                  p.crypt_algo_vpp_id, p.crypt_key,
1702                                  self.vpp_esp_protocol)
1703         p.tun_sa_in.add_vpp_config()
1704
1705         p.tun_if = VppGreInterface(self,
1706                                    self.pg0.local_ip6,
1707                                    self.pg0.remote_ip6)
1708         p.tun_if.add_vpp_config()
1709
1710         p.tun_protect = VppIpsecTunProtect(self,
1711                                            p.tun_if,
1712                                            p.tun_sa_out,
1713                                            [p.tun_sa_in])
1714         p.tun_protect.add_vpp_config()
1715
1716         p.tun_if.admin_up()
1717         p.tun_if.config_ip6()
1718         config_tra_params(p, self.encryption_type, p.tun_if)
1719
1720         r = VppIpRoute(self, "1::2", 128,
1721                        [VppRoutePath(p.tun_if.remote_ip6,
1722                                      0xffffffff,
1723                                      proto=DpoProto.DPO_PROTO_IP6)])
1724         r.add_vpp_config()
1725
1726     def tearDown(self):
1727         p = self.ipv6_params
1728         p.tun_if.unconfig_ip6()
1729         super(TestIpsecGre6IfEspTra, self).tearDown()
1730
1731
1732 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1733     """ Ipsec mGRE ESP v4 TRA tests """
1734     tun4_encrypt_node_name = "esp4-encrypt-tun"
1735     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1736     encryption_type = ESP
1737
1738     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1739                          payload_size=100):
1740         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1741                 sa.encrypt(IP(src=p.tun_dst,
1742                               dst=self.pg0.local_ip4) /
1743                            GRE() /
1744                            IP(src=self.pg1.local_ip4,
1745                               dst=self.pg1.remote_ip4) /
1746                            UDP(sport=1144, dport=2233) /
1747                            Raw(b'X' * payload_size))
1748                 for i in range(count)]
1749
1750     def gen_pkts(self, sw_intf, src, dst, count=1,
1751                  payload_size=100):
1752         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1753                 IP(src="1.1.1.1", dst=dst) /
1754                 UDP(sport=1144, dport=2233) /
1755                 Raw(b'X' * payload_size)
1756                 for i in range(count)]
1757
1758     def verify_decrypted(self, p, rxs):
1759         for rx in rxs:
1760             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1761             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1762
1763     def verify_encrypted(self, p, sa, rxs):
1764         for rx in rxs:
1765             try:
1766                 pkt = sa.decrypt(rx[IP])
1767                 if not pkt.haslayer(IP):
1768                     pkt = IP(pkt[Raw].load)
1769                 self.assert_packet_checksums_valid(pkt)
1770                 self.assertTrue(pkt.haslayer(GRE))
1771                 e = pkt[GRE]
1772                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1773             except (IndexError, AssertionError):
1774                 self.logger.debug(ppp("Unexpected packet:", rx))
1775                 try:
1776                     self.logger.debug(ppp("Decrypted packet:", pkt))
1777                 except:
1778                     pass
1779                 raise
1780
1781     def setUp(self):
1782         super(TestIpsecMGreIfEspTra4, self).setUp()
1783
1784         N_NHS = 16
1785         self.tun_if = self.pg0
1786         p = self.ipv4_params
1787         p.tun_if = VppGreInterface(self,
1788                                    self.pg0.local_ip4,
1789                                    "0.0.0.0",
1790                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1791                                          TUNNEL_API_MODE_MP))
1792         p.tun_if.add_vpp_config()
1793         p.tun_if.admin_up()
1794         p.tun_if.config_ip4()
1795         p.tun_if.generate_remote_hosts(N_NHS)
1796         self.pg0.generate_remote_hosts(N_NHS)
1797         self.pg0.configure_ipv4_neighbors()
1798
1799         # setup some SAs for several next-hops on the interface
1800         self.multi_params = []
1801
1802         for ii in range(N_NHS):
1803             p = copy.copy(self.ipv4_params)
1804
1805             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1806             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1807             p.scapy_tun_spi = p.scapy_tun_spi + ii
1808             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1809             p.vpp_tun_spi = p.vpp_tun_spi + ii
1810
1811             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1812             p.scapy_tra_spi = p.scapy_tra_spi + ii
1813             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1814             p.vpp_tra_spi = p.vpp_tra_spi + ii
1815             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1816                                       p.auth_algo_vpp_id, p.auth_key,
1817                                       p.crypt_algo_vpp_id, p.crypt_key,
1818                                       self.vpp_esp_protocol)
1819             p.tun_sa_out.add_vpp_config()
1820
1821             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1822                                      p.auth_algo_vpp_id, p.auth_key,
1823                                      p.crypt_algo_vpp_id, p.crypt_key,
1824                                      self.vpp_esp_protocol)
1825             p.tun_sa_in.add_vpp_config()
1826
1827             p.tun_protect = VppIpsecTunProtect(
1828                 self,
1829                 p.tun_if,
1830                 p.tun_sa_out,
1831                 [p.tun_sa_in],
1832                 nh=p.tun_if.remote_hosts[ii].ip4)
1833             p.tun_protect.add_vpp_config()
1834             config_tra_params(p, self.encryption_type, p.tun_if)
1835             self.multi_params.append(p)
1836
1837             VppIpRoute(self, p.remote_tun_if_host, 32,
1838                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1839                                      p.tun_if.sw_if_index)]).add_vpp_config()
1840
1841             # in this v4 variant add the teibs after the protect
1842             p.teib = VppTeib(self, p.tun_if,
1843                              p.tun_if.remote_hosts[ii].ip4,
1844                              self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1845             p.tun_dst = self.pg0.remote_hosts[ii].ip4
1846         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1847
1848     def tearDown(self):
1849         p = self.ipv4_params
1850         p.tun_if.unconfig_ip4()
1851         super(TestIpsecMGreIfEspTra4, self).tearDown()
1852
1853     def test_tun_44(self):
1854         """mGRE IPSEC 44"""
1855         N_PKTS = 63
1856         for p in self.multi_params:
1857             self.verify_tun_44(p, count=N_PKTS)
1858             p.teib.remove_vpp_config()
1859             self.verify_tun_dropped_44(p, count=N_PKTS)
1860             p.teib.add_vpp_config()
1861             self.verify_tun_44(p, count=N_PKTS)
1862
1863
1864 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1865     """ Ipsec mGRE ESP v6 TRA tests """
1866     tun6_encrypt_node_name = "esp6-encrypt-tun"
1867     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1868     encryption_type = ESP
1869
1870     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1871                           payload_size=100):
1872         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1873                 sa.encrypt(IPv6(src=p.tun_dst,
1874                                 dst=self.pg0.local_ip6) /
1875                            GRE() /
1876                            IPv6(src=self.pg1.local_ip6,
1877                                 dst=self.pg1.remote_ip6) /
1878                            UDP(sport=1144, dport=2233) /
1879                            Raw(b'X' * payload_size))
1880                 for i in range(count)]
1881
1882     def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1883                   payload_size=100):
1884         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1885                 IPv6(src="1::1", dst=dst) /
1886                 UDP(sport=1144, dport=2233) /
1887                 Raw(b'X' * payload_size)
1888                 for i in range(count)]
1889
1890     def verify_decrypted6(self, p, rxs):
1891         for rx in rxs:
1892             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1893             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1894
1895     def verify_encrypted6(self, p, sa, rxs):
1896         for rx in rxs:
1897             try:
1898                 pkt = sa.decrypt(rx[IPv6])
1899                 if not pkt.haslayer(IPv6):
1900                     pkt = IPv6(pkt[Raw].load)
1901                 self.assert_packet_checksums_valid(pkt)
1902                 self.assertTrue(pkt.haslayer(GRE))
1903                 e = pkt[GRE]
1904                 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1905             except (IndexError, AssertionError):
1906                 self.logger.debug(ppp("Unexpected packet:", rx))
1907                 try:
1908                     self.logger.debug(ppp("Decrypted packet:", pkt))
1909                 except:
1910                     pass
1911                 raise
1912
1913     def setUp(self):
1914         super(TestIpsecMGreIfEspTra6, self).setUp()
1915
1916         self.vapi.cli("set logging class ipsec level debug")
1917
1918         N_NHS = 16
1919         self.tun_if = self.pg0
1920         p = self.ipv6_params
1921         p.tun_if = VppGreInterface(self,
1922                                    self.pg0.local_ip6,
1923                                    "::",
1924                                    mode=(VppEnum.vl_api_tunnel_mode_t.
1925                                          TUNNEL_API_MODE_MP))
1926         p.tun_if.add_vpp_config()
1927         p.tun_if.admin_up()
1928         p.tun_if.config_ip6()
1929         p.tun_if.generate_remote_hosts(N_NHS)
1930         self.pg0.generate_remote_hosts(N_NHS)
1931         self.pg0.configure_ipv6_neighbors()
1932
1933         # setup some SAs for several next-hops on the interface
1934         self.multi_params = []
1935
1936         for ii in range(N_NHS):
1937             p = copy.copy(self.ipv6_params)
1938
1939             p.remote_tun_if_host = "1::%d" % (ii + 1)
1940             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1941             p.scapy_tun_spi = p.scapy_tun_spi + ii
1942             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1943             p.vpp_tun_spi = p.vpp_tun_spi + ii
1944
1945             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1946             p.scapy_tra_spi = p.scapy_tra_spi + ii
1947             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1948             p.vpp_tra_spi = p.vpp_tra_spi + ii
1949             p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1950                                       p.auth_algo_vpp_id, p.auth_key,
1951                                       p.crypt_algo_vpp_id, p.crypt_key,
1952                                       self.vpp_esp_protocol)
1953             p.tun_sa_out.add_vpp_config()
1954
1955             p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1956                                      p.auth_algo_vpp_id, p.auth_key,
1957                                      p.crypt_algo_vpp_id, p.crypt_key,
1958                                      self.vpp_esp_protocol)
1959             p.tun_sa_in.add_vpp_config()
1960
1961             # in this v6 variant add the teibs first then the protection
1962             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1963             VppTeib(self, p.tun_if,
1964                     p.tun_if.remote_hosts[ii].ip6,
1965                     p.tun_dst).add_vpp_config()
1966
1967             p.tun_protect = VppIpsecTunProtect(
1968                 self,
1969                 p.tun_if,
1970                 p.tun_sa_out,
1971                 [p.tun_sa_in],
1972                 nh=p.tun_if.remote_hosts[ii].ip6)
1973             p.tun_protect.add_vpp_config()
1974             config_tra_params(p, self.encryption_type, p.tun_if)
1975             self.multi_params.append(p)
1976
1977             VppIpRoute(self, p.remote_tun_if_host, 128,
1978                        [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1979                                      p.tun_if.sw_if_index)]).add_vpp_config()
1980             p.tun_dst = self.pg0.remote_hosts[ii].ip6
1981
1982         self.logger.info(self.vapi.cli("sh log"))
1983         self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1984         self.logger.info(self.vapi.cli("sh adj 41"))
1985
1986     def tearDown(self):
1987         p = self.ipv6_params
1988         p.tun_if.unconfig_ip6()
1989         super(TestIpsecMGreIfEspTra6, self).tearDown()
1990
1991     def test_tun_66(self):
1992         """mGRE IPSec 66"""
1993         for p in self.multi_params:
1994             self.verify_tun_66(p, count=63)
1995
1996
1997 @tag_fixme_vpp_workers
1998 class TestIpsec4TunProtect(TemplateIpsec,
1999                            TemplateIpsec4TunProtect,
2000                            IpsecTun4):
2001     """ IPsec IPv4 Tunnel protect - transport mode"""
2002
2003     def setUp(self):
2004         super(TestIpsec4TunProtect, self).setUp()
2005
2006         self.tun_if = self.pg0
2007
2008     def tearDown(self):
2009         super(TestIpsec4TunProtect, self).tearDown()
2010
2011     def test_tun_44(self):
2012         """IPSEC tunnel protect"""
2013
2014         p = self.ipv4_params
2015
2016         self.config_network(p)
2017         self.config_sa_tra(p)
2018         self.config_protect(p)
2019
2020         self.verify_tun_44(p, count=127)
2021         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2022         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2023
2024         self.vapi.cli("clear ipsec sa")
2025         self.verify_tun_64(p, count=127)
2026         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2027         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2028
2029         # rekey - create new SAs and update the tunnel protection
2030         np = copy.copy(p)
2031         np.crypt_key = b'X' + p.crypt_key[1:]
2032         np.scapy_tun_spi += 100
2033         np.scapy_tun_sa_id += 1
2034         np.vpp_tun_spi += 100
2035         np.vpp_tun_sa_id += 1
2036         np.tun_if.local_spi = p.vpp_tun_spi
2037         np.tun_if.remote_spi = p.scapy_tun_spi
2038
2039         self.config_sa_tra(np)
2040         self.config_protect(np)
2041         self.unconfig_sa(p)
2042
2043         self.verify_tun_44(np, count=127)
2044         self.assertEqual(p.tun_if.get_rx_stats(), 381)
2045         self.assertEqual(p.tun_if.get_tx_stats(), 381)
2046
2047         # teardown
2048         self.unconfig_protect(np)
2049         self.unconfig_sa(np)
2050         self.unconfig_network(p)
2051
2052
2053 @tag_fixme_vpp_workers
2054 class TestIpsec4TunProtectUdp(TemplateIpsec,
2055                               TemplateIpsec4TunProtect,
2056                               IpsecTun4):
2057     """ IPsec IPv4 Tunnel protect - transport mode"""
2058
2059     def setUp(self):
2060         super(TestIpsec4TunProtectUdp, self).setUp()
2061
2062         self.tun_if = self.pg0
2063
2064         p = self.ipv4_params
2065         p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
2066                    IPSEC_API_SAD_FLAG_UDP_ENCAP)
2067         p.nat_header = UDP(sport=4500, dport=4500)
2068         self.config_network(p)
2069         self.config_sa_tra(p)
2070         self.config_protect(p)
2071
2072     def tearDown(self):
2073         p = self.ipv4_params
2074         self.unconfig_protect(p)
2075         self.unconfig_sa(p)
2076         self.unconfig_network(p)
2077         super(TestIpsec4TunProtectUdp, self).tearDown()
2078
2079     def verify_encrypted(self, p, sa, rxs):
2080         # ensure encrypted packets are recieved with the default UDP ports
2081         for rx in rxs:
2082             self.assertEqual(rx[UDP].sport, 4500)
2083             self.assertEqual(rx[UDP].dport, 4500)
2084         super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2085
2086     def test_tun_44(self):
2087         """IPSEC UDP tunnel protect"""
2088
2089         p = self.ipv4_params
2090
2091         self.verify_tun_44(p, count=127)
2092         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2093         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2094
2095     def test_keepalive(self):
2096         """ IPSEC NAT Keepalive """
2097         self.verify_keepalive(self.ipv4_params)
2098
2099
2100 @tag_fixme_vpp_workers
2101 class TestIpsec4TunProtectTun(TemplateIpsec,
2102                               TemplateIpsec4TunProtect,
2103                               IpsecTun4):
2104     """ IPsec IPv4 Tunnel protect - tunnel mode"""
2105
2106     encryption_type = ESP
2107     tun4_encrypt_node_name = "esp4-encrypt-tun"
2108     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2109
2110     def setUp(self):
2111         super(TestIpsec4TunProtectTun, self).setUp()
2112
2113         self.tun_if = self.pg0
2114
2115     def tearDown(self):
2116         super(TestIpsec4TunProtectTun, self).tearDown()
2117
2118     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2119                          payload_size=100):
2120         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2121                 sa.encrypt(IP(src=sw_intf.remote_ip4,
2122                               dst=sw_intf.local_ip4) /
2123                            IP(src=src, dst=dst) /
2124                            UDP(sport=1144, dport=2233) /
2125                            Raw(b'X' * payload_size))
2126                 for i in range(count)]
2127
2128     def gen_pkts(self, sw_intf, src, dst, count=1,
2129                  payload_size=100):
2130         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2131                 IP(src=src, dst=dst) /
2132                 UDP(sport=1144, dport=2233) /
2133                 Raw(b'X' * payload_size)
2134                 for i in range(count)]
2135
2136     def verify_decrypted(self, p, rxs):
2137         for rx in rxs:
2138             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2139             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2140             self.assert_packet_checksums_valid(rx)
2141
2142     def verify_encrypted(self, p, sa, rxs):
2143         for rx in rxs:
2144             try:
2145                 pkt = sa.decrypt(rx[IP])
2146                 if not pkt.haslayer(IP):
2147                     pkt = IP(pkt[Raw].load)
2148                 self.assert_packet_checksums_valid(pkt)
2149                 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2150                 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2151                 inner = pkt[IP].payload
2152                 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2153
2154             except (IndexError, AssertionError):
2155                 self.logger.debug(ppp("Unexpected packet:", rx))
2156                 try:
2157                     self.logger.debug(ppp("Decrypted packet:", pkt))
2158                 except:
2159                     pass
2160                 raise
2161
2162     def test_tun_44(self):
2163         """IPSEC tunnel protect """
2164
2165         p = self.ipv4_params
2166
2167         self.config_network(p)
2168         self.config_sa_tun(p)
2169         self.config_protect(p)
2170
2171         # also add an output features on the tunnel and physical interface
2172         # so we test they still work
2173         r_all = AclRule(True,
2174                         src_prefix="0.0.0.0/0",
2175                         dst_prefix="0.0.0.0/0",
2176                         proto=0)
2177         a = VppAcl(self, [r_all]).add_vpp_config()
2178
2179         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2180         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2181
2182         self.verify_tun_44(p, count=127)
2183
2184         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2185         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2186
2187         # rekey - create new SAs and update the tunnel protection
2188         np = copy.copy(p)
2189         np.crypt_key = b'X' + p.crypt_key[1:]
2190         np.scapy_tun_spi += 100
2191         np.scapy_tun_sa_id += 1
2192         np.vpp_tun_spi += 100
2193         np.vpp_tun_sa_id += 1
2194         np.tun_if.local_spi = p.vpp_tun_spi
2195         np.tun_if.remote_spi = p.scapy_tun_spi
2196
2197         self.config_sa_tun(np)
2198         self.config_protect(np)
2199         self.unconfig_sa(p)
2200
2201         self.verify_tun_44(np, count=127)
2202         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2203         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2204
2205         # teardown
2206         self.unconfig_protect(np)
2207         self.unconfig_sa(np)
2208         self.unconfig_network(p)
2209
2210
2211 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2212                                   TemplateIpsec4TunProtect,
2213                                   IpsecTun4):
2214     """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2215
2216     encryption_type = ESP
2217     tun4_encrypt_node_name = "esp4-encrypt-tun"
2218     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2219
2220     def setUp(self):
2221         super(TestIpsec4TunProtectTunDrop, self).setUp()
2222
2223         self.tun_if = self.pg0
2224
2225     def tearDown(self):
2226         super(TestIpsec4TunProtectTunDrop, self).tearDown()
2227
2228     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2229                          payload_size=100):
2230         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2231                 sa.encrypt(IP(src=sw_intf.remote_ip4,
2232                               dst="5.5.5.5") /
2233                            IP(src=src, dst=dst) /
2234                            UDP(sport=1144, dport=2233) /
2235                            Raw(b'X' * payload_size))
2236                 for i in range(count)]
2237
2238     def test_tun_drop_44(self):
2239         """IPSEC tunnel protect bogus tunnel header """
2240
2241         p = self.ipv4_params
2242
2243         self.config_network(p)
2244         self.config_sa_tun(p)
2245         self.config_protect(p)
2246
2247         tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2248                                    src=p.remote_tun_if_host,
2249                                    dst=self.pg1.remote_ip4,
2250                                    count=63)
2251         self.send_and_assert_no_replies(self.tun_if, tx)
2252
2253         # teardown
2254         self.unconfig_protect(p)
2255         self.unconfig_sa(p)
2256         self.unconfig_network(p)
2257
2258
2259 @tag_fixme_vpp_workers
2260 class TestIpsec6TunProtect(TemplateIpsec,
2261                            TemplateIpsec6TunProtect,
2262                            IpsecTun6):
2263     """ IPsec IPv6 Tunnel protect - transport mode"""
2264
2265     encryption_type = ESP
2266     tun6_encrypt_node_name = "esp6-encrypt-tun"
2267     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2268
2269     def setUp(self):
2270         super(TestIpsec6TunProtect, self).setUp()
2271
2272         self.tun_if = self.pg0
2273
2274     def tearDown(self):
2275         super(TestIpsec6TunProtect, self).tearDown()
2276
2277     def test_tun_66(self):
2278         """IPSEC tunnel protect 6o6"""
2279
2280         p = self.ipv6_params
2281
2282         self.config_network(p)
2283         self.config_sa_tra(p)
2284         self.config_protect(p)
2285
2286         self.verify_tun_66(p, count=127)
2287         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2288         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2289
2290         # rekey - create new SAs and update the tunnel protection
2291         np = copy.copy(p)
2292         np.crypt_key = b'X' + p.crypt_key[1:]
2293         np.scapy_tun_spi += 100
2294         np.scapy_tun_sa_id += 1
2295         np.vpp_tun_spi += 100
2296         np.vpp_tun_sa_id += 1
2297         np.tun_if.local_spi = p.vpp_tun_spi
2298         np.tun_if.remote_spi = p.scapy_tun_spi
2299
2300         self.config_sa_tra(np)
2301         self.config_protect(np)
2302         self.unconfig_sa(p)
2303
2304         self.verify_tun_66(np, count=127)
2305         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2306         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2307
2308         # bounce the interface state
2309         p.tun_if.admin_down()
2310         self.verify_drop_tun_66(np, count=127)
2311         node = ('/err/ipsec6-tun-input/%s' %
2312                 'ipsec packets received on disabled interface')
2313         self.assertEqual(127, self.statistics.get_err_counter(node))
2314         p.tun_if.admin_up()
2315         self.verify_tun_66(np, count=127)
2316
2317         # 3 phase rekey
2318         #  1) add two input SAs [old, new]
2319         #  2) swap output SA to [new]
2320         #  3) use only [new] input SA
2321         np3 = copy.copy(np)
2322         np3.crypt_key = b'Z' + p.crypt_key[1:]
2323         np3.scapy_tun_spi += 100
2324         np3.scapy_tun_sa_id += 1
2325         np3.vpp_tun_spi += 100
2326         np3.vpp_tun_sa_id += 1
2327         np3.tun_if.local_spi = p.vpp_tun_spi
2328         np3.tun_if.remote_spi = p.scapy_tun_spi
2329
2330         self.config_sa_tra(np3)
2331
2332         # step 1;
2333         p.tun_protect.update_vpp_config(np.tun_sa_out,
2334                                         [np.tun_sa_in, np3.tun_sa_in])
2335         self.verify_tun_66(np, np, count=127)
2336         self.verify_tun_66(np3, np, count=127)
2337
2338         # step 2;
2339         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2340                                         [np.tun_sa_in, np3.tun_sa_in])
2341         self.verify_tun_66(np, np3, count=127)
2342         self.verify_tun_66(np3, np3, count=127)
2343
2344         # step 1;
2345         p.tun_protect.update_vpp_config(np3.tun_sa_out,
2346                                         [np3.tun_sa_in])
2347         self.verify_tun_66(np3, np3, count=127)
2348         self.verify_drop_tun_rx_66(np, count=127)
2349
2350         self.assertEqual(p.tun_if.get_rx_stats(), 127*9)
2351         self.assertEqual(p.tun_if.get_tx_stats(), 127*8)
2352         self.unconfig_sa(np)
2353
2354         # teardown
2355         self.unconfig_protect(np3)
2356         self.unconfig_sa(np3)
2357         self.unconfig_network(p)
2358
2359     def test_tun_46(self):
2360         """IPSEC tunnel protect 4o6"""
2361
2362         p = self.ipv6_params
2363
2364         self.config_network(p)
2365         self.config_sa_tra(p)
2366         self.config_protect(p)
2367
2368         self.verify_tun_46(p, count=127)
2369         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2370         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2371
2372         # teardown
2373         self.unconfig_protect(p)
2374         self.unconfig_sa(p)
2375         self.unconfig_network(p)
2376
2377
2378 @tag_fixme_vpp_workers
2379 class TestIpsec6TunProtectTun(TemplateIpsec,
2380                               TemplateIpsec6TunProtect,
2381                               IpsecTun6):
2382     """ IPsec IPv6 Tunnel protect - tunnel mode"""
2383
2384     encryption_type = ESP
2385     tun6_encrypt_node_name = "esp6-encrypt-tun"
2386     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2387
2388     def setUp(self):
2389         super(TestIpsec6TunProtectTun, self).setUp()
2390
2391         self.tun_if = self.pg0
2392
2393     def tearDown(self):
2394         super(TestIpsec6TunProtectTun, self).tearDown()
2395
2396     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2397                           payload_size=100):
2398         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2399                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2400                                 dst=sw_intf.local_ip6) /
2401                            IPv6(src=src, dst=dst) /
2402                            UDP(sport=1166, dport=2233) /
2403                            Raw(b'X' * payload_size))
2404                 for i in range(count)]
2405
2406     def gen_pkts6(self, p, sw_intf, src, dst, count=1,
2407                   payload_size=100):
2408         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2409                 IPv6(src=src, dst=dst) /
2410                 UDP(sport=1166, dport=2233) /
2411                 Raw(b'X' * payload_size)
2412                 for i in range(count)]
2413
2414     def verify_decrypted6(self, p, rxs):
2415         for rx in rxs:
2416             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2417             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2418             self.assert_packet_checksums_valid(rx)
2419
2420     def verify_encrypted6(self, p, sa, rxs):
2421         for rx in rxs:
2422             try:
2423                 pkt = sa.decrypt(rx[IPv6])
2424                 if not pkt.haslayer(IPv6):
2425                     pkt = IPv6(pkt[Raw].load)
2426                 self.assert_packet_checksums_valid(pkt)
2427                 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2428                 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2429                 inner = pkt[IPv6].payload
2430                 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2431
2432             except (IndexError, AssertionError):
2433                 self.logger.debug(ppp("Unexpected packet:", rx))
2434                 try:
2435                     self.logger.debug(ppp("Decrypted packet:", pkt))
2436                 except:
2437                     pass
2438                 raise
2439
2440     def test_tun_66(self):
2441         """IPSEC tunnel protect """
2442
2443         p = self.ipv6_params
2444
2445         self.config_network(p)
2446         self.config_sa_tun(p)
2447         self.config_protect(p)
2448
2449         self.verify_tun_66(p, count=127)
2450
2451         self.assertEqual(p.tun_if.get_rx_stats(), 127)
2452         self.assertEqual(p.tun_if.get_tx_stats(), 127)
2453
2454         # rekey - create new SAs and update the tunnel protection
2455         np = copy.copy(p)
2456         np.crypt_key = b'X' + p.crypt_key[1:]
2457         np.scapy_tun_spi += 100
2458         np.scapy_tun_sa_id += 1
2459         np.vpp_tun_spi += 100
2460         np.vpp_tun_sa_id += 1
2461         np.tun_if.local_spi = p.vpp_tun_spi
2462         np.tun_if.remote_spi = p.scapy_tun_spi
2463
2464         self.config_sa_tun(np)
2465         self.config_protect(np)
2466         self.unconfig_sa(p)
2467
2468         self.verify_tun_66(np, count=127)
2469         self.assertEqual(p.tun_if.get_rx_stats(), 254)
2470         self.assertEqual(p.tun_if.get_tx_stats(), 254)
2471
2472         # teardown
2473         self.unconfig_protect(np)
2474         self.unconfig_sa(np)
2475         self.unconfig_network(p)
2476
2477
2478 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2479                                   TemplateIpsec6TunProtect,
2480                                   IpsecTun6):
2481     """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2482
2483     encryption_type = ESP
2484     tun6_encrypt_node_name = "esp6-encrypt-tun"
2485     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2486
2487     def setUp(self):
2488         super(TestIpsec6TunProtectTunDrop, self).setUp()
2489
2490         self.tun_if = self.pg0
2491
2492     def tearDown(self):
2493         super(TestIpsec6TunProtectTunDrop, self).tearDown()
2494
2495     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2496                           payload_size=100):
2497         # the IP destination of the revelaed packet does not match
2498         # that assigned to the tunnel
2499         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2500                 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2501                                 dst="5::5") /
2502                            IPv6(src=src, dst=dst) /
2503                            UDP(sport=1144, dport=2233) /
2504                            Raw(b'X' * payload_size))
2505                 for i in range(count)]
2506
2507     def test_tun_drop_66(self):
2508         """IPSEC 6 tunnel protect bogus tunnel header """
2509
2510         p = self.ipv6_params
2511
2512         self.config_network(p)
2513         self.config_sa_tun(p)
2514         self.config_protect(p)
2515
2516         tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2517                                     src=p.remote_tun_if_host,
2518                                     dst=self.pg1.remote_ip6,
2519                                     count=63)
2520         self.send_and_assert_no_replies(self.tun_if, tx)
2521
2522         self.unconfig_protect(p)
2523         self.unconfig_sa(p)
2524         self.unconfig_network(p)
2525
2526
2527 class TemplateIpsecItf4(object):
2528     """ IPsec Interface IPv4 """
2529
2530     encryption_type = ESP
2531     tun4_encrypt_node_name = "esp4-encrypt-tun"
2532     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2533     tun4_input_node = "ipsec4-tun-input"
2534
2535     def config_sa_tun(self, p, src, dst):
2536         config_tun_params(p, self.encryption_type, None, src, dst)
2537
2538         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2539                                   p.auth_algo_vpp_id, p.auth_key,
2540                                   p.crypt_algo_vpp_id, p.crypt_key,
2541                                   self.vpp_esp_protocol,
2542                                   src, dst,
2543                                   flags=p.flags)
2544         p.tun_sa_out.add_vpp_config()
2545
2546         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2547                                  p.auth_algo_vpp_id, p.auth_key,
2548                                  p.crypt_algo_vpp_id, p.crypt_key,
2549                                  self.vpp_esp_protocol,
2550                                  dst, src,
2551                                  flags=p.flags)
2552         p.tun_sa_in.add_vpp_config()
2553
2554     def config_protect(self, p):
2555         p.tun_protect = VppIpsecTunProtect(self,
2556                                            p.tun_if,
2557                                            p.tun_sa_out,
2558                                            [p.tun_sa_in])
2559         p.tun_protect.add_vpp_config()
2560
2561     def config_network(self, p, instance=0xffffffff):
2562         p.tun_if = VppIpsecInterface(self, instance=instance)
2563
2564         p.tun_if.add_vpp_config()
2565         p.tun_if.admin_up()
2566         p.tun_if.config_ip4()
2567         p.tun_if.config_ip6()
2568
2569         p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2570                              [VppRoutePath(p.tun_if.remote_ip4,
2571                                            0xffffffff)])
2572         p.route.add_vpp_config()
2573         r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2574                        [VppRoutePath(p.tun_if.remote_ip6,
2575                                      0xffffffff,
2576                                      proto=DpoProto.DPO_PROTO_IP6)])
2577         r.add_vpp_config()
2578
2579     def unconfig_network(self, p):
2580         p.route.remove_vpp_config()
2581         p.tun_if.remove_vpp_config()
2582
2583     def unconfig_protect(self, p):
2584         p.tun_protect.remove_vpp_config()
2585
2586     def unconfig_sa(self, p):
2587         p.tun_sa_out.remove_vpp_config()
2588         p.tun_sa_in.remove_vpp_config()
2589
2590
2591 @tag_fixme_vpp_workers
2592 class TestIpsecItf4(TemplateIpsec,
2593                     TemplateIpsecItf4,
2594                     IpsecTun4):
2595     """ IPsec Interface IPv4 """
2596
2597     def setUp(self):
2598         super(TestIpsecItf4, self).setUp()
2599
2600         self.tun_if = self.pg0
2601
2602     def tearDown(self):
2603         super(TestIpsecItf4, self).tearDown()
2604
2605     def test_tun_instance_44(self):
2606         p = self.ipv4_params
2607         self.config_network(p, instance=3)
2608
2609         with self.assertRaises(CliFailedCommandError):
2610             self.vapi.cli("show interface ipsec0")
2611
2612         output = self.vapi.cli("show interface ipsec3")
2613         self.assertTrue("unknown" not in output)
2614
2615         self.unconfig_network(p)
2616
2617     def test_tun_44(self):
2618         """IPSEC interface IPv4"""
2619
2620         n_pkts = 127
2621         p = self.ipv4_params
2622
2623         self.config_network(p)
2624         config_tun_params(p, self.encryption_type, None,
2625                           self.pg0.local_ip4,
2626                           self.pg0.remote_ip4)
2627         self.verify_tun_dropped_44(p, count=n_pkts)
2628         self.config_sa_tun(p,
2629                            self.pg0.local_ip4,
2630                            self.pg0.remote_ip4)
2631         self.config_protect(p)
2632
2633         self.verify_tun_44(p, count=n_pkts)
2634         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2635         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2636
2637         p.tun_if.admin_down()
2638         self.verify_tun_dropped_44(p, count=n_pkts)
2639         p.tun_if.admin_up()
2640         self.verify_tun_44(p, count=n_pkts)
2641
2642         self.assertEqual(p.tun_if.get_rx_stats(), 3*n_pkts)
2643         self.assertEqual(p.tun_if.get_tx_stats(), 2*n_pkts)
2644
2645         # it's a v6 packet when its encrypted
2646         self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2647
2648         self.verify_tun_64(p, count=n_pkts)
2649         self.assertEqual(p.tun_if.get_rx_stats(), 4*n_pkts)
2650         self.assertEqual(p.tun_if.get_tx_stats(), 3*n_pkts)
2651
2652         self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2653
2654         self.vapi.cli("clear interfaces")
2655
2656         # rekey - create new SAs and update the tunnel protection
2657         np = copy.copy(p)
2658         np.crypt_key = b'X' + p.crypt_key[1:]
2659         np.scapy_tun_spi += 100
2660         np.scapy_tun_sa_id += 1
2661         np.vpp_tun_spi += 100
2662         np.vpp_tun_sa_id += 1
2663         np.tun_if.local_spi = p.vpp_tun_spi
2664         np.tun_if.remote_spi = p.scapy_tun_spi
2665
2666         self.config_sa_tun(np,
2667                            self.pg0.local_ip4,
2668                            self.pg0.remote_ip4)
2669         self.config_protect(np)
2670         self.unconfig_sa(p)
2671
2672         self.verify_tun_44(np, count=n_pkts)
2673         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2674         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2675
2676         # teardown
2677         self.unconfig_protect(np)
2678         self.unconfig_sa(np)
2679         self.unconfig_network(p)
2680
2681     def test_tun_44_null(self):
2682         """IPSEC interface IPv4 NULL auth/crypto"""
2683
2684         n_pkts = 127
2685         p = copy.copy(self.ipv4_params)
2686
2687         p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
2688                               IPSEC_API_INTEG_ALG_NONE)
2689         p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
2690                                IPSEC_API_CRYPTO_ALG_NONE)
2691         p.crypt_algo = "NULL"
2692         p.auth_algo = "NULL"
2693
2694         self.config_network(p)
2695         self.config_sa_tun(p,
2696                            self.pg0.local_ip4,
2697                            self.pg0.remote_ip4)
2698         self.config_protect(p)
2699
2700         self.logger.info(self.vapi.cli("sh ipsec sa"))
2701         self.verify_tun_44(p, count=n_pkts)
2702
2703         # teardown
2704         self.unconfig_protect(p)
2705         self.unconfig_sa(p)
2706         self.unconfig_network(p)
2707
2708     def test_tun_44_police(self):
2709         """IPSEC interface IPv4 with input policer"""
2710         n_pkts = 127
2711         p = self.ipv4_params
2712
2713         self.config_network(p)
2714         self.config_sa_tun(p,
2715                            self.pg0.local_ip4,
2716                            self.pg0.remote_ip4)
2717         self.config_protect(p)
2718
2719         action_tx = PolicerAction(
2720             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
2721             0)
2722         policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
2723                              conform_action=action_tx,
2724                              exceed_action=action_tx,
2725                              violate_action=action_tx)
2726         policer.add_vpp_config()
2727
2728         # Start policing on tun
2729         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
2730
2731         self.verify_tun_44(p, count=n_pkts)
2732         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2733         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2734
2735         stats = policer.get_stats()
2736
2737         # Single rate, 2 colour policer - expect conform, violate but no exceed
2738         self.assertGreater(stats['conform_packets'], 0)
2739         self.assertEqual(stats['exceed_packets'], 0)
2740         self.assertGreater(stats['violate_packets'], 0)
2741
2742         # Stop policing on tun
2743         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
2744         self.verify_tun_44(p, count=n_pkts)
2745
2746         # No new policer stats
2747         statsnew = policer.get_stats()
2748         self.assertEqual(stats, statsnew)
2749
2750         # teardown
2751         policer.remove_vpp_config()
2752         self.unconfig_protect(p)
2753         self.unconfig_sa(p)
2754         self.unconfig_network(p)
2755
2756
2757 class TestIpsecItf4MPLS(TemplateIpsec,
2758                         TemplateIpsecItf4,
2759                         IpsecTun4):
2760     """ IPsec Interface MPLSoIPv4 """
2761
2762     tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
2763
2764     def setUp(self):
2765         super(TestIpsecItf4MPLS, self).setUp()
2766
2767         self.tun_if = self.pg0
2768
2769     def tearDown(self):
2770         super(TestIpsecItf4MPLS, self).tearDown()
2771
2772     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2773                          payload_size=100):
2774         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2775                 sa.encrypt(MPLS(label=44, ttl=3) /
2776                            IP(src=src, dst=dst) /
2777                            UDP(sport=1166, dport=2233) /
2778                            Raw(b'X' * payload_size))
2779                 for i in range(count)]
2780
2781     def verify_encrypted(self, p, sa, rxs):
2782         for rx in rxs:
2783             try:
2784                 pkt = sa.decrypt(rx[IP])
2785                 if not pkt.haslayer(IP):
2786                     pkt = IP(pkt[Raw].load)
2787                 self.assert_packet_checksums_valid(pkt)
2788                 self.assert_equal(pkt[MPLS].label, 44)
2789                 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
2790             except (IndexError, AssertionError):
2791                 self.logger.debug(ppp("Unexpected packet:", rx))
2792                 try:
2793                     self.logger.debug(ppp("Decrypted packet:", pkt))
2794                 except:
2795                     pass
2796                 raise
2797
2798     def test_tun_mpls_o_ip4(self):
2799         """IPSEC interface MPLS over IPv4"""
2800
2801         n_pkts = 127
2802         p = self.ipv4_params
2803         f = FibPathProto
2804
2805         tbl = VppMplsTable(self, 0)
2806         tbl.add_vpp_config()
2807
2808         self.config_network(p)
2809         # deag MPLS routes from the tunnel
2810         r4 = VppMplsRoute(self, 44, 1,
2811                           [VppRoutePath(
2812                               self.pg1.remote_ip4,
2813                               self.pg1.sw_if_index)]).add_vpp_config()
2814         p.route.modify([VppRoutePath(p.tun_if.remote_ip4,
2815                                      p.tun_if.sw_if_index,
2816                                      labels=[VppMplsLabel(44)])])
2817         p.tun_if.enable_mpls()
2818
2819         self.config_sa_tun(p,
2820                            self.pg0.local_ip4,
2821                            self.pg0.remote_ip4)
2822         self.config_protect(p)
2823
2824         self.verify_tun_44(p, count=n_pkts)
2825
2826         # cleanup
2827         p.tun_if.disable_mpls()
2828         self.unconfig_protect(p)
2829         self.unconfig_sa(p)
2830         self.unconfig_network(p)
2831
2832
2833 class TemplateIpsecItf6(object):
2834     """ IPsec Interface IPv6 """
2835
2836     encryption_type = ESP
2837     tun6_encrypt_node_name = "esp6-encrypt-tun"
2838     tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2839     tun6_input_node = "ipsec6-tun-input"
2840
2841     def config_sa_tun(self, p, src, dst):
2842         config_tun_params(p, self.encryption_type, None, src, dst)
2843
2844         if not hasattr(p, 'tun_flags'):
2845             p.tun_flags = None
2846         if not hasattr(p, 'hop_limit'):
2847             p.hop_limit = 255
2848
2849         p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2850                                   p.auth_algo_vpp_id, p.auth_key,
2851                                   p.crypt_algo_vpp_id, p.crypt_key,
2852                                   self.vpp_esp_protocol,
2853                                   src, dst,
2854                                   flags=p.flags,
2855                                   tun_flags=p.tun_flags,
2856                                   hop_limit=p.hop_limit)
2857         p.tun_sa_out.add_vpp_config()
2858
2859         p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2860                                  p.auth_algo_vpp_id, p.auth_key,
2861                                  p.crypt_algo_vpp_id, p.crypt_key,
2862                                  self.vpp_esp_protocol,
2863                                  dst, src,
2864                                  flags=p.flags)
2865         p.tun_sa_in.add_vpp_config()
2866
2867     def config_protect(self, p):
2868         p.tun_protect = VppIpsecTunProtect(self,
2869                                            p.tun_if,
2870                                            p.tun_sa_out,
2871                                            [p.tun_sa_in])
2872         p.tun_protect.add_vpp_config()
2873
2874     def config_network(self, p):
2875         p.tun_if = VppIpsecInterface(self)
2876
2877         p.tun_if.add_vpp_config()
2878         p.tun_if.admin_up()
2879         p.tun_if.config_ip4()
2880         p.tun_if.config_ip6()
2881
2882         r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2883                        [VppRoutePath(p.tun_if.remote_ip4,
2884                                      0xffffffff)])
2885         r.add_vpp_config()
2886
2887         p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2888                              [VppRoutePath(p.tun_if.remote_ip6,
2889                                            0xffffffff,
2890                                            proto=DpoProto.DPO_PROTO_IP6)])
2891         p.route.add_vpp_config()
2892
2893     def unconfig_network(self, p):
2894         p.route.remove_vpp_config()
2895         p.tun_if.remove_vpp_config()
2896
2897     def unconfig_protect(self, p):
2898         p.tun_protect.remove_vpp_config()
2899
2900     def unconfig_sa(self, p):
2901         p.tun_sa_out.remove_vpp_config()
2902         p.tun_sa_in.remove_vpp_config()
2903
2904
2905 @tag_fixme_vpp_workers
2906 class TestIpsecItf6(TemplateIpsec,
2907                     TemplateIpsecItf6,
2908                     IpsecTun6):
2909     """ IPsec Interface IPv6 """
2910
2911     def setUp(self):
2912         super(TestIpsecItf6, self).setUp()
2913
2914         self.tun_if = self.pg0
2915
2916     def tearDown(self):
2917         super(TestIpsecItf6, self).tearDown()
2918
2919     def test_tun_66(self):
2920         """IPSEC interface IPv6"""
2921
2922         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
2923         n_pkts = 127
2924         p = self.ipv6_params
2925         p.inner_hop_limit = 24
2926         p.outer_hop_limit = 23
2927         p.outer_flow_label = 243224
2928         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
2929
2930         self.config_network(p)
2931         config_tun_params(p, self.encryption_type, None,
2932                           self.pg0.local_ip6,
2933                           self.pg0.remote_ip6)
2934         self.verify_drop_tun_66(p, count=n_pkts)
2935         self.config_sa_tun(p,
2936                            self.pg0.local_ip6,
2937                            self.pg0.remote_ip6)
2938         self.config_protect(p)
2939
2940         self.verify_tun_66(p, count=n_pkts)
2941         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2942         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2943
2944         p.tun_if.admin_down()
2945         self.verify_drop_tun_66(p, count=n_pkts)
2946         p.tun_if.admin_up()
2947         self.verify_tun_66(p, count=n_pkts)
2948
2949         self.assertEqual(p.tun_if.get_rx_stats(), 3*n_pkts)
2950         self.assertEqual(p.tun_if.get_tx_stats(), 2*n_pkts)
2951
2952         # it's a v4 packet when its encrypted
2953         self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2954
2955         self.verify_tun_46(p, count=n_pkts)
2956         self.assertEqual(p.tun_if.get_rx_stats(), 4*n_pkts)
2957         self.assertEqual(p.tun_if.get_tx_stats(), 3*n_pkts)
2958
2959         self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2960
2961         self.vapi.cli("clear interfaces")
2962
2963         # rekey - create new SAs and update the tunnel protection
2964         np = copy.copy(p)
2965         np.crypt_key = b'X' + p.crypt_key[1:]
2966         np.scapy_tun_spi += 100
2967         np.scapy_tun_sa_id += 1
2968         np.vpp_tun_spi += 100
2969         np.vpp_tun_sa_id += 1
2970         np.tun_if.local_spi = p.vpp_tun_spi
2971         np.tun_if.remote_spi = p.scapy_tun_spi
2972         np.inner_hop_limit = 24
2973         np.outer_hop_limit = 128
2974         np.inner_flow_label = 0xabcde
2975         np.outer_flow_label = 0xabcde
2976         np.hop_limit = 128
2977         np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
2978
2979         self.config_sa_tun(np,
2980                            self.pg0.local_ip6,
2981                            self.pg0.remote_ip6)
2982         self.config_protect(np)
2983         self.unconfig_sa(p)
2984
2985         self.verify_tun_66(np, count=n_pkts)
2986         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2987         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2988
2989         # teardown
2990         self.unconfig_protect(np)
2991         self.unconfig_sa(np)
2992         self.unconfig_network(p)
2993
2994     def test_tun_66_police(self):
2995         """IPSEC interface IPv6 with input policer"""
2996         tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
2997         n_pkts = 127
2998         p = self.ipv6_params
2999         p.inner_hop_limit = 24
3000         p.outer_hop_limit = 23
3001         p.outer_flow_label = 243224
3002         p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3003
3004         self.config_network(p)
3005         self.config_sa_tun(p,
3006                            self.pg0.local_ip6,
3007                            self.pg0.remote_ip6)
3008         self.config_protect(p)
3009
3010         action_tx = PolicerAction(
3011             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
3012             0)
3013         policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
3014                              conform_action=action_tx,
3015                              exceed_action=action_tx,
3016                              violate_action=action_tx)
3017         policer.add_vpp_config()
3018
3019         # Start policing on tun
3020         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3021
3022         self.verify_tun_66(p, count=n_pkts)
3023         self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3024         self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3025
3026         stats = policer.get_stats()
3027
3028         # Single rate, 2 colour policer - expect conform, violate but no exceed
3029         self.assertGreater(stats['conform_packets'], 0)
3030         self.assertEqual(stats['exceed_packets'], 0)
3031         self.assertGreater(stats['violate_packets'], 0)
3032
3033         # Stop policing on tun
3034         policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3035         self.verify_tun_66(p, count=n_pkts)
3036
3037         # No new policer stats
3038         statsnew = policer.get_stats()
3039         self.assertEqual(stats, statsnew)
3040
3041         # teardown
3042         policer.remove_vpp_config()
3043         self.unconfig_protect(p)
3044         self.unconfig_sa(p)
3045         self.unconfig_network(p)
3046
3047
3048 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3049     """ Ipsec P2MP ESP v4 tests """
3050     tun4_encrypt_node_name = "esp4-encrypt-tun"
3051     tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3052     encryption_type = ESP
3053
3054     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
3055                          payload_size=100):
3056         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3057                 sa.encrypt(IP(src=self.pg1.local_ip4,
3058                               dst=self.pg1.remote_ip4) /
3059                            UDP(sport=1144, dport=2233) /
3060                            Raw(b'X' * payload_size))
3061                 for i in range(count)]
3062
3063     def gen_pkts(self, sw_intf, src, dst, count=1,
3064                  payload_size=100):
3065         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3066                 IP(src="1.1.1.1", dst=dst) /
3067                 UDP(sport=1144, dport=2233) /
3068                 Raw(b'X' * payload_size)
3069                 for i in range(count)]
3070
3071     def verify_decrypted(self, p, rxs):
3072         for rx in rxs:
3073             self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3074             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3075
3076     def verify_encrypted(self, p, sa, rxs):
3077         for rx in rxs:
3078             try:
3079                 self.assertEqual(rx[IP].tos,
3080                                  VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2)
3081                 self.assertEqual(rx[IP].ttl, p.hop_limit)
3082                 pkt = sa.decrypt(rx[IP])
3083                 if not pkt.haslayer(IP):
3084                     pkt = IP(pkt[Raw].load)
3085                 self.assert_packet_checksums_valid(pkt)
3086                 e = pkt[IP]
3087                 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3088             except (IndexError, AssertionError):
3089                 self.logger.debug(ppp("Unexpected packet:", rx))
3090                 try:
3091                     self.logger.debug(ppp("Decrypted packet:", pkt))
3092                 except:
3093                     pass
3094                 raise
3095
3096     def setUp(self):
3097         super(TestIpsecMIfEsp4, self).setUp()
3098
3099         N_NHS = 16
3100         self.tun_if = self.pg0
3101         p = self.ipv4_params
3102         p.tun_if = VppIpsecInterface(self,
3103                                      mode=(VppEnum.vl_api_tunnel_mode_t.
3104                                            TUNNEL_API_MODE_MP))
3105         p.tun_if.add_vpp_config()
3106         p.tun_if.admin_up()
3107         p.tun_if.config_ip4()
3108         p.tun_if.unconfig_ip4()
3109         p.tun_if.config_ip4()
3110         p.tun_if.generate_remote_hosts(N_NHS)
3111         self.pg0.generate_remote_hosts(N_NHS)
3112         self.pg0.configure_ipv4_neighbors()
3113
3114         r_all = AclRule(True,
3115                         src_prefix="0.0.0.0/0",
3116                         dst_prefix="0.0.0.0/0",
3117                         proto=0)
3118         a = VppAcl(self, [r_all]).add_vpp_config()
3119
3120         VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3121         VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3122
3123         # setup some SAs for several next-hops on the interface
3124         self.multi_params = []
3125
3126         for ii in range(N_NHS):
3127             p = copy.copy(self.ipv4_params)
3128
3129             p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3130             p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3131             p.scapy_tun_spi = p.scapy_tun_spi + ii
3132             p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3133             p.vpp_tun_spi = p.vpp_tun_spi + ii
3134
3135             p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3136             p.scapy_tra_spi = p.scapy_tra_spi + ii
3137             p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3138             p.vpp_tra_spi = p.vpp_tra_spi + ii
3139             p.hop_limit = ii+10
3140             p.tun_sa_out = VppIpsecSA(
3141                 self, p.scapy_tun_sa_id, p.scapy_tun_spi,
3142                 p.auth_algo_vpp_id, p.auth_key,
3143                 p.crypt_algo_vpp_id, p.crypt_key,
3144                 self.vpp_esp_protocol,
3145                 self.pg0.local_ip4,
3146                 self.pg0.remote_hosts[ii].ip4,
3147                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3148                 hop_limit=p.hop_limit)
3149             p.tun_sa_out.add_vpp_config()
3150
3151             p.tun_sa_in = VppIpsecSA(
3152                 self, p.vpp_tun_sa_id, p.vpp_tun_spi,
3153                 p.auth_algo_vpp_id, p.auth_key,
3154                 p.crypt_algo_vpp_id, p.crypt_key,
3155                 self.vpp_esp_protocol,
3156                 self.pg0.remote_hosts[ii].ip4,
3157                 self.pg0.local_ip4,
3158                 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3159                 hop_limit=p.hop_limit)
3160             p.tun_sa_in.add_vpp_config()
3161
3162             p.tun_protect = VppIpsecTunProtect(
3163                 self,
3164                 p.tun_if,
3165                 p.tun_sa_out,
3166                 [p.tun_sa_in],
3167                 nh=p.tun_if.remote_hosts[ii].ip4)
3168             p.tun_protect.add_vpp_config()
3169             config_tun_params(p, self.encryption_type, None,
3170                               self.pg0.local_ip4,
3171                               self.pg0.remote_hosts[ii].ip4)
3172             self.multi_params.append(p)
3173
3174             p.via_tun_route = VppIpRoute(
3175                 self, p.remote_tun_if_host, 32,
3176                 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
3177                               p.tun_if.sw_if_index)]).add_vpp_config()
3178
3179             p.tun_dst = self.pg0.remote_hosts[ii].ip4
3180
3181     def tearDown(self):
3182         p = self.ipv4_params
3183         p.tun_if.unconfig_ip4()
3184         super(TestIpsecMIfEsp4, self).tearDown()
3185
3186     def test_tun_44(self):
3187         """P2MP IPSEC 44"""
3188         N_PKTS = 63
3189         for p in self.multi_params:
3190             self.verify_tun_44(p, count=N_PKTS)
3191
3192         # remove one tunnel protect, the rest should still work
3193         self.multi_params[0].tun_protect.remove_vpp_config()
3194         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3195         self.multi_params[0].via_tun_route.remove_vpp_config()
3196         self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3197
3198         for p in self.multi_params[1:]:
3199             self.verify_tun_44(p, count=N_PKTS)
3200
3201         self.multi_params[0].tun_protect.add_vpp_config()
3202         self.multi_params[0].via_tun_route.add_vpp_config()
3203
3204         for p in self.multi_params:
3205             self.verify_tun_44(p, count=N_PKTS)
3206
3207
3208 class TestIpsecItf6MPLS(TemplateIpsec,
3209                         TemplateIpsecItf6,
3210                         IpsecTun6):
3211     """ IPsec Interface MPLSoIPv6 """
3212
3213     tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3214
3215     def setUp(self):
3216         super(TestIpsecItf6MPLS, self).setUp()
3217
3218         self.tun_if = self.pg0
3219
3220     def tearDown(self):
3221         super(TestIpsecItf6MPLS, self).tearDown()
3222
3223     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
3224                           payload_size=100):
3225         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3226                 sa.encrypt(MPLS(label=66, ttl=3) /
3227                            IPv6(src=src, dst=dst) /
3228                            UDP(sport=1166, dport=2233) /
3229                            Raw(b'X' * payload_size))
3230                 for i in range(count)]
3231
3232     def verify_encrypted6(self, p, sa, rxs):
3233         for rx in rxs:
3234             try:
3235                 pkt = sa.decrypt(rx[IPv6])
3236                 if not pkt.haslayer(IPv6):
3237                     pkt = IP(pkt[Raw].load)
3238                 self.assert_packet_checksums_valid(pkt)
3239                 self.assert_equal(pkt[MPLS].label, 66)
3240                 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3241             except (IndexError, AssertionError):
3242                 self.logger.debug(ppp("Unexpected packet:", rx))
3243                 try:
3244                     self.logger.debug(ppp("Decrypted packet:", pkt))
3245                 except:
3246                     pass
3247                 raise
3248
3249     def test_tun_mpls_o_ip6(self):
3250         """IPSEC interface MPLS over IPv6"""
3251
3252         n_pkts = 127
3253         p = self.ipv6_params
3254         f = FibPathProto
3255
3256         tbl = VppMplsTable(self, 0)
3257         tbl.add_vpp_config()
3258
3259         self.config_network(p)
3260         # deag MPLS routes from the tunnel
3261         r6 = VppMplsRoute(self, 66, 1,
3262                           [VppRoutePath(
3263                               self.pg1.remote_ip6,
3264                               self.pg1.sw_if_index)],
3265                           eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config()
3266         p.route.modify([VppRoutePath(p.tun_if.remote_ip6,
3267                                      p.tun_if.sw_if_index,
3268                                      labels=[VppMplsLabel(66)])])
3269         p.tun_if.enable_mpls()
3270
3271         self.config_sa_tun(p,
3272                            self.pg0.local_ip6,
3273                            self.pg0.remote_ip6)
3274         self.config_protect(p)
3275
3276         self.verify_tun_66(p, count=n_pkts)
3277
3278         # cleanup
3279         p.tun_if.disable_mpls()
3280         self.unconfig_protect(p)
3281         self.unconfig_sa(p)
3282         self.unconfig_network(p)
3283
3284
3285 if __name__ == '__main__':
3286     unittest.main(testRunner=VppTestRunner)