8 from hashlib import blake2s
9 from scapy.packet import Packet
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP, UDP
13 from scapy.layers.inet6 import IPv6
14 from scapy.contrib.wireguard import (
20 from cryptography.hazmat.primitives.asymmetric.x25519 import (
24 from cryptography.hazmat.primitives.serialization import (
30 from cryptography.hazmat.primitives.hashes import BLAKE2s, Hash
31 from cryptography.hazmat.primitives.hmac import HMAC
32 from cryptography.hazmat.backends import default_backend
33 from noise.connection import NoiseConnection, Keypair
35 from vpp_ipip_tun_interface import VppIpIpTunInterface
36 from vpp_interface import VppInterface
37 from vpp_ip_route import VppIpRoute, VppRoutePath
38 from vpp_object import VppObject
39 from vpp_papi import VppEnum
40 from framework import VppTestCase
41 from re import compile
44 """ TestWg is a subclass of VPPTestCase classes.
51 def private_key_bytes(k):
52 return k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
55 def public_key_bytes(k):
56 return k.public_bytes(Encoding.Raw, PublicFormat.Raw)
59 class VppWgInterface(VppInterface):
61 VPP WireGuard interface
64 def __init__(self, test, src, port):
65 super(VppWgInterface, self).__init__(test)
69 self.private_key = X25519PrivateKey.generate()
70 self.public_key = self.private_key.public_key()
72 def public_key_bytes(self):
73 return public_key_bytes(self.public_key)
75 def private_key_bytes(self):
76 return private_key_bytes(self.private_key)
78 def add_vpp_config(self):
79 r = self.test.vapi.wireguard_interface_create(
81 "user_instance": 0xFFFFFFFF,
84 "private_key": private_key_bytes(self.private_key),
85 "generate_key": False,
88 self.set_sw_if_index(r.sw_if_index)
89 self.test.registry.register(self, self.test.logger)
92 def remove_vpp_config(self):
93 self.test.vapi.wireguard_interface_delete(sw_if_index=self._sw_if_index)
95 def query_vpp_config(self):
96 ts = self.test.vapi.wireguard_interface_dump(sw_if_index=0xFFFFFFFF)
99 t.interface.sw_if_index == self._sw_if_index
100 and str(t.interface.src_ip) == self.src
101 and t.interface.port == self.port
102 and t.interface.private_key == private_key_bytes(self.private_key)
107 def want_events(self, peer_index=0xFFFFFFFF):
108 self.test.vapi.want_wireguard_peer_events(
111 sw_if_index=self._sw_if_index,
112 peer_index=peer_index,
115 def wait_events(self, expect, peers, timeout=5):
116 for i in range(len(peers)):
117 rv = self.test.vapi.wait_for_event(timeout, "wireguard_peer_event")
118 self.test.assertEqual(rv.peer_index, peers[i])
119 self.test.assertEqual(rv.flags, expect)
122 return self.object_id()
125 return "wireguard-%d" % self._sw_if_index
128 def find_route(test, prefix, is_ip6, table_id=0):
129 routes = test.vapi.ip_route_dump(table_id, is_ip6)
132 if table_id == e.route.table_id and str(e.route.prefix) == str(prefix):
137 NOISE_HANDSHAKE_NAME = b"Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s"
138 NOISE_IDENTIFIER_NAME = b"WireGuard v1 zx2c4 Jason@zx2c4.com"
141 class VppWgPeer(VppObject):
142 def __init__(self, test, itf, endpoint, port, allowed_ips, persistent_keepalive=15):
145 self.endpoint = endpoint
147 self.allowed_ips = allowed_ips
148 self.persistent_keepalive = persistent_keepalive
150 # remote peer's public
151 self.private_key = X25519PrivateKey.generate()
152 self.public_key = self.private_key.public_key()
154 self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME)
156 def add_vpp_config(self, is_ip6=False):
157 rv = self._test.vapi.wireguard_peer_add(
159 "public_key": self.public_key_bytes(),
161 "endpoint": self.endpoint,
162 "n_allowed_ips": len(self.allowed_ips),
163 "allowed_ips": self.allowed_ips,
164 "sw_if_index": self.itf.sw_if_index,
165 "persistent_keepalive": self.persistent_keepalive,
168 self.index = rv.peer_index
169 self.receiver_index = self.index + 1
170 self._test.registry.register(self, self._test.logger)
173 def remove_vpp_config(self):
174 self._test.vapi.wireguard_peer_remove(peer_index=self.index)
177 return "wireguard-peer-%s" % self.index
179 def public_key_bytes(self):
180 return public_key_bytes(self.public_key)
182 def query_vpp_config(self):
183 peers = self._test.vapi.wireguard_peers_dump()
187 p.peer.public_key == self.public_key_bytes()
188 and p.peer.port == self.port
189 and str(p.peer.endpoint) == self.endpoint
190 and p.peer.sw_if_index == self.itf.sw_if_index
191 and len(self.allowed_ips) == p.peer.n_allowed_ips
193 self.allowed_ips.sort()
194 p.peer.allowed_ips.sort()
196 for (a1, a2) in zip(self.allowed_ips, p.peer.allowed_ips):
197 if str(a1) != str(a2):
202 def set_responder(self):
203 self.noise.set_as_responder()
205 def mk_tunnel_header(self, tx_itf, is_ip6=False):
208 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
209 / IP(src=self.endpoint, dst=self.itf.src)
210 / UDP(sport=self.port, dport=self.itf.port)
214 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
215 / IPv6(src=self.endpoint, dst=self.itf.src)
216 / UDP(sport=self.port, dport=self.itf.port)
219 def noise_init(self, public_key=None):
220 self.noise.set_prologue(NOISE_IDENTIFIER_NAME)
221 self.noise.set_psks(psk=bytes(bytearray(32)))
224 public_key = self.itf.public_key
227 self.noise.set_keypair_from_private_bytes(
228 Keypair.STATIC, private_key_bytes(self.private_key)
231 self.noise.set_keypair_from_public_bytes(
232 Keypair.REMOTE_STATIC, public_key_bytes(public_key)
235 self.noise.start_handshake()
237 def mk_handshake(self, tx_itf, is_ip6=False, public_key=None):
238 self.noise.set_as_initiator()
239 self.noise_init(public_key)
241 p = Wireguard() / WireguardInitiation()
243 p[Wireguard].message_type = 1
244 p[Wireguard].reserved_zero = 0
245 p[WireguardInitiation].sender_index = self.receiver_index
247 # some random data for the message
248 # lifted from the noise protocol's wireguard example
249 now = datetime.datetime.now()
252 4611686018427387914 + int(now.timestamp()),
253 int(now.microsecond * 1e3),
255 b = self.noise.write_message(payload=tai)
257 # load noise into init message
258 p[WireguardInitiation].unencrypted_ephemeral = b[0:32]
259 p[WireguardInitiation].encrypted_static = b[32:80]
260 p[WireguardInitiation].encrypted_timestamp = b[80:108]
262 # generate the mac1 hash
263 mac_key = blake2s(b"mac1----" + self.itf.public_key_bytes()).digest()
264 p[WireguardInitiation].mac1 = blake2s(
265 bytes(p)[0:116], digest_size=16, key=mac_key
267 p[WireguardInitiation].mac2 = bytearray(16)
269 p = self.mk_tunnel_header(tx_itf, is_ip6) / p
273 def verify_header(self, p, is_ip6=False):
275 self._test.assertEqual(p[IP].src, self.itf.src)
276 self._test.assertEqual(p[IP].dst, self.endpoint)
278 self._test.assertEqual(p[IPv6].src, self.itf.src)
279 self._test.assertEqual(p[IPv6].dst, self.endpoint)
280 self._test.assertEqual(p[UDP].sport, self.itf.port)
281 self._test.assertEqual(p[UDP].dport, self.port)
282 self._test.assert_packet_checksums_valid(p)
284 def consume_init(self, p, tx_itf, is_ip6=False):
285 self.noise.set_as_responder()
286 self.noise_init(self.itf.public_key)
287 self.verify_header(p, is_ip6)
289 init = Wireguard(p[Raw])
291 self._test.assertEqual(init[Wireguard].message_type, 1)
292 self._test.assertEqual(init[Wireguard].reserved_zero, 0)
294 self.sender = init[WireguardInitiation].sender_index
297 mac_key = blake2s(b"mac1----" + public_key_bytes(self.public_key)).digest()
298 mac1 = blake2s(bytes(init)[0:-32], digest_size=16, key=mac_key).digest()
299 self._test.assertEqual(init[WireguardInitiation].mac1, mac1)
301 # this passes only unencrypted_ephemeral, encrypted_static,
302 # encrypted_timestamp fields of the init
303 payload = self.noise.read_message(bytes(init)[8:-32])
306 b = self.noise.write_message()
307 mac_key = blake2s(b"mac1----" + public_key_bytes(self.itf.public_key)).digest()
308 resp = Wireguard(message_type=2, reserved_zero=0) / WireguardResponse(
309 sender_index=self.receiver_index,
310 receiver_index=self.sender,
311 unencrypted_ephemeral=b[0:32],
312 encrypted_nothing=b[32:],
314 mac1 = blake2s(bytes(resp)[:-32], digest_size=16, key=mac_key).digest()
315 resp[WireguardResponse].mac1 = mac1
317 resp = self.mk_tunnel_header(tx_itf, is_ip6) / resp
318 self._test.assertTrue(self.noise.handshake_finished)
322 def consume_response(self, p, is_ip6=False):
323 self.verify_header(p, is_ip6)
325 resp = Wireguard(p[Raw])
327 self._test.assertEqual(resp[Wireguard].message_type, 2)
328 self._test.assertEqual(resp[Wireguard].reserved_zero, 0)
329 self._test.assertEqual(
330 resp[WireguardResponse].receiver_index, self.receiver_index
333 self.sender = resp[Wireguard].sender_index
335 payload = self.noise.read_message(bytes(resp)[12:60])
336 self._test.assertEqual(payload, b"")
337 self._test.assertTrue(self.noise.handshake_finished)
339 def decrypt_transport(self, p, is_ip6=False):
340 self.verify_header(p, is_ip6)
342 p = Wireguard(p[Raw])
343 self._test.assertEqual(p[Wireguard].message_type, 4)
344 self._test.assertEqual(p[Wireguard].reserved_zero, 0)
345 self._test.assertEqual(
346 p[WireguardTransport].receiver_index, self.receiver_index
349 d = self.noise.decrypt(p[WireguardTransport].encrypted_encapsulated_packet)
352 def encrypt_transport(self, p):
353 return self.noise.encrypt(bytes(p))
355 def validate_encapped(self, rxs, tx, is_ip6=False):
358 rx = IP(self.decrypt_transport(rx))
360 # chech the oringial packet is present
361 self._test.assertEqual(rx[IP].dst, tx[IP].dst)
362 self._test.assertEqual(rx[IP].ttl, tx[IP].ttl - 1)
364 rx = IPv6(self.decrypt_transport(rx))
366 # chech the oringial packet is present
367 self._test.assertEqual(rx[IPv6].dst, tx[IPv6].dst)
368 self._test.assertEqual(rx[IPv6].ttl, tx[IPv6].ttl - 1)
370 def want_events(self):
371 self._test.vapi.want_wireguard_peer_events(
374 peer_index=self.index,
375 sw_if_index=self.itf.sw_if_index,
378 def wait_event(self, expect, timeout=5):
379 rv = self._test.vapi.wait_for_event(timeout, "wireguard_peer_event")
380 self._test.assertEqual(rv.flags, expect)
381 self._test.assertEqual(rv.peer_index, self.index)
384 class TestWg(VppTestCase):
385 """Wireguard Test Case"""
387 error_str = compile(r"Error")
389 wg4_output_node_name = "/err/wg4-output-tun/"
390 wg4_input_node_name = "/err/wg4-input/"
391 wg6_output_node_name = "/err/wg6-output-tun/"
392 wg6_input_node_name = "/err/wg6-input/"
393 kp4_error = wg4_output_node_name + "Keypair error"
394 mac4_error = wg4_input_node_name + "Invalid MAC handshake"
395 peer4_in_err = wg4_input_node_name + "Peer error"
396 peer4_out_err = wg4_output_node_name + "Peer error"
397 kp6_error = wg6_output_node_name + "Keypair error"
398 mac6_error = wg6_input_node_name + "Invalid MAC handshake"
399 peer6_in_err = wg6_input_node_name + "Peer error"
400 peer6_out_err = wg6_output_node_name + "Peer error"
404 super(TestWg, cls).setUpClass()
406 cls.create_pg_interfaces(range(3))
407 for i in cls.pg_interfaces:
415 super(TestWg, cls).tearDownClass()
419 def tearDownClass(cls):
420 super(TestWg, cls).tearDownClass()
423 super(VppTestCase, self).setUp()
424 self.base_kp4_err = self.statistics.get_err_counter(self.kp4_error)
425 self.base_mac4_err = self.statistics.get_err_counter(self.mac4_error)
426 self.base_peer4_in_err = self.statistics.get_err_counter(self.peer4_in_err)
427 self.base_peer4_out_err = self.statistics.get_err_counter(self.peer4_out_err)
428 self.base_kp6_err = self.statistics.get_err_counter(self.kp6_error)
429 self.base_mac6_err = self.statistics.get_err_counter(self.mac6_error)
430 self.base_peer6_in_err = self.statistics.get_err_counter(self.peer6_in_err)
431 self.base_peer6_out_err = self.statistics.get_err_counter(self.peer6_out_err)
433 def test_wg_interface(self):
434 """Simple interface creation"""
438 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
440 self.logger.info(self.vapi.cli("sh int"))
443 wg0.remove_vpp_config()
445 def test_handshake_hash(self):
446 """test hashing an init message"""
447 # a init packet generated by linux given the key below
470 b = bytearray.fromhex(h)
473 pubb = base64.b64decode("aRuHFTTxICIQNefp05oKWlJv3zgKxb8+WW7JJMh0jyM=")
474 pub = X25519PublicKey.from_public_bytes(pubb)
476 self.assertEqual(pubb, public_key_bytes(pub))
478 # strip the macs and build a new packet
480 mac_key = blake2s(b"mac1----" + public_key_bytes(pub)).digest()
481 init += blake2s(init, digest_size=16, key=mac_key).digest()
484 act = Wireguard(init)
486 self.assertEqual(tgt, act)
488 def test_wg_peer_resp(self):
489 """Send handshake response"""
493 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
497 self.pg_enable_capture(self.pg_interfaces)
501 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
503 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
506 self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
509 # wait for the peer to send a handshake
510 rx = self.pg1.get_capture(1, timeout=2)
512 # consume the handshake in the noise protocol and
513 # generate the response
514 resp = peer_1.consume_init(rx[0], self.pg1)
516 # send the response, get keepalive
517 rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
520 b = peer_1.decrypt_transport(rx)
521 self.assertEqual(0, len(b))
523 # send a packets that are routed into the tunnel
525 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
526 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
527 / UDP(sport=555, dport=556)
531 rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
533 peer_1.validate_encapped(rxs, p)
535 # send packets into the tunnel, expect to receive them on
539 peer_1.mk_tunnel_header(self.pg1)
540 / Wireguard(message_type=4, reserved_zero=0)
541 / WireguardTransport(
542 receiver_index=peer_1.sender,
544 encrypted_encapsulated_packet=peer_1.encrypt_transport(
546 IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
547 / UDP(sport=222, dport=223)
556 rxs = self.send_and_expect(self.pg1, p, self.pg0)
559 self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
560 self.assertEqual(rx[IP].ttl, 19)
562 r1.remove_vpp_config()
563 peer_1.remove_vpp_config()
564 wg0.remove_vpp_config()
566 def test_wg_peer_v4o4(self):
572 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
577 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
579 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
582 self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
585 self, "20.22.3.0", 24, [VppRoutePath("20.22.3.1", wg0.sw_if_index)]
588 # route a packet into the wg interface
589 # use the allowed-ip prefix
590 # this is dropped because the peer is not initiated
592 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
593 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
594 / UDP(sport=555, dport=556)
597 self.send_and_assert_no_replies(self.pg0, [p])
599 self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
602 # route a packet into the wg interface
603 # use a not allowed-ip prefix
604 # this is dropped because there is no matching peer
606 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
607 / IP(src=self.pg0.remote_ip4, dst="20.22.3.2")
608 / UDP(sport=555, dport=556)
611 self.send_and_assert_no_replies(self.pg0, [p])
613 self.base_peer4_out_err + 1,
614 self.statistics.get_err_counter(self.peer4_out_err),
617 # send a handsake from the peer with an invalid MAC
618 p = peer_1.mk_handshake(self.pg1)
619 p[WireguardInitiation].mac1 = b"foobar"
620 self.send_and_assert_no_replies(self.pg1, [p])
622 self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
625 # send a handsake from the peer but signed by the wrong key.
626 p = peer_1.mk_handshake(
627 self.pg1, False, X25519PrivateKey.generate().public_key()
629 self.send_and_assert_no_replies(self.pg1, [p])
631 self.base_peer4_in_err + 1,
632 self.statistics.get_err_counter(self.peer4_in_err),
635 # send a valid handsake init for which we expect a response
636 p = peer_1.mk_handshake(self.pg1)
638 rx = self.send_and_expect(self.pg1, [p], self.pg1)
640 peer_1.consume_response(rx[0])
642 # route a packet into the wg interface
643 # this is dropped because the peer is still not initiated
645 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
646 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
647 / UDP(sport=555, dport=556)
650 self.send_and_assert_no_replies(self.pg0, [p])
652 self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
655 # send a data packet from the peer through the tunnel
656 # this completes the handshake
658 IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
659 / UDP(sport=222, dport=223)
662 d = peer_1.encrypt_transport(p)
663 p = peer_1.mk_tunnel_header(self.pg1) / (
664 Wireguard(message_type=4, reserved_zero=0)
665 / WireguardTransport(
666 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
669 rxs = self.send_and_expect(self.pg1, [p], self.pg0)
672 self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
673 self.assertEqual(rx[IP].ttl, 19)
675 # send a packets that are routed into the tunnel
677 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
678 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
679 / UDP(sport=555, dport=556)
683 rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
686 rx = IP(peer_1.decrypt_transport(rx))
688 # chech the oringial packet is present
689 self.assertEqual(rx[IP].dst, p[IP].dst)
690 self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
692 # send packets into the tunnel, expect to receive them on
696 peer_1.mk_tunnel_header(self.pg1)
697 / Wireguard(message_type=4, reserved_zero=0)
698 / WireguardTransport(
699 receiver_index=peer_1.sender,
701 encrypted_encapsulated_packet=peer_1.encrypt_transport(
703 IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
704 / UDP(sport=222, dport=223)
713 rxs = self.send_and_expect(self.pg1, p, self.pg0)
716 self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
717 self.assertEqual(rx[IP].ttl, 19)
719 r1.remove_vpp_config()
720 r2.remove_vpp_config()
721 peer_1.remove_vpp_config()
722 wg0.remove_vpp_config()
724 def test_wg_peer_v6o6(self):
730 wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
735 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
736 ).add_vpp_config(True)
737 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
740 self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
743 self, "22::3:0", 112, [VppRoutePath("22::3:1", wg0.sw_if_index)]
746 # route a packet into the wg interface
747 # use the allowed-ip prefix
748 # this is dropped because the peer is not initiated
751 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
752 / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
753 / UDP(sport=555, dport=556)
756 self.send_and_assert_no_replies(self.pg0, [p])
759 self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
762 # route a packet into the wg interface
763 # use a not allowed-ip prefix
764 # this is dropped because there is no matching peer
766 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
767 / IPv6(src=self.pg0.remote_ip6, dst="22::3:2")
768 / UDP(sport=555, dport=556)
771 self.send_and_assert_no_replies(self.pg0, [p])
773 self.base_peer6_out_err + 1,
774 self.statistics.get_err_counter(self.peer6_out_err),
777 # send a handsake from the peer with an invalid MAC
778 p = peer_1.mk_handshake(self.pg1, True)
779 p[WireguardInitiation].mac1 = b"foobar"
780 self.send_and_assert_no_replies(self.pg1, [p])
783 self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
786 # send a handsake from the peer but signed by the wrong key.
787 p = peer_1.mk_handshake(
788 self.pg1, True, X25519PrivateKey.generate().public_key()
790 self.send_and_assert_no_replies(self.pg1, [p])
792 self.base_peer6_in_err + 1,
793 self.statistics.get_err_counter(self.peer6_in_err),
796 # send a valid handsake init for which we expect a response
797 p = peer_1.mk_handshake(self.pg1, True)
799 rx = self.send_and_expect(self.pg1, [p], self.pg1)
801 peer_1.consume_response(rx[0], True)
803 # route a packet into the wg interface
804 # this is dropped because the peer is still not initiated
806 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
807 / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
808 / UDP(sport=555, dport=556)
811 self.send_and_assert_no_replies(self.pg0, [p])
813 self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
816 # send a data packet from the peer through the tunnel
817 # this completes the handshake
819 IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
820 / UDP(sport=222, dport=223)
823 d = peer_1.encrypt_transport(p)
824 p = peer_1.mk_tunnel_header(self.pg1, True) / (
825 Wireguard(message_type=4, reserved_zero=0)
826 / WireguardTransport(
827 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
830 rxs = self.send_and_expect(self.pg1, [p], self.pg0)
833 self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
834 self.assertEqual(rx[IPv6].hlim, 19)
836 # send a packets that are routed into the tunnel
838 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
839 / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
840 / UDP(sport=555, dport=556)
844 rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
847 rx = IPv6(peer_1.decrypt_transport(rx, True))
849 # chech the oringial packet is present
850 self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
851 self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
853 # send packets into the tunnel, expect to receive them on
857 peer_1.mk_tunnel_header(self.pg1, True)
858 / Wireguard(message_type=4, reserved_zero=0)
859 / WireguardTransport(
860 receiver_index=peer_1.sender,
862 encrypted_encapsulated_packet=peer_1.encrypt_transport(
864 IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
865 / UDP(sport=222, dport=223)
874 rxs = self.send_and_expect(self.pg1, p, self.pg0)
877 self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
878 self.assertEqual(rx[IPv6].hlim, 19)
880 r1.remove_vpp_config()
881 r2.remove_vpp_config()
882 peer_1.remove_vpp_config()
883 wg0.remove_vpp_config()
885 def test_wg_peer_v6o4(self):
891 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
896 self, wg0, self.pg1.remote_ip4, port + 1, ["1::3:0/112"]
897 ).add_vpp_config(True)
898 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
901 self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
904 # route a packet into the wg interface
905 # use the allowed-ip prefix
906 # this is dropped because the peer is not initiated
908 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
909 / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
910 / UDP(sport=555, dport=556)
913 self.send_and_assert_no_replies(self.pg0, [p])
915 self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
918 # send a handsake from the peer with an invalid MAC
919 p = peer_1.mk_handshake(self.pg1)
920 p[WireguardInitiation].mac1 = b"foobar"
921 self.send_and_assert_no_replies(self.pg1, [p])
924 self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
927 # send a handsake from the peer but signed by the wrong key.
928 p = peer_1.mk_handshake(
929 self.pg1, False, X25519PrivateKey.generate().public_key()
931 self.send_and_assert_no_replies(self.pg1, [p])
933 self.base_peer4_in_err + 1,
934 self.statistics.get_err_counter(self.peer4_in_err),
937 # send a valid handsake init for which we expect a response
938 p = peer_1.mk_handshake(self.pg1)
940 rx = self.send_and_expect(self.pg1, [p], self.pg1)
942 peer_1.consume_response(rx[0])
944 # route a packet into the wg interface
945 # this is dropped because the peer is still not initiated
947 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
948 / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
949 / UDP(sport=555, dport=556)
952 self.send_and_assert_no_replies(self.pg0, [p])
954 self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
957 # send a data packet from the peer through the tunnel
958 # this completes the handshake
960 IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
961 / UDP(sport=222, dport=223)
964 d = peer_1.encrypt_transport(p)
965 p = peer_1.mk_tunnel_header(self.pg1) / (
966 Wireguard(message_type=4, reserved_zero=0)
967 / WireguardTransport(
968 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
971 rxs = self.send_and_expect(self.pg1, [p], self.pg0)
974 self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
975 self.assertEqual(rx[IPv6].hlim, 19)
977 # send a packets that are routed into the tunnel
979 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
980 / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
981 / UDP(sport=555, dport=556)
985 rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
988 rx = IPv6(peer_1.decrypt_transport(rx))
990 # chech the oringial packet is present
991 self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
992 self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
994 # send packets into the tunnel, expect to receive them on
998 peer_1.mk_tunnel_header(self.pg1)
999 / Wireguard(message_type=4, reserved_zero=0)
1000 / WireguardTransport(
1001 receiver_index=peer_1.sender,
1003 encrypted_encapsulated_packet=peer_1.encrypt_transport(
1005 IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1006 / UDP(sport=222, dport=223)
1012 for ii in range(255)
1015 rxs = self.send_and_expect(self.pg1, p, self.pg0)
1018 self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1019 self.assertEqual(rx[IPv6].hlim, 19)
1021 r1.remove_vpp_config()
1022 peer_1.remove_vpp_config()
1023 wg0.remove_vpp_config()
1025 def test_wg_peer_v4o6(self):
1031 wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1036 self, wg0, self.pg1.remote_ip6, port + 1, ["10.11.3.0/24"]
1038 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1041 self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1044 # route a packet into the wg interface
1045 # use the allowed-ip prefix
1046 # this is dropped because the peer is not initiated
1048 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1049 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1050 / UDP(sport=555, dport=556)
1053 self.send_and_assert_no_replies(self.pg0, [p])
1055 self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
1058 # send a handsake from the peer with an invalid MAC
1059 p = peer_1.mk_handshake(self.pg1, True)
1060 p[WireguardInitiation].mac1 = b"foobar"
1061 self.send_and_assert_no_replies(self.pg1, [p])
1063 self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
1066 # send a handsake from the peer but signed by the wrong key.
1067 p = peer_1.mk_handshake(
1068 self.pg1, True, X25519PrivateKey.generate().public_key()
1070 self.send_and_assert_no_replies(self.pg1, [p])
1072 self.base_peer6_in_err + 1,
1073 self.statistics.get_err_counter(self.peer6_in_err),
1076 # send a valid handsake init for which we expect a response
1077 p = peer_1.mk_handshake(self.pg1, True)
1079 rx = self.send_and_expect(self.pg1, [p], self.pg1)
1081 peer_1.consume_response(rx[0], True)
1083 # route a packet into the wg interface
1084 # this is dropped because the peer is still not initiated
1086 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1087 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1088 / UDP(sport=555, dport=556)
1091 self.send_and_assert_no_replies(self.pg0, [p])
1093 self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
1096 # send a data packet from the peer through the tunnel
1097 # this completes the handshake
1099 IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1100 / UDP(sport=222, dport=223)
1103 d = peer_1.encrypt_transport(p)
1104 p = peer_1.mk_tunnel_header(self.pg1, True) / (
1105 Wireguard(message_type=4, reserved_zero=0)
1106 / WireguardTransport(
1107 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1110 rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1113 self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1114 self.assertEqual(rx[IP].ttl, 19)
1116 # send a packets that are routed into the tunnel
1118 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1119 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1120 / UDP(sport=555, dport=556)
1124 rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1127 rx = IP(peer_1.decrypt_transport(rx, True))
1129 # chech the oringial packet is present
1130 self.assertEqual(rx[IP].dst, p[IP].dst)
1131 self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
1133 # send packets into the tunnel, expect to receive them on
1137 peer_1.mk_tunnel_header(self.pg1, True)
1138 / Wireguard(message_type=4, reserved_zero=0)
1139 / WireguardTransport(
1140 receiver_index=peer_1.sender,
1142 encrypted_encapsulated_packet=peer_1.encrypt_transport(
1144 IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1145 / UDP(sport=222, dport=223)
1151 for ii in range(255)
1154 rxs = self.send_and_expect(self.pg1, p, self.pg0)
1157 self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1158 self.assertEqual(rx[IP].ttl, 19)
1160 r1.remove_vpp_config()
1161 peer_1.remove_vpp_config()
1162 wg0.remove_vpp_config()
1164 def test_wg_multi_peer(self):
1165 """multiple peer setup"""
1169 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1170 wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
1174 # Check peer counter
1175 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
1177 self.pg_enable_capture(self.pg_interfaces)
1180 # Create many peers on sencond interface
1182 self.pg2.generate_remote_hosts(NUM_PEERS)
1183 self.pg2.configure_ipv4_neighbors()
1184 self.pg1.generate_remote_hosts(NUM_PEERS)
1185 self.pg1.configure_ipv4_neighbors()
1191 for i in range(NUM_PEERS):
1196 self.pg1.remote_hosts[i].ip4,
1198 ["10.0.%d.4/32" % i],
1206 [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
1214 self.pg2.remote_hosts[i].ip4,
1216 ["10.100.%d.4/32" % i],
1224 [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
1228 self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
1230 self.logger.info(self.vapi.cli("show wireguard peer"))
1231 self.logger.info(self.vapi.cli("show wireguard interface"))
1232 self.logger.info(self.vapi.cli("show adj 37"))
1233 self.logger.info(self.vapi.cli("sh ip fib 172.16.3.17"))
1234 self.logger.info(self.vapi.cli("sh ip fib 10.11.3.0"))
1238 r.remove_vpp_config()
1240 r.remove_vpp_config()
1244 self.assertTrue(p.query_vpp_config())
1245 p.remove_vpp_config()
1247 self.assertTrue(p.query_vpp_config())
1248 p.remove_vpp_config()
1250 wg0.remove_vpp_config()
1251 wg1.remove_vpp_config()
1253 def test_wg_multi_interface(self):
1254 """Multi-tunnel on the same port"""
1257 # Create many wireguard interfaces
1259 self.pg1.generate_remote_hosts(NUM_IFS)
1260 self.pg1.configure_ipv4_neighbors()
1261 self.pg0.generate_remote_hosts(NUM_IFS)
1262 self.pg0.configure_ipv4_neighbors()
1264 # Create interfaces with a peer on each
1268 for i in range(NUM_IFS):
1269 # Use the same port for each interface
1270 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1278 self.pg1.remote_hosts[i].ip4,
1280 ["10.0.%d.0/24" % i],
1289 [VppRoutePath("10.0.%d.4" % i, wg0.sw_if_index)],
1293 self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_IFS)
1295 for i in range(NUM_IFS):
1296 # send a valid handsake init for which we expect a response
1297 p = peers[i].mk_handshake(self.pg1)
1298 rx = self.send_and_expect(self.pg1, [p], self.pg1)
1299 peers[i].consume_response(rx[0])
1301 # send a data packet from the peer through the tunnel
1302 # this completes the handshake
1304 IP(src="10.0.%d.4" % i, dst=self.pg0.remote_hosts[i].ip4, ttl=20)
1305 / UDP(sport=222, dport=223)
1308 d = peers[i].encrypt_transport(p)
1309 p = peers[i].mk_tunnel_header(self.pg1) / (
1310 Wireguard(message_type=4, reserved_zero=0)
1311 / WireguardTransport(
1312 receiver_index=peers[i].sender,
1314 encrypted_encapsulated_packet=d,
1317 rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1319 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
1320 self.assertEqual(rx[IP].ttl, 19)
1322 # send a packets that are routed into the tunnel
1323 for i in range(NUM_IFS):
1325 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1326 / IP(src=self.pg0.remote_hosts[i].ip4, dst="10.0.%d.4" % i)
1327 / UDP(sport=555, dport=556)
1331 rxs = self.send_and_expect(self.pg0, p * 64, self.pg1)
1334 rx = IP(peers[i].decrypt_transport(rx))
1336 # check the oringial packet is present
1337 self.assertEqual(rx[IP].dst, p[IP].dst)
1338 self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
1340 # send packets into the tunnel
1341 for i in range(NUM_IFS):
1344 peers[i].mk_tunnel_header(self.pg1)
1345 / Wireguard(message_type=4, reserved_zero=0)
1346 / WireguardTransport(
1347 receiver_index=peers[i].sender,
1349 encrypted_encapsulated_packet=peers[i].encrypt_transport(
1352 src="10.0.%d.4" % i,
1353 dst=self.pg0.remote_hosts[i].ip4,
1356 / UDP(sport=222, dport=223)
1365 rxs = self.send_and_expect(self.pg1, p, self.pg0)
1368 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
1369 self.assertEqual(rx[IP].ttl, 19)
1372 r.remove_vpp_config()
1374 p.remove_vpp_config()
1376 i.remove_vpp_config()
1378 def test_wg_event(self):
1381 ESTABLISHED_FLAG = (
1382 VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_ESTABLISHED
1384 DEAD_FLAG = VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_STATUS_DEAD
1387 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1388 wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
1392 # Check peer counter
1393 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
1395 self.pg_enable_capture(self.pg_interfaces)
1400 self.pg2.generate_remote_hosts(NUM_PEERS)
1401 self.pg2.configure_ipv4_neighbors()
1402 self.pg1.generate_remote_hosts(NUM_PEERS)
1403 self.pg1.configure_ipv4_neighbors()
1409 for i in range(NUM_PEERS):
1414 self.pg1.remote_hosts[i].ip4,
1416 ["10.0.%d.4/32" % i],
1424 [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
1432 self.pg2.remote_hosts[i].ip4,
1434 ["10.100.%d.4/32" % i],
1442 [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
1446 self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
1448 # Want events from the first perr of wg0
1449 # and from all wg1 peers
1450 peers_0[0].want_events()
1453 for i in range(NUM_PEERS):
1454 # send a valid handsake init for which we expect a response
1455 p = peers_0[i].mk_handshake(self.pg1)
1456 rx = self.send_and_expect(self.pg1, [p], self.pg1)
1457 peers_0[i].consume_response(rx[0])
1459 peers_0[0].wait_event(ESTABLISHED_FLAG)
1461 p = peers_1[i].mk_handshake(self.pg2)
1462 rx = self.send_and_expect(self.pg2, [p], self.pg2)
1463 peers_1[i].consume_response(rx[0])
1465 wg1.wait_events(ESTABLISHED_FLAG, [peers_1[0].index, peers_1[1].index])
1469 r.remove_vpp_config()
1471 r.remove_vpp_config()
1474 for i in range(NUM_PEERS):
1475 self.assertTrue(peers_0[i].query_vpp_config())
1476 peers_0[i].remove_vpp_config()
1478 peers_0[i].wait_event(0)
1479 peers_0[i].wait_event(DEAD_FLAG)
1481 self.assertTrue(p.query_vpp_config())
1482 p.remove_vpp_config()
1484 p.wait_event(DEAD_FLAG)
1486 wg0.remove_vpp_config()
1487 wg1.remove_vpp_config()
1490 class WireguardHandoffTests(TestWg):
1491 """Wireguard Tests in multi worker setup"""
1493 vpp_worker_count = 2
1495 def test_wg_peer_init(self):
1501 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1506 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.2.0/24", "10.11.3.0/24"]
1508 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1511 self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1514 # send a valid handsake init for which we expect a response
1515 p = peer_1.mk_handshake(self.pg1)
1517 rx = self.send_and_expect(self.pg1, [p], self.pg1)
1519 peer_1.consume_response(rx[0])
1521 # send a data packet from the peer through the tunnel
1522 # this completes the handshake and pins the peer to worker 0
1524 IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1525 / UDP(sport=222, dport=223)
1528 d = peer_1.encrypt_transport(p)
1529 p = peer_1.mk_tunnel_header(self.pg1) / (
1530 Wireguard(message_type=4, reserved_zero=0)
1531 / WireguardTransport(
1532 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1535 rxs = self.send_and_expect(self.pg1, [p], self.pg0, worker=0)
1538 self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1539 self.assertEqual(rx[IP].ttl, 19)
1541 # send a packets that are routed into the tunnel
1542 # and pins the peer tp worker 1
1544 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1545 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1546 / UDP(sport=555, dport=556)
1549 rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=1)
1550 peer_1.validate_encapped(rxs, pe)
1552 # send packets into the tunnel, from the other worker
1555 peer_1.mk_tunnel_header(self.pg1)
1556 / Wireguard(message_type=4, reserved_zero=0)
1557 / WireguardTransport(
1558 receiver_index=peer_1.sender,
1560 encrypted_encapsulated_packet=peer_1.encrypt_transport(
1562 IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1563 / UDP(sport=222, dport=223)
1569 for ii in range(255)
1572 rxs = self.send_and_expect(self.pg1, p, self.pg0, worker=1)
1575 self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1576 self.assertEqual(rx[IP].ttl, 19)
1578 # send a packets that are routed into the tunnel
1580 rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=0)
1582 peer_1.validate_encapped(rxs, pe)
1584 r1.remove_vpp_config()
1585 peer_1.remove_vpp_config()
1586 wg0.remove_vpp_config()
1588 @unittest.skip("test disabled")
1589 def test_wg_multi_interface(self):
1590 """Multi-tunnel on the same port"""