vppinfra: toeplitz hash
[vpp.git] / test / test_cnat.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
15
16 import struct
17
18 from ipaddress import ip_address, ip_network, \
19     IPv4Address, IPv6Address, IPv4Network, IPv6Network
20
21 from vpp_object import VppObject
22 from vpp_papi import VppEnum
23
24 N_PKTS = 15
25 N_REMOTE_HOSTS = 3
26
27 SRC = 0
28 DST = 1
29
30
31 class CnatCommonTestCase(VppTestCase):
32     """ CNat common test class """
33
34     #
35     # turn the scanner off whilst testing otherwise sessions
36     # will time out
37     #
38     extra_vpp_punt_config = ["cnat", "{",
39                              "session-db-buckets", "64",
40                              "session-cleanup-timeout", "0.1",
41                              "session-max-age", "1",
42                              "tcp-max-age", "1",
43                              "scanner", "off", "}"]
44
45     @classmethod
46     def setUpClass(cls):
47         super(CnatCommonTestCase, cls).setUpClass()
48
49     @classmethod
50     def tearDownClass(cls):
51         super(CnatCommonTestCase, cls).tearDownClass()
52
53
54 class Endpoint(object):
55     """ CNat endpoint """
56
57     def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
58         self.port = port
59         self.is_v6 = is_v6
60         self.sw_if_index = INVALID_INDEX
61         if pg is not None and pgi is not None:
62             # pg interface specified and remote index
63             self.ip = self.get_ip46(pg.remote_hosts[pgi])
64         elif pg is not None:
65             self.ip = None
66             self.sw_if_index = pg.sw_if_index
67         elif ip is not None:
68             self.ip = ip
69         else:
70             self.ip = "::" if self.is_v6 else "0.0.0.0"
71
72     def get_ip46(self, obj):
73         if self.is_v6:
74             return obj.ip6
75         return obj.ip4
76
77     def udpate(self, **kwargs):
78         self.__init__(**kwargs)
79
80     def _vpp_if_af(self):
81         if self.is_v6:
82             return VppEnum.vl_api_address_family_t.ADDRESS_IP6
83         return VppEnum.vl_api_address_family_t.ADDRESS_IP4
84
85     def encode(self):
86         return {'addr': self.ip,
87                 'port': self.port,
88                 'sw_if_index': self.sw_if_index,
89                 'if_af': self._vpp_if_af()}
90
91     def __str__(self):
92         return ("%s:%d" % (self.ip, self.port))
93
94
95 class Translation(VppObject):
96
97     def __init__(self, test, iproto, vip, paths):
98         self._test = test
99         self.vip = vip
100         self.iproto = iproto
101         self.paths = paths
102         self.id = None
103
104     def __str__(self):
105         return ("%s %s %s" % (self.vip, self.iproto, self.paths))
106
107     def _vl4_proto(self):
108         ip_proto = VppEnum.vl_api_ip_proto_t
109         return {
110             UDP: ip_proto.IP_API_PROTO_UDP,
111             TCP: ip_proto.IP_API_PROTO_TCP,
112         }[self.iproto]
113
114     def _encoded_paths(self):
115         return [{'src_ep': src.encode(),
116                  'dst_ep': dst.encode()} for (src, dst) in self.paths]
117
118     def add_vpp_config(self):
119         r = self._test.vapi.cnat_translation_update(
120             {'vip': self.vip.encode(),
121              'ip_proto': self._vl4_proto(),
122              'n_paths': len(self.paths),
123              'paths': self._encoded_paths()})
124         self._test.registry.register(self, self._test.logger)
125         self.id = r.id
126         return self
127
128     def remove_vpp_config(self):
129         assert(self.id is not None)
130         self._test.vapi.cnat_translation_del(id=self.id)
131         return self
132
133     def query_vpp_config(self):
134         for t in self._test.vapi.cnat_translation_dump():
135             if self.id == t.translation.id:
136                 return t.translation
137         return None
138
139
140 class CnatTestContext(object):
141     """
142     Usage :
143
144     ctx = CnatTestContext(self, TCP, is_v6=True)
145
146     # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
147     ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
148
149     # We expect this to be NATed as
150     # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
151     ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
152
153     # After running cnat_expect, we can send back the received packet
154     # and expect it be 'unnated' so that we get the original packet
155     ctx.cnat_send_return().cnat_expect_return()
156
157     # same thing for ICMP errors
158     ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
159     """
160
161     def __init__(self, test, L4PROTO, is_v6):
162         self.L4PROTO = L4PROTO
163         self.is_v6 = is_v6
164         self._test = test
165
166     def get_ip46(self, obj):
167         if self.is_v6:
168             return obj.ip6
169         return obj.ip4
170
171     @property
172     def IP46(self):
173         return IPv6 if self.is_v6 else IP
174
175     def cnat_send(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port,
176                   no_replies=False):
177         if isinstance(src_id, int):
178             self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
179         else:
180             self.dst_addr = src_id
181         if isinstance(dst_id, int):
182             self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
183         else:
184             self.dst_addr = dst_id
185         self.src_port = src_port  # also ICMP id
186         self.dst_port = dst_port  # also ICMP type
187
188         if self.L4PROTO in [TCP, UDP]:
189             l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
190         elif self.L4PROTO in [ICMP] and not self.is_v6:
191             l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
192         elif self.L4PROTO in [ICMP] and self.is_v6:
193             l4 = ICMPv6EchoRequest(id=self.src_port)
194         p1 = (Ether(src=src_pg.remote_mac,
195                     dst=src_pg.local_mac) /
196               self.IP46(src=self.src_addr, dst=self.dst_addr) /
197               l4 /
198               Raw())
199
200         if no_replies:
201             self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
202         else:
203             self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
204         self.expected_src_pg = src_pg
205         self.expected_dst_pg = dst_pg
206         return self
207
208     def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
209         if isinstance(src_id, int):
210             self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
211         else:
212             self.expect_src_addr = src_id
213         if isinstance(dst_id, int):
214             self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
215         else:
216             self.expect_dst_addr = dst_id
217         self.expect_src_port = src_port
218         self.expect_dst_port = dst_port
219
220         if self.expect_src_port is None:
221             if self.L4PROTO in [TCP, UDP]:
222                 self.expect_src_port = self.rxs[0][self.L4PROTO].sport
223             elif self.L4PROTO in [ICMP] and not self.is_v6:
224                 self.expect_src_port = self.rxs[0][self.L4PROTO].id
225             elif self.L4PROTO in [ICMP] and self.is_v6:
226                 self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
227
228         for rx in self.rxs:
229             self._test.assert_packet_checksums_valid(rx)
230             self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
231             self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
232             if self.L4PROTO in [TCP, UDP]:
233                 self._test.assertEqual(
234                     rx[self.L4PROTO].dport, self.expect_dst_port)
235                 self._test.assertEqual(
236                     rx[self.L4PROTO].sport, self.expect_src_port)
237             elif self.L4PROTO in [ICMP] and not self.is_v6:
238                 self._test.assertEqual(
239                     rx[self.L4PROTO].type, self.expect_dst_port)
240                 self._test.assertEqual(
241                     rx[self.L4PROTO].id, self.expect_src_port)
242             elif self.L4PROTO in [ICMP] and self.is_v6:
243                 self._test.assertEqual(
244                     rx[ICMPv6EchoRequest].id, self.expect_src_port)
245         return self
246
247     def cnat_send_return(self):
248         """This sends the return traffic"""
249         if self.L4PROTO in [TCP, UDP]:
250             l4 = self.L4PROTO(sport=self.expect_dst_port,
251                               dport=self.expect_src_port)
252         elif self.L4PROTO in [ICMP] and not self.is_v6:
253             # icmp type 0 if echo reply
254             l4 = self.L4PROTO(id=self.expect_src_port, type=0)
255         elif self.L4PROTO in [ICMP] and self.is_v6:
256             l4 = ICMPv6EchoReply(id=self.expect_src_port)
257         src_mac = self.expected_dst_pg.remote_mac
258         p1 = (Ether(src=src_mac, dst=self.expected_dst_pg.local_mac) /
259               self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) /
260               l4 /
261               Raw())
262
263         self.return_rxs = self._test.send_and_expect(
264             self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg)
265         return self
266
267     def cnat_expect_return(self):
268         for rx in self.return_rxs:
269             self._test.assert_packet_checksums_valid(rx)
270             self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
271             self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
272             if self.L4PROTO in [TCP, UDP]:
273                 self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
274                 self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
275             elif self.L4PROTO in [ICMP] and not self.is_v6:
276                 # icmp type 0 if echo reply
277                 self._test.assertEqual(rx[self.L4PROTO].type, 0)
278                 self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
279             elif self.L4PROTO in [ICMP] and self.is_v6:
280                 self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
281         return self
282
283     def cnat_send_icmp_return_error(self):
284         """
285         This called after cnat_expect will send an icmp error
286         on the reverse path
287         """
288         ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
289         InnerIP = self.rxs[0][self.IP46]
290         p1 = (
291             Ether(src=self.expected_dst_pg.remote_mac,
292                   dst=self.expected_dst_pg.local_mac) /
293             self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) /
294             ICMPelem / InnerIP)
295         self.return_rxs = self._test.send_and_expect(
296             self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg)
297         return self
298
299     def cnat_expect_icmp_error_return(self):
300         ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
301         IP46err = IPerror6 if self.is_v6 else IPerror
302         L4err = TCPerror if self.L4PROTO is TCP else UDPerror
303         for rx in self.return_rxs:
304             self._test.assert_packet_checksums_valid(rx)
305             self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
306             self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
307             self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
308             self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
309             self._test.assertEqual(
310                 rx[ICMP46][IP46err][L4err].sport, self.src_port)
311             self._test.assertEqual(
312                 rx[ICMP46][IP46err][L4err].dport, self.dst_port)
313         return self
314
315 # -------------------------------------------------------------------
316 # -------------------------------------------------------------------
317 # -------------------------------------------------------------------
318 # -------------------------------------------------------------------
319
320
321 class TestCNatTranslation(CnatCommonTestCase):
322     """ CNat Translation """
323
324     @classmethod
325     def setUpClass(cls):
326         super(TestCNatTranslation, cls).setUpClass()
327
328     @classmethod
329     def tearDownClass(cls):
330         super(TestCNatTranslation, cls).tearDownClass()
331
332     def setUp(self):
333         super(TestCNatTranslation, self).setUp()
334
335         self.create_pg_interfaces(range(3))
336         self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
337         self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
338
339         for i in self.pg_interfaces:
340             i.admin_up()
341             i.config_ip4()
342             i.resolve_arp()
343             i.config_ip6()
344             i.resolve_ndp()
345             i.configure_ipv4_neighbors()
346             i.configure_ipv6_neighbors()
347
348     def tearDown(self):
349         for translation in self.translations:
350             translation.remove_vpp_config()
351
352         self.vapi.cnat_session_purge()
353         self.assertFalse(self.vapi.cnat_session_dump())
354
355         for i in self.pg_interfaces:
356             i.unconfig_ip4()
357             i.unconfig_ip6()
358             i.admin_down()
359         super(TestCNatTranslation, self).tearDown()
360
361     def cnat_translation(self):
362         """ CNat Translation """
363         self.logger.info(self.vapi.cli("sh cnat client"))
364         self.logger.info(self.vapi.cli("sh cnat translation"))
365
366         for nbr, translation in enumerate(self.translations):
367             vip = translation.vip
368
369             #
370             # Test Flows to the VIP
371             #
372             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
373             for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
374                 # from client to vip
375                 ctx.cnat_send(self.pg0, src_pgi, sport,
376                               self.pg1, vip.ip, vip.port)
377                 dst_port = translation.paths[0][DST].port
378                 ctx.cnat_expect(self.pg0, src_pgi, sport,
379                                 self.pg1, nbr, dst_port)
380                 # from vip to client
381                 ctx.cnat_send_return().cnat_expect_return()
382
383                 #
384                 # packets to the VIP that do not match a
385                 # translation are dropped
386                 #
387                 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1,
388                               vip.ip, 6666, no_replies=True)
389
390                 #
391                 # packets from the VIP that do not match a
392                 # session are forwarded
393                 #
394                 ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
395                 ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
396
397             #
398             # modify the translation to use a different backend
399             #
400             old_dst_port = translation.paths[0][DST].port
401             translation.paths[0][DST].udpate(
402                 pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6)
403             translation.add_vpp_config()
404
405             #
406             # existing flows follow the old path
407             #
408             for src_pgi in range(N_REMOTE_HOSTS):
409                 for sport in [1234, 1233]:
410                     # from client to vip
411                     ctx.cnat_send(self.pg0, src_pgi, sport,
412                                   self.pg1, vip.ip, vip.port)
413                     ctx.cnat_expect(self.pg0, src_pgi, sport,
414                                     self.pg1, nbr, old_dst_port)
415                     # from vip to client
416                     ctx.cnat_send_return().cnat_expect_return()
417
418             #
419             # new flows go to the new backend
420             #
421             for src_pgi in range(N_REMOTE_HOSTS):
422                 ctx.cnat_send(self.pg0, src_pgi, 9999,
423                               self.pg2, vip.ip, vip.port)
424                 ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
425
426             self.logger.info(self.vapi.cli("sh cnat session verbose"))
427
428         #
429         # turn the scanner back on and wait until the sessions
430         # all disapper
431         #
432         self.vapi.cli("test cnat scanner on")
433         self.virtual_sleep(2)
434         sessions = self.vapi.cnat_session_dump()
435         self.assertEqual(len(sessions), 0)
436         self.vapi.cli("test cnat scanner off")
437
438         #
439         # load some flows again and purge
440         #
441         for translation in self.translations:
442             vip = translation.vip
443             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
444             for src_pgi in range(N_REMOTE_HOSTS):
445                 for sport in [1234, 1233]:
446                     # from client to vip
447                     ctx.cnat_send(self.pg0, src_pgi, sport,
448                                   self.pg2, vip.ip, vip.port)
449                     ctx.cnat_expect(self.pg0, src_pgi,
450                                     sport, self.pg2, 0, 5000)
451
452     def _test_icmp(self):
453
454         #
455         # Testing ICMP
456         #
457         for nbr, translation in enumerate(self.translations):
458             vip = translation.vip
459             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
460
461             #
462             # NATing ICMP errors
463             #
464             ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
465             dst_port = translation.paths[0][DST].port
466             ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
467             ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
468
469             #
470             # ICMP errors with no VIP associated should not be
471             # modified
472             #
473             ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
474             dst_port = translation.paths[0][DST].port
475             ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
476             ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
477
478     def _make_translations_v4(self):
479         self.translations = []
480         self.translations.append(Translation(
481             self, TCP, Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
482             [(
483                 Endpoint(is_v6=False),
484                 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
485             )]
486         ).add_vpp_config())
487         self.translations.append(Translation(
488             self, TCP, Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
489             [(
490                 Endpoint(is_v6=False),
491                 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
492             )]
493         ).add_vpp_config())
494         self.translations.append(Translation(
495             self, UDP, Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
496             [(
497                 Endpoint(is_v6=False),
498                 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
499             )]
500         ).add_vpp_config())
501
502     def _make_translations_v6(self):
503         self.translations = []
504         self.translations.append(Translation(
505             self, TCP, Endpoint(ip="30::1", port=5555, is_v6=True),
506             [(
507                 Endpoint(is_v6=True),
508                 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
509             )]
510         ).add_vpp_config())
511         self.translations.append(Translation(
512             self, TCP, Endpoint(ip="30::2", port=5554, is_v6=True),
513             [(
514                 Endpoint(is_v6=True),
515                 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
516             )]
517         ).add_vpp_config())
518         self.translations.append(Translation(
519             self, UDP, Endpoint(ip="30::2", port=5553, is_v6=True),
520             [(
521                 Endpoint(is_v6=True),
522                 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
523             )]
524         ).add_vpp_config())
525
526     def test_icmp4(self):
527         # """ CNat Translation icmp v4 """
528         self._make_translations_v4()
529         self._test_icmp()
530
531     def test_icmp6(self):
532         # """ CNat Translation icmp v6 """
533         self._make_translations_v6()
534         self._test_icmp()
535
536     def test_cnat6(self):
537         # """ CNat Translation ipv6 """
538         self._make_translations_v6()
539         self.cnat_translation()
540
541     def test_cnat4(self):
542         # """ CNat Translation ipv4 """
543         self._make_translations_v4()
544         self.cnat_translation()
545
546
547 class TestCNatSourceNAT(CnatCommonTestCase):
548     """ CNat Source NAT """
549
550     @classmethod
551     def setUpClass(cls):
552         super(TestCNatSourceNAT, cls).setUpClass()
553
554     @classmethod
555     def tearDownClass(cls):
556         super(TestCNatSourceNAT, cls).tearDownClass()
557
558     def _enable_disable_snat(self, is_enable=True):
559         self.vapi.cnat_set_snat_addresses(
560             snat_ip4=self.pg2.remote_hosts[0].ip4,
561             snat_ip6=self.pg2.remote_hosts[0].ip6,
562             sw_if_index=INVALID_INDEX)
563         self.vapi.feature_enable_disable(
564             enable=1 if is_enable else 0,
565             arc_name="ip6-unicast",
566             feature_name="cnat-snat-ip6",
567             sw_if_index=self.pg0.sw_if_index)
568         self.vapi.feature_enable_disable(
569             enable=1 if is_enable else 0,
570             arc_name="ip4-unicast",
571             feature_name="cnat-snat-ip4",
572             sw_if_index=self.pg0.sw_if_index)
573
574         policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
575         self.vapi.cnat_set_snat_policy(
576             policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX)
577         for i in self.pg_interfaces:
578             self.vapi.cnat_snat_policy_add_del_if(
579                 sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0,
580                 table=policie_tbls.CNAT_POLICY_INCLUDE_V6)
581             self.vapi.cnat_snat_policy_add_del_if(
582                 sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0,
583                 table=policie_tbls.CNAT_POLICY_INCLUDE_V4)
584
585     def setUp(self):
586         super(TestCNatSourceNAT, self).setUp()
587
588         self.create_pg_interfaces(range(3))
589         self.pg1.generate_remote_hosts(2)
590
591         for i in self.pg_interfaces:
592             i.admin_up()
593             i.config_ip4()
594             i.resolve_arp()
595             i.config_ip6()
596             i.resolve_ndp()
597             i.configure_ipv6_neighbors()
598             i.configure_ipv4_neighbors()
599
600         self._enable_disable_snat(is_enable=True)
601
602     def tearDown(self):
603         self._enable_disable_snat(is_enable=True)
604
605         self.vapi.cnat_session_purge()
606         for i in self.pg_interfaces:
607             i.unconfig_ip4()
608             i.unconfig_ip6()
609             i.admin_down()
610         super(TestCNatSourceNAT, self).tearDown()
611
612     def test_snat_v6(self):
613         # """ CNat Source Nat v6 """
614         self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
615         self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
616         self.sourcenat_test_icmp_echo_conf(is_v6=True)
617
618     def test_snat_v4(self):
619         # """ CNat Source Nat v4 """
620         self.sourcenat_test_tcp_udp_conf(TCP)
621         self.sourcenat_test_tcp_udp_conf(UDP)
622         self.sourcenat_test_icmp_echo_conf()
623
624     def sourcenat_test_icmp_echo_conf(self, is_v6=False):
625         ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
626         # 8 is ICMP type echo (v4 only)
627         ctx.cnat_send(self.pg0, 0, 0xfeed, self.pg1, 0, 8)
628         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
629         ctx.cnat_send_return().cnat_expect_return()
630
631     def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
632         ctx = CnatTestContext(self, L4PROTO, is_v6)
633         # we should source NAT
634         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
635         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
636         ctx.cnat_send_return().cnat_expect_return()
637
638         # exclude dst address of pg1.1 from snat
639         if is_v6:
640             exclude_prefix = ip_network(
641                 "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False)
642         else:
643             exclude_prefix = ip_network(
644                 "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False)
645
646         # add remote host to exclude list
647         self.vapi.cnat_snat_policy_add_del_exclude_pfx(
648             prefix=exclude_prefix, is_add=1)
649
650         # We should not source NAT the id=1
651         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
652         ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
653         ctx.cnat_send_return().cnat_expect_return()
654
655         # But we should source NAT the id=0
656         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
657         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
658         ctx.cnat_send_return().cnat_expect_return()
659
660         # remove remote host from exclude list
661         self.vapi.cnat_snat_policy_add_del_exclude_pfx(
662             prefix=exclude_prefix, is_add=0)
663         self.vapi.cnat_session_purge()
664
665         # We should source NAT again
666         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
667         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
668         ctx.cnat_send_return().cnat_expect_return()
669
670         # test return ICMP error nating
671         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
672         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
673         ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
674
675         self.vapi.cnat_session_purge()
676
677
678 class TestCNatDHCP(CnatCommonTestCase):
679     """ CNat Translation """
680
681     @classmethod
682     def setUpClass(cls):
683         super(TestCNatDHCP, cls).setUpClass()
684
685     @classmethod
686     def tearDownClass(cls):
687         super(TestCNatDHCP, cls).tearDownClass()
688
689     def tearDown(self):
690         for i in self.pg_interfaces:
691             i.admin_down()
692         super(TestCNatDHCP, self).tearDown()
693
694     def make_addr(self, sw_if_index, addr_id, is_v6):
695         if is_v6:
696             return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
697         return "172.16.%u.%u" % (sw_if_index, addr_id)
698
699     def make_prefix(self, sw_if_index, addr_id, is_v6):
700         if is_v6:
701             return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
702         return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
703
704     def check_resolved(self, tr, addr_id, is_v6=False):
705         qt = tr.query_vpp_config()
706         self.assertEqual(str(qt.vip.addr), self.make_addr(
707             tr.vip.sw_if_index, addr_id, is_v6))
708         self.assertEqual(len(qt.paths), len(tr.paths))
709         for path_tr, path_qt in zip(tr.paths, qt.paths):
710             src_qt = path_qt.src_ep
711             dst_qt = path_qt.dst_ep
712             src_tr, dst_tr = path_tr
713             self.assertEqual(str(src_qt.addr), self.make_addr(
714                 src_tr.sw_if_index, addr_id, is_v6))
715             self.assertEqual(str(dst_qt.addr), self.make_addr(
716                 dst_tr.sw_if_index, addr_id, is_v6))
717
718     def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
719         self.vapi.sw_interface_add_del_address(
720             sw_if_index=pg.sw_if_index,
721             prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
722             is_add=1 if is_add else 0)
723
724     def _test_dhcp_v46(self, is_v6):
725         self.create_pg_interfaces(range(4))
726         for i in self.pg_interfaces:
727             i.admin_up()
728         paths = [
729             (Endpoint(pg=self.pg1, is_v6=is_v6),
730              Endpoint(pg=self.pg2, is_v6=is_v6)),
731             (Endpoint(pg=self.pg1, is_v6=is_v6),
732              Endpoint(pg=self.pg3, is_v6=is_v6))
733         ]
734         ep = Endpoint(pg=self.pg0, is_v6=is_v6)
735         t = Translation(self, TCP, ep, paths).add_vpp_config()
736         # Add an address on every interface
737         # and check it is reflected in the cnat config
738         for pg in self.pg_interfaces:
739             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
740         self.check_resolved(t, addr_id=0, is_v6=is_v6)
741         # Add a new address on every interface, remove the old one
742         # and check it is reflected in the cnat config
743         for pg in self.pg_interfaces:
744             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
745             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
746         self.check_resolved(t, addr_id=1, is_v6=is_v6)
747         # remove the configuration
748         for pg in self.pg_interfaces:
749             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
750         t.remove_vpp_config()
751
752     def test_dhcp_v4(self):
753         self._test_dhcp_v46(False)
754
755     def test_dhcp_v6(self):
756         self._test_dhcp_v46(True)
757
758     def test_dhcp_snat(self):
759         self.create_pg_interfaces(range(1))
760         for i in self.pg_interfaces:
761             i.admin_up()
762         self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
763         # Add an address on every interface
764         # and check it is reflected in the cnat config
765         for pg in self.pg_interfaces:
766             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
767             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
768         r = self.vapi.cnat_get_snat_addresses()
769         self.assertEqual(str(r.snat_ip4), self.make_addr(
770             self.pg0.sw_if_index, addr_id=0, is_v6=False))
771         self.assertEqual(str(r.snat_ip6), self.make_addr(
772             self.pg0.sw_if_index, addr_id=0, is_v6=True))
773         # Add a new address on every interface, remove the old one
774         # and check it is reflected in the cnat config
775         for pg in self.pg_interfaces:
776             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
777             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
778             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
779             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
780         r = self.vapi.cnat_get_snat_addresses()
781         self.assertEqual(str(r.snat_ip4), self.make_addr(
782             self.pg0.sw_if_index, addr_id=1, is_v6=False))
783         self.assertEqual(str(r.snat_ip6), self.make_addr(
784             self.pg0.sw_if_index, addr_id=1, is_v6=True))
785         # remove the configuration
786         for pg in self.pg_interfaces:
787             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
788             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
789         self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
790
791
792 if __name__ == '__main__':
793     unittest.main(testRunner=VppTestRunner)