cnat: Destination based NAT
[vpp.git] / src / plugins / cnat / test / test_cnat.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, UDP, TCP
11 from scapy.layers.inet6 import IPv6
12
13 from ipaddress import ip_address, ip_network, \
14     IPv4Address, IPv6Address, IPv4Network, IPv6Network
15
16 from vpp_object import VppObject
17 from vpp_papi import VppEnum
18
19 N_PKTS = 15
20
21
22 def find_cnat_translation(test, id):
23     ts = test.vapi.cnat_translation_dump()
24     for t in ts:
25         if id == t.translation.id:
26             return True
27     return False
28
29
30 class Ep(object):
31     """ CNat endpoint """
32
33     def __init__(self, ip, port, l4p=TCP):
34         self.ip = ip
35         self.port = port
36         self.l4p = l4p
37
38     def encode(self):
39         return {'addr': self.ip,
40                 'port': self.port}
41
42     def __str__(self):
43         return ("%s:%d" % (self.ip, self.port))
44
45
46 class EpTuple(object):
47     """ CNat endpoint """
48
49     def __init__(self, src, dst):
50         self.src = src
51         self.dst = dst
52
53     def encode(self):
54         return {'src_ep': self.src.encode(),
55                 'dst_ep': self.dst.encode()}
56
57     def __str__(self):
58         return ("%s->%s" % (self.src, self.dst))
59
60
61 class VppCNatTranslation(VppObject):
62
63     def __init__(self, test, iproto, vip, paths):
64         self._test = test
65         self.vip = vip
66         self.iproto = iproto
67         self.paths = paths
68         self.encoded_paths = []
69         for path in self.paths:
70             self.encoded_paths.append(path.encode())
71
72     @property
73     def vl4_proto(self):
74         ip_proto = VppEnum.vl_api_ip_proto_t
75         return {
76             UDP: ip_proto.IP_API_PROTO_UDP,
77             TCP: ip_proto.IP_API_PROTO_TCP,
78         }[self.iproto]
79
80     def delete(self):
81         r = self._test.vapi.cnat_translation_del(id=self.id)
82
83     def add_vpp_config(self):
84         r = self._test.vapi.cnat_translation_update(
85             {'vip': self.vip.encode(),
86              'ip_proto': self.vl4_proto,
87              'n_paths': len(self.paths),
88              'paths': self.encoded_paths})
89         self._test.registry.register(self, self._test.logger)
90         self.id = r.id
91
92     def modify_vpp_config(self, paths):
93         self.paths = paths
94         self.encoded_paths = []
95         for path in self.paths:
96             self.encoded_paths.append(path.encode())
97
98         r = self._test.vapi.cnat_translation_update(
99             {'vip': self.vip.encode(),
100              'ip_proto': self.vl4_proto,
101              'n_paths': len(self.paths),
102              'paths': self.encoded_paths})
103         self._test.registry.register(self, self._test.logger)
104
105     def remove_vpp_config(self):
106         self._test.vapi.cnat_translation_del(self.id)
107
108     def query_vpp_config(self):
109         return find_cnat_translation(self._test, self.id)
110
111     def object_id(self):
112         return ("cnat-translation-%s" % (self.vip))
113
114     def get_stats(self):
115         c = self._test.statistics.get_counter("/net/cnat-translation")
116         return c[0][self.id]
117
118
119 class VppCNATSourceNat(VppObject):
120
121     def __init__(self, test, address, exclude_subnets=[]):
122         self._test = test
123         self.address = address
124         self.exclude_subnets = exclude_subnets
125
126     def add_vpp_config(self):
127         a = ip_address(self.address)
128         if 4 == a.version:
129             self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address)
130         else:
131             self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address)
132         for subnet in self.exclude_subnets:
133             self.cnat_exclude_subnet(subnet, True)
134
135     def cnat_exclude_subnet(self, exclude_subnet, isAdd=True):
136         add = 1 if isAdd else 0
137         self._test.vapi.cnat_add_del_snat_prefix(
138                 prefix=exclude_subnet, is_add=add)
139
140     def query_vpp_config(self):
141         return False
142
143     def remove_vpp_config(self):
144         return False
145
146
147 class TestCNatTranslation(VppTestCase):
148     """ CNat Translation """
149     extra_vpp_punt_config = ["cnat", "{",
150                              "session-max-age", "1",
151                              "tcp-max-age", "1", "}"]
152
153     @classmethod
154     def setUpClass(cls):
155         super(TestCNatTranslation, cls).setUpClass()
156
157     @classmethod
158     def tearDownClass(cls):
159         super(TestCNatTranslation, cls).tearDownClass()
160
161     def setUp(self):
162         super(TestCNatTranslation, self).setUp()
163
164         self.create_pg_interfaces(range(3))
165
166         for i in self.pg_interfaces:
167             i.admin_up()
168             i.config_ip4()
169             i.resolve_arp()
170             i.config_ip6()
171             i.resolve_ndp()
172
173     def tearDown(self):
174         for i in self.pg_interfaces:
175             i.unconfig_ip4()
176             i.unconfig_ip6()
177             i.admin_down()
178         super(TestCNatTranslation, self).tearDown()
179
180     def cnat_create_translation(self, vip, nbr, isV6=False):
181         ip_v = "ip6" if isV6 else "ip4"
182         dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
183         sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
184         t1 = VppCNatTranslation(
185             self, vip.l4p, vip,
186             [EpTuple(sep, dep), EpTuple(sep, dep)])
187         t1.add_vpp_config()
188         return t1
189
190     def cnat_test_translation(self, t1, nbr, sports, isV6=False):
191         ip_v = "ip6" if isV6 else "ip4"
192         ip_class = IPv6 if isV6 else IP
193         vip = t1.vip
194
195         #
196         # Flows
197         #
198         for src in self.pg0.remote_hosts:
199             for sport in sports:
200                 # from client to vip
201                 p1 = (Ether(dst=self.pg0.local_mac,
202                             src=src.mac) /
203                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
204                       vip.l4p(sport=sport, dport=vip.port) /
205                       Raw())
206
207                 self.vapi.cli("trace add pg-input 1")
208                 rxs = self.send_and_expect(self.pg0,
209                                            p1 * N_PKTS,
210                                            self.pg1)
211
212                 for rx in rxs:
213                     self.assert_packet_checksums_valid(rx)
214                     self.assertEqual(
215                         rx[ip_class].dst,
216                         getattr(self.pg1.remote_hosts[nbr], ip_v))
217                     self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
218                     self.assertEqual(
219                         rx[ip_class].src,
220                         getattr(src, ip_v))
221                     self.assertEqual(rx[vip.l4p].sport, sport)
222
223                 # from vip to client
224                 p1 = (Ether(dst=self.pg1.local_mac,
225                             src=self.pg1.remote_mac) /
226                       ip_class(src=getattr(
227                           self.pg1.remote_hosts[nbr],
228                           ip_v),
229                           dst=getattr(src, ip_v)) /
230                       vip.l4p(sport=4000 + nbr, dport=sport) /
231                       Raw())
232
233                 rxs = self.send_and_expect(self.pg1,
234                                            p1 * N_PKTS,
235                                            self.pg0)
236
237                 for rx in rxs:
238                     self.assert_packet_checksums_valid(rx)
239                     self.assertEqual(
240                         rx[ip_class].dst,
241                         getattr(src, ip_v))
242                     self.assertEqual(rx[vip.l4p].dport, sport)
243                     self.assertEqual(rx[ip_class].src, vip.ip)
244                     self.assertEqual(rx[vip.l4p].sport, vip.port)
245
246                 #
247                 # packets to the VIP that do not match a
248                 # translation are dropped
249                 #
250                 p1 = (Ether(dst=self.pg0.local_mac,
251                             src=src.mac) /
252                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
253                       vip.l4p(sport=sport, dport=6666) /
254                       Raw())
255
256                 self.send_and_assert_no_replies(self.pg0,
257                                                 p1 * N_PKTS,
258                                                 self.pg1)
259
260                 #
261                 # packets from the VIP that do not match a
262                 # session are forwarded
263                 #
264                 p1 = (Ether(dst=self.pg1.local_mac,
265                             src=self.pg1.remote_mac) /
266                       ip_class(src=getattr(
267                           self.pg1.remote_hosts[nbr],
268                           ip_v),
269                           dst=getattr(src, ip_v)) /
270                       vip.l4p(sport=6666, dport=sport) /
271                       Raw())
272
273                 rxs = self.send_and_expect(self.pg1,
274                                            p1 * N_PKTS,
275                                            self.pg0)
276
277         self.assertEqual(t1.get_stats()['packets'],
278                          N_PKTS *
279                          len(sports) *
280                          len(self.pg0.remote_hosts))
281
282     def cnat_test_translation_update(self, t1, sports, isV6=False):
283         ip_v = "ip6" if isV6 else "ip4"
284         ip_class = IPv6 if isV6 else IP
285         vip = t1.vip
286
287         #
288         # modify the translation to use a different backend
289         #
290         dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
291         sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
292         t1.modify_vpp_config([EpTuple(sep, dep)])
293
294         #
295         # existing flows follow the old path
296         #
297         for src in self.pg0.remote_hosts:
298             for sport in sports:
299                 # from client to vip
300                 p1 = (Ether(dst=self.pg0.local_mac,
301                             src=src.mac) /
302                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
303                       vip.l4p(sport=sport, dport=vip.port) /
304                       Raw())
305
306                 rxs = self.send_and_expect(self.pg0,
307                                            p1 * N_PKTS,
308                                            self.pg1)
309
310         #
311         # new flows go to the new backend
312         #
313         for src in self.pg0.remote_hosts:
314             p1 = (Ether(dst=self.pg0.local_mac,
315                         src=src.mac) /
316                   ip_class(src=getattr(src, ip_v), dst=vip.ip) /
317                   vip.l4p(sport=9999, dport=vip.port) /
318                   Raw())
319
320             rxs = self.send_and_expect(self.pg0,
321                                        p1 * N_PKTS,
322                                        self.pg2)
323
324     def cnat_translation(self, vips, isV6=False):
325         """ CNat Translation """
326
327         ip_class = IPv6 if isV6 else IP
328         ip_v = "ip6" if isV6 else "ip4"
329         sports = [1234, 1233]
330
331         #
332         # turn the scanner off whilst testing otherwise sessions
333         # will time out
334         #
335         self.vapi.cli("test cnat scanner off")
336
337         sessions = self.vapi.cnat_session_dump()
338
339         trs = []
340         for nbr, vip in enumerate(vips):
341             trs.append(self.cnat_create_translation(vip, nbr, isV6=isV6))
342
343         self.logger.info(self.vapi.cli("sh cnat client"))
344         self.logger.info(self.vapi.cli("sh cnat translation"))
345
346         #
347         # translations
348         #
349         for nbr, vip in enumerate(vips):
350             self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
351             self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
352             if isV6:
353                 self.logger.info(self.vapi.cli(
354                     "sh ip6 fib %s" % self.pg0.remote_ip6))
355             else:
356                 self.logger.info(self.vapi.cli(
357                     "sh ip fib %s" % self.pg0.remote_ip4))
358             self.logger.info(self.vapi.cli("sh cnat session verbose"))
359
360         #
361         # turn the scanner back on and wait untill the sessions
362         # all disapper
363         #
364         self.vapi.cli("test cnat scanner on")
365
366         n_tries = 0
367         sessions = self.vapi.cnat_session_dump()
368         while (len(sessions) and n_tries < 100):
369             n_tries += 1
370             sessions = self.vapi.cnat_session_dump()
371             self.sleep(2)
372
373         self.assertTrue(n_tries < 100)
374
375         #
376         # load some flows again and purge
377         #
378         for vip in vips:
379             for src in self.pg0.remote_hosts:
380                 for sport in sports:
381                     # from client to vip
382                     p1 = (Ether(dst=self.pg0.local_mac,
383                                 src=src.mac) /
384                           ip_class(src=getattr(src, ip_v), dst=vip.ip) /
385                           vip.l4p(sport=sport, dport=vip.port) /
386                           Raw())
387                     self.send_and_expect(self.pg0,
388                                          p1 * N_PKTS,
389                                          self.pg2)
390
391         for tr in trs:
392             tr.delete()
393
394         self.assertTrue(self.vapi.cnat_session_dump())
395         self.vapi.cnat_session_purge()
396         self.assertFalse(self.vapi.cnat_session_dump())
397
398     def test_cnat6(self):
399         # """ CNat Translation ipv6 """
400         vips = [
401             Ep("30::1", 5555),
402             Ep("30::2", 5554),
403             Ep("30::2", 5553, UDP),
404         ]
405
406         self.pg0.generate_remote_hosts(len(vips))
407         self.pg0.configure_ipv6_neighbors()
408         self.pg1.generate_remote_hosts(len(vips))
409         self.pg1.configure_ipv6_neighbors()
410
411         self.cnat_translation(vips, isV6=True)
412
413     def test_cnat4(self):
414         # """ CNat Translation ipv4 """
415
416         vips = [
417             Ep("30.0.0.1", 5555),
418             Ep("30.0.0.2", 5554),
419             Ep("30.0.0.2", 5553, UDP),
420         ]
421
422         self.pg0.generate_remote_hosts(len(vips))
423         self.pg0.configure_ipv4_neighbors()
424         self.pg1.generate_remote_hosts(len(vips))
425         self.pg1.configure_ipv4_neighbors()
426
427         self.cnat_translation(vips)
428
429
430 class TestCNatSourceNAT(VppTestCase):
431     """ CNat Source NAT """
432     extra_vpp_punt_config = ["cnat", "{",
433                              "session-max-age", "1",
434                              "tcp-max-age", "1", "}"]
435
436     @classmethod
437     def setUpClass(cls):
438         super(TestCNatSourceNAT, cls).setUpClass()
439
440     @classmethod
441     def tearDownClass(cls):
442         super(TestCNatSourceNAT, cls).tearDownClass()
443
444     def setUp(self):
445         super(TestCNatSourceNAT, self).setUp()
446
447         self.create_pg_interfaces(range(3))
448
449         for i in self.pg_interfaces:
450             i.admin_up()
451             i.config_ip4()
452             i.resolve_arp()
453             i.config_ip6()
454             i.resolve_ndp()
455
456     def tearDown(self):
457         for i in self.pg_interfaces:
458             i.unconfig_ip4()
459             i.unconfig_ip6()
460             i.admin_down()
461         super(TestCNatSourceNAT, self).tearDown()
462
463     def cnat_create_translation(self, srcNatAddr, interface, isV6=False):
464         t1 = VppCNATSourceNat(self, srcNatAddr)
465         t1.add_vpp_config()
466         cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast"
467         cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat"
468         self.vapi.feature_enable_disable(
469             enable=1,
470             arc_name=cnat_arc_name,
471             feature_name=cnat_feature_name,
472             sw_if_index=interface.sw_if_index)
473
474         return t1
475
476     def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False):
477         ip_v = "ip6" if isV6 else "ip4"
478         ip_class = IPv6 if isV6 else IP
479         sports = [1234, 1235, 1236]
480         dports = [6661, 6662, 6663]
481
482         self.pg0.generate_remote_hosts(1)
483         self.pg0.configure_ipv4_neighbors()
484         self.pg0.configure_ipv6_neighbors()
485         self.pg1.generate_remote_hosts(len(sports))
486         self.pg1.configure_ipv4_neighbors()
487         self.pg1.configure_ipv6_neighbors()
488
489         self.vapi.cli("test cnat scanner on")
490         t1 = self.cnat_create_translation(srcNatAddr, self.pg0)
491
492         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
493             # from pods to outside network
494             p1 = (
495                 Ether(
496                     dst=self.pg0.local_mac,
497                     src=self.pg0.remote_hosts[0].mac) /
498                 ip_class(
499                     src=getattr(self.pg0.remote_hosts[0], ip_v),
500                     dst=getattr(remote_host, ip_v)) /
501                 l4p(sport=sports[nbr], dport=dports[nbr]) /
502                 Raw())
503
504             rxs = self.send_and_expect(
505                                 self.pg0,
506                                 p1 * N_PKTS,
507                                 self.pg1)
508             for rx in rxs:
509                 self.assert_packet_checksums_valid(rx)
510                 self.assertEqual(
511                     rx[ip_class].dst,
512                     getattr(remote_host, ip_v))
513                 self.assertEqual(rx[l4p].dport, dports[nbr])
514                 self.assertEqual(
515                     rx[ip_class].src,
516                     srcNatAddr)
517                 sport = rx[l4p].sport
518
519             # from outside to pods
520             p2 = (
521                 Ether(
522                     dst=self.pg1.local_mac,
523                     src=self.pg1.remote_hosts[nbr].mac) /
524                 ip_class(src=getattr(remote_host, ip_v), dst=srcNatAddr) /
525                 l4p(sport=dports[nbr], dport=sport) /
526                 Raw())
527
528             rxs = self.send_and_expect(
529                                     self.pg1,
530                                     p2 * N_PKTS,
531                                     self.pg0)
532
533             for rx in rxs:
534                 self.assert_packet_checksums_valid(rx)
535                 self.assertEqual(
536                     rx[ip_class].dst,
537                     getattr(self.pg0.remote_hosts[0], ip_v))
538                 self.assertEqual(rx[l4p].dport, sports[nbr])
539                 self.assertEqual(rx[l4p].sport, dports[nbr])
540                 self.assertEqual(
541                     rx[ip_class].src,
542                     getattr(remote_host, ip_v))
543
544             # add remote host to exclude list
545             subnet_mask = 100 if isV6 else 16
546             subnet = getattr(remote_host, ip_v) + "/" + str(subnet_mask)
547             exclude_subnet = ip_network(subnet, strict=False)
548
549             t1.cnat_exclude_subnet(exclude_subnet)
550             self.vapi.cnat_session_purge()
551
552             rxs = self.send_and_expect(
553                                 self.pg0,
554                                 p1 * N_PKTS,
555                                 self.pg1)
556             for rx in rxs:
557                 self.assert_packet_checksums_valid(rx)
558                 self.assertEqual(
559                     rx[ip_class].dst,
560                     getattr(remote_host, ip_v))
561                 self.assertEqual(rx[l4p].dport, dports[nbr])
562                 self.assertEqual(
563                     rx[ip_class].src,
564                     getattr(self.pg0.remote_hosts[0], ip_v))
565
566             # remove remote host from exclude list
567             t1.cnat_exclude_subnet(exclude_subnet, isAdd=False)
568             self.vapi.cnat_session_purge()
569
570             rxs = self.send_and_expect(
571                     self.pg0,
572                     p1 * N_PKTS,
573                     self.pg1)
574
575             for rx in rxs:
576                 self.assert_packet_checksums_valid(rx)
577                 self.assertEqual(
578                     rx[ip_class].dst,
579                     getattr(remote_host, ip_v))
580                 self.assertEqual(rx[l4p].dport, dports[nbr])
581                 self.assertEqual(
582                     rx[ip_class].src,
583                     srcNatAddr)
584
585     # def test_cnat6_sourcenat(self):
586     #     # """ CNat Source Nat ipv6 """
587     #     self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True)
588     #     self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True)
589
590     def test_cnat4_sourcenat(self):
591         # """ CNat Source Nat ipv4 """
592         self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP)
593         self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP)
594
595 if __name__ == '__main__':
596     unittest.main(testRunner=VppTestRunner)