198dd739d943eb53aef8063dd962ada311bb2769
[vpp.git] / test / test_cnat.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
15
16 import struct
17
18 from ipaddress import ip_address, ip_network, \
19     IPv4Address, IPv6Address, IPv4Network, IPv6Network
20
21 from vpp_object import VppObject
22 from vpp_papi import VppEnum
23
24 N_PKTS = 15
25 N_REMOTE_HOSTS = 3
26
27 SRC = 0
28 DST = 1
29
30
31 class CnatCommonTestCase(VppTestCase):
32     """ CNat common test class """
33
34     #
35     # turn the scanner off whilst testing otherwise sessions
36     # will time out
37     #
38     extra_vpp_punt_config = ["cnat", "{",
39                              "session-db-buckets", "64",
40                              "session-cleanup-timeout", "0.1",
41                              "session-max-age", "1",
42                              "tcp-max-age", "1",
43                              "scanner", "off", "}"]
44
45     @classmethod
46     def setUpClass(cls):
47         super(CnatCommonTestCase, cls).setUpClass()
48
49     @classmethod
50     def tearDownClass(cls):
51         super(CnatCommonTestCase, cls).tearDownClass()
52
53
54 class Endpoint(object):
55     """ CNat endpoint """
56
57     def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
58         self.port = port
59         self.is_v6 = is_v6
60         self.sw_if_index = INVALID_INDEX
61         if pg is not None and pgi is not None:
62             # pg interface specified and remote index
63             self.ip = self.get_ip46(pg.remote_hosts[pgi])
64         elif pg is not None:
65             self.ip = None
66             self.sw_if_index = pg.sw_if_index
67         elif ip is not None:
68             self.ip = ip
69         else:
70             self.ip = "::" if self.is_v6 else "0.0.0.0"
71
72     def get_ip46(self, obj):
73         if self.is_v6:
74             return obj.ip6
75         return obj.ip4
76
77     def udpate(self, **kwargs):
78         self.__init__(**kwargs)
79
80     def _vpp_if_af(self):
81         if self.is_v6:
82             return VppEnum.vl_api_address_family_t.ADDRESS_IP6
83         return VppEnum.vl_api_address_family_t.ADDRESS_IP4
84
85     def encode(self):
86         return {'addr': self.ip,
87                 'port': self.port,
88                 'sw_if_index': self.sw_if_index,
89                 'if_af': self._vpp_if_af()}
90
91     def __str__(self):
92         return ("%s:%d" % (self.ip, self.port))
93
94
95 class Translation(VppObject):
96
97     def __init__(self, test, iproto, vip, paths):
98         self._test = test
99         self.vip = vip
100         self.iproto = iproto
101         self.paths = paths
102         self.id = None
103
104     def __str__(self):
105         return ("%s %s %s" % (self.vip, self.iproto, self.paths))
106
107     def _vl4_proto(self):
108         ip_proto = VppEnum.vl_api_ip_proto_t
109         return {
110             UDP: ip_proto.IP_API_PROTO_UDP,
111             TCP: ip_proto.IP_API_PROTO_TCP,
112         }[self.iproto]
113
114     def _encoded_paths(self):
115         return [{'src_ep': src.encode(),
116                  'dst_ep': dst.encode()} for (src, dst) in self.paths]
117
118     def add_vpp_config(self):
119         r = self._test.vapi.cnat_translation_update(
120             {'vip': self.vip.encode(),
121              'ip_proto': self._vl4_proto(),
122              'n_paths': len(self.paths),
123              'paths': self._encoded_paths()})
124         self._test.registry.register(self, self._test.logger)
125         self.id = r.id
126         return self
127
128     def remove_vpp_config(self):
129         assert(self.id is not None)
130         self._test.vapi.cnat_translation_del(id=self.id)
131         return self
132
133     def query_vpp_config(self):
134         for t in self._test.vapi.cnat_translation_dump():
135             if self.id == t.translation.id:
136                 return t.translation
137         return None
138
139
140 class CnatTestContext(object):
141     """
142     Usage :
143
144     ctx = CnatTestContext(self, TCP, is_v6=True)
145
146     # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
147     ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
148
149     # We expect this to be NATed as
150     # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
151     ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
152
153     # After running cnat_expect, we can send back the received packet
154     # and expect it be 'unnated' so that we get the original packet
155     ctx.cnat_send_return().cnat_expect_return()
156
157     # same thing for ICMP errors
158     ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
159     """
160
161     def __init__(self, test, L4PROTO, is_v6):
162         self.L4PROTO = L4PROTO
163         self.is_v6 = is_v6
164         self._test = test
165
166     def get_ip46(self, obj):
167         if self.is_v6:
168             return obj.ip6
169         return obj.ip4
170
171     @property
172     def IP46(self):
173         return IPv6 if self.is_v6 else IP
174
175     def cnat_send(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port,
176                   no_replies=False):
177         if isinstance(src_id, int):
178             self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
179         else:
180             self.dst_addr = src_id
181         if isinstance(dst_id, int):
182             self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
183         else:
184             self.dst_addr = dst_id
185         self.src_port = src_port  # also ICMP id
186         self.dst_port = dst_port  # also ICMP type
187
188         if self.L4PROTO in [TCP, UDP]:
189             l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
190         elif self.L4PROTO in [ICMP] and not self.is_v6:
191             l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
192         elif self.L4PROTO in [ICMP] and self.is_v6:
193             l4 = ICMPv6EchoRequest(id=self.src_port)
194         p1 = (Ether(src=src_pg.remote_mac,
195                     dst=src_pg.local_mac) /
196               self.IP46(src=self.src_addr, dst=self.dst_addr) /
197               l4 /
198               Raw())
199
200         if no_replies:
201             self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
202         else:
203             self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
204         self.expected_src_pg = src_pg
205         self.expected_dst_pg = dst_pg
206         return self
207
208     def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
209         if isinstance(src_id, int):
210             self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
211         else:
212             self.expect_src_addr = src_id
213         if isinstance(dst_id, int):
214             self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
215         else:
216             self.expect_dst_addr = dst_id
217         self.expect_src_port = src_port
218         self.expect_dst_port = dst_port
219
220         if self.expect_src_port is None:
221             if self.L4PROTO in [TCP, UDP]:
222                 self.expect_src_port = self.rxs[0][self.L4PROTO].sport
223             elif self.L4PROTO in [ICMP] and not self.is_v6:
224                 self.expect_src_port = self.rxs[0][self.L4PROTO].id
225             elif self.L4PROTO in [ICMP] and self.is_v6:
226                 self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
227
228         for rx in self.rxs:
229             self._test.assert_packet_checksums_valid(rx)
230             self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
231             self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
232             if self.L4PROTO in [TCP, UDP]:
233                 self._test.assertEqual(
234                     rx[self.L4PROTO].dport, self.expect_dst_port)
235                 self._test.assertEqual(
236                     rx[self.L4PROTO].sport, self.expect_src_port)
237             elif self.L4PROTO in [ICMP] and not self.is_v6:
238                 self._test.assertEqual(
239                     rx[self.L4PROTO].type, self.expect_dst_port)
240                 self._test.assertEqual(
241                     rx[self.L4PROTO].id, self.expect_src_port)
242             elif self.L4PROTO in [ICMP] and self.is_v6:
243                 self._test.assertEqual(
244                     rx[ICMPv6EchoRequest].id, self.expect_src_port)
245         return self
246
247     def cnat_send_return(self):
248         """This sends the return traffic"""
249         if self.L4PROTO in [TCP, UDP]:
250             l4 = self.L4PROTO(sport=self.expect_dst_port,
251                               dport=self.expect_src_port)
252         elif self.L4PROTO in [ICMP] and not self.is_v6:
253             # icmp type 0 if echo reply
254             l4 = self.L4PROTO(id=self.expect_src_port, type=0)
255         elif self.L4PROTO in [ICMP] and self.is_v6:
256             l4 = ICMPv6EchoReply(id=self.expect_src_port)
257         src_mac = self.expected_dst_pg.remote_mac
258         p1 = (Ether(src=src_mac, dst=self.expected_dst_pg.local_mac) /
259               self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) /
260               l4 /
261               Raw())
262
263         self.return_rxs = self._test.send_and_expect(
264             self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg)
265         return self
266
267     def cnat_expect_return(self):
268         for rx in self.return_rxs:
269             self._test.assert_packet_checksums_valid(rx)
270             self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
271             self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
272             if self.L4PROTO in [TCP, UDP]:
273                 self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
274                 self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
275             elif self.L4PROTO in [ICMP] and not self.is_v6:
276                 # icmp type 0 if echo reply
277                 self._test.assertEqual(rx[self.L4PROTO].type, 0)
278                 self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
279             elif self.L4PROTO in [ICMP] and self.is_v6:
280                 self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
281         return self
282
283     def cnat_send_icmp_return_error(self):
284         """
285         This called after cnat_expect will send an icmp error
286         on the reverse path
287         """
288         ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
289         InnerIP = self.rxs[0][self.IP46]
290         p1 = (
291             Ether(src=self.expected_dst_pg.remote_mac,
292                   dst=self.expected_dst_pg.local_mac) /
293             self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) /
294             ICMPelem / InnerIP)
295         self.return_rxs = self._test.send_and_expect(
296             self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg)
297         return self
298
299     def cnat_expect_icmp_error_return(self):
300         ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
301         IP46err = IPerror6 if self.is_v6 else IPerror
302         L4err = TCPerror if self.L4PROTO is TCP else UDPerror
303         for rx in self.return_rxs:
304             self._test.assert_packet_checksums_valid(rx)
305             self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
306             self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
307             self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
308             self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
309             self._test.assertEqual(
310                 rx[ICMP46][IP46err][L4err].sport, self.src_port)
311             self._test.assertEqual(
312                 rx[ICMP46][IP46err][L4err].dport, self.dst_port)
313         return self
314
315 # -------------------------------------------------------------------
316 # -------------------------------------------------------------------
317 # -------------------------------------------------------------------
318 # -------------------------------------------------------------------
319
320
321 class TestCNatTranslation(CnatCommonTestCase):
322     """ CNat Translation """
323
324     @classmethod
325     def setUpClass(cls):
326         super(TestCNatTranslation, cls).setUpClass()
327
328     @classmethod
329     def tearDownClass(cls):
330         super(TestCNatTranslation, cls).tearDownClass()
331
332     def setUp(self):
333         super(TestCNatTranslation, self).setUp()
334
335         self.create_pg_interfaces(range(3))
336         self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
337         self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
338
339         for i in self.pg_interfaces:
340             i.admin_up()
341             i.config_ip4()
342             i.resolve_arp()
343             i.config_ip6()
344             i.resolve_ndp()
345             i.configure_ipv4_neighbors()
346             i.configure_ipv6_neighbors()
347
348     def tearDown(self):
349         for translation in self.translations:
350             translation.remove_vpp_config()
351
352         self.vapi.cnat_session_purge()
353         self.assertFalse(self.vapi.cnat_session_dump())
354
355         for i in self.pg_interfaces:
356             i.unconfig_ip4()
357             i.unconfig_ip6()
358             i.admin_down()
359         super(TestCNatTranslation, self).tearDown()
360
361     def cnat_translation(self):
362         """ CNat Translation """
363         self.logger.info(self.vapi.cli("sh cnat client"))
364         self.logger.info(self.vapi.cli("sh cnat translation"))
365
366         for nbr, translation in enumerate(self.translations):
367             vip = translation.vip
368
369             #
370             # Test Flows to the VIP
371             #
372             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
373             for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
374                 # from client to vip
375                 ctx.cnat_send(self.pg0, src_pgi, sport,
376                               self.pg1, vip.ip, vip.port)
377                 dst_port = translation.paths[0][DST].port
378                 ctx.cnat_expect(self.pg0, src_pgi, sport,
379                                 self.pg1, nbr, dst_port)
380                 # from vip to client
381                 ctx.cnat_send_return().cnat_expect_return()
382
383                 #
384                 # packets to the VIP that do not match a
385                 # translation are dropped
386                 #
387                 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1,
388                               vip.ip, 6666, no_replies=True)
389
390                 #
391                 # packets from the VIP that do not match a
392                 # session are forwarded
393                 #
394                 ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
395                 ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
396
397             #
398             # modify the translation to use a different backend
399             #
400             old_dst_port = translation.paths[0][DST].port
401             translation.paths[0][DST].udpate(
402                 pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6)
403             translation.add_vpp_config()
404
405             #
406             # existing flows follow the old path
407             #
408             for src_pgi in range(N_REMOTE_HOSTS):
409                 for sport in [1234, 1233]:
410                     # from client to vip
411                     ctx.cnat_send(self.pg0, src_pgi, sport,
412                                   self.pg1, vip.ip, vip.port)
413                     ctx.cnat_expect(self.pg0, src_pgi, sport,
414                                     self.pg1, nbr, old_dst_port)
415                     # from vip to client
416                     ctx.cnat_send_return().cnat_expect_return()
417
418             #
419             # new flows go to the new backend
420             #
421             for src_pgi in range(N_REMOTE_HOSTS):
422                 ctx.cnat_send(self.pg0, src_pgi, 9999,
423                               self.pg2, vip.ip, vip.port)
424                 ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
425
426             self.logger.info(self.vapi.cli("sh cnat session verbose"))
427
428         #
429         # turn the scanner back on and wait until the sessions
430         # all disapper
431         #
432         self.vapi.cli("test cnat scanner on")
433
434         n_tries = 0
435         sessions = self.vapi.cnat_session_dump()
436         while (len(sessions) and n_tries < 100):
437             n_tries += 1
438             sessions = self.vapi.cnat_session_dump()
439             self.sleep(2)
440             self.logger.info(self.vapi.cli("show cnat session verbose"))
441
442         self.assertTrue(n_tries < 100)
443         self.vapi.cli("test cnat scanner off")
444
445         #
446         # load some flows again and purge
447         #
448         for translation in self.translations:
449             vip = translation.vip
450             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
451             for src_pgi in range(N_REMOTE_HOSTS):
452                 for sport in [1234, 1233]:
453                     # from client to vip
454                     ctx.cnat_send(self.pg0, src_pgi, sport,
455                                   self.pg2, vip.ip, vip.port)
456                     ctx.cnat_expect(self.pg0, src_pgi,
457                                     sport, self.pg2, 0, 5000)
458
459     def _test_icmp(self):
460
461         #
462         # Testing ICMP
463         #
464         for nbr, translation in enumerate(self.translations):
465             vip = translation.vip
466             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
467
468             #
469             # NATing ICMP errors
470             #
471             ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
472             dst_port = translation.paths[0][DST].port
473             ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
474             ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
475
476             #
477             # ICMP errors with no VIP associated should not be
478             # modified
479             #
480             ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
481             dst_port = translation.paths[0][DST].port
482             ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
483             ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
484
485     def _make_translations_v4(self):
486         self.translations = []
487         self.translations.append(Translation(
488             self, TCP, Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
489             [(
490                 Endpoint(is_v6=False),
491                 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
492             )]
493         ).add_vpp_config())
494         self.translations.append(Translation(
495             self, TCP, Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
496             [(
497                 Endpoint(is_v6=False),
498                 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
499             )]
500         ).add_vpp_config())
501         self.translations.append(Translation(
502             self, UDP, Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
503             [(
504                 Endpoint(is_v6=False),
505                 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
506             )]
507         ).add_vpp_config())
508
509     def _make_translations_v6(self):
510         self.translations = []
511         self.translations.append(Translation(
512             self, TCP, Endpoint(ip="30::1", port=5555, is_v6=True),
513             [(
514                 Endpoint(is_v6=True),
515                 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
516             )]
517         ).add_vpp_config())
518         self.translations.append(Translation(
519             self, TCP, Endpoint(ip="30::2", port=5554, is_v6=True),
520             [(
521                 Endpoint(is_v6=True),
522                 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
523             )]
524         ).add_vpp_config())
525         self.translations.append(Translation(
526             self, UDP, Endpoint(ip="30::2", port=5553, is_v6=True),
527             [(
528                 Endpoint(is_v6=True),
529                 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
530             )]
531         ).add_vpp_config())
532
533     def test_icmp4(self):
534         # """ CNat Translation icmp v4 """
535         self._make_translations_v4()
536         self._test_icmp()
537
538     def test_icmp6(self):
539         # """ CNat Translation icmp v6 """
540         self._make_translations_v6()
541         self._test_icmp()
542
543     def test_cnat6(self):
544         # """ CNat Translation ipv6 """
545         self._make_translations_v6()
546         self.cnat_translation()
547
548     def test_cnat4(self):
549         # """ CNat Translation ipv4 """
550         self._make_translations_v4()
551         self.cnat_translation()
552
553
554 class TestCNatSourceNAT(CnatCommonTestCase):
555     """ CNat Source NAT """
556
557     @classmethod
558     def setUpClass(cls):
559         super(TestCNatSourceNAT, cls).setUpClass()
560
561     @classmethod
562     def tearDownClass(cls):
563         super(TestCNatSourceNAT, cls).tearDownClass()
564
565     def _enable_disable_snat(self, is_enable=True):
566         self.vapi.cnat_set_snat_addresses(
567             snat_ip4=self.pg2.remote_hosts[0].ip4,
568             snat_ip6=self.pg2.remote_hosts[0].ip6,
569             sw_if_index=INVALID_INDEX)
570         self.vapi.feature_enable_disable(
571             enable=1 if is_enable else 0,
572             arc_name="ip6-unicast",
573             feature_name="cnat-snat-ip6",
574             sw_if_index=self.pg0.sw_if_index)
575         self.vapi.feature_enable_disable(
576             enable=1 if is_enable else 0,
577             arc_name="ip4-unicast",
578             feature_name="cnat-snat-ip4",
579             sw_if_index=self.pg0.sw_if_index)
580
581         policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
582         self.vapi.cnat_set_snat_policy(
583             policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX)
584         for i in self.pg_interfaces:
585             self.vapi.cnat_snat_policy_add_del_if(
586                 sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0,
587                 table=policie_tbls.CNAT_POLICY_INCLUDE_V6)
588             self.vapi.cnat_snat_policy_add_del_if(
589                 sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0,
590                 table=policie_tbls.CNAT_POLICY_INCLUDE_V4)
591
592     def setUp(self):
593         super(TestCNatSourceNAT, self).setUp()
594
595         self.create_pg_interfaces(range(3))
596         self.pg1.generate_remote_hosts(2)
597
598         for i in self.pg_interfaces:
599             i.admin_up()
600             i.config_ip4()
601             i.resolve_arp()
602             i.config_ip6()
603             i.resolve_ndp()
604             i.configure_ipv6_neighbors()
605             i.configure_ipv4_neighbors()
606
607         self._enable_disable_snat(is_enable=True)
608
609     def tearDown(self):
610         self._enable_disable_snat(is_enable=True)
611
612         self.vapi.cnat_session_purge()
613         for i in self.pg_interfaces:
614             i.unconfig_ip4()
615             i.unconfig_ip6()
616             i.admin_down()
617         super(TestCNatSourceNAT, self).tearDown()
618
619     def test_snat_v6(self):
620         # """ CNat Source Nat v6 """
621         self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
622         self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
623         self.sourcenat_test_icmp_echo_conf(is_v6=True)
624
625     def test_snat_v4(self):
626         # """ CNat Source Nat v4 """
627         self.sourcenat_test_tcp_udp_conf(TCP)
628         self.sourcenat_test_tcp_udp_conf(UDP)
629         self.sourcenat_test_icmp_echo_conf()
630
631     def sourcenat_test_icmp_echo_conf(self, is_v6=False):
632         ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
633         # 8 is ICMP type echo (v4 only)
634         ctx.cnat_send(self.pg0, 0, 0xfeed, self.pg1, 0, 8)
635         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
636         ctx.cnat_send_return().cnat_expect_return()
637
638     def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
639         ctx = CnatTestContext(self, L4PROTO, is_v6)
640         # we should source NAT
641         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
642         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
643         ctx.cnat_send_return().cnat_expect_return()
644
645         # exclude dst address of pg1.1 from snat
646         if is_v6:
647             exclude_prefix = ip_network(
648                 "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False)
649         else:
650             exclude_prefix = ip_network(
651                 "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False)
652
653         # add remote host to exclude list
654         self.vapi.cnat_snat_policy_add_del_exclude_pfx(
655             prefix=exclude_prefix, is_add=1)
656
657         # We should not source NAT the id=1
658         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
659         ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
660         ctx.cnat_send_return().cnat_expect_return()
661
662         # But we should source NAT the id=0
663         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
664         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
665         ctx.cnat_send_return().cnat_expect_return()
666
667         # remove remote host from exclude list
668         self.vapi.cnat_snat_policy_add_del_exclude_pfx(
669             prefix=exclude_prefix, is_add=0)
670         self.vapi.cnat_session_purge()
671
672         # We should source NAT again
673         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
674         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
675         ctx.cnat_send_return().cnat_expect_return()
676
677         # test return ICMP error nating
678         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
679         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
680         ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
681
682         self.vapi.cnat_session_purge()
683
684
685 class TestCNatDHCP(CnatCommonTestCase):
686     """ CNat Translation """
687
688     @classmethod
689     def setUpClass(cls):
690         super(TestCNatDHCP, cls).setUpClass()
691
692     @classmethod
693     def tearDownClass(cls):
694         super(TestCNatDHCP, cls).tearDownClass()
695
696     def tearDown(self):
697         for i in self.pg_interfaces:
698             i.admin_down()
699         super(TestCNatDHCP, self).tearDown()
700
701     def make_addr(self, sw_if_index, addr_id, is_v6):
702         if is_v6:
703             return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
704         return "172.16.%u.%u" % (sw_if_index, addr_id)
705
706     def make_prefix(self, sw_if_index, addr_id, is_v6):
707         if is_v6:
708             return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
709         return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
710
711     def check_resolved(self, tr, addr_id, is_v6=False):
712         qt = tr.query_vpp_config()
713         self.assertEqual(str(qt.vip.addr), self.make_addr(
714             tr.vip.sw_if_index, addr_id, is_v6))
715         self.assertEqual(len(qt.paths), len(tr.paths))
716         for path_tr, path_qt in zip(tr.paths, qt.paths):
717             src_qt = path_qt.src_ep
718             dst_qt = path_qt.dst_ep
719             src_tr, dst_tr = path_tr
720             self.assertEqual(str(src_qt.addr), self.make_addr(
721                 src_tr.sw_if_index, addr_id, is_v6))
722             self.assertEqual(str(dst_qt.addr), self.make_addr(
723                 dst_tr.sw_if_index, addr_id, is_v6))
724
725     def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
726         self.vapi.sw_interface_add_del_address(
727             sw_if_index=pg.sw_if_index,
728             prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
729             is_add=1 if is_add else 0)
730
731     def _test_dhcp_v46(self, is_v6):
732         self.create_pg_interfaces(range(4))
733         for i in self.pg_interfaces:
734             i.admin_up()
735         paths = [
736             (Endpoint(pg=self.pg1, is_v6=is_v6),
737              Endpoint(pg=self.pg2, is_v6=is_v6)),
738             (Endpoint(pg=self.pg1, is_v6=is_v6),
739              Endpoint(pg=self.pg3, is_v6=is_v6))
740         ]
741         ep = Endpoint(pg=self.pg0, is_v6=is_v6)
742         t = Translation(self, TCP, ep, paths).add_vpp_config()
743         # Add an address on every interface
744         # and check it is reflected in the cnat config
745         for pg in self.pg_interfaces:
746             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
747         self.check_resolved(t, addr_id=0, is_v6=is_v6)
748         # Add a new address on every interface, remove the old one
749         # and check it is reflected in the cnat config
750         for pg in self.pg_interfaces:
751             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
752             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
753         self.check_resolved(t, addr_id=1, is_v6=is_v6)
754         # remove the configuration
755         for pg in self.pg_interfaces:
756             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
757         t.remove_vpp_config()
758
759     def test_dhcp_v4(self):
760         self._test_dhcp_v46(False)
761
762     def test_dhcp_v6(self):
763         self._test_dhcp_v46(True)
764
765     def test_dhcp_snat(self):
766         self.create_pg_interfaces(range(1))
767         for i in self.pg_interfaces:
768             i.admin_up()
769         self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
770         # Add an address on every interface
771         # and check it is reflected in the cnat config
772         for pg in self.pg_interfaces:
773             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
774             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
775         r = self.vapi.cnat_get_snat_addresses()
776         self.assertEqual(str(r.snat_ip4), self.make_addr(
777             self.pg0.sw_if_index, addr_id=0, is_v6=False))
778         self.assertEqual(str(r.snat_ip6), self.make_addr(
779             self.pg0.sw_if_index, addr_id=0, is_v6=True))
780         # Add a new address on every interface, remove the old one
781         # and check it is reflected in the cnat config
782         for pg in self.pg_interfaces:
783             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
784             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
785             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
786             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
787         r = self.vapi.cnat_get_snat_addresses()
788         self.assertEqual(str(r.snat_ip4), self.make_addr(
789             self.pg0.sw_if_index, addr_id=1, is_v6=False))
790         self.assertEqual(str(r.snat_ip6), self.make_addr(
791             self.pg0.sw_if_index, addr_id=1, is_v6=True))
792         # remove the configuration
793         for pg in self.pg_interfaces:
794             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
795             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
796         self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
797
798
799 if __name__ == '__main__':
800     unittest.main(testRunner=VppTestRunner)