tests: replace pycodestyle with black
[vpp.git] / test / test_linux_cp.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from scapy.layers.inet import IP, UDP
6 from scapy.layers.inet6 import IPv6, Raw
7 from scapy.layers.l2 import Ether, ARP, Dot1Q
8
9 from util import reassemble4
10 from vpp_object import VppObject
11 from framework import VppTestCase, VppTestRunner
12 from vpp_ipip_tun_interface import VppIpIpTunInterface
13 from template_ipsec import (
14     TemplateIpsec,
15     IpsecTun4Tests,
16     IpsecTun4,
17     mk_scapy_crypt_key,
18     config_tun_params,
19 )
20 from template_ipsec import (
21     TemplateIpsec,
22     IpsecTun4Tests,
23     IpsecTun4,
24     mk_scapy_crypt_key,
25     config_tun_params,
26 )
27 from test_ipsec_tun_if_esp import TemplateIpsecItf4
28 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
29
30
31 class VppLcpPair(VppObject):
32     def __init__(self, test, phy, host):
33         self._test = test
34         self.phy = phy
35         self.host = host
36
37     def add_vpp_config(self):
38         self._test.vapi.cli("test lcp add phy %s host %s" % (self.phy, self.host))
39         self._test.registry.register(self, self._test.logger)
40         return self
41
42     def remove_vpp_config(self):
43         self._test.vapi.cli("test lcp del phy %s host %s" % (self.phy, self.host))
44
45     def object_id(self):
46         return "lcp:%d:%d" % (self.phy.sw_if_index, self.host.sw_if_index)
47
48     def query_vpp_config(self):
49         pairs = list(self._test.vapi.vpp.details_iter(self._test.vapi.lcp_itf_pair_get))
50
51         for p in pairs:
52             if (
53                 p.phy_sw_if_index == self.phy.sw_if_index
54                 and p.host_sw_if_index == self.host.sw_if_index
55             ):
56                 return True
57         return False
58
59
60 class TestLinuxCP(VppTestCase):
61     """Linux Control Plane"""
62
63     extra_vpp_plugin_config = [
64         "plugin",
65         "linux_cp_plugin.so",
66         "{",
67         "enable",
68         "}",
69         "plugin",
70         "linux_cp_unittest_plugin.so",
71         "{",
72         "enable",
73         "}",
74     ]
75
76     @classmethod
77     def setUpClass(cls):
78         super(TestLinuxCP, cls).setUpClass()
79
80     @classmethod
81     def tearDownClass(cls):
82         super(TestLinuxCP, cls).tearDownClass()
83
84     def setUp(self):
85         super(TestLinuxCP, self).setUp()
86
87         # create 4 pg interfaces so we can create two pairs
88         self.create_pg_interfaces(range(4))
89
90         # create on ip4 and one ip6 pg tun
91         self.pg_interfaces += self.create_pg_ip4_interfaces(range(4, 5))
92         self.pg_interfaces += self.create_pg_ip6_interfaces(range(5, 6))
93
94         for i in self.pg_interfaces:
95             i.admin_up()
96
97     def tearDown(self):
98         for i in self.pg_interfaces:
99             i.admin_down()
100         super(TestLinuxCP, self).tearDown()
101
102     def test_linux_cp_tap(self):
103         """Linux CP TAP"""
104
105         #
106         # Setup
107         #
108
109         arp_opts = {"who-has": 1, "is-at": 2}
110
111         # create two pairs, wihch a bunch of hots on the phys
112         hosts = [self.pg0, self.pg1]
113         phys = [self.pg2, self.pg3]
114         N_HOSTS = 4
115
116         for phy in phys:
117             phy.config_ip4()
118             phy.generate_remote_hosts(4)
119             phy.configure_ipv4_neighbors()
120
121         pair1 = VppLcpPair(self, phys[0], hosts[0]).add_vpp_config()
122         pair2 = VppLcpPair(self, phys[1], hosts[1]).add_vpp_config()
123
124         self.logger.info(self.vapi.cli("sh lcp adj verbose"))
125         self.logger.info(self.vapi.cli("sh lcp"))
126
127         #
128         # Traffic Tests
129         #
130
131         # hosts to phys
132         for phy, host in zip(phys, hosts):
133             for j in range(N_HOSTS):
134                 p = (
135                     Ether(src=phy.local_mac, dst=phy.remote_hosts[j].mac)
136                     / IP(src=phy.local_ip4, dst=phy.remote_hosts[j].ip4)
137                     / UDP(sport=1234, dport=1234)
138                     / Raw()
139                 )
140
141                 rxs = self.send_and_expect(host, [p], phy)
142
143                 # verify packet is unchanged
144                 for rx in rxs:
145                     self.assertEqual(p.show2(True), rx.show2(True))
146
147                 # ARPs x-connect to phy
148                 p = Ether(dst="ff:ff:ff:ff:ff:ff", src=phy.remote_hosts[j].mac) / ARP(
149                     op="who-has",
150                     hwdst=phy.remote_hosts[j].mac,
151                     hwsrc=phy.local_mac,
152                     psrc=phy.local_ip4,
153                     pdst=phy.remote_hosts[j].ip4,
154                 )
155
156                 rxs = self.send_and_expect(host, [p], phy)
157
158                 # verify packet is unchanged
159                 for rx in rxs:
160                     self.assertEqual(p.show2(True), rx.show2(True))
161
162         # phy to host
163         for phy, host in zip(phys, hosts):
164             for j in range(N_HOSTS):
165                 p = (
166                     Ether(dst=phy.local_mac, src=phy.remote_hosts[j].mac)
167                     / IP(dst=phy.local_ip4, src=phy.remote_hosts[j].ip4)
168                     / UDP(sport=1234, dport=1234)
169                     / Raw()
170                 )
171
172                 rxs = self.send_and_expect(phy, [p], host)
173
174                 # verify packet is unchanged
175                 for rx in rxs:
176                     self.assertEqual(p.show2(True), rx.show2(True))
177
178                 # ARPs rx'd on the phy are sent to the host
179                 p = Ether(dst="ff:ff:ff:ff:ff:ff", src=phy.remote_hosts[j].mac) / ARP(
180                     op="is-at",
181                     hwsrc=phy.remote_hosts[j].mac,
182                     hwdst=phy.local_mac,
183                     pdst=phy.local_ip4,
184                     psrc=phy.remote_hosts[j].ip4,
185                 )
186
187                 rxs = self.send_and_expect(phy, [p], host)
188
189                 # verify packet is unchanged
190                 for rx in rxs:
191                     self.assertEqual(p.show2(True), rx.show2(True))
192
193         # cleanup
194         for phy in phys:
195             phy.unconfig_ip4()
196
197     def test_linux_cp_tun(self):
198         """Linux CP TUN"""
199
200         #
201         # Setup
202         #
203         N_PKTS = 31
204
205         # create two pairs, wihch a bunch of hots on the phys
206         hosts = [self.pg4, self.pg5]
207         phy = self.pg2
208
209         phy.config_ip4()
210         phy.config_ip6()
211         phy.resolve_arp()
212         phy.resolve_ndp()
213
214         tun4 = VppIpIpTunInterface(
215             self, phy, phy.local_ip4, phy.remote_ip4
216         ).add_vpp_config()
217         tun6 = VppIpIpTunInterface(
218             self, phy, phy.local_ip6, phy.remote_ip6
219         ).add_vpp_config()
220         tuns = [tun4, tun6]
221
222         tun4.admin_up()
223         tun4.config_ip4()
224         tun6.admin_up()
225         tun6.config_ip6()
226
227         pair1 = VppLcpPair(self, tuns[0], hosts[0]).add_vpp_config()
228         pair2 = VppLcpPair(self, tuns[1], hosts[1]).add_vpp_config()
229
230         self.logger.info(self.vapi.cli("sh lcp adj verbose"))
231         self.logger.info(self.vapi.cli("sh lcp"))
232         self.logger.info(self.vapi.cli("sh ip punt redirect"))
233
234         #
235         # Traffic Tests
236         #
237
238         # host to phy for v4
239         p = IP(src=tun4.local_ip4, dst="2.2.2.2") / UDP(sport=1234, dport=1234) / Raw()
240
241         rxs = self.send_and_expect(self.pg4, p * N_PKTS, phy)
242
243         # verify inner packet is unchanged and has the tunnel encap
244         for rx in rxs:
245             self.assertEqual(rx[Ether].dst, phy.remote_mac)
246             self.assertEqual(rx[IP].dst, phy.remote_ip4)
247             self.assertEqual(rx[IP].src, phy.local_ip4)
248             inner = IP(rx[IP].payload)
249             self.assertEqual(inner.src, tun4.local_ip4)
250             self.assertEqual(inner.dst, "2.2.2.2")
251
252         # host to phy for v6
253         p = IPv6(src=tun6.local_ip6, dst="2::2") / UDP(sport=1234, dport=1234) / Raw()
254
255         rxs = self.send_and_expect(self.pg5, p * N_PKTS, phy)
256
257         # verify inner packet is unchanged and has the tunnel encap
258         for rx in rxs:
259             self.assertEqual(rx[IPv6].dst, phy.remote_ip6)
260             self.assertEqual(rx[IPv6].src, phy.local_ip6)
261             inner = IPv6(rx[IPv6].payload)
262             self.assertEqual(inner.src, tun6.local_ip6)
263             self.assertEqual(inner.dst, "2::2")
264
265         # phy to host v4
266         p = (
267             Ether(dst=phy.local_mac, src=phy.remote_mac)
268             / IP(dst=phy.local_ip4, src=phy.remote_ip4)
269             / IP(dst=tun4.local_ip4, src=tun4.remote_ip4)
270             / UDP(sport=1234, dport=1234)
271             / Raw()
272         )
273
274         rxs = self.send_and_expect(phy, p * N_PKTS, self.pg4)
275         for rx in rxs:
276             rx = IP(rx)
277             self.assertEqual(rx[IP].dst, tun4.local_ip4)
278             self.assertEqual(rx[IP].src, tun4.remote_ip4)
279
280         # phy to host v6
281         p = (
282             Ether(dst=phy.local_mac, src=phy.remote_mac)
283             / IPv6(dst=phy.local_ip6, src=phy.remote_ip6)
284             / IPv6(dst=tun6.local_ip6, src=tun6.remote_ip6)
285             / UDP(sport=1234, dport=1234)
286             / Raw()
287         )
288
289         rxs = self.send_and_expect(phy, p * N_PKTS, self.pg5)
290         for rx in rxs:
291             rx = IPv6(rx)
292             self.assertEqual(rx[IPv6].dst, tun6.local_ip6)
293             self.assertEqual(rx[IPv6].src, tun6.remote_ip6)
294
295         # cleanup
296         phy.unconfig_ip4()
297         phy.unconfig_ip6()
298
299         tun4.unconfig_ip4()
300         tun6.unconfig_ip6()
301
302
303 class TestLinuxCPIpsec(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
304     """IPsec Interface IPv4"""
305
306     extra_vpp_plugin_config = [
307         "plugin",
308         "linux_cp_plugin.so",
309         "{",
310         "enable",
311         "}",
312         "plugin",
313         "linux_cp_unittest_plugin.so",
314         "{",
315         "enable",
316         "}",
317     ]
318
319     def setUp(self):
320         super(TestLinuxCPIpsec, self).setUp()
321
322         self.tun_if = self.pg0
323         self.pg_interfaces += self.create_pg_ip4_interfaces(range(3, 4))
324         self.pg_interfaces += self.create_pg_ip6_interfaces(range(4, 5))
325
326     def tearDown(self):
327         super(TestLinuxCPIpsec, self).tearDown()
328
329     def verify_encrypted(self, p, sa, rxs):
330         decrypt_pkts = []
331         for rx in rxs:
332             if p.nat_header:
333                 self.assertEqual(rx[UDP].dport, 4500)
334             self.assert_packet_checksums_valid(rx)
335             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
336             try:
337                 rx_ip = rx[IP]
338                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
339                 if not decrypt_pkt.haslayer(IP):
340                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
341                 if rx_ip.proto == socket.IPPROTO_ESP:
342                     self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
343                 decrypt_pkts.append(decrypt_pkt)
344                 self.assert_equal(decrypt_pkt.src, p.tun_if.local_ip4)
345                 self.assert_equal(decrypt_pkt.dst, p.tun_if.remote_ip4)
346             except:
347                 self.logger.debug(ppp("Unexpected packet:", rx))
348                 try:
349                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
350                 except:
351                     pass
352                 raise
353         pkts = reassemble4(decrypt_pkts)
354         for pkt in pkts:
355             self.assert_packet_checksums_valid(pkt)
356
357     def verify_decrypted(self, p, rxs):
358         for rx in rxs:
359             rx = IP(rx)
360             self.assert_equal(rx[IP].src, p.tun_if.remote_ip4)
361             self.assert_equal(rx[IP].dst, p.tun_if.local_ip4)
362             self.assert_packet_checksums_valid(rx)
363
364     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
365         return [
366             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
367             / sa.encrypt(
368                 IP(src=src, dst=dst)
369                 / UDP(sport=1111, dport=2222)
370                 / Raw(b"X" * payload_size)
371             )
372             for i in range(count)
373         ]
374
375     def test_linux_cp_ipsec4_tun(self):
376         """Linux CP Ipsec TUN"""
377
378         #
379         # Setup
380         #
381         N_PKTS = 31
382
383         # the pg that paris with the tunnel
384         self.host = self.pg3
385
386         # tunnel and protection setup
387         p = self.ipv4_params
388
389         self.config_network(p)
390         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
391         self.config_protect(p)
392
393         pair = VppLcpPair(self, p.tun_if, self.host).add_vpp_config()
394
395         self.logger.info(self.vapi.cli("sh int addr"))
396         self.logger.info(self.vapi.cli("sh lcp"))
397         self.logger.info(self.vapi.cli("sh ip punt redirect"))
398
399         #
400         # Traffic Tests
401         #
402
403         # host to phy for v4
404         pkt = (
405             IP(src=p.tun_if.local_ip4, dst=p.tun_if.remote_ip4)
406             / UDP(sport=1234, dport=1234)
407             / Raw()
408         )
409
410         rxs = self.send_and_expect(self.host, pkt * N_PKTS, self.tun_if)
411         self.verify_encrypted(p, p.vpp_tun_sa, rxs)
412
413         # phy to host for v4
414         pkts = self.gen_encrypt_pkts(
415             p,
416             p.scapy_tun_sa,
417             self.tun_if,
418             src=p.tun_if.remote_ip4,
419             dst=p.tun_if.local_ip4,
420             count=N_PKTS,
421         )
422         rxs = self.send_and_expect(self.tun_if, pkts, self.host)
423         self.verify_decrypted(p, rxs)
424
425         # cleanup
426         pair.remove_vpp_config()
427         self.unconfig_protect(p)
428         self.unconfig_sa(p)
429         self.unconfig_network(p)
430
431
432 if __name__ == "__main__":
433     unittest.main(testRunner=VppTestRunner)