5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, UDP, TCP
11 from scapy.layers.inet6 import IPv6
13 from ipaddress import ip_address, ip_network, \
14 IPv4Address, IPv6Address, IPv4Network, IPv6Network
16 from vpp_object import VppObject
17 from vpp_papi import VppEnum
22 def find_cnat_translation(test, id):
23 ts = test.vapi.cnat_translation_dump()
25 if id == t.translation.id:
33 def __init__(self, ip, port, l4p=TCP):
39 return {'addr': self.ip,
43 return ("%s:%d" % (self.ip, self.port))
46 class EpTuple(object):
49 def __init__(self, src, dst):
54 return {'src_ep': self.src.encode(),
55 'dst_ep': self.dst.encode()}
58 return ("%s->%s" % (self.src, self.dst))
61 class VppCNatTranslation(VppObject):
63 def __init__(self, test, iproto, vip, paths):
68 self.encoded_paths = []
69 for path in self.paths:
70 self.encoded_paths.append(path.encode())
74 ip_proto = VppEnum.vl_api_ip_proto_t
76 UDP: ip_proto.IP_API_PROTO_UDP,
77 TCP: ip_proto.IP_API_PROTO_TCP,
81 r = self._test.vapi.cnat_translation_del(id=self.id)
83 def add_vpp_config(self):
84 r = self._test.vapi.cnat_translation_update(
85 {'vip': self.vip.encode(),
86 'ip_proto': self.vl4_proto,
87 'n_paths': len(self.paths),
88 'paths': self.encoded_paths})
89 self._test.registry.register(self, self._test.logger)
92 def modify_vpp_config(self, paths):
94 self.encoded_paths = []
95 for path in self.paths:
96 self.encoded_paths.append(path.encode())
98 r = self._test.vapi.cnat_translation_update(
99 {'vip': self.vip.encode(),
100 'ip_proto': self.vl4_proto,
101 'n_paths': len(self.paths),
102 'paths': self.encoded_paths})
103 self._test.registry.register(self, self._test.logger)
105 def remove_vpp_config(self):
106 self._test.vapi.cnat_translation_del(self.id)
108 def query_vpp_config(self):
109 return find_cnat_translation(self._test, self.id)
112 return ("cnat-translation-%s" % (self.vip))
115 c = self._test.statistics.get_counter("/net/cnat-translation")
119 class VppCNATSourceNat(VppObject):
121 def __init__(self, test, address, exclude_subnets=[]):
123 self.address = address
124 self.exclude_subnets = exclude_subnets
126 def add_vpp_config(self):
127 a = ip_address(self.address)
129 self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address)
131 self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address)
132 for subnet in self.exclude_subnets:
133 self.cnat_exclude_subnet(subnet, True)
135 def cnat_exclude_subnet(self, exclude_subnet, isAdd=True):
136 add = 1 if isAdd else 0
137 self._test.vapi.cnat_add_del_snat_prefix(
138 prefix=exclude_subnet, is_add=add)
140 def query_vpp_config(self):
143 def remove_vpp_config(self):
147 class TestCNatTranslation(VppTestCase):
148 """ CNat Translation """
149 extra_vpp_punt_config = ["cnat", "{",
150 "session-db-buckets", "64",
151 "session-cleanup-timeout", "0.1",
152 "session-max-age", "1",
154 "scanner", "off", "}"]
158 super(TestCNatTranslation, cls).setUpClass()
161 def tearDownClass(cls):
162 super(TestCNatTranslation, cls).tearDownClass()
165 super(TestCNatTranslation, self).setUp()
167 self.create_pg_interfaces(range(3))
169 for i in self.pg_interfaces:
177 for i in self.pg_interfaces:
181 super(TestCNatTranslation, self).tearDown()
183 def cnat_create_translation(self, vip, nbr, isV6=False):
184 ip_v = "ip6" if isV6 else "ip4"
185 dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
186 sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
187 t1 = VppCNatTranslation(
189 [EpTuple(sep, dep), EpTuple(sep, dep)])
193 def cnat_test_translation(self, t1, nbr, sports, isV6=False):
194 ip_v = "ip6" if isV6 else "ip4"
195 ip_class = IPv6 if isV6 else IP
201 for src in self.pg0.remote_hosts:
204 p1 = (Ether(dst=self.pg0.local_mac,
206 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
207 vip.l4p(sport=sport, dport=vip.port) /
210 self.vapi.cli("trace add pg-input 1")
211 rxs = self.send_and_expect(self.pg0,
216 self.assert_packet_checksums_valid(rx)
219 getattr(self.pg1.remote_hosts[nbr], ip_v))
220 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
224 self.assertEqual(rx[vip.l4p].sport, sport)
227 p1 = (Ether(dst=self.pg1.local_mac,
228 src=self.pg1.remote_mac) /
229 ip_class(src=getattr(
230 self.pg1.remote_hosts[nbr],
232 dst=getattr(src, ip_v)) /
233 vip.l4p(sport=4000 + nbr, dport=sport) /
236 rxs = self.send_and_expect(self.pg1,
241 self.assert_packet_checksums_valid(rx)
245 self.assertEqual(rx[vip.l4p].dport, sport)
246 self.assertEqual(rx[ip_class].src, vip.ip)
247 self.assertEqual(rx[vip.l4p].sport, vip.port)
250 # packets to the VIP that do not match a
251 # translation are dropped
253 p1 = (Ether(dst=self.pg0.local_mac,
255 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
256 vip.l4p(sport=sport, dport=6666) /
259 self.send_and_assert_no_replies(self.pg0,
264 # packets from the VIP that do not match a
265 # session are forwarded
267 p1 = (Ether(dst=self.pg1.local_mac,
268 src=self.pg1.remote_mac) /
269 ip_class(src=getattr(
270 self.pg1.remote_hosts[nbr],
272 dst=getattr(src, ip_v)) /
273 vip.l4p(sport=6666, dport=sport) /
276 rxs = self.send_and_expect(self.pg1,
280 self.assertEqual(t1.get_stats()['packets'],
283 len(self.pg0.remote_hosts))
285 def cnat_test_translation_update(self, t1, sports, isV6=False):
286 ip_v = "ip6" if isV6 else "ip4"
287 ip_class = IPv6 if isV6 else IP
291 # modify the translation to use a different backend
293 dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
294 sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
295 t1.modify_vpp_config([EpTuple(sep, dep)])
298 # existing flows follow the old path
300 for src in self.pg0.remote_hosts:
303 p1 = (Ether(dst=self.pg0.local_mac,
305 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
306 vip.l4p(sport=sport, dport=vip.port) /
309 rxs = self.send_and_expect(self.pg0,
314 # new flows go to the new backend
316 for src in self.pg0.remote_hosts:
317 p1 = (Ether(dst=self.pg0.local_mac,
319 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
320 vip.l4p(sport=9999, dport=vip.port) /
323 rxs = self.send_and_expect(self.pg0,
327 def cnat_translation(self, vips, isV6=False):
328 """ CNat Translation """
330 ip_class = IPv6 if isV6 else IP
331 ip_v = "ip6" if isV6 else "ip4"
332 sports = [1234, 1233]
335 # turn the scanner off whilst testing otherwise sessions
338 self.vapi.cli("test cnat scanner off")
340 sessions = self.vapi.cnat_session_dump()
343 for nbr, vip in enumerate(vips):
344 trs.append(self.cnat_create_translation(vip, nbr, isV6=isV6))
346 self.logger.info(self.vapi.cli("sh cnat client"))
347 self.logger.info(self.vapi.cli("sh cnat translation"))
352 for nbr, vip in enumerate(vips):
353 self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
354 self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
356 self.logger.info(self.vapi.cli(
357 "sh ip6 fib %s" % self.pg0.remote_ip6))
359 self.logger.info(self.vapi.cli(
360 "sh ip fib %s" % self.pg0.remote_ip4))
361 self.logger.info(self.vapi.cli("sh cnat session verbose"))
364 # turn the scanner back on and wait until the sessions
367 self.vapi.cli("test cnat scanner on")
370 sessions = self.vapi.cnat_session_dump()
371 while (len(sessions) and n_tries < 100):
373 sessions = self.vapi.cnat_session_dump()
376 self.assertTrue(n_tries < 100)
379 # load some flows again and purge
382 for src in self.pg0.remote_hosts:
385 p1 = (Ether(dst=self.pg0.local_mac,
387 ip_class(src=getattr(src, ip_v), dst=vip.ip) /
388 vip.l4p(sport=sport, dport=vip.port) /
390 self.send_and_expect(self.pg0,
397 self.assertTrue(self.vapi.cnat_session_dump())
398 self.vapi.cnat_session_purge()
399 self.assertFalse(self.vapi.cnat_session_dump())
401 def test_cnat6(self):
402 # """ CNat Translation ipv6 """
406 Ep("30::2", 5553, UDP),
409 self.pg0.generate_remote_hosts(len(vips))
410 self.pg0.configure_ipv6_neighbors()
411 self.pg1.generate_remote_hosts(len(vips))
412 self.pg1.configure_ipv6_neighbors()
414 self.cnat_translation(vips, isV6=True)
416 def test_cnat4(self):
417 # """ CNat Translation ipv4 """
420 Ep("30.0.0.1", 5555),
421 Ep("30.0.0.2", 5554),
422 Ep("30.0.0.2", 5553, UDP),
425 self.pg0.generate_remote_hosts(len(vips))
426 self.pg0.configure_ipv4_neighbors()
427 self.pg1.generate_remote_hosts(len(vips))
428 self.pg1.configure_ipv4_neighbors()
430 self.cnat_translation(vips)
433 class TestCNatSourceNAT(VppTestCase):
434 """ CNat Source NAT """
435 extra_vpp_punt_config = ["cnat", "{",
436 "session-max-age", "1",
437 "tcp-max-age", "1", "}"]
441 super(TestCNatSourceNAT, cls).setUpClass()
444 def tearDownClass(cls):
445 super(TestCNatSourceNAT, cls).tearDownClass()
448 super(TestCNatSourceNAT, self).setUp()
450 self.create_pg_interfaces(range(3))
452 for i in self.pg_interfaces:
460 for i in self.pg_interfaces:
464 super(TestCNatSourceNAT, self).tearDown()
466 def cnat_set_snat_address(self, srcNatAddr, interface, isV6=False):
467 t1 = VppCNATSourceNat(self, srcNatAddr)
469 cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast"
470 cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat"
471 self.vapi.feature_enable_disable(
473 arc_name=cnat_arc_name,
474 feature_name=cnat_feature_name,
475 sw_if_index=interface.sw_if_index)
479 def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False):
480 ip_v = "ip6" if isV6 else "ip4"
481 ip_class = IPv6 if isV6 else IP
482 sports = [1234, 1235, 1236]
483 dports = [6661, 6662, 6663]
485 self.pg0.generate_remote_hosts(1)
486 self.pg0.configure_ipv4_neighbors()
487 self.pg0.configure_ipv6_neighbors()
488 self.pg1.generate_remote_hosts(len(sports))
489 self.pg1.configure_ipv4_neighbors()
490 self.pg1.configure_ipv6_neighbors()
492 self.vapi.cli("test cnat scanner on")
493 t1 = self.cnat_set_snat_address(srcNatAddr, self.pg0, isV6)
495 for nbr, remote_host in enumerate(self.pg1.remote_hosts):
496 # from pods to outside network
499 dst=self.pg0.local_mac,
500 src=self.pg0.remote_hosts[0].mac) /
502 src=getattr(self.pg0.remote_hosts[0], ip_v),
503 dst=getattr(remote_host, ip_v)) /
504 l4p(sport=sports[nbr], dport=dports[nbr]) /
507 rxs = self.send_and_expect(
512 self.assert_packet_checksums_valid(rx)
515 getattr(remote_host, ip_v))
516 self.assertEqual(rx[l4p].dport, dports[nbr])
520 sport = rx[l4p].sport
522 # from outside to pods
525 dst=self.pg1.local_mac,
526 src=self.pg1.remote_hosts[nbr].mac) /
527 ip_class(src=getattr(remote_host, ip_v), dst=srcNatAddr) /
528 l4p(sport=dports[nbr], dport=sport) /
531 rxs = self.send_and_expect(
537 self.assert_packet_checksums_valid(rx)
540 getattr(self.pg0.remote_hosts[0], ip_v))
541 self.assertEqual(rx[l4p].dport, sports[nbr])
542 self.assertEqual(rx[l4p].sport, dports[nbr])
545 getattr(remote_host, ip_v))
547 # add remote host to exclude list
548 subnet_mask = 100 if isV6 else 16
549 subnet = getattr(remote_host, ip_v) + "/" + str(subnet_mask)
550 exclude_subnet = ip_network(subnet, strict=False)
552 t1.cnat_exclude_subnet(exclude_subnet)
553 self.vapi.cnat_session_purge()
555 rxs = self.send_and_expect(
560 self.assert_packet_checksums_valid(rx)
563 getattr(remote_host, ip_v))
564 self.assertEqual(rx[l4p].dport, dports[nbr])
567 getattr(self.pg0.remote_hosts[0], ip_v))
569 # remove remote host from exclude list
570 t1.cnat_exclude_subnet(exclude_subnet, isAdd=False)
571 self.vapi.cnat_session_purge()
573 rxs = self.send_and_expect(
579 self.assert_packet_checksums_valid(rx)
582 getattr(remote_host, ip_v))
583 self.assertEqual(rx[l4p].dport, dports[nbr])
588 def test_cnat6_sourcenat(self):
589 # """ CNat Source Nat ipv6 """
590 self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True)
591 self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True)
593 def test_cnat4_sourcenat(self):
594 # """ CNat Source Nat ipv4 """
595 self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP)
596 self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP)
599 if __name__ == '__main__':
600 unittest.main(testRunner=VppTestRunner)