tests: Use errno value rather than a specific int
[vpp.git] / test / test_linux_cp.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from scapy.layers.inet import IP, UDP
6 from scapy.layers.inet6 import IPv6, Raw
7 from scapy.layers.l2 import Ether, ARP
8
9 from util import reassemble4
10 from vpp_object import VppObject
11 from framework import VppTestCase
12 from asfframework import VppTestRunner
13 from vpp_ipip_tun_interface import VppIpIpTunInterface
14 from template_ipsec import (
15     TemplateIpsec,
16     IpsecTun4,
17 )
18 from template_ipsec import (
19     TemplateIpsec,
20     IpsecTun4,
21 )
22 from test_ipsec_tun_if_esp import TemplateIpsecItf4
23
24
25 class VppLcpPair(VppObject):
26     def __init__(self, test, phy, host):
27         self._test = test
28         self.phy = phy
29         self.host = host
30
31     def add_vpp_config(self):
32         self._test.vapi.cli("test lcp add phy %s host %s" % (self.phy, self.host))
33         self._test.registry.register(self, self._test.logger)
34         return self
35
36     def remove_vpp_config(self):
37         self._test.vapi.cli("test lcp del phy %s host %s" % (self.phy, self.host))
38
39     def object_id(self):
40         return "lcp:%d:%d" % (self.phy.sw_if_index, self.host.sw_if_index)
41
42     def query_vpp_config(self):
43         pairs = list(self._test.vapi.vpp.details_iter(self._test.vapi.lcp_itf_pair_get))
44
45         for p in pairs:
46             if (
47                 p.phy_sw_if_index == self.phy.sw_if_index
48                 and p.host_sw_if_index == self.host.sw_if_index
49             ):
50                 return True
51         return False
52
53
54 class TestLinuxCP(VppTestCase):
55     """Linux Control Plane"""
56
57     extra_vpp_plugin_config = [
58         "plugin",
59         "linux_cp_plugin.so",
60         "{",
61         "enable",
62         "}",
63         "plugin",
64         "linux_cp_unittest_plugin.so",
65         "{",
66         "enable",
67         "}",
68     ]
69
70     @classmethod
71     def setUpClass(cls):
72         super(TestLinuxCP, cls).setUpClass()
73
74     @classmethod
75     def tearDownClass(cls):
76         super(TestLinuxCP, cls).tearDownClass()
77
78     def setUp(self):
79         super(TestLinuxCP, self).setUp()
80
81         # create 4 pg interfaces so we can create two pairs
82         self.create_pg_interfaces(range(4))
83
84         # create on ip4 and one ip6 pg tun
85         self.pg_interfaces += self.create_pg_ip4_interfaces(range(4, 5))
86         self.pg_interfaces += self.create_pg_ip6_interfaces(range(5, 6))
87
88         for i in self.pg_interfaces:
89             i.admin_up()
90
91     def tearDown(self):
92         for i in self.pg_interfaces:
93             i.admin_down()
94         super(TestLinuxCP, self).tearDown()
95
96     def test_linux_cp_tap(self):
97         """Linux CP TAP"""
98
99         #
100         # Setup
101         #
102
103         arp_opts = {"who-has": 1, "is-at": 2}
104
105         # create two pairs, wihch a bunch of hots on the phys
106         hosts = [self.pg0, self.pg1]
107         phys = [self.pg2, self.pg3]
108         N_HOSTS = 4
109
110         for phy in phys:
111             phy.config_ip4()
112             phy.generate_remote_hosts(4)
113             phy.configure_ipv4_neighbors()
114
115         pair1 = VppLcpPair(self, phys[0], hosts[0]).add_vpp_config()
116         pair2 = VppLcpPair(self, phys[1], hosts[1]).add_vpp_config()
117
118         self.logger.info(self.vapi.cli("sh lcp adj verbose"))
119         self.logger.info(self.vapi.cli("sh lcp"))
120
121         #
122         # Traffic Tests
123         #
124
125         # hosts to phys
126         for phy, host in zip(phys, hosts):
127             for j in range(N_HOSTS):
128                 p = (
129                     Ether(src=phy.local_mac, dst=phy.remote_hosts[j].mac)
130                     / IP(src=phy.local_ip4, dst=phy.remote_hosts[j].ip4)
131                     / UDP(sport=1234, dport=1234)
132                     / Raw()
133                 )
134
135                 rxs = self.send_and_expect(host, [p], phy)
136
137                 # verify packet is unchanged
138                 for rx in rxs:
139                     self.assertEqual(p.show2(True), rx.show2(True))
140
141                 # ARPs x-connect to phy
142                 p = Ether(dst="ff:ff:ff:ff:ff:ff", src=phy.remote_hosts[j].mac) / ARP(
143                     op="who-has",
144                     hwdst=phy.remote_hosts[j].mac,
145                     hwsrc=phy.local_mac,
146                     psrc=phy.local_ip4,
147                     pdst=phy.remote_hosts[j].ip4,
148                 )
149
150                 rxs = self.send_and_expect(host, [p], phy)
151
152                 # verify packet is unchanged
153                 for rx in rxs:
154                     self.assertEqual(p.show2(True), rx.show2(True))
155
156         # phy to host
157         for phy, host in zip(phys, hosts):
158             for j in range(N_HOSTS):
159                 p = (
160                     Ether(dst=phy.local_mac, src=phy.remote_hosts[j].mac)
161                     / IP(dst=phy.local_ip4, src=phy.remote_hosts[j].ip4)
162                     / UDP(sport=1234, dport=1234)
163                     / Raw()
164                 )
165
166                 rxs = self.send_and_expect(phy, [p], host)
167
168                 # verify packet is unchanged
169                 for rx in rxs:
170                     self.assertEqual(p.show2(True), rx.show2(True))
171
172                 # ARPs rx'd on the phy are sent to the host
173                 p = Ether(dst="ff:ff:ff:ff:ff:ff", src=phy.remote_hosts[j].mac) / ARP(
174                     op="is-at",
175                     hwsrc=phy.remote_hosts[j].mac,
176                     hwdst=phy.local_mac,
177                     pdst=phy.local_ip4,
178                     psrc=phy.remote_hosts[j].ip4,
179                 )
180
181                 rxs = self.send_and_expect(phy, [p], host)
182
183                 # verify packet is unchanged
184                 for rx in rxs:
185                     self.assertEqual(p.show2(True), rx.show2(True))
186
187         # cleanup
188         for phy in phys:
189             phy.unconfig_ip4()
190
191     def test_linux_cp_tun(self):
192         """Linux CP TUN"""
193
194         #
195         # Setup
196         #
197         N_PKTS = 31
198
199         # create two pairs, wihch a bunch of hots on the phys
200         hosts = [self.pg4, self.pg5]
201         phy = self.pg2
202
203         phy.config_ip4()
204         phy.config_ip6()
205         phy.resolve_arp()
206         phy.resolve_ndp()
207
208         tun4 = VppIpIpTunInterface(
209             self, phy, phy.local_ip4, phy.remote_ip4
210         ).add_vpp_config()
211         tun6 = VppIpIpTunInterface(
212             self, phy, phy.local_ip6, phy.remote_ip6
213         ).add_vpp_config()
214         tuns = [tun4, tun6]
215
216         tun4.admin_up()
217         tun4.config_ip4()
218         tun6.admin_up()
219         tun6.config_ip6()
220
221         pair1 = VppLcpPair(self, tuns[0], hosts[0]).add_vpp_config()
222         pair2 = VppLcpPair(self, tuns[1], hosts[1]).add_vpp_config()
223
224         self.logger.info(self.vapi.cli("sh lcp adj verbose"))
225         self.logger.info(self.vapi.cli("sh lcp"))
226         self.logger.info(self.vapi.cli("sh ip punt redirect"))
227
228         #
229         # Traffic Tests
230         #
231
232         # host to phy for v4
233         p = IP(src=tun4.local_ip4, dst="2.2.2.2") / UDP(sport=1234, dport=1234) / Raw()
234
235         rxs = self.send_and_expect(self.pg4, p * N_PKTS, phy)
236
237         # verify inner packet is unchanged and has the tunnel encap
238         for rx in rxs:
239             self.assertEqual(rx[Ether].dst, phy.remote_mac)
240             self.assertEqual(rx[IP].dst, phy.remote_ip4)
241             self.assertEqual(rx[IP].src, phy.local_ip4)
242             inner = IP(rx[IP].payload)
243             self.assertEqual(inner.src, tun4.local_ip4)
244             self.assertEqual(inner.dst, "2.2.2.2")
245
246         # host to phy for v6
247         p = IPv6(src=tun6.local_ip6, dst="2::2") / UDP(sport=1234, dport=1234) / Raw()
248
249         rxs = self.send_and_expect(self.pg5, p * N_PKTS, phy)
250
251         # verify inner packet is unchanged and has the tunnel encap
252         for rx in rxs:
253             self.assertEqual(rx[IPv6].dst, phy.remote_ip6)
254             self.assertEqual(rx[IPv6].src, phy.local_ip6)
255             inner = IPv6(rx[IPv6].payload)
256             self.assertEqual(inner.src, tun6.local_ip6)
257             self.assertEqual(inner.dst, "2::2")
258
259         # phy to host v4
260         p = (
261             Ether(dst=phy.local_mac, src=phy.remote_mac)
262             / IP(dst=phy.local_ip4, src=phy.remote_ip4)
263             / IP(dst=tun4.local_ip4, src=tun4.remote_ip4)
264             / UDP(sport=1234, dport=1234)
265             / Raw()
266         )
267
268         rxs = self.send_and_expect(phy, p * N_PKTS, self.pg4)
269         for rx in rxs:
270             rx = IP(rx)
271             self.assertEqual(rx[IP].dst, tun4.local_ip4)
272             self.assertEqual(rx[IP].src, tun4.remote_ip4)
273
274         # phy to host v6
275         p = (
276             Ether(dst=phy.local_mac, src=phy.remote_mac)
277             / IPv6(dst=phy.local_ip6, src=phy.remote_ip6)
278             / IPv6(dst=tun6.local_ip6, src=tun6.remote_ip6)
279             / UDP(sport=1234, dport=1234)
280             / Raw()
281         )
282
283         rxs = self.send_and_expect(phy, p * N_PKTS, self.pg5)
284         for rx in rxs:
285             rx = IPv6(rx)
286             self.assertEqual(rx[IPv6].dst, tun6.local_ip6)
287             self.assertEqual(rx[IPv6].src, tun6.remote_ip6)
288
289         # cleanup
290         phy.unconfig_ip4()
291         phy.unconfig_ip6()
292
293         tun4.unconfig_ip4()
294         tun6.unconfig_ip6()
295
296
297 class TestLinuxCPIpsec(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
298     """IPsec Interface IPv4"""
299
300     extra_vpp_plugin_config = [
301         "plugin",
302         "linux_cp_plugin.so",
303         "{",
304         "enable",
305         "}",
306         "plugin",
307         "linux_cp_unittest_plugin.so",
308         "{",
309         "enable",
310         "}",
311     ]
312
313     def setUp(self):
314         super(TestLinuxCPIpsec, self).setUp()
315
316         self.tun_if = self.pg0
317         self.pg_interfaces += self.create_pg_ip4_interfaces(range(3, 4))
318         self.pg_interfaces += self.create_pg_ip6_interfaces(range(4, 5))
319
320     def tearDown(self):
321         super(TestLinuxCPIpsec, self).tearDown()
322
323     def verify_encrypted(self, p, sa, rxs):
324         decrypt_pkts = []
325         for rx in rxs:
326             if p.nat_header:
327                 self.assertEqual(rx[UDP].dport, 4500)
328             self.assert_packet_checksums_valid(rx)
329             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
330             try:
331                 rx_ip = rx[IP]
332                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
333                 if not decrypt_pkt.haslayer(IP):
334                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
335                 if rx_ip.proto == socket.IPPROTO_ESP:
336                     self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
337                 decrypt_pkts.append(decrypt_pkt)
338                 self.assert_equal(decrypt_pkt.src, p.tun_if.local_ip4)
339                 self.assert_equal(decrypt_pkt.dst, p.tun_if.remote_ip4)
340             except:
341                 self.logger.debug(ppp("Unexpected packet:", rx))
342                 try:
343                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
344                 except:
345                     pass
346                 raise
347         pkts = reassemble4(decrypt_pkts)
348         for pkt in pkts:
349             self.assert_packet_checksums_valid(pkt)
350
351     def verify_decrypted(self, p, rxs):
352         for rx in rxs:
353             rx = IP(rx)
354             self.assert_equal(rx[IP].src, p.tun_if.remote_ip4)
355             self.assert_equal(rx[IP].dst, p.tun_if.local_ip4)
356             self.assert_packet_checksums_valid(rx)
357
358     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
359         return [
360             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
361             / sa.encrypt(
362                 IP(src=src, dst=dst)
363                 / UDP(sport=1111, dport=2222)
364                 / Raw(b"X" * payload_size)
365             )
366             for i in range(count)
367         ]
368
369     def test_linux_cp_ipsec4_tun(self):
370         """Linux CP Ipsec TUN"""
371
372         #
373         # Setup
374         #
375         N_PKTS = 31
376
377         # the pg that paris with the tunnel
378         self.host = self.pg3
379
380         # tunnel and protection setup
381         p = self.ipv4_params
382
383         self.config_network(p)
384         self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
385         self.config_protect(p)
386
387         pair = VppLcpPair(self, p.tun_if, self.host).add_vpp_config()
388
389         self.logger.info(self.vapi.cli("sh int addr"))
390         self.logger.info(self.vapi.cli("sh lcp"))
391         self.logger.info(self.vapi.cli("sh ip punt redirect"))
392
393         #
394         # Traffic Tests
395         #
396
397         # host to phy for v4
398         pkt = (
399             IP(src=p.tun_if.local_ip4, dst=p.tun_if.remote_ip4)
400             / UDP(sport=1234, dport=1234)
401             / Raw()
402         )
403
404         rxs = self.send_and_expect(self.host, pkt * N_PKTS, self.tun_if)
405         self.verify_encrypted(p, p.vpp_tun_sa, rxs)
406
407         # phy to host for v4
408         pkts = self.gen_encrypt_pkts(
409             p,
410             p.scapy_tun_sa,
411             self.tun_if,
412             src=p.tun_if.remote_ip4,
413             dst=p.tun_if.local_ip4,
414             count=N_PKTS,
415         )
416         rxs = self.send_and_expect(self.tun_if, pkts, self.host)
417         self.verify_decrypted(p, rxs)
418
419         # cleanup
420         pair.remove_vpp_config()
421         self.unconfig_protect(p)
422         self.unconfig_sa(p)
423         self.unconfig_network(p)
424
425
426 if __name__ == "__main__":
427     unittest.main(testRunner=VppTestRunner)