cnat: Disable default scanner process
[vpp.git] / src / plugins / cnat / test / test_cnat.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, UDP, TCP
11 from scapy.layers.inet6 import IPv6
12
13 from ipaddress import ip_address, ip_network, \
14     IPv4Address, IPv6Address, IPv4Network, IPv6Network
15
16 from vpp_object import VppObject
17 from vpp_papi import VppEnum
18
19 N_PKTS = 15
20
21
22 def find_cnat_translation(test, id):
23     ts = test.vapi.cnat_translation_dump()
24     for t in ts:
25         if id == t.translation.id:
26             return True
27     return False
28
29
30 class Ep(object):
31     """ CNat endpoint """
32
33     def __init__(self, ip, port, l4p=TCP):
34         self.ip = ip
35         self.port = port
36         self.l4p = l4p
37
38     def encode(self):
39         return {'addr': self.ip,
40                 'port': self.port}
41
42     def __str__(self):
43         return ("%s:%d" % (self.ip, self.port))
44
45
46 class EpTuple(object):
47     """ CNat endpoint """
48
49     def __init__(self, src, dst):
50         self.src = src
51         self.dst = dst
52
53     def encode(self):
54         return {'src_ep': self.src.encode(),
55                 'dst_ep': self.dst.encode()}
56
57     def __str__(self):
58         return ("%s->%s" % (self.src, self.dst))
59
60
61 class VppCNatTranslation(VppObject):
62
63     def __init__(self, test, iproto, vip, paths):
64         self._test = test
65         self.vip = vip
66         self.iproto = iproto
67         self.paths = paths
68         self.encoded_paths = []
69         for path in self.paths:
70             self.encoded_paths.append(path.encode())
71
72     @property
73     def vl4_proto(self):
74         ip_proto = VppEnum.vl_api_ip_proto_t
75         return {
76             UDP: ip_proto.IP_API_PROTO_UDP,
77             TCP: ip_proto.IP_API_PROTO_TCP,
78         }[self.iproto]
79
80     def delete(self):
81         r = self._test.vapi.cnat_translation_del(id=self.id)
82
83     def add_vpp_config(self):
84         r = self._test.vapi.cnat_translation_update(
85             {'vip': self.vip.encode(),
86              'ip_proto': self.vl4_proto,
87              'n_paths': len(self.paths),
88              'paths': self.encoded_paths})
89         self._test.registry.register(self, self._test.logger)
90         self.id = r.id
91
92     def modify_vpp_config(self, paths):
93         self.paths = paths
94         self.encoded_paths = []
95         for path in self.paths:
96             self.encoded_paths.append(path.encode())
97
98         r = self._test.vapi.cnat_translation_update(
99             {'vip': self.vip.encode(),
100              'ip_proto': self.vl4_proto,
101              'n_paths': len(self.paths),
102              'paths': self.encoded_paths})
103         self._test.registry.register(self, self._test.logger)
104
105     def remove_vpp_config(self):
106         self._test.vapi.cnat_translation_del(self.id)
107
108     def query_vpp_config(self):
109         return find_cnat_translation(self._test, self.id)
110
111     def object_id(self):
112         return ("cnat-translation-%s" % (self.vip))
113
114     def get_stats(self):
115         c = self._test.statistics.get_counter("/net/cnat-translation")
116         return c[0][self.id]
117
118
119 class VppCNATSourceNat(VppObject):
120
121     def __init__(self, test, address, exclude_subnets=[]):
122         self._test = test
123         self.address = address
124         self.exclude_subnets = exclude_subnets
125
126     def add_vpp_config(self):
127         a = ip_address(self.address)
128         if 4 == a.version:
129             self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address)
130         else:
131             self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address)
132         for subnet in self.exclude_subnets:
133             self.cnat_exclude_subnet(subnet, True)
134
135     def cnat_exclude_subnet(self, exclude_subnet, isAdd=True):
136         add = 1 if isAdd else 0
137         self._test.vapi.cnat_add_del_snat_prefix(
138             prefix=exclude_subnet, is_add=add)
139
140     def query_vpp_config(self):
141         return False
142
143     def remove_vpp_config(self):
144         return False
145
146
147 class TestCNatTranslation(VppTestCase):
148     """ CNat Translation """
149     extra_vpp_punt_config = ["cnat", "{",
150                              "session-db-buckets", "64",
151                              "session-cleanup-timeout", "0.1",
152                              "session-max-age", "1",
153                              "tcp-max-age", "1",
154                              "scanner", "off", "}"]
155
156     @classmethod
157     def setUpClass(cls):
158         super(TestCNatTranslation, cls).setUpClass()
159
160     @classmethod
161     def tearDownClass(cls):
162         super(TestCNatTranslation, cls).tearDownClass()
163
164     def setUp(self):
165         super(TestCNatTranslation, self).setUp()
166
167         self.create_pg_interfaces(range(3))
168
169         for i in self.pg_interfaces:
170             i.admin_up()
171             i.config_ip4()
172             i.resolve_arp()
173             i.config_ip6()
174             i.resolve_ndp()
175
176     def tearDown(self):
177         for i in self.pg_interfaces:
178             i.unconfig_ip4()
179             i.unconfig_ip6()
180             i.admin_down()
181         super(TestCNatTranslation, self).tearDown()
182
183     def cnat_create_translation(self, vip, nbr, isV6=False):
184         ip_v = "ip6" if isV6 else "ip4"
185         dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
186         sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
187         t1 = VppCNatTranslation(
188             self, vip.l4p, vip,
189             [EpTuple(sep, dep), EpTuple(sep, dep)])
190         t1.add_vpp_config()
191         return t1
192
193     def cnat_test_translation(self, t1, nbr, sports, isV6=False):
194         ip_v = "ip6" if isV6 else "ip4"
195         ip_class = IPv6 if isV6 else IP
196         vip = t1.vip
197
198         #
199         # Flows
200         #
201         for src in self.pg0.remote_hosts:
202             for sport in sports:
203                 # from client to vip
204                 p1 = (Ether(dst=self.pg0.local_mac,
205                             src=src.mac) /
206                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
207                       vip.l4p(sport=sport, dport=vip.port) /
208                       Raw())
209
210                 self.vapi.cli("trace add pg-input 1")
211                 rxs = self.send_and_expect(self.pg0,
212                                            p1 * N_PKTS,
213                                            self.pg1)
214
215                 for rx in rxs:
216                     self.assert_packet_checksums_valid(rx)
217                     self.assertEqual(
218                         rx[ip_class].dst,
219                         getattr(self.pg1.remote_hosts[nbr], ip_v))
220                     self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
221                     self.assertEqual(
222                         rx[ip_class].src,
223                         getattr(src, ip_v))
224                     self.assertEqual(rx[vip.l4p].sport, sport)
225
226                 # from vip to client
227                 p1 = (Ether(dst=self.pg1.local_mac,
228                             src=self.pg1.remote_mac) /
229                       ip_class(src=getattr(
230                           self.pg1.remote_hosts[nbr],
231                           ip_v),
232                           dst=getattr(src, ip_v)) /
233                       vip.l4p(sport=4000 + nbr, dport=sport) /
234                       Raw())
235
236                 rxs = self.send_and_expect(self.pg1,
237                                            p1 * N_PKTS,
238                                            self.pg0)
239
240                 for rx in rxs:
241                     self.assert_packet_checksums_valid(rx)
242                     self.assertEqual(
243                         rx[ip_class].dst,
244                         getattr(src, ip_v))
245                     self.assertEqual(rx[vip.l4p].dport, sport)
246                     self.assertEqual(rx[ip_class].src, vip.ip)
247                     self.assertEqual(rx[vip.l4p].sport, vip.port)
248
249                 #
250                 # packets to the VIP that do not match a
251                 # translation are dropped
252                 #
253                 p1 = (Ether(dst=self.pg0.local_mac,
254                             src=src.mac) /
255                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
256                       vip.l4p(sport=sport, dport=6666) /
257                       Raw())
258
259                 self.send_and_assert_no_replies(self.pg0,
260                                                 p1 * N_PKTS,
261                                                 self.pg1)
262
263                 #
264                 # packets from the VIP that do not match a
265                 # session are forwarded
266                 #
267                 p1 = (Ether(dst=self.pg1.local_mac,
268                             src=self.pg1.remote_mac) /
269                       ip_class(src=getattr(
270                           self.pg1.remote_hosts[nbr],
271                           ip_v),
272                           dst=getattr(src, ip_v)) /
273                       vip.l4p(sport=6666, dport=sport) /
274                       Raw())
275
276                 rxs = self.send_and_expect(self.pg1,
277                                            p1 * N_PKTS,
278                                            self.pg0)
279
280         self.assertEqual(t1.get_stats()['packets'],
281                          N_PKTS *
282                          len(sports) *
283                          len(self.pg0.remote_hosts))
284
285     def cnat_test_translation_update(self, t1, sports, isV6=False):
286         ip_v = "ip6" if isV6 else "ip4"
287         ip_class = IPv6 if isV6 else IP
288         vip = t1.vip
289
290         #
291         # modify the translation to use a different backend
292         #
293         dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
294         sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
295         t1.modify_vpp_config([EpTuple(sep, dep)])
296
297         #
298         # existing flows follow the old path
299         #
300         for src in self.pg0.remote_hosts:
301             for sport in sports:
302                 # from client to vip
303                 p1 = (Ether(dst=self.pg0.local_mac,
304                             src=src.mac) /
305                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
306                       vip.l4p(sport=sport, dport=vip.port) /
307                       Raw())
308
309                 rxs = self.send_and_expect(self.pg0,
310                                            p1 * N_PKTS,
311                                            self.pg1)
312
313         #
314         # new flows go to the new backend
315         #
316         for src in self.pg0.remote_hosts:
317             p1 = (Ether(dst=self.pg0.local_mac,
318                         src=src.mac) /
319                   ip_class(src=getattr(src, ip_v), dst=vip.ip) /
320                   vip.l4p(sport=9999, dport=vip.port) /
321                   Raw())
322
323             rxs = self.send_and_expect(self.pg0,
324                                        p1 * N_PKTS,
325                                        self.pg2)
326
327     def cnat_translation(self, vips, isV6=False):
328         """ CNat Translation """
329
330         ip_class = IPv6 if isV6 else IP
331         ip_v = "ip6" if isV6 else "ip4"
332         sports = [1234, 1233]
333
334         #
335         # turn the scanner off whilst testing otherwise sessions
336         # will time out
337         #
338         self.vapi.cli("test cnat scanner off")
339
340         sessions = self.vapi.cnat_session_dump()
341
342         trs = []
343         for nbr, vip in enumerate(vips):
344             trs.append(self.cnat_create_translation(vip, nbr, isV6=isV6))
345
346         self.logger.info(self.vapi.cli("sh cnat client"))
347         self.logger.info(self.vapi.cli("sh cnat translation"))
348
349         #
350         # translations
351         #
352         for nbr, vip in enumerate(vips):
353             self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
354             self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
355             if isV6:
356                 self.logger.info(self.vapi.cli(
357                     "sh ip6 fib %s" % self.pg0.remote_ip6))
358             else:
359                 self.logger.info(self.vapi.cli(
360                     "sh ip fib %s" % self.pg0.remote_ip4))
361             self.logger.info(self.vapi.cli("sh cnat session verbose"))
362
363         #
364         # turn the scanner back on and wait until the sessions
365         # all disapper
366         #
367         self.vapi.cli("test cnat scanner on")
368
369         n_tries = 0
370         sessions = self.vapi.cnat_session_dump()
371         while (len(sessions) and n_tries < 100):
372             n_tries += 1
373             sessions = self.vapi.cnat_session_dump()
374             self.sleep(2)
375
376         self.assertTrue(n_tries < 100)
377
378         #
379         # load some flows again and purge
380         #
381         for vip in vips:
382             for src in self.pg0.remote_hosts:
383                 for sport in sports:
384                     # from client to vip
385                     p1 = (Ether(dst=self.pg0.local_mac,
386                                 src=src.mac) /
387                           ip_class(src=getattr(src, ip_v), dst=vip.ip) /
388                           vip.l4p(sport=sport, dport=vip.port) /
389                           Raw())
390                     self.send_and_expect(self.pg0,
391                                          p1 * N_PKTS,
392                                          self.pg2)
393
394         for tr in trs:
395             tr.delete()
396
397         self.assertTrue(self.vapi.cnat_session_dump())
398         self.vapi.cnat_session_purge()
399         self.assertFalse(self.vapi.cnat_session_dump())
400
401     def test_cnat6(self):
402         # """ CNat Translation ipv6 """
403         vips = [
404             Ep("30::1", 5555),
405             Ep("30::2", 5554),
406             Ep("30::2", 5553, UDP),
407         ]
408
409         self.pg0.generate_remote_hosts(len(vips))
410         self.pg0.configure_ipv6_neighbors()
411         self.pg1.generate_remote_hosts(len(vips))
412         self.pg1.configure_ipv6_neighbors()
413
414         self.cnat_translation(vips, isV6=True)
415
416     def test_cnat4(self):
417         # """ CNat Translation ipv4 """
418
419         vips = [
420             Ep("30.0.0.1", 5555),
421             Ep("30.0.0.2", 5554),
422             Ep("30.0.0.2", 5553, UDP),
423         ]
424
425         self.pg0.generate_remote_hosts(len(vips))
426         self.pg0.configure_ipv4_neighbors()
427         self.pg1.generate_remote_hosts(len(vips))
428         self.pg1.configure_ipv4_neighbors()
429
430         self.cnat_translation(vips)
431
432
433 class TestCNatSourceNAT(VppTestCase):
434     """ CNat Source NAT """
435     extra_vpp_punt_config = ["cnat", "{",
436                              "session-max-age", "1",
437                              "tcp-max-age", "1", "}"]
438
439     @classmethod
440     def setUpClass(cls):
441         super(TestCNatSourceNAT, cls).setUpClass()
442
443     @classmethod
444     def tearDownClass(cls):
445         super(TestCNatSourceNAT, cls).tearDownClass()
446
447     def setUp(self):
448         super(TestCNatSourceNAT, self).setUp()
449
450         self.create_pg_interfaces(range(3))
451
452         for i in self.pg_interfaces:
453             i.admin_up()
454             i.config_ip4()
455             i.resolve_arp()
456             i.config_ip6()
457             i.resolve_ndp()
458
459     def tearDown(self):
460         for i in self.pg_interfaces:
461             i.unconfig_ip4()
462             i.unconfig_ip6()
463             i.admin_down()
464         super(TestCNatSourceNAT, self).tearDown()
465
466     def cnat_set_snat_address(self, srcNatAddr, interface, isV6=False):
467         t1 = VppCNATSourceNat(self, srcNatAddr)
468         t1.add_vpp_config()
469         cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast"
470         cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat"
471         self.vapi.feature_enable_disable(
472             enable=1,
473             arc_name=cnat_arc_name,
474             feature_name=cnat_feature_name,
475             sw_if_index=interface.sw_if_index)
476
477         return t1
478
479     def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False):
480         ip_v = "ip6" if isV6 else "ip4"
481         ip_class = IPv6 if isV6 else IP
482         sports = [1234, 1235, 1236]
483         dports = [6661, 6662, 6663]
484
485         self.pg0.generate_remote_hosts(1)
486         self.pg0.configure_ipv4_neighbors()
487         self.pg0.configure_ipv6_neighbors()
488         self.pg1.generate_remote_hosts(len(sports))
489         self.pg1.configure_ipv4_neighbors()
490         self.pg1.configure_ipv6_neighbors()
491
492         self.vapi.cli("test cnat scanner on")
493         t1 = self.cnat_set_snat_address(srcNatAddr, self.pg0, isV6)
494
495         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
496             # from pods to outside network
497             p1 = (
498                 Ether(
499                     dst=self.pg0.local_mac,
500                     src=self.pg0.remote_hosts[0].mac) /
501                 ip_class(
502                     src=getattr(self.pg0.remote_hosts[0], ip_v),
503                     dst=getattr(remote_host, ip_v)) /
504                 l4p(sport=sports[nbr], dport=dports[nbr]) /
505                 Raw())
506
507             rxs = self.send_and_expect(
508                 self.pg0,
509                 p1 * N_PKTS,
510                 self.pg1)
511             for rx in rxs:
512                 self.assert_packet_checksums_valid(rx)
513                 self.assertEqual(
514                     rx[ip_class].dst,
515                     getattr(remote_host, ip_v))
516                 self.assertEqual(rx[l4p].dport, dports[nbr])
517                 self.assertEqual(
518                     rx[ip_class].src,
519                     srcNatAddr)
520                 sport = rx[l4p].sport
521
522             # from outside to pods
523             p2 = (
524                 Ether(
525                     dst=self.pg1.local_mac,
526                     src=self.pg1.remote_hosts[nbr].mac) /
527                 ip_class(src=getattr(remote_host, ip_v), dst=srcNatAddr) /
528                 l4p(sport=dports[nbr], dport=sport) /
529                 Raw())
530
531             rxs = self.send_and_expect(
532                 self.pg1,
533                 p2 * N_PKTS,
534                 self.pg0)
535
536             for rx in rxs:
537                 self.assert_packet_checksums_valid(rx)
538                 self.assertEqual(
539                     rx[ip_class].dst,
540                     getattr(self.pg0.remote_hosts[0], ip_v))
541                 self.assertEqual(rx[l4p].dport, sports[nbr])
542                 self.assertEqual(rx[l4p].sport, dports[nbr])
543                 self.assertEqual(
544                     rx[ip_class].src,
545                     getattr(remote_host, ip_v))
546
547             # add remote host to exclude list
548             subnet_mask = 100 if isV6 else 16
549             subnet = getattr(remote_host, ip_v) + "/" + str(subnet_mask)
550             exclude_subnet = ip_network(subnet, strict=False)
551
552             t1.cnat_exclude_subnet(exclude_subnet)
553             self.vapi.cnat_session_purge()
554
555             rxs = self.send_and_expect(
556                 self.pg0,
557                 p1 * N_PKTS,
558                 self.pg1)
559             for rx in rxs:
560                 self.assert_packet_checksums_valid(rx)
561                 self.assertEqual(
562                     rx[ip_class].dst,
563                     getattr(remote_host, ip_v))
564                 self.assertEqual(rx[l4p].dport, dports[nbr])
565                 self.assertEqual(
566                     rx[ip_class].src,
567                     getattr(self.pg0.remote_hosts[0], ip_v))
568
569             # remove remote host from exclude list
570             t1.cnat_exclude_subnet(exclude_subnet, isAdd=False)
571             self.vapi.cnat_session_purge()
572
573             rxs = self.send_and_expect(
574                 self.pg0,
575                 p1 * N_PKTS,
576                 self.pg1)
577
578             for rx in rxs:
579                 self.assert_packet_checksums_valid(rx)
580                 self.assertEqual(
581                     rx[ip_class].dst,
582                     getattr(remote_host, ip_v))
583                 self.assertEqual(rx[l4p].dport, dports[nbr])
584                 self.assertEqual(
585                     rx[ip_class].src,
586                     srcNatAddr)
587
588     def test_cnat6_sourcenat(self):
589         # """ CNat Source Nat ipv6 """
590         self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True)
591         self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True)
592
593     def test_cnat4_sourcenat(self):
594         # """ CNat Source Nat ipv4 """
595         self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP)
596         self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP)
597
598
599 if __name__ == '__main__':
600     unittest.main(testRunner=VppTestRunner)