tests: stabilize wireguard ratelimiting test
[vpp.git] / test / test_wireguard.py
1 #!/usr/bin/env python3
2 """ Wg tests """
3
4 import datetime
5 import base64
6 import os
7
8 from hashlib import blake2s
9 from scapy.packet import Packet
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP, UDP
13 from scapy.layers.inet6 import IPv6
14 from scapy.contrib.wireguard import (
15     Wireguard,
16     WireguardResponse,
17     WireguardInitiation,
18     WireguardTransport,
19     WireguardCookieReply,
20 )
21 from cryptography.hazmat.primitives.asymmetric.x25519 import (
22     X25519PrivateKey,
23     X25519PublicKey,
24 )
25 from cryptography.hazmat.primitives.serialization import (
26     Encoding,
27     PrivateFormat,
28     PublicFormat,
29     NoEncryption,
30 )
31 from cryptography.hazmat.primitives.hashes import BLAKE2s, Hash
32 from cryptography.hazmat.primitives.hmac import HMAC
33 from cryptography.hazmat.backends import default_backend
34 from noise.connection import NoiseConnection, Keypair
35
36 from Crypto.Cipher import ChaCha20_Poly1305
37 from Crypto.Random import get_random_bytes
38
39 from vpp_ipip_tun_interface import VppIpIpTunInterface
40 from vpp_interface import VppInterface
41 from vpp_pg_interface import is_ipv6_misc
42 from vpp_ip_route import VppIpRoute, VppRoutePath
43 from vpp_object import VppObject
44 from vpp_papi import VppEnum
45 from framework import is_distro_ubuntu2204, is_distro_debian11
46 from framework import VppTestCase
47 from re import compile
48 import unittest
49
50 """ TestWg is a subclass of  VPPTestCase classes.
51
52 Wg test.
53
54 """
55
56
57 def private_key_bytes(k):
58     return k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
59
60
61 def public_key_bytes(k):
62     return k.public_bytes(Encoding.Raw, PublicFormat.Raw)
63
64
65 def get_field_bytes(pkt, name):
66     fld, val = pkt.getfield_and_val(name)
67     return fld.i2m(pkt, val)
68
69
70 class VppWgInterface(VppInterface):
71     """
72     VPP WireGuard interface
73     """
74
75     def __init__(self, test, src, port):
76         super(VppWgInterface, self).__init__(test)
77
78         self.port = port
79         self.src = src
80         self.private_key = X25519PrivateKey.generate()
81         self.public_key = self.private_key.public_key()
82
83         # cookie related params
84         self.cookie_key = blake2s(b"cookie--" + self.public_key_bytes()).digest()
85
86     def public_key_bytes(self):
87         return public_key_bytes(self.public_key)
88
89     def private_key_bytes(self):
90         return private_key_bytes(self.private_key)
91
92     def add_vpp_config(self):
93         r = self.test.vapi.wireguard_interface_create(
94             interface={
95                 "user_instance": 0xFFFFFFFF,
96                 "port": self.port,
97                 "src_ip": self.src,
98                 "private_key": private_key_bytes(self.private_key),
99                 "generate_key": False,
100             }
101         )
102         self.set_sw_if_index(r.sw_if_index)
103         self.test.registry.register(self, self.test.logger)
104         return self
105
106     def remove_vpp_config(self):
107         self.test.vapi.wireguard_interface_delete(sw_if_index=self._sw_if_index)
108
109     def query_vpp_config(self):
110         ts = self.test.vapi.wireguard_interface_dump(sw_if_index=0xFFFFFFFF)
111         for t in ts:
112             if (
113                 t.interface.sw_if_index == self._sw_if_index
114                 and str(t.interface.src_ip) == self.src
115                 and t.interface.port == self.port
116                 and t.interface.private_key == private_key_bytes(self.private_key)
117             ):
118                 return True
119         return False
120
121     def want_events(self, peer_index=0xFFFFFFFF):
122         self.test.vapi.want_wireguard_peer_events(
123             enable_disable=1,
124             pid=os.getpid(),
125             sw_if_index=self._sw_if_index,
126             peer_index=peer_index,
127         )
128
129     def wait_events(self, expect, peers, timeout=5):
130         for i in range(len(peers)):
131             rv = self.test.vapi.wait_for_event(timeout, "wireguard_peer_event")
132             self.test.assertEqual(rv.peer_index, peers[i])
133             self.test.assertEqual(rv.flags, expect)
134
135     def __str__(self):
136         return self.object_id()
137
138     def object_id(self):
139         return "wireguard-%d" % self._sw_if_index
140
141
142 NOISE_HANDSHAKE_NAME = b"Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s"
143 NOISE_IDENTIFIER_NAME = b"WireGuard v1 zx2c4 Jason@zx2c4.com"
144
145 HANDSHAKE_COUNTING_INTERVAL = 0.5
146 UNDER_LOAD_INTERVAL = 1.0
147 HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD = 40
148 HANDSHAKE_NUM_BEFORE_RATELIMITING = 5
149
150
151 class VppWgPeer(VppObject):
152     def __init__(self, test, itf, endpoint, port, allowed_ips, persistent_keepalive=15):
153         self._test = test
154         self.itf = itf
155         self.endpoint = endpoint
156         self.port = port
157         self.allowed_ips = allowed_ips
158         self.persistent_keepalive = persistent_keepalive
159
160         # remote peer's public
161         self.private_key = X25519PrivateKey.generate()
162         self.public_key = self.private_key.public_key()
163
164         # cookie related params
165         self.cookie_key = blake2s(b"cookie--" + self.public_key_bytes()).digest()
166         self.last_sent_cookie = None
167         self.last_mac1 = None
168         self.last_received_cookie = None
169
170         self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME)
171
172     def change_endpoint(self, endpoint, port):
173         self.endpoint = endpoint
174         self.port = port
175
176     def add_vpp_config(self):
177         rv = self._test.vapi.wireguard_peer_add(
178             peer={
179                 "public_key": self.public_key_bytes(),
180                 "port": self.port,
181                 "endpoint": self.endpoint,
182                 "n_allowed_ips": len(self.allowed_ips),
183                 "allowed_ips": self.allowed_ips,
184                 "sw_if_index": self.itf.sw_if_index,
185                 "persistent_keepalive": self.persistent_keepalive,
186             }
187         )
188         self.index = rv.peer_index
189         self.receiver_index = self.index + 1
190         self._test.registry.register(self, self._test.logger)
191         return self
192
193     def remove_vpp_config(self):
194         self._test.vapi.wireguard_peer_remove(peer_index=self.index)
195
196     def object_id(self):
197         return "wireguard-peer-%s" % self.index
198
199     def public_key_bytes(self):
200         return public_key_bytes(self.public_key)
201
202     def query_vpp_config(self):
203         peers = self._test.vapi.wireguard_peers_dump()
204
205         for p in peers:
206             # "::" endpoint will be returned as "0.0.0.0" in peer's details
207             endpoint = "0.0.0.0" if self.endpoint == "::" else self.endpoint
208             if (
209                 p.peer.public_key == self.public_key_bytes()
210                 and p.peer.port == self.port
211                 and str(p.peer.endpoint) == endpoint
212                 and p.peer.sw_if_index == self.itf.sw_if_index
213                 and len(self.allowed_ips) == p.peer.n_allowed_ips
214             ):
215                 self.allowed_ips.sort()
216                 p.peer.allowed_ips.sort()
217
218                 for (a1, a2) in zip(self.allowed_ips, p.peer.allowed_ips):
219                     if str(a1) != str(a2):
220                         return False
221                 return True
222         return False
223
224     def mk_tunnel_header(self, tx_itf, is_ip6=False):
225         if is_ip6 is False:
226             return (
227                 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
228                 / IP(src=self.endpoint, dst=self.itf.src)
229                 / UDP(sport=self.port, dport=self.itf.port)
230             )
231         else:
232             return (
233                 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
234                 / IPv6(src=self.endpoint, dst=self.itf.src)
235                 / UDP(sport=self.port, dport=self.itf.port)
236             )
237
238     def noise_reset(self):
239         self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME)
240
241     def noise_init(self, public_key=None):
242         self.noise.set_prologue(NOISE_IDENTIFIER_NAME)
243         self.noise.set_psks(psk=bytes(bytearray(32)))
244
245         if not public_key:
246             public_key = self.itf.public_key
247
248         # local/this private
249         self.noise.set_keypair_from_private_bytes(
250             Keypair.STATIC, private_key_bytes(self.private_key)
251         )
252         # remote's public
253         self.noise.set_keypair_from_public_bytes(
254             Keypair.REMOTE_STATIC, public_key_bytes(public_key)
255         )
256
257         self.noise.start_handshake()
258
259     def mk_cookie(self, p, tx_itf, is_resp=False, is_ip6=False):
260         self.verify_header(p, is_ip6)
261
262         wg_pkt = Wireguard(p[Raw])
263
264         if is_resp:
265             self._test.assertEqual(wg_pkt[Wireguard].message_type, 2)
266             self._test.assertEqual(wg_pkt[Wireguard].reserved_zero, 0)
267             self._test.assertEqual(wg_pkt[WireguardResponse].mac2, bytes([0] * 16))
268         else:
269             self._test.assertEqual(wg_pkt[Wireguard].message_type, 1)
270             self._test.assertEqual(wg_pkt[Wireguard].reserved_zero, 0)
271             self._test.assertEqual(wg_pkt[WireguardInitiation].mac2, bytes([0] * 16))
272
273         # collect info from wg packet (initiation or response)
274         src = get_field_bytes(p[IPv6 if is_ip6 else IP], "src")
275         sport = p[UDP].sport.to_bytes(2, byteorder="big")
276         if is_resp:
277             mac1 = wg_pkt[WireguardResponse].mac1
278             sender_index = wg_pkt[WireguardResponse].sender_index
279         else:
280             mac1 = wg_pkt[WireguardInitiation].mac1
281             sender_index = wg_pkt[WireguardInitiation].sender_index
282
283         # make cookie reply
284         cookie_reply = Wireguard() / WireguardCookieReply()
285         cookie_reply[Wireguard].message_type = 3
286         cookie_reply[Wireguard].reserved_zero = 0
287         cookie_reply[WireguardCookieReply].receiver_index = sender_index
288         nonce = get_random_bytes(24)
289         cookie_reply[WireguardCookieReply].nonce = nonce
290
291         # generate cookie data
292         changing_secret = get_random_bytes(32)
293         self.last_sent_cookie = blake2s(
294             src + sport, digest_size=16, key=changing_secret
295         ).digest()
296
297         # encrypt cookie data
298         cipher = ChaCha20_Poly1305.new(key=self.cookie_key, nonce=nonce)
299         cipher.update(mac1)
300         ciphertext, tag = cipher.encrypt_and_digest(self.last_sent_cookie)
301         cookie_reply[WireguardCookieReply].encrypted_cookie = ciphertext + tag
302
303         # prepare cookie reply to be sent
304         cookie_reply = self.mk_tunnel_header(tx_itf, is_ip6) / cookie_reply
305
306         return cookie_reply
307
308     def consume_cookie(self, p, is_ip6=False):
309         self.verify_header(p, is_ip6)
310
311         cookie_reply = Wireguard(p[Raw])
312
313         self._test.assertEqual(cookie_reply[Wireguard].message_type, 3)
314         self._test.assertEqual(cookie_reply[Wireguard].reserved_zero, 0)
315         self._test.assertEqual(
316             cookie_reply[WireguardCookieReply].receiver_index, self.receiver_index
317         )
318
319         # collect info from cookie reply
320         nonce = cookie_reply[WireguardCookieReply].nonce
321         encrypted_cookie = cookie_reply[WireguardCookieReply].encrypted_cookie
322         ciphertext, tag = encrypted_cookie[:16], encrypted_cookie[16:]
323
324         # decrypt cookie data
325         cipher = ChaCha20_Poly1305.new(key=self.itf.cookie_key, nonce=nonce)
326         cipher.update(self.last_mac1)
327         self.last_received_cookie = cipher.decrypt_and_verify(ciphertext, tag)
328
329     def mk_handshake(self, tx_itf, is_ip6=False, public_key=None):
330         self.noise.set_as_initiator()
331         self.noise_init(public_key)
332
333         p = Wireguard() / WireguardInitiation()
334
335         p[Wireguard].message_type = 1
336         p[Wireguard].reserved_zero = 0
337         p[WireguardInitiation].sender_index = self.receiver_index
338
339         # some random data for the message
340         #  lifted from the noise protocol's wireguard example
341         now = datetime.datetime.now()
342         tai = struct.pack(
343             "!qi",
344             4611686018427387914 + int(now.timestamp()),
345             int(now.microsecond * 1e3),
346         )
347         b = self.noise.write_message(payload=tai)
348
349         # load noise into init message
350         p[WireguardInitiation].unencrypted_ephemeral = b[0:32]
351         p[WireguardInitiation].encrypted_static = b[32:80]
352         p[WireguardInitiation].encrypted_timestamp = b[80:108]
353
354         # generate the mac1 hash
355         mac_key = blake2s(b"mac1----" + self.itf.public_key_bytes()).digest()
356         mac1 = blake2s(bytes(p)[0:116], digest_size=16, key=mac_key).digest()
357         p[WireguardInitiation].mac1 = mac1
358         self.last_mac1 = mac1
359
360         # generate the mac2 hash
361         if self.last_received_cookie:
362             mac2 = blake2s(
363                 bytes(p)[0:132], digest_size=16, key=self.last_received_cookie
364             ).digest()
365             p[WireguardInitiation].mac2 = mac2
366             self.last_received_cookie = None
367         else:
368             p[WireguardInitiation].mac2 = bytearray(16)
369
370         p = self.mk_tunnel_header(tx_itf, is_ip6) / p
371
372         return p
373
374     def verify_header(self, p, is_ip6=False):
375         if is_ip6 is False:
376             self._test.assertEqual(p[IP].src, self.itf.src)
377             self._test.assertEqual(p[IP].dst, self.endpoint)
378         else:
379             self._test.assertEqual(p[IPv6].src, self.itf.src)
380             self._test.assertEqual(p[IPv6].dst, self.endpoint)
381         self._test.assertEqual(p[UDP].sport, self.itf.port)
382         self._test.assertEqual(p[UDP].dport, self.port)
383         self._test.assert_packet_checksums_valid(p)
384
385     def consume_init(self, p, tx_itf, is_ip6=False, is_mac2=False):
386         self.noise.set_as_responder()
387         self.noise_init(self.itf.public_key)
388         self.verify_header(p, is_ip6)
389
390         init = Wireguard(p[Raw])
391
392         self._test.assertEqual(init[Wireguard].message_type, 1)
393         self._test.assertEqual(init[Wireguard].reserved_zero, 0)
394
395         self.sender = init[WireguardInitiation].sender_index
396
397         # validate the mac1 hash
398         mac_key = blake2s(b"mac1----" + public_key_bytes(self.public_key)).digest()
399         mac1 = blake2s(bytes(init)[0:-32], digest_size=16, key=mac_key).digest()
400         self._test.assertEqual(init[WireguardInitiation].mac1, mac1)
401
402         # validate the mac2 hash
403         if is_mac2:
404             self._test.assertNotEqual(init[WireguardInitiation].mac2, bytes([0] * 16))
405             self._test.assertNotEqual(self.last_sent_cookie, None)
406             mac2 = blake2s(
407                 bytes(init)[0:-16], digest_size=16, key=self.last_sent_cookie
408             ).digest()
409             self._test.assertEqual(init[WireguardInitiation].mac2, mac2)
410             self.last_sent_cookie = None
411         else:
412             self._test.assertEqual(init[WireguardInitiation].mac2, bytes([0] * 16))
413
414         # this passes only unencrypted_ephemeral, encrypted_static,
415         # encrypted_timestamp fields of the init
416         payload = self.noise.read_message(bytes(init)[8:-32])
417
418         # build the response
419         b = self.noise.write_message()
420         mac_key = blake2s(b"mac1----" + public_key_bytes(self.itf.public_key)).digest()
421         resp = Wireguard(message_type=2, reserved_zero=0) / WireguardResponse(
422             sender_index=self.receiver_index,
423             receiver_index=self.sender,
424             unencrypted_ephemeral=b[0:32],
425             encrypted_nothing=b[32:],
426         )
427         mac1 = blake2s(bytes(resp)[:-32], digest_size=16, key=mac_key).digest()
428         resp[WireguardResponse].mac1 = mac1
429         self.last_mac1 = mac1
430
431         resp = self.mk_tunnel_header(tx_itf, is_ip6) / resp
432         self._test.assertTrue(self.noise.handshake_finished)
433
434         return resp
435
436     def consume_response(self, p, is_ip6=False):
437         self.verify_header(p, is_ip6)
438
439         resp = Wireguard(p[Raw])
440
441         self._test.assertEqual(resp[Wireguard].message_type, 2)
442         self._test.assertEqual(resp[Wireguard].reserved_zero, 0)
443         self._test.assertEqual(
444             resp[WireguardResponse].receiver_index, self.receiver_index
445         )
446
447         self.sender = resp[Wireguard].sender_index
448
449         payload = self.noise.read_message(bytes(resp)[12:60])
450         self._test.assertEqual(payload, b"")
451         self._test.assertTrue(self.noise.handshake_finished)
452
453     def decrypt_transport(self, p, is_ip6=False):
454         self.verify_header(p, is_ip6)
455
456         p = Wireguard(p[Raw])
457         self._test.assertEqual(p[Wireguard].message_type, 4)
458         self._test.assertEqual(p[Wireguard].reserved_zero, 0)
459         self._test.assertEqual(
460             p[WireguardTransport].receiver_index, self.receiver_index
461         )
462
463         d = self.noise.decrypt(p[WireguardTransport].encrypted_encapsulated_packet)
464         return d
465
466     def encrypt_transport(self, p):
467         return self.noise.encrypt(bytes(p))
468
469     def validate_encapped(self, rxs, tx, is_ip6=False):
470         for rx in rxs:
471             if is_ip6 is False:
472                 rx = IP(self.decrypt_transport(rx, is_ip6=is_ip6))
473
474                 # check the original packet is present
475                 self._test.assertEqual(rx[IP].dst, tx[IP].dst)
476                 self._test.assertEqual(rx[IP].ttl, tx[IP].ttl - 1)
477             else:
478                 rx = IPv6(self.decrypt_transport(rx, is_ip6=is_ip6))
479
480                 # check the original packet is present
481                 self._test.assertEqual(rx[IPv6].dst, tx[IPv6].dst)
482                 self._test.assertEqual(rx[IPv6].hlim, tx[IPv6].hlim - 1)
483
484     def want_events(self):
485         self._test.vapi.want_wireguard_peer_events(
486             enable_disable=1,
487             pid=os.getpid(),
488             peer_index=self.index,
489             sw_if_index=self.itf.sw_if_index,
490         )
491
492     def wait_event(self, expect, timeout=5):
493         rv = self._test.vapi.wait_for_event(timeout, "wireguard_peer_event")
494         self._test.assertEqual(rv.flags, expect)
495         self._test.assertEqual(rv.peer_index, self.index)
496
497
498 def is_handshake_init(p):
499     wg_p = Wireguard(p[Raw])
500
501     return wg_p[Wireguard].message_type == 1
502
503
504 class TestWg(VppTestCase):
505     """Wireguard Test Case"""
506
507     error_str = compile(r"Error")
508
509     wg4_output_node_name = "/err/wg4-output-tun/"
510     wg4_input_node_name = "/err/wg4-input/"
511     wg6_output_node_name = "/err/wg6-output-tun/"
512     wg6_input_node_name = "/err/wg6-input/"
513     kp4_error = wg4_output_node_name + "Keypair error"
514     mac4_error = wg4_input_node_name + "Invalid MAC handshake"
515     peer4_in_err = wg4_input_node_name + "Peer error"
516     peer4_out_err = wg4_output_node_name + "Peer error"
517     kp6_error = wg6_output_node_name + "Keypair error"
518     mac6_error = wg6_input_node_name + "Invalid MAC handshake"
519     peer6_in_err = wg6_input_node_name + "Peer error"
520     peer6_out_err = wg6_output_node_name + "Peer error"
521     cookie_dec4_err = wg4_input_node_name + "Failed during Cookie decryption"
522     cookie_dec6_err = wg6_input_node_name + "Failed during Cookie decryption"
523     ratelimited4_err = wg4_input_node_name + "Handshake ratelimited"
524     ratelimited6_err = wg6_input_node_name + "Handshake ratelimited"
525
526     @classmethod
527     def setUpClass(cls):
528         super(TestWg, cls).setUpClass()
529         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
530             cls, "vpp"
531         ):
532             return
533         try:
534             cls.create_pg_interfaces(range(3))
535             for i in cls.pg_interfaces:
536                 i.admin_up()
537                 i.config_ip4()
538                 i.config_ip6()
539                 i.resolve_arp()
540                 i.resolve_ndp()
541
542         except Exception:
543             super(TestWg, cls).tearDownClass()
544             raise
545
546     @classmethod
547     def tearDownClass(cls):
548         super(TestWg, cls).tearDownClass()
549
550     def setUp(self):
551         super(VppTestCase, self).setUp()
552         self.base_kp4_err = self.statistics.get_err_counter(self.kp4_error)
553         self.base_mac4_err = self.statistics.get_err_counter(self.mac4_error)
554         self.base_peer4_in_err = self.statistics.get_err_counter(self.peer4_in_err)
555         self.base_peer4_out_err = self.statistics.get_err_counter(self.peer4_out_err)
556         self.base_kp6_err = self.statistics.get_err_counter(self.kp6_error)
557         self.base_mac6_err = self.statistics.get_err_counter(self.mac6_error)
558         self.base_peer6_in_err = self.statistics.get_err_counter(self.peer6_in_err)
559         self.base_peer6_out_err = self.statistics.get_err_counter(self.peer6_out_err)
560         self.base_cookie_dec4_err = self.statistics.get_err_counter(
561             self.cookie_dec4_err
562         )
563         self.base_cookie_dec6_err = self.statistics.get_err_counter(
564             self.cookie_dec6_err
565         )
566         self.base_ratelimited4_err = self.statistics.get_err_counter(
567             self.ratelimited4_err
568         )
569         self.base_ratelimited6_err = self.statistics.get_err_counter(
570             self.ratelimited6_err
571         )
572
573     def send_and_assert_no_replies_ignoring_init(
574         self, intf, pkts, remark="", timeout=None
575     ):
576         self.pg_send(intf, pkts)
577
578         def _filter_out_fn(p):
579             return is_ipv6_misc(p) or is_handshake_init(p)
580
581         try:
582             if not timeout:
583                 timeout = 1
584             for i in self.pg_interfaces:
585                 i.assert_nothing_captured(
586                     timeout=timeout, remark=remark, filter_out_fn=_filter_out_fn
587                 )
588                 timeout = 0.1
589         finally:
590             pass
591
592     def test_wg_interface(self):
593         """Simple interface creation"""
594         port = 12312
595
596         # Create interface
597         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
598
599         self.logger.info(self.vapi.cli("sh int"))
600
601         # delete interface
602         wg0.remove_vpp_config()
603
604     def test_handshake_hash(self):
605         """test hashing an init message"""
606         # a init packet generated by linux given the key below
607         h = (
608             "0100000098b9032b"
609             "55cc4b39e73c3d24"
610             "a2a1ab884b524a81"
611             "1808bb86640fb70d"
612             "e93154fec1879125"
613             "ab012624a27f0b75"
614             "c0a2582f438ddb5f"
615             "8e768af40b4ab444"
616             "02f9ff473e1b797e"
617             "80d39d93c5480c82"
618             "a3d4510f70396976"
619             "586fb67300a5167b"
620             "ae6ca3ff3dfd00eb"
621             "59be198810f5aa03"
622             "6abc243d2155ee4f"
623             "2336483900aef801"
624             "08752cd700000000"
625             "0000000000000000"
626             "00000000"
627         )
628
629         b = bytearray.fromhex(h)
630         tgt = Wireguard(b)
631
632         pubb = base64.b64decode("aRuHFTTxICIQNefp05oKWlJv3zgKxb8+WW7JJMh0jyM=")
633         pub = X25519PublicKey.from_public_bytes(pubb)
634
635         self.assertEqual(pubb, public_key_bytes(pub))
636
637         # strip the macs and build a new packet
638         init = b[0:-32]
639         mac_key = blake2s(b"mac1----" + public_key_bytes(pub)).digest()
640         init += blake2s(init, digest_size=16, key=mac_key).digest()
641         init += b"\x00" * 16
642
643         act = Wireguard(init)
644
645         self.assertEqual(tgt, act)
646
647     def _test_wg_send_cookie_tmpl(self, is_resp, is_ip6):
648         port = 12323
649
650         # create wg interface
651         if is_ip6:
652             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
653             wg0.admin_up()
654             wg0.config_ip6()
655         else:
656             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
657             wg0.admin_up()
658             wg0.config_ip4()
659
660         self.pg_enable_capture(self.pg_interfaces)
661         self.pg_start()
662
663         # create a peer
664         if is_ip6:
665             peer_1 = VppWgPeer(
666                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
667             ).add_vpp_config()
668         else:
669             peer_1 = VppWgPeer(
670                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
671             ).add_vpp_config()
672         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
673
674         if is_resp:
675             # prepare and send a handshake initiation
676             # expect the peer to send a handshake response
677             init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
678             rxs = self.send_and_expect(self.pg1, [init], self.pg1)
679         else:
680             # wait for the peer to send a handshake initiation
681             rxs = self.pg1.get_capture(1, timeout=2)
682
683         # prepare and send a wrong cookie reply
684         # expect no replies and the cookie error incremented
685         cookie = peer_1.mk_cookie(rxs[0], self.pg1, is_resp=is_resp, is_ip6=is_ip6)
686         cookie.nonce = b"1234567890"
687         self.send_and_assert_no_replies(self.pg1, [cookie], timeout=0.1)
688         if is_ip6:
689             self.assertEqual(
690                 self.base_cookie_dec6_err + 1,
691                 self.statistics.get_err_counter(self.cookie_dec6_err),
692             )
693         else:
694             self.assertEqual(
695                 self.base_cookie_dec4_err + 1,
696                 self.statistics.get_err_counter(self.cookie_dec4_err),
697             )
698
699         # prepare and send a correct cookie reply
700         cookie = peer_1.mk_cookie(rxs[0], self.pg1, is_resp=is_resp, is_ip6=is_ip6)
701         self.pg_send(self.pg1, [cookie])
702
703         # wait for the peer to send a handshake initiation with mac2 set
704         rxs = self.pg1.get_capture(1, timeout=6)
705
706         # verify the initiation and its mac2
707         peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6, is_mac2=True)
708
709         # remove configs
710         peer_1.remove_vpp_config()
711         wg0.remove_vpp_config()
712
713     def test_wg_send_cookie_on_init_v4(self):
714         """Send cookie on handshake initiation (v4)"""
715         self._test_wg_send_cookie_tmpl(is_resp=False, is_ip6=False)
716
717     def test_wg_send_cookie_on_init_v6(self):
718         """Send cookie on handshake initiation (v6)"""
719         self._test_wg_send_cookie_tmpl(is_resp=False, is_ip6=True)
720
721     def test_wg_send_cookie_on_resp_v4(self):
722         """Send cookie on handshake response (v4)"""
723         self._test_wg_send_cookie_tmpl(is_resp=True, is_ip6=False)
724
725     def test_wg_send_cookie_on_resp_v6(self):
726         """Send cookie on handshake response (v6)"""
727         self._test_wg_send_cookie_tmpl(is_resp=True, is_ip6=True)
728
729     def _test_wg_receive_cookie_tmpl(self, is_resp, is_ip6):
730         port = 12323
731
732         # create wg interface
733         if is_ip6:
734             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
735             wg0.admin_up()
736             wg0.config_ip6()
737         else:
738             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
739             wg0.admin_up()
740             wg0.config_ip4()
741
742         self.pg_enable_capture(self.pg_interfaces)
743         self.pg_start()
744
745         # create a peer
746         if is_ip6:
747             peer_1 = VppWgPeer(
748                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
749             ).add_vpp_config()
750         else:
751             peer_1 = VppWgPeer(
752                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
753             ).add_vpp_config()
754         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
755
756         if is_resp:
757             # wait for the peer to send a handshake initiation
758             rxs = self.pg1.get_capture(1, timeout=2)
759             # prepare and send a bunch of handshake responses
760             # expect to switch to under load state
761             resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
762             txs = [resp] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
763             rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
764             # reset noise to be able to turn into initiator later
765             peer_1.noise_reset()
766         else:
767             # prepare and send a bunch of handshake initiations
768             # expect to switch to under load state
769             init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
770             txs = [init] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
771             rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
772
773         # expect the peer to send a cookie reply
774         peer_1.consume_cookie(rxs[-1], is_ip6=is_ip6)
775
776         # prepare and send a handshake initiation with wrong mac2
777         # expect a cookie reply
778         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
779         init.mac2 = b"1234567890"
780         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
781         peer_1.consume_cookie(rxs[0], is_ip6=is_ip6)
782
783         # prepare and send a handshake initiation with correct mac2
784         # expect a handshake response
785         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
786         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
787
788         # verify the response
789         peer_1.consume_response(rxs[0], is_ip6=is_ip6)
790
791         # clear up under load state
792         self.sleep(UNDER_LOAD_INTERVAL)
793
794         # remove configs
795         peer_1.remove_vpp_config()
796         wg0.remove_vpp_config()
797
798     def test_wg_receive_cookie_on_init_v4(self):
799         """Receive cookie on handshake initiation (v4)"""
800         self._test_wg_receive_cookie_tmpl(is_resp=False, is_ip6=False)
801
802     def test_wg_receive_cookie_on_init_v6(self):
803         """Receive cookie on handshake initiation (v6)"""
804         self._test_wg_receive_cookie_tmpl(is_resp=False, is_ip6=True)
805
806     def test_wg_receive_cookie_on_resp_v4(self):
807         """Receive cookie on handshake response (v4)"""
808         self._test_wg_receive_cookie_tmpl(is_resp=True, is_ip6=False)
809
810     def test_wg_receive_cookie_on_resp_v6(self):
811         """Receive cookie on handshake response (v6)"""
812         self._test_wg_receive_cookie_tmpl(is_resp=True, is_ip6=True)
813
814     def test_wg_under_load_interval(self):
815         """Under load interval"""
816         port = 12323
817
818         # create wg interface
819         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
820         wg0.admin_up()
821         wg0.config_ip4()
822
823         self.pg_enable_capture(self.pg_interfaces)
824         self.pg_start()
825
826         # create a peer
827         peer_1 = VppWgPeer(
828             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
829         ).add_vpp_config()
830         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
831
832         # prepare and send a bunch of handshake initiations
833         # expect to switch to under load state
834         init = peer_1.mk_handshake(self.pg1)
835         txs = [init] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
836         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
837
838         # expect the peer to send a cookie reply
839         peer_1.consume_cookie(rxs[-1])
840
841         # sleep till the next counting interval
842         # expect under load state is still active
843         self.sleep(HANDSHAKE_COUNTING_INTERVAL)
844
845         # prepare and send a handshake initiation with wrong mac2
846         # expect a cookie reply
847         init = peer_1.mk_handshake(self.pg1)
848         init.mac2 = b"1234567890"
849         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
850         peer_1.consume_cookie(rxs[0])
851
852         # sleep till the end of being under load
853         # expect under load state is over
854         self.sleep(UNDER_LOAD_INTERVAL - HANDSHAKE_COUNTING_INTERVAL)
855
856         # prepare and send a handshake initiation with wrong mac2
857         # expect a handshake response
858         init = peer_1.mk_handshake(self.pg1)
859         init.mac2 = b"1234567890"
860         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
861
862         # verify the response
863         peer_1.consume_response(rxs[0])
864
865         # remove configs
866         peer_1.remove_vpp_config()
867         wg0.remove_vpp_config()
868
869     def _test_wg_handshake_ratelimiting_tmpl(self, is_ip6):
870         port = 12323
871
872         # create wg interface
873         if is_ip6:
874             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
875             wg0.admin_up()
876             wg0.config_ip6()
877         else:
878             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
879             wg0.admin_up()
880             wg0.config_ip4()
881
882         self.pg_enable_capture(self.pg_interfaces)
883         self.pg_start()
884
885         # create a peer
886         if is_ip6:
887             peer_1 = VppWgPeer(
888                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
889             ).add_vpp_config()
890         else:
891             peer_1 = VppWgPeer(
892                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
893             ).add_vpp_config()
894         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
895
896         # prepare and send a bunch of handshake initiations
897         # expect to switch to under load state
898         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
899         txs = [init] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
900         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
901
902         # expect the peer to send a cookie reply
903         peer_1.consume_cookie(rxs[-1], is_ip6=is_ip6)
904
905         # prepare and send a bunch of handshake initiations with correct mac2
906         # expect a handshake response and then ratelimiting
907         NUM_TO_REJECT = 10
908         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
909         txs = [init] * (HANDSHAKE_NUM_BEFORE_RATELIMITING + NUM_TO_REJECT)
910         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
911
912         if is_ip6:
913             self.assertEqual(
914                 self.base_ratelimited6_err + NUM_TO_REJECT,
915                 self.statistics.get_err_counter(self.ratelimited6_err),
916             )
917         else:
918             self.assertEqual(
919                 self.base_ratelimited4_err + NUM_TO_REJECT,
920                 self.statistics.get_err_counter(self.ratelimited4_err),
921             )
922
923         # verify the response
924         peer_1.consume_response(rxs[0], is_ip6=is_ip6)
925
926         # clear up under load state
927         self.sleep(UNDER_LOAD_INTERVAL)
928
929         # remove configs
930         peer_1.remove_vpp_config()
931         wg0.remove_vpp_config()
932
933     def test_wg_handshake_ratelimiting_v4(self):
934         """Handshake ratelimiting (v4)"""
935         self._test_wg_handshake_ratelimiting_tmpl(is_ip6=False)
936
937     def test_wg_handshake_ratelimiting_v6(self):
938         """Handshake ratelimiting (v6)"""
939         self._test_wg_handshake_ratelimiting_tmpl(is_ip6=True)
940
941     def test_wg_handshake_ratelimiting_multi_peer(self):
942         """Handshake ratelimiting (multiple peer)"""
943         port = 12323
944
945         # create wg interface
946         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
947         wg0.admin_up()
948         wg0.config_ip4()
949
950         self.pg_enable_capture(self.pg_interfaces)
951         self.pg_start()
952
953         # create two peers
954         NUM_PEERS = 2
955         self.pg1.generate_remote_hosts(NUM_PEERS)
956         self.pg1.configure_ipv4_neighbors()
957
958         peer_1 = VppWgPeer(
959             self, wg0, self.pg1.remote_hosts[0].ip4, port + 1, ["10.11.3.0/24"]
960         ).add_vpp_config()
961         peer_2 = VppWgPeer(
962             self, wg0, self.pg1.remote_hosts[1].ip4, port + 1, ["10.11.4.0/24"]
963         ).add_vpp_config()
964         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 2)
965
966         # (peer_1) prepare and send a bunch of handshake initiations
967         # expect not to switch to under load state
968         init_1 = peer_1.mk_handshake(self.pg1)
969         txs = [init_1] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
970         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
971
972         # (peer_1) expect the peer to send a handshake response
973         peer_1.consume_response(rxs[0])
974         peer_1.noise_reset()
975
976         # (peer_1) send another bunch of handshake initiations
977         # expect to switch to under load state
978         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
979
980         # (peer_1) expect the peer to send a cookie reply
981         peer_1.consume_cookie(rxs[-1])
982
983         # (peer_2) prepare and send a handshake initiation
984         # expect a cookie reply
985         init_2 = peer_2.mk_handshake(self.pg1)
986         rxs = self.send_and_expect(self.pg1, [init_2], self.pg1)
987         peer_2.consume_cookie(rxs[0])
988
989         # (peer_1) (peer_2) prepare and send a bunch of handshake initiations with correct mac2
990         # expect a handshake response and then ratelimiting
991         PEER_1_NUM_TO_REJECT = 2
992         PEER_2_NUM_TO_REJECT = 5
993         init_1 = peer_1.mk_handshake(self.pg1)
994         txs = [init_1] * (HANDSHAKE_NUM_BEFORE_RATELIMITING + PEER_1_NUM_TO_REJECT)
995         init_2 = peer_2.mk_handshake(self.pg1)
996         txs += [init_2] * (HANDSHAKE_NUM_BEFORE_RATELIMITING + PEER_2_NUM_TO_REJECT)
997         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
998
999         self.assertTrue(
1000             self.base_ratelimited4_err + PEER_1_NUM_TO_REJECT
1001             < self.statistics.get_err_counter(self.ratelimited4_err)
1002             <= self.base_ratelimited4_err + PEER_1_NUM_TO_REJECT + PEER_2_NUM_TO_REJECT
1003         )
1004
1005         # (peer_1) (peer_2) verify the response
1006         peer_1.consume_response(rxs[0])
1007         peer_2.consume_response(rxs[1])
1008
1009         # clear up under load state
1010         self.sleep(UNDER_LOAD_INTERVAL)
1011
1012         # remove configs
1013         peer_1.remove_vpp_config()
1014         peer_2.remove_vpp_config()
1015         wg0.remove_vpp_config()
1016
1017     def _test_wg_peer_roaming_on_handshake_tmpl(self, is_endpoint_set, is_resp, is_ip6):
1018         port = 12323
1019
1020         # create wg interface
1021         if is_ip6:
1022             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1023             wg0.admin_up()
1024             wg0.config_ip6()
1025         else:
1026             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1027             wg0.admin_up()
1028             wg0.config_ip4()
1029
1030         self.pg_enable_capture(self.pg_interfaces)
1031         self.pg_start()
1032
1033         # create more remote hosts
1034         NUM_REMOTE_HOSTS = 2
1035         self.pg1.generate_remote_hosts(NUM_REMOTE_HOSTS)
1036         if is_ip6:
1037             self.pg1.configure_ipv6_neighbors()
1038         else:
1039             self.pg1.configure_ipv4_neighbors()
1040
1041         # create a peer
1042         if is_ip6:
1043             peer_1 = VppWgPeer(
1044                 test=self,
1045                 itf=wg0,
1046                 endpoint=self.pg1.remote_hosts[0].ip6 if is_endpoint_set else "::",
1047                 port=port + 1 if is_endpoint_set else 0,
1048                 allowed_ips=["1::3:0/112"],
1049             ).add_vpp_config()
1050         else:
1051             peer_1 = VppWgPeer(
1052                 test=self,
1053                 itf=wg0,
1054                 endpoint=self.pg1.remote_hosts[0].ip4 if is_endpoint_set else "0.0.0.0",
1055                 port=port + 1 if is_endpoint_set else 0,
1056                 allowed_ips=["10.11.3.0/24"],
1057             ).add_vpp_config()
1058         self.assertTrue(peer_1.query_vpp_config())
1059
1060         if is_resp:
1061             # wait for the peer to send a handshake initiation
1062             rxs = self.pg1.get_capture(1, timeout=2)
1063             # prepare a handshake response
1064             resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
1065             # change endpoint
1066             if is_ip6:
1067                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip6, port + 100)
1068                 resp[IPv6].src, resp[UDP].sport = peer_1.endpoint, peer_1.port
1069             else:
1070                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip4, port + 100)
1071                 resp[IP].src, resp[UDP].sport = peer_1.endpoint, peer_1.port
1072             # send the handshake response
1073             # expect a keepalive message sent to the new endpoint
1074             rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1075             # verify the keepalive message
1076             b = peer_1.decrypt_transport(rxs[0], is_ip6=is_ip6)
1077             self.assertEqual(0, len(b))
1078         else:
1079             # change endpoint
1080             if is_ip6:
1081                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip6, port + 100)
1082             else:
1083                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip4, port + 100)
1084             # prepare and send a handshake initiation
1085             # expect a handshake response sent to the new endpoint
1086             init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
1087             rxs = self.send_and_expect(self.pg1, [init], self.pg1)
1088             # verify the response
1089             peer_1.consume_response(rxs[0], is_ip6=is_ip6)
1090         self.assertTrue(peer_1.query_vpp_config())
1091
1092         # remove configs
1093         peer_1.remove_vpp_config()
1094         wg0.remove_vpp_config()
1095
1096     def test_wg_peer_roaming_on_init_v4(self):
1097         """Peer roaming on handshake initiation (v4)"""
1098         self._test_wg_peer_roaming_on_handshake_tmpl(
1099             is_endpoint_set=False, is_resp=False, is_ip6=False
1100         )
1101
1102     def test_wg_peer_roaming_on_init_v6(self):
1103         """Peer roaming on handshake initiation (v6)"""
1104         self._test_wg_peer_roaming_on_handshake_tmpl(
1105             is_endpoint_set=False, is_resp=False, is_ip6=True
1106         )
1107
1108     def test_wg_peer_roaming_on_resp_v4(self):
1109         """Peer roaming on handshake response (v4)"""
1110         self._test_wg_peer_roaming_on_handshake_tmpl(
1111             is_endpoint_set=True, is_resp=True, is_ip6=False
1112         )
1113
1114     def test_wg_peer_roaming_on_resp_v6(self):
1115         """Peer roaming on handshake response (v6)"""
1116         self._test_wg_peer_roaming_on_handshake_tmpl(
1117             is_endpoint_set=True, is_resp=True, is_ip6=True
1118         )
1119
1120     def _test_wg_peer_roaming_on_data_tmpl(self, is_async, is_ip6):
1121         self.vapi.wg_set_async_mode(is_async)
1122         port = 12323
1123
1124         # create wg interface
1125         if is_ip6:
1126             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1127             wg0.admin_up()
1128             wg0.config_ip6()
1129         else:
1130             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1131             wg0.admin_up()
1132             wg0.config_ip4()
1133
1134         self.pg_enable_capture(self.pg_interfaces)
1135         self.pg_start()
1136
1137         # create more remote hosts
1138         NUM_REMOTE_HOSTS = 2
1139         self.pg1.generate_remote_hosts(NUM_REMOTE_HOSTS)
1140         if is_ip6:
1141             self.pg1.configure_ipv6_neighbors()
1142         else:
1143             self.pg1.configure_ipv4_neighbors()
1144
1145         # create a peer
1146         if is_ip6:
1147             peer_1 = VppWgPeer(
1148                 self, wg0, self.pg1.remote_hosts[0].ip6, port + 1, ["1::3:0/112"]
1149             ).add_vpp_config()
1150         else:
1151             peer_1 = VppWgPeer(
1152                 self, wg0, self.pg1.remote_hosts[0].ip4, port + 1, ["10.11.3.0/24"]
1153             ).add_vpp_config()
1154         self.assertTrue(peer_1.query_vpp_config())
1155
1156         # create a route to rewrite traffic into the wg interface
1157         if is_ip6:
1158             r1 = VppIpRoute(
1159                 self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
1160             ).add_vpp_config()
1161         else:
1162             r1 = VppIpRoute(
1163                 self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1164             ).add_vpp_config()
1165
1166         # wait for the peer to send a handshake initiation
1167         rxs = self.pg1.get_capture(1, timeout=2)
1168
1169         # prepare and send a handshake response
1170         # expect a keepalive message
1171         resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
1172         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1173
1174         # verify the keepalive message
1175         b = peer_1.decrypt_transport(rxs[0], is_ip6=is_ip6)
1176         self.assertEqual(0, len(b))
1177
1178         # change endpoint
1179         if is_ip6:
1180             peer_1.change_endpoint(self.pg1.remote_hosts[1].ip6, port + 100)
1181         else:
1182             peer_1.change_endpoint(self.pg1.remote_hosts[1].ip4, port + 100)
1183
1184         # prepare and send a data packet
1185         # expect endpoint change
1186         if is_ip6:
1187             ip_header = IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1188         else:
1189             ip_header = IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1190         data = (
1191             peer_1.mk_tunnel_header(self.pg1, is_ip6=is_ip6)
1192             / Wireguard(message_type=4, reserved_zero=0)
1193             / WireguardTransport(
1194                 receiver_index=peer_1.sender,
1195                 counter=0,
1196                 encrypted_encapsulated_packet=peer_1.encrypt_transport(
1197                     ip_header / UDP(sport=222, dport=223) / Raw()
1198                 ),
1199             )
1200         )
1201         rxs = self.send_and_expect(self.pg1, [data], self.pg0)
1202         if is_ip6:
1203             self.assertEqual(rxs[0][IPv6].dst, self.pg0.remote_ip6)
1204             self.assertEqual(rxs[0][IPv6].hlim, 19)
1205         else:
1206             self.assertEqual(rxs[0][IP].dst, self.pg0.remote_ip4)
1207             self.assertEqual(rxs[0][IP].ttl, 19)
1208         self.assertTrue(peer_1.query_vpp_config())
1209
1210         # prepare and send a packet that will be rewritten into the wg interface
1211         # expect a data packet sent to the new endpoint
1212         if is_ip6:
1213             ip_header = IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1214         else:
1215             ip_header = IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1216         p = (
1217             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1218             / ip_header
1219             / UDP(sport=555, dport=556)
1220             / Raw()
1221         )
1222         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
1223
1224         # verify the data packet
1225         peer_1.validate_encapped(rxs, p, is_ip6=is_ip6)
1226
1227         # remove configs
1228         r1.remove_vpp_config()
1229         peer_1.remove_vpp_config()
1230         wg0.remove_vpp_config()
1231
1232     def test_wg_peer_roaming_on_data_v4_sync(self):
1233         """Peer roaming on data packet (v4, sync)"""
1234         self._test_wg_peer_roaming_on_data_tmpl(is_async=False, is_ip6=False)
1235
1236     def test_wg_peer_roaming_on_data_v6_sync(self):
1237         """Peer roaming on data packet (v6, sync)"""
1238         self._test_wg_peer_roaming_on_data_tmpl(is_async=False, is_ip6=True)
1239
1240     def test_wg_peer_roaming_on_data_v4_async(self):
1241         """Peer roaming on data packet (v4, async)"""
1242         self._test_wg_peer_roaming_on_data_tmpl(is_async=True, is_ip6=False)
1243
1244     def test_wg_peer_roaming_on_data_v6_async(self):
1245         """Peer roaming on data packet (v6, async)"""
1246         self._test_wg_peer_roaming_on_data_tmpl(is_async=True, is_ip6=True)
1247
1248     def test_wg_peer_resp(self):
1249         """Send handshake response"""
1250         port = 12323
1251
1252         # Create interfaces
1253         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1254         wg0.admin_up()
1255         wg0.config_ip4()
1256
1257         self.pg_enable_capture(self.pg_interfaces)
1258         self.pg_start()
1259
1260         peer_1 = VppWgPeer(
1261             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
1262         ).add_vpp_config()
1263         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1264
1265         r1 = VppIpRoute(
1266             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1267         ).add_vpp_config()
1268
1269         # wait for the peer to send a handshake
1270         rx = self.pg1.get_capture(1, timeout=2)
1271
1272         # consume the handshake in the noise protocol and
1273         # generate the response
1274         resp = peer_1.consume_init(rx[0], self.pg1)
1275
1276         # send the response, get keepalive
1277         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1278
1279         for rx in rxs:
1280             b = peer_1.decrypt_transport(rx)
1281             self.assertEqual(0, len(b))
1282
1283         # send a packets that are routed into the tunnel
1284         p = (
1285             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1286             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1287             / UDP(sport=555, dport=556)
1288             / Raw(b"\x00" * 80)
1289         )
1290
1291         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1292
1293         peer_1.validate_encapped(rxs, p)
1294
1295         # send packets into the tunnel, expect to receive them on
1296         # the other side
1297         p = [
1298             (
1299                 peer_1.mk_tunnel_header(self.pg1)
1300                 / Wireguard(message_type=4, reserved_zero=0)
1301                 / WireguardTransport(
1302                     receiver_index=peer_1.sender,
1303                     counter=ii,
1304                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1305                         (
1306                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1307                             / UDP(sport=222, dport=223)
1308                             / Raw()
1309                         )
1310                     ),
1311                 )
1312             )
1313             for ii in range(255)
1314         ]
1315
1316         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1317
1318         for rx in rxs:
1319             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1320             self.assertEqual(rx[IP].ttl, 19)
1321
1322         r1.remove_vpp_config()
1323         peer_1.remove_vpp_config()
1324         wg0.remove_vpp_config()
1325
1326     def test_wg_peer_v4o4(self):
1327         """Test v4o4"""
1328
1329         port = 12333
1330
1331         # Create interfaces
1332         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1333         wg0.admin_up()
1334         wg0.config_ip4()
1335
1336         peer_1 = VppWgPeer(
1337             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
1338         ).add_vpp_config()
1339         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1340
1341         r1 = VppIpRoute(
1342             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1343         ).add_vpp_config()
1344         r2 = VppIpRoute(
1345             self, "20.22.3.0", 24, [VppRoutePath("20.22.3.1", wg0.sw_if_index)]
1346         ).add_vpp_config()
1347
1348         # route a packet into the wg interface
1349         #  use the allowed-ip prefix
1350         #  this is dropped because the peer is not initiated
1351         p = (
1352             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1353             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1354             / UDP(sport=555, dport=556)
1355             / Raw()
1356         )
1357         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1358         self.assertEqual(
1359             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
1360         )
1361
1362         # route a packet into the wg interface
1363         #  use a not allowed-ip prefix
1364         #  this is dropped because there is no matching peer
1365         p = (
1366             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1367             / IP(src=self.pg0.remote_ip4, dst="20.22.3.2")
1368             / UDP(sport=555, dport=556)
1369             / Raw()
1370         )
1371         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1372         self.assertEqual(
1373             self.base_peer4_out_err + 1,
1374             self.statistics.get_err_counter(self.peer4_out_err),
1375         )
1376
1377         # send a handsake from the peer with an invalid MAC
1378         p = peer_1.mk_handshake(self.pg1)
1379         p[WireguardInitiation].mac1 = b"foobar"
1380         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1381         self.assertEqual(
1382             self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
1383         )
1384
1385         # send a handsake from the peer but signed by the wrong key.
1386         p = peer_1.mk_handshake(
1387             self.pg1, False, X25519PrivateKey.generate().public_key()
1388         )
1389         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1390         self.assertEqual(
1391             self.base_peer4_in_err + 1,
1392             self.statistics.get_err_counter(self.peer4_in_err),
1393         )
1394
1395         # send a valid handsake init for which we expect a response
1396         p = peer_1.mk_handshake(self.pg1)
1397
1398         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1399
1400         peer_1.consume_response(rx[0])
1401
1402         # route a packet into the wg interface
1403         #  this is dropped because the peer is still not initiated
1404         p = (
1405             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1406             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1407             / UDP(sport=555, dport=556)
1408             / Raw()
1409         )
1410         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1411         self.assertEqual(
1412             self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
1413         )
1414
1415         # send a data packet from the peer through the tunnel
1416         # this completes the handshake
1417         p = (
1418             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1419             / UDP(sport=222, dport=223)
1420             / Raw()
1421         )
1422         d = peer_1.encrypt_transport(p)
1423         p = peer_1.mk_tunnel_header(self.pg1) / (
1424             Wireguard(message_type=4, reserved_zero=0)
1425             / WireguardTransport(
1426                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1427             )
1428         )
1429         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1430
1431         for rx in rxs:
1432             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1433             self.assertEqual(rx[IP].ttl, 19)
1434
1435         # send a packets that are routed into the tunnel
1436         p = (
1437             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1438             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1439             / UDP(sport=555, dport=556)
1440             / Raw(b"\x00" * 80)
1441         )
1442
1443         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1444
1445         for rx in rxs:
1446             rx = IP(peer_1.decrypt_transport(rx))
1447
1448             # check the original packet is present
1449             self.assertEqual(rx[IP].dst, p[IP].dst)
1450             self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
1451
1452         # send packets into the tunnel, expect to receive them on
1453         # the other side
1454         p = [
1455             (
1456                 peer_1.mk_tunnel_header(self.pg1)
1457                 / Wireguard(message_type=4, reserved_zero=0)
1458                 / WireguardTransport(
1459                     receiver_index=peer_1.sender,
1460                     counter=ii + 1,
1461                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1462                         (
1463                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1464                             / UDP(sport=222, dport=223)
1465                             / Raw()
1466                         )
1467                     ),
1468                 )
1469             )
1470             for ii in range(255)
1471         ]
1472
1473         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1474
1475         for rx in rxs:
1476             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1477             self.assertEqual(rx[IP].ttl, 19)
1478
1479         r1.remove_vpp_config()
1480         r2.remove_vpp_config()
1481         peer_1.remove_vpp_config()
1482         wg0.remove_vpp_config()
1483
1484     def test_wg_peer_v6o6(self):
1485         """Test v6o6"""
1486
1487         port = 12343
1488
1489         # Create interfaces
1490         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1491         wg0.admin_up()
1492         wg0.config_ip6()
1493
1494         peer_1 = VppWgPeer(
1495             self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
1496         ).add_vpp_config()
1497         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1498
1499         r1 = VppIpRoute(
1500             self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
1501         ).add_vpp_config()
1502         r2 = VppIpRoute(
1503             self, "22::3:0", 112, [VppRoutePath("22::3:1", wg0.sw_if_index)]
1504         ).add_vpp_config()
1505
1506         # route a packet into the wg interface
1507         #  use the allowed-ip prefix
1508         #  this is dropped because the peer is not initiated
1509
1510         p = (
1511             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1512             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1513             / UDP(sport=555, dport=556)
1514             / Raw()
1515         )
1516         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1517
1518         self.assertEqual(
1519             self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
1520         )
1521
1522         # route a packet into the wg interface
1523         #  use a not allowed-ip prefix
1524         #  this is dropped because there is no matching peer
1525         p = (
1526             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1527             / IPv6(src=self.pg0.remote_ip6, dst="22::3:2")
1528             / UDP(sport=555, dport=556)
1529             / Raw()
1530         )
1531         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1532         self.assertEqual(
1533             self.base_peer6_out_err + 1,
1534             self.statistics.get_err_counter(self.peer6_out_err),
1535         )
1536
1537         # send a handsake from the peer with an invalid MAC
1538         p = peer_1.mk_handshake(self.pg1, True)
1539         p[WireguardInitiation].mac1 = b"foobar"
1540         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1541
1542         self.assertEqual(
1543             self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
1544         )
1545
1546         # send a handsake from the peer but signed by the wrong key.
1547         p = peer_1.mk_handshake(
1548             self.pg1, True, X25519PrivateKey.generate().public_key()
1549         )
1550         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1551         self.assertEqual(
1552             self.base_peer6_in_err + 1,
1553             self.statistics.get_err_counter(self.peer6_in_err),
1554         )
1555
1556         # send a valid handsake init for which we expect a response
1557         p = peer_1.mk_handshake(self.pg1, True)
1558
1559         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1560
1561         peer_1.consume_response(rx[0], True)
1562
1563         # route a packet into the wg interface
1564         #  this is dropped because the peer is still not initiated
1565         p = (
1566             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1567             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1568             / UDP(sport=555, dport=556)
1569             / Raw()
1570         )
1571         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1572         self.assertEqual(
1573             self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
1574         )
1575
1576         # send a data packet from the peer through the tunnel
1577         # this completes the handshake
1578         p = (
1579             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1580             / UDP(sport=222, dport=223)
1581             / Raw()
1582         )
1583         d = peer_1.encrypt_transport(p)
1584         p = peer_1.mk_tunnel_header(self.pg1, True) / (
1585             Wireguard(message_type=4, reserved_zero=0)
1586             / WireguardTransport(
1587                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1588             )
1589         )
1590         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1591
1592         for rx in rxs:
1593             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1594             self.assertEqual(rx[IPv6].hlim, 19)
1595
1596         # send a packets that are routed into the tunnel
1597         p = (
1598             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1599             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1600             / UDP(sport=555, dport=556)
1601             / Raw(b"\x00" * 80)
1602         )
1603
1604         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1605
1606         for rx in rxs:
1607             rx = IPv6(peer_1.decrypt_transport(rx, True))
1608
1609             # check the original packet is present
1610             self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
1611             self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
1612
1613         # send packets into the tunnel, expect to receive them on
1614         # the other side
1615         p = [
1616             (
1617                 peer_1.mk_tunnel_header(self.pg1, True)
1618                 / Wireguard(message_type=4, reserved_zero=0)
1619                 / WireguardTransport(
1620                     receiver_index=peer_1.sender,
1621                     counter=ii + 1,
1622                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1623                         (
1624                             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1625                             / UDP(sport=222, dport=223)
1626                             / Raw()
1627                         )
1628                     ),
1629                 )
1630             )
1631             for ii in range(255)
1632         ]
1633
1634         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1635
1636         for rx in rxs:
1637             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1638             self.assertEqual(rx[IPv6].hlim, 19)
1639
1640         r1.remove_vpp_config()
1641         r2.remove_vpp_config()
1642         peer_1.remove_vpp_config()
1643         wg0.remove_vpp_config()
1644
1645     def test_wg_peer_v6o4(self):
1646         """Test v6o4"""
1647
1648         port = 12353
1649
1650         # Create interfaces
1651         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1652         wg0.admin_up()
1653         wg0.config_ip6()
1654
1655         peer_1 = VppWgPeer(
1656             self, wg0, self.pg1.remote_ip4, port + 1, ["1::3:0/112"]
1657         ).add_vpp_config()
1658         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1659
1660         r1 = VppIpRoute(
1661             self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
1662         ).add_vpp_config()
1663
1664         # route a packet into the wg interface
1665         #  use the allowed-ip prefix
1666         #  this is dropped because the peer is not initiated
1667         p = (
1668             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1669             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1670             / UDP(sport=555, dport=556)
1671             / Raw()
1672         )
1673         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1674         self.assertEqual(
1675             self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
1676         )
1677
1678         # send a handsake from the peer with an invalid MAC
1679         p = peer_1.mk_handshake(self.pg1)
1680         p[WireguardInitiation].mac1 = b"foobar"
1681         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1682
1683         self.assertEqual(
1684             self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
1685         )
1686
1687         # send a handsake from the peer but signed by the wrong key.
1688         p = peer_1.mk_handshake(
1689             self.pg1, False, X25519PrivateKey.generate().public_key()
1690         )
1691         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1692         self.assertEqual(
1693             self.base_peer4_in_err + 1,
1694             self.statistics.get_err_counter(self.peer4_in_err),
1695         )
1696
1697         # send a valid handsake init for which we expect a response
1698         p = peer_1.mk_handshake(self.pg1)
1699
1700         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1701
1702         peer_1.consume_response(rx[0])
1703
1704         # route a packet into the wg interface
1705         #  this is dropped because the peer is still not initiated
1706         p = (
1707             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1708             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1709             / UDP(sport=555, dport=556)
1710             / Raw()
1711         )
1712         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1713         self.assertEqual(
1714             self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
1715         )
1716
1717         # send a data packet from the peer through the tunnel
1718         # this completes the handshake
1719         p = (
1720             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1721             / UDP(sport=222, dport=223)
1722             / Raw()
1723         )
1724         d = peer_1.encrypt_transport(p)
1725         p = peer_1.mk_tunnel_header(self.pg1) / (
1726             Wireguard(message_type=4, reserved_zero=0)
1727             / WireguardTransport(
1728                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1729             )
1730         )
1731         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1732
1733         for rx in rxs:
1734             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1735             self.assertEqual(rx[IPv6].hlim, 19)
1736
1737         # send a packets that are routed into the tunnel
1738         p = (
1739             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1740             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1741             / UDP(sport=555, dport=556)
1742             / Raw(b"\x00" * 80)
1743         )
1744
1745         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1746
1747         for rx in rxs:
1748             rx = IPv6(peer_1.decrypt_transport(rx))
1749
1750             # check the original packet is present
1751             self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
1752             self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
1753
1754         # send packets into the tunnel, expect to receive them on
1755         # the other side
1756         p = [
1757             (
1758                 peer_1.mk_tunnel_header(self.pg1)
1759                 / Wireguard(message_type=4, reserved_zero=0)
1760                 / WireguardTransport(
1761                     receiver_index=peer_1.sender,
1762                     counter=ii + 1,
1763                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1764                         (
1765                             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1766                             / UDP(sport=222, dport=223)
1767                             / Raw()
1768                         )
1769                     ),
1770                 )
1771             )
1772             for ii in range(255)
1773         ]
1774
1775         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1776
1777         for rx in rxs:
1778             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1779             self.assertEqual(rx[IPv6].hlim, 19)
1780
1781         r1.remove_vpp_config()
1782         peer_1.remove_vpp_config()
1783         wg0.remove_vpp_config()
1784
1785     def test_wg_peer_v4o6(self):
1786         """Test v4o6"""
1787
1788         port = 12363
1789
1790         # Create interfaces
1791         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1792         wg0.admin_up()
1793         wg0.config_ip4()
1794
1795         peer_1 = VppWgPeer(
1796             self, wg0, self.pg1.remote_ip6, port + 1, ["10.11.3.0/24"]
1797         ).add_vpp_config()
1798         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1799
1800         r1 = VppIpRoute(
1801             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1802         ).add_vpp_config()
1803
1804         # route a packet into the wg interface
1805         #  use the allowed-ip prefix
1806         #  this is dropped because the peer is not initiated
1807         p = (
1808             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1809             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1810             / UDP(sport=555, dport=556)
1811             / Raw()
1812         )
1813         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1814         self.assertEqual(
1815             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
1816         )
1817
1818         # send a handsake from the peer with an invalid MAC
1819         p = peer_1.mk_handshake(self.pg1, True)
1820         p[WireguardInitiation].mac1 = b"foobar"
1821         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1822         self.assertEqual(
1823             self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
1824         )
1825
1826         # send a handsake from the peer but signed by the wrong key.
1827         p = peer_1.mk_handshake(
1828             self.pg1, True, X25519PrivateKey.generate().public_key()
1829         )
1830         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1831         self.assertEqual(
1832             self.base_peer6_in_err + 1,
1833             self.statistics.get_err_counter(self.peer6_in_err),
1834         )
1835
1836         # send a valid handsake init for which we expect a response
1837         p = peer_1.mk_handshake(self.pg1, True)
1838
1839         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1840
1841         peer_1.consume_response(rx[0], True)
1842
1843         # route a packet into the wg interface
1844         #  this is dropped because the peer is still not initiated
1845         p = (
1846             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1847             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1848             / UDP(sport=555, dport=556)
1849             / Raw()
1850         )
1851         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1852         self.assertEqual(
1853             self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
1854         )
1855
1856         # send a data packet from the peer through the tunnel
1857         # this completes the handshake
1858         p = (
1859             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1860             / UDP(sport=222, dport=223)
1861             / Raw()
1862         )
1863         d = peer_1.encrypt_transport(p)
1864         p = peer_1.mk_tunnel_header(self.pg1, True) / (
1865             Wireguard(message_type=4, reserved_zero=0)
1866             / WireguardTransport(
1867                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1868             )
1869         )
1870         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1871
1872         for rx in rxs:
1873             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1874             self.assertEqual(rx[IP].ttl, 19)
1875
1876         # send a packets that are routed into the tunnel
1877         p = (
1878             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1879             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1880             / UDP(sport=555, dport=556)
1881             / Raw(b"\x00" * 80)
1882         )
1883
1884         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1885
1886         for rx in rxs:
1887             rx = IP(peer_1.decrypt_transport(rx, True))
1888
1889             # check the original packet is present
1890             self.assertEqual(rx[IP].dst, p[IP].dst)
1891             self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
1892
1893         # send packets into the tunnel, expect to receive them on
1894         # the other side
1895         p = [
1896             (
1897                 peer_1.mk_tunnel_header(self.pg1, True)
1898                 / Wireguard(message_type=4, reserved_zero=0)
1899                 / WireguardTransport(
1900                     receiver_index=peer_1.sender,
1901                     counter=ii + 1,
1902                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1903                         (
1904                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1905                             / UDP(sport=222, dport=223)
1906                             / Raw()
1907                         )
1908                     ),
1909                 )
1910             )
1911             for ii in range(255)
1912         ]
1913
1914         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1915
1916         for rx in rxs:
1917             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1918             self.assertEqual(rx[IP].ttl, 19)
1919
1920         r1.remove_vpp_config()
1921         peer_1.remove_vpp_config()
1922         wg0.remove_vpp_config()
1923
1924     def test_wg_multi_peer(self):
1925         """multiple peer setup"""
1926         port = 12373
1927
1928         # Create interfaces
1929         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1930         wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
1931         wg0.admin_up()
1932         wg1.admin_up()
1933
1934         # Check peer counter
1935         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
1936
1937         self.pg_enable_capture(self.pg_interfaces)
1938         self.pg_start()
1939
1940         # Create many peers on sencond interface
1941         NUM_PEERS = 16
1942         self.pg2.generate_remote_hosts(NUM_PEERS)
1943         self.pg2.configure_ipv4_neighbors()
1944         self.pg1.generate_remote_hosts(NUM_PEERS)
1945         self.pg1.configure_ipv4_neighbors()
1946
1947         peers_1 = []
1948         peers_2 = []
1949         routes_1 = []
1950         routes_2 = []
1951         for i in range(NUM_PEERS):
1952             peers_1.append(
1953                 VppWgPeer(
1954                     self,
1955                     wg0,
1956                     self.pg1.remote_hosts[i].ip4,
1957                     port + 1 + i,
1958                     ["10.0.%d.4/32" % i],
1959                 ).add_vpp_config()
1960             )
1961             routes_1.append(
1962                 VppIpRoute(
1963                     self,
1964                     "10.0.%d.4" % i,
1965                     32,
1966                     [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
1967                 ).add_vpp_config()
1968             )
1969
1970             peers_2.append(
1971                 VppWgPeer(
1972                     self,
1973                     wg1,
1974                     self.pg2.remote_hosts[i].ip4,
1975                     port + 100 + i,
1976                     ["10.100.%d.4/32" % i],
1977                 ).add_vpp_config()
1978             )
1979             routes_2.append(
1980                 VppIpRoute(
1981                     self,
1982                     "10.100.%d.4" % i,
1983                     32,
1984                     [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
1985                 ).add_vpp_config()
1986             )
1987
1988         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
1989
1990         self.logger.info(self.vapi.cli("show wireguard peer"))
1991         self.logger.info(self.vapi.cli("show wireguard interface"))
1992         self.logger.info(self.vapi.cli("show adj 37"))
1993         self.logger.info(self.vapi.cli("sh ip fib 172.16.3.17"))
1994         self.logger.info(self.vapi.cli("sh ip fib 10.11.3.0"))
1995
1996         # remove routes
1997         for r in routes_1:
1998             r.remove_vpp_config()
1999         for r in routes_2:
2000             r.remove_vpp_config()
2001
2002         # remove peers
2003         for p in peers_1:
2004             self.assertTrue(p.query_vpp_config())
2005             p.remove_vpp_config()
2006         for p in peers_2:
2007             self.assertTrue(p.query_vpp_config())
2008             p.remove_vpp_config()
2009
2010         wg0.remove_vpp_config()
2011         wg1.remove_vpp_config()
2012
2013     def test_wg_multi_interface(self):
2014         """Multi-tunnel on the same port"""
2015         port = 12500
2016
2017         # Create many wireguard interfaces
2018         NUM_IFS = 4
2019         self.pg1.generate_remote_hosts(NUM_IFS)
2020         self.pg1.configure_ipv4_neighbors()
2021         self.pg0.generate_remote_hosts(NUM_IFS)
2022         self.pg0.configure_ipv4_neighbors()
2023
2024         # Create interfaces with a peer on each
2025         peers = []
2026         routes = []
2027         wg_ifs = []
2028         for i in range(NUM_IFS):
2029             # Use the same port for each interface
2030             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2031             wg0.admin_up()
2032             wg0.config_ip4()
2033             wg_ifs.append(wg0)
2034             peers.append(
2035                 VppWgPeer(
2036                     self,
2037                     wg0,
2038                     self.pg1.remote_hosts[i].ip4,
2039                     port + 1 + i,
2040                     ["10.0.%d.0/24" % i],
2041                 ).add_vpp_config()
2042             )
2043
2044             routes.append(
2045                 VppIpRoute(
2046                     self,
2047                     "10.0.%d.0" % i,
2048                     24,
2049                     [VppRoutePath("10.0.%d.4" % i, wg0.sw_if_index)],
2050                 ).add_vpp_config()
2051             )
2052
2053         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_IFS)
2054
2055         for i in range(NUM_IFS):
2056             # send a valid handsake init for which we expect a response
2057             p = peers[i].mk_handshake(self.pg1)
2058             rx = self.send_and_expect(self.pg1, [p], self.pg1)
2059             peers[i].consume_response(rx[0])
2060
2061             # send a data packet from the peer through the tunnel
2062             # this completes the handshake
2063             p = (
2064                 IP(src="10.0.%d.4" % i, dst=self.pg0.remote_hosts[i].ip4, ttl=20)
2065                 / UDP(sport=222, dport=223)
2066                 / Raw()
2067             )
2068             d = peers[i].encrypt_transport(p)
2069             p = peers[i].mk_tunnel_header(self.pg1) / (
2070                 Wireguard(message_type=4, reserved_zero=0)
2071                 / WireguardTransport(
2072                     receiver_index=peers[i].sender,
2073                     counter=0,
2074                     encrypted_encapsulated_packet=d,
2075                 )
2076             )
2077             rxs = self.send_and_expect(self.pg1, [p], self.pg0)
2078             for rx in rxs:
2079                 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
2080                 self.assertEqual(rx[IP].ttl, 19)
2081
2082         # send a packets that are routed into the tunnel
2083         for i in range(NUM_IFS):
2084             p = (
2085                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2086                 / IP(src=self.pg0.remote_hosts[i].ip4, dst="10.0.%d.4" % i)
2087                 / UDP(sport=555, dport=556)
2088                 / Raw(b"\x00" * 80)
2089             )
2090
2091             rxs = self.send_and_expect(self.pg0, p * 64, self.pg1)
2092
2093             for rx in rxs:
2094                 rx = IP(peers[i].decrypt_transport(rx))
2095
2096                 # check the oringial packet is present
2097                 self.assertEqual(rx[IP].dst, p[IP].dst)
2098                 self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
2099
2100         # send packets into the tunnel
2101         for i in range(NUM_IFS):
2102             p = [
2103                 (
2104                     peers[i].mk_tunnel_header(self.pg1)
2105                     / Wireguard(message_type=4, reserved_zero=0)
2106                     / WireguardTransport(
2107                         receiver_index=peers[i].sender,
2108                         counter=ii + 1,
2109                         encrypted_encapsulated_packet=peers[i].encrypt_transport(
2110                             (
2111                                 IP(
2112                                     src="10.0.%d.4" % i,
2113                                     dst=self.pg0.remote_hosts[i].ip4,
2114                                     ttl=20,
2115                                 )
2116                                 / UDP(sport=222, dport=223)
2117                                 / Raw()
2118                             )
2119                         ),
2120                     )
2121                 )
2122                 for ii in range(64)
2123             ]
2124
2125             rxs = self.send_and_expect(self.pg1, p, self.pg0)
2126
2127             for rx in rxs:
2128                 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
2129                 self.assertEqual(rx[IP].ttl, 19)
2130
2131         for r in routes:
2132             r.remove_vpp_config()
2133         for p in peers:
2134             p.remove_vpp_config()
2135         for i in wg_ifs:
2136             i.remove_vpp_config()
2137
2138     def test_wg_event(self):
2139         """Test events"""
2140         port = 12600
2141         ESTABLISHED_FLAG = (
2142             VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_ESTABLISHED
2143         )
2144         DEAD_FLAG = VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_STATUS_DEAD
2145
2146         # Create interfaces
2147         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2148         wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
2149         wg0.admin_up()
2150         wg1.admin_up()
2151
2152         # Check peer counter
2153         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
2154
2155         self.pg_enable_capture(self.pg_interfaces)
2156         self.pg_start()
2157
2158         # Create peers
2159         NUM_PEERS = 2
2160         self.pg2.generate_remote_hosts(NUM_PEERS)
2161         self.pg2.configure_ipv4_neighbors()
2162         self.pg1.generate_remote_hosts(NUM_PEERS)
2163         self.pg1.configure_ipv4_neighbors()
2164
2165         peers_0 = []
2166         peers_1 = []
2167         routes_0 = []
2168         routes_1 = []
2169         for i in range(NUM_PEERS):
2170             peers_0.append(
2171                 VppWgPeer(
2172                     self,
2173                     wg0,
2174                     self.pg1.remote_hosts[i].ip4,
2175                     port + 1 + i,
2176                     ["10.0.%d.4/32" % i],
2177                 ).add_vpp_config()
2178             )
2179             routes_0.append(
2180                 VppIpRoute(
2181                     self,
2182                     "10.0.%d.4" % i,
2183                     32,
2184                     [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
2185                 ).add_vpp_config()
2186             )
2187
2188             peers_1.append(
2189                 VppWgPeer(
2190                     self,
2191                     wg1,
2192                     self.pg2.remote_hosts[i].ip4,
2193                     port + 100 + i,
2194                     ["10.100.%d.4/32" % i],
2195                 ).add_vpp_config()
2196             )
2197             routes_1.append(
2198                 VppIpRoute(
2199                     self,
2200                     "10.100.%d.4" % i,
2201                     32,
2202                     [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
2203                 ).add_vpp_config()
2204             )
2205
2206         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
2207
2208         # Want events from the first perr of wg0
2209         # and from all wg1 peers
2210         peers_0[0].want_events()
2211         wg1.want_events()
2212
2213         for i in range(NUM_PEERS):
2214             # send a valid handsake init for which we expect a response
2215             p = peers_0[i].mk_handshake(self.pg1)
2216             rx = self.send_and_expect(self.pg1, [p], self.pg1)
2217             peers_0[i].consume_response(rx[0])
2218             if i == 0:
2219                 peers_0[0].wait_event(ESTABLISHED_FLAG)
2220
2221             p = peers_1[i].mk_handshake(self.pg2)
2222             rx = self.send_and_expect(self.pg2, [p], self.pg2)
2223             peers_1[i].consume_response(rx[0])
2224
2225         wg1.wait_events(ESTABLISHED_FLAG, [peers_1[0].index, peers_1[1].index])
2226
2227         # remove routes
2228         for r in routes_0:
2229             r.remove_vpp_config()
2230         for r in routes_1:
2231             r.remove_vpp_config()
2232
2233         # remove peers
2234         for i in range(NUM_PEERS):
2235             self.assertTrue(peers_0[i].query_vpp_config())
2236             peers_0[i].remove_vpp_config()
2237             if i == 0:
2238                 peers_0[i].wait_event(0)
2239                 peers_0[i].wait_event(DEAD_FLAG)
2240         for p in peers_1:
2241             self.assertTrue(p.query_vpp_config())
2242             p.remove_vpp_config()
2243             p.wait_event(0)
2244             p.wait_event(DEAD_FLAG)
2245
2246         wg0.remove_vpp_config()
2247         wg1.remove_vpp_config()
2248
2249     def test_wg_sending_handshake_when_admin_down(self):
2250         """Sending handshake when admin down"""
2251         port = 12323
2252
2253         # create wg interface
2254         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2255         wg0.config_ip4()
2256
2257         # create a peer
2258         peer_1 = VppWgPeer(
2259             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
2260         ).add_vpp_config()
2261         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2262
2263         self.pg_enable_capture(self.pg_interfaces)
2264         self.pg_start()
2265
2266         # wait for the peer to send a handshake initiation
2267         # expect no handshakes
2268         for i in range(2):
2269             self.pg1.assert_nothing_captured(remark="handshake packet(s) sent")
2270
2271         self.pg_enable_capture(self.pg_interfaces)
2272         self.pg_start()
2273
2274         # administratively enable the wg interface
2275         # expect the peer to send a handshake initiation
2276         wg0.admin_up()
2277         rxs = self.pg1.get_capture(1, timeout=2)
2278         peer_1.consume_init(rxs[0], self.pg1)
2279
2280         self.pg_enable_capture(self.pg_interfaces)
2281         self.pg_start()
2282
2283         # administratively disable the wg interface
2284         # expect no handshakes
2285         wg0.admin_down()
2286         for i in range(6):
2287             self.pg1.assert_nothing_captured(remark="handshake packet(s) sent")
2288
2289         # remove configs
2290         peer_1.remove_vpp_config()
2291         wg0.remove_vpp_config()
2292
2293     def test_wg_sending_data_when_admin_down(self):
2294         """Sending data when admin down"""
2295         port = 12323
2296
2297         # create wg interface
2298         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2299         wg0.admin_up()
2300         wg0.config_ip4()
2301
2302         self.pg_enable_capture(self.pg_interfaces)
2303         self.pg_start()
2304
2305         # create a peer
2306         peer_1 = VppWgPeer(
2307             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
2308         ).add_vpp_config()
2309         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2310
2311         # create a route to rewrite traffic into the wg interface
2312         r1 = VppIpRoute(
2313             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2314         ).add_vpp_config()
2315
2316         # wait for the peer to send a handshake initiation
2317         rxs = self.pg1.get_capture(1, timeout=2)
2318
2319         # prepare and send a handshake response
2320         # expect a keepalive message
2321         resp = peer_1.consume_init(rxs[0], self.pg1)
2322         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2323
2324         # verify the keepalive message
2325         b = peer_1.decrypt_transport(rxs[0])
2326         self.assertEqual(0, len(b))
2327
2328         # prepare and send a packet that will be rewritten into the wg interface
2329         # expect a data packet sent
2330         p = (
2331             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2332             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2333             / UDP(sport=555, dport=556)
2334             / Raw()
2335         )
2336         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
2337
2338         # verify the data packet
2339         peer_1.validate_encapped(rxs, p)
2340
2341         # administratively disable the wg interface
2342         wg0.admin_down()
2343
2344         # send a packet that will be rewritten into the wg interface
2345         # expect no data packets sent
2346         self.send_and_assert_no_replies(self.pg0, [p])
2347
2348         # administratively enable the wg interface
2349         # expect the peer to send a handshake initiation
2350         wg0.admin_up()
2351         peer_1.noise_reset()
2352         rxs = self.pg1.get_capture(1, timeout=2)
2353         resp = peer_1.consume_init(rxs[0], self.pg1)
2354
2355         # send a packet that will be rewritten into the wg interface
2356         # expect no data packets sent because the peer is not initiated
2357         self.send_and_assert_no_replies(self.pg0, [p])
2358         self.assertEqual(
2359             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
2360         )
2361
2362         # send a handshake response and expect a keepalive message
2363         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2364
2365         # verify the keepalive message
2366         b = peer_1.decrypt_transport(rxs[0])
2367         self.assertEqual(0, len(b))
2368
2369         # send a packet that will be rewritten into the wg interface
2370         # expect a data packet sent
2371         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
2372
2373         # verify the data packet
2374         peer_1.validate_encapped(rxs, p)
2375
2376         # remove configs
2377         r1.remove_vpp_config()
2378         peer_1.remove_vpp_config()
2379         wg0.remove_vpp_config()
2380
2381
2382 class WireguardHandoffTests(TestWg):
2383     """Wireguard Tests in multi worker setup"""
2384
2385     vpp_worker_count = 2
2386
2387     def test_wg_peer_init(self):
2388         """Handoff"""
2389
2390         port = 12383
2391
2392         # Create interfaces
2393         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2394         wg0.admin_up()
2395         wg0.config_ip4()
2396
2397         peer_1 = VppWgPeer(
2398             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.2.0/24", "10.11.3.0/24"]
2399         ).add_vpp_config()
2400         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2401
2402         r1 = VppIpRoute(
2403             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2404         ).add_vpp_config()
2405
2406         # send a valid handsake init for which we expect a response
2407         p = peer_1.mk_handshake(self.pg1)
2408
2409         rx = self.send_and_expect(self.pg1, [p], self.pg1)
2410
2411         peer_1.consume_response(rx[0])
2412
2413         # send a data packet from the peer through the tunnel
2414         # this completes the handshake and pins the peer to worker 0
2415         p = (
2416             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
2417             / UDP(sport=222, dport=223)
2418             / Raw()
2419         )
2420         d = peer_1.encrypt_transport(p)
2421         p = peer_1.mk_tunnel_header(self.pg1) / (
2422             Wireguard(message_type=4, reserved_zero=0)
2423             / WireguardTransport(
2424                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
2425             )
2426         )
2427         rxs = self.send_and_expect(self.pg1, [p], self.pg0, worker=0)
2428
2429         for rx in rxs:
2430             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
2431             self.assertEqual(rx[IP].ttl, 19)
2432
2433         # send a packets that are routed into the tunnel
2434         # and pins the peer tp worker 1
2435         pe = (
2436             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2437             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2438             / UDP(sport=555, dport=556)
2439             / Raw(b"\x00" * 80)
2440         )
2441         rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=1)
2442         peer_1.validate_encapped(rxs, pe)
2443
2444         # send packets into the tunnel, from the other worker
2445         p = [
2446             (
2447                 peer_1.mk_tunnel_header(self.pg1)
2448                 / Wireguard(message_type=4, reserved_zero=0)
2449                 / WireguardTransport(
2450                     receiver_index=peer_1.sender,
2451                     counter=ii + 1,
2452                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
2453                         (
2454                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
2455                             / UDP(sport=222, dport=223)
2456                             / Raw()
2457                         )
2458                     ),
2459                 )
2460             )
2461             for ii in range(255)
2462         ]
2463
2464         rxs = self.send_and_expect(self.pg1, p, self.pg0, worker=1)
2465
2466         for rx in rxs:
2467             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
2468             self.assertEqual(rx[IP].ttl, 19)
2469
2470         # send a packets that are routed into the tunnel
2471         # from worker 0
2472         rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=0)
2473
2474         peer_1.validate_encapped(rxs, pe)
2475
2476         r1.remove_vpp_config()
2477         peer_1.remove_vpp_config()
2478         wg0.remove_vpp_config()
2479
2480     @unittest.skip("test disabled")
2481     def test_wg_multi_interface(self):
2482         """Multi-tunnel on the same port"""
2483
2484
2485 class TestWgFIB(VppTestCase):
2486     """Wireguard FIB Test Case"""
2487
2488     @classmethod
2489     def setUpClass(cls):
2490         super(TestWgFIB, cls).setUpClass()
2491
2492     @classmethod
2493     def tearDownClass(cls):
2494         super(TestWgFIB, cls).tearDownClass()
2495
2496     def setUp(self):
2497         super(TestWgFIB, self).setUp()
2498
2499         self.create_pg_interfaces(range(2))
2500
2501         for i in self.pg_interfaces:
2502             i.admin_up()
2503             i.config_ip4()
2504
2505     def tearDown(self):
2506         for i in self.pg_interfaces:
2507             i.unconfig_ip4()
2508             i.admin_down()
2509         super(TestWgFIB, self).tearDown()
2510
2511     def test_wg_fib_tracking(self):
2512         """FIB tracking"""
2513         port = 12323
2514
2515         # create wg interface
2516         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2517         wg0.admin_up()
2518         wg0.config_ip4()
2519
2520         self.pg_enable_capture(self.pg_interfaces)
2521         self.pg_start()
2522
2523         # create a peer
2524         peer_1 = VppWgPeer(
2525             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
2526         ).add_vpp_config()
2527         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2528
2529         # create a route to rewrite traffic into the wg interface
2530         r1 = VppIpRoute(
2531             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2532         ).add_vpp_config()
2533
2534         # resolve ARP and expect the adjacency to update
2535         self.pg1.resolve_arp()
2536
2537         # wait for the peer to send a handshake initiation
2538         rxs = self.pg1.get_capture(2, timeout=6)
2539
2540         # prepare and send a handshake response
2541         # expect a keepalive message
2542         resp = peer_1.consume_init(rxs[1], self.pg1)
2543         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2544
2545         # verify the keepalive message
2546         b = peer_1.decrypt_transport(rxs[0])
2547         self.assertEqual(0, len(b))
2548
2549         # prepare and send a packet that will be rewritten into the wg interface
2550         # expect a data packet sent
2551         p = (
2552             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2553             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2554             / UDP(sport=555, dport=556)
2555             / Raw()
2556         )
2557         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
2558
2559         # verify the data packet
2560         peer_1.validate_encapped(rxs, p)
2561
2562         # remove configs
2563         r1.remove_vpp_config()
2564         peer_1.remove_vpp_config()
2565         wg0.remove_vpp_config()