import datetime
import base64
+import os
from hashlib import blake2s
from scapy.packet import Packet
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet import IP, UDP
+from scapy.layers.inet6 import IPv6
from scapy.contrib.wireguard import Wireguard, WireguardResponse, \
WireguardInitiation, WireguardTransport
from cryptography.hazmat.primitives.asymmetric.x25519 import \
from vpp_ipip_tun_interface import VppIpIpTunInterface
from vpp_interface import VppInterface
+from vpp_ip_route import VppIpRoute, VppRoutePath
from vpp_object import VppObject
+from vpp_papi import VppEnum
from framework import VppTestCase
from re import compile
import unittest
return True
return False
+ def want_events(self, peer_index=0xffffffff):
+ self.test.vapi.want_wireguard_peer_events(
+ enable_disable=1,
+ pid=os.getpid(),
+ sw_if_index=self._sw_if_index,
+ peer_index=peer_index)
+
+ def wait_events(self, expect, peers, timeout=5):
+ for i in range(len(peers)):
+ rv = self.test.vapi.wait_for_event(timeout, "wireguard_peer_event")
+ self.test.assertEqual(rv.peer_index, peers[i])
+ self.test.assertEqual(rv.flags, expect)
+
def __str__(self):
return self.object_id()
return "wireguard-%d" % self._sw_if_index
-def find_route(test, prefix, table_id=0):
- routes = test.vapi.ip_route_dump(table_id, False)
+def find_route(test, prefix, is_ip6, table_id=0):
+ routes = test.vapi.ip_route_dump(table_id, is_ip6)
for e in routes:
if table_id == e.route.table_id \
self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME)
- def validate_routing(self):
- for a in self.allowed_ips:
- self._test.assertTrue(find_route(self._test, a))
-
- def validate_no_routing(self):
- for a in self.allowed_ips:
- self._test.assertFalse(find_route(self._test, a))
-
- def add_vpp_config(self):
+ def add_vpp_config(self, is_ip6=False):
rv = self._test.vapi.wireguard_peer_add(
peer={
'public_key': self.public_key_bytes(),
self.index = rv.peer_index
self.receiver_index = self.index + 1
self._test.registry.register(self, self._test.logger)
- self.validate_routing()
return self
def remove_vpp_config(self):
self._test.vapi.wireguard_peer_remove(peer_index=self.index)
- self.validate_no_routing()
def object_id(self):
return ("wireguard-peer-%s" % self.index)
def set_responder(self):
self.noise.set_as_responder()
- def mk_tunnel_header(self, tx_itf):
- return (Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac) /
- IP(src=self.endpoint, dst=self.itf.src) /
- UDP(sport=self.port, dport=self.itf.port))
+ def mk_tunnel_header(self, tx_itf, is_ip6=False):
+ if is_ip6 is False:
+ return (Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac) /
+ IP(src=self.endpoint, dst=self.itf.src) /
+ UDP(sport=self.port, dport=self.itf.port))
+ else:
+ return (Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac) /
+ IPv6(src=self.endpoint, dst=self.itf.src) /
+ UDP(sport=self.port, dport=self.itf.port))
def noise_init(self, public_key=None):
self.noise.set_prologue(NOISE_IDENTIFIER_NAME)
self.noise.start_handshake()
- def mk_handshake(self, tx_itf, public_key=None):
+ def mk_handshake(self, tx_itf, is_ip6=False, public_key=None):
self.noise.set_as_initiator()
self.noise_init(public_key)
key=mac_key).digest()
p[WireguardInitiation].mac2 = bytearray(16)
- p = (self.mk_tunnel_header(tx_itf) / p)
+ p = (self.mk_tunnel_header(tx_itf, is_ip6) / p)
return p
- def verify_header(self, p):
- self._test.assertEqual(p[IP].src, self.itf.src)
- self._test.assertEqual(p[IP].dst, self.endpoint)
+ def verify_header(self, p, is_ip6=False):
+ if is_ip6 is False:
+ self._test.assertEqual(p[IP].src, self.itf.src)
+ self._test.assertEqual(p[IP].dst, self.endpoint)
+ else:
+ self._test.assertEqual(p[IPv6].src, self.itf.src)
+ self._test.assertEqual(p[IPv6].dst, self.endpoint)
self._test.assertEqual(p[UDP].sport, self.itf.port)
self._test.assertEqual(p[UDP].dport, self.port)
self._test.assert_packet_checksums_valid(p)
- def consume_init(self, p, tx_itf):
+ def consume_init(self, p, tx_itf, is_ip6=False):
self.noise.set_as_responder()
self.noise_init(self.itf.public_key)
- self.verify_header(p)
+ self.verify_header(p, is_ip6)
init = Wireguard(p[Raw])
key=mac_key).digest()
resp[WireguardResponse].mac1 = mac1
- resp = (self.mk_tunnel_header(tx_itf) / resp)
+ resp = (self.mk_tunnel_header(tx_itf, is_ip6) / resp)
self._test.assertTrue(self.noise.handshake_finished)
return resp
- def consume_response(self, p):
- self.verify_header(p)
+ def consume_response(self, p, is_ip6=False):
+ self.verify_header(p, is_ip6)
resp = Wireguard(p[Raw])
self._test.assertEqual(payload, b'')
self._test.assertTrue(self.noise.handshake_finished)
- def decrypt_transport(self, p):
- self.verify_header(p)
+ def decrypt_transport(self, p, is_ip6=False):
+ self.verify_header(p, is_ip6)
p = Wireguard(p[Raw])
self._test.assertEqual(p[Wireguard].message_type, 4)
def encrypt_transport(self, p):
return self.noise.encrypt(bytes(p))
- def validate_encapped(self, rxs, tx):
+ def validate_encapped(self, rxs, tx, is_ip6=False):
for rx in rxs:
- rx = IP(self.decrypt_transport(rx))
+ if is_ip6 is False:
+ rx = IP(self.decrypt_transport(rx))
- # chech the oringial packet is present
- self._test.assertEqual(rx[IP].dst, tx[IP].dst)
- self._test.assertEqual(rx[IP].ttl, tx[IP].ttl-1)
+ # chech the oringial packet is present
+ self._test.assertEqual(rx[IP].dst, tx[IP].dst)
+ self._test.assertEqual(rx[IP].ttl, tx[IP].ttl-1)
+ else:
+ rx = IPv6(self.decrypt_transport(rx))
+
+ # chech the oringial packet is present
+ self._test.assertEqual(rx[IPv6].dst, tx[IPv6].dst)
+ self._test.assertEqual(rx[IPv6].ttl, tx[IPv6].ttl-1)
+
+ def want_events(self):
+ self._test.vapi.want_wireguard_peer_events(
+ enable_disable=1,
+ pid=os.getpid(),
+ peer_index=self.index,
+ sw_if_index=self.itf.sw_if_index)
+
+ def wait_event(self, expect, timeout=5):
+ rv = self._test.vapi.wait_for_event(timeout, "wireguard_peer_event")
+ self._test.assertEqual(rv.flags, expect)
+ self._test.assertEqual(rv.peer_index, self.index)
class TestWg(VppTestCase):
error_str = compile(r"Error")
+ wg4_output_node_name = '/err/wg4-output-tun/'
+ wg4_input_node_name = '/err/wg4-input/'
+ wg6_output_node_name = '/err/wg6-output-tun/'
+ wg6_input_node_name = '/err/wg6-input/'
+ kp4_error = wg4_output_node_name + "Keypair error"
+ mac4_error = wg4_input_node_name + "Invalid MAC handshake"
+ peer4_error = wg4_input_node_name + "Peer error"
+ kp6_error = wg6_output_node_name + "Keypair error"
+ mac6_error = wg6_input_node_name + "Invalid MAC handshake"
+ peer6_error = wg6_input_node_name + "Peer error"
+
@classmethod
def setUpClass(cls):
super(TestWg, cls).setUpClass()
for i in cls.pg_interfaces:
i.admin_up()
i.config_ip4()
+ i.config_ip6()
i.resolve_arp()
+ i.resolve_ndp()
except Exception:
super(TestWg, cls).tearDownClass()
def tearDownClass(cls):
super(TestWg, cls).tearDownClass()
+ def setUp(self):
+ super(VppTestCase, self).setUp()
+ self.base_kp4_err = self.statistics.get_err_counter(self.kp4_error)
+ self.base_mac4_err = self.statistics.get_err_counter(self.mac4_error)
+ self.base_peer4_err = self.statistics.get_err_counter(self.peer4_error)
+ self.base_kp6_err = self.statistics.get_err_counter(self.kp6_error)
+ self.base_mac6_err = self.statistics.get_err_counter(self.mac6_error)
+ self.base_peer6_err = self.statistics.get_err_counter(self.peer6_error)
+
def test_wg_interface(self):
""" Simple interface creation """
port = 12312
def test_wg_peer_resp(self):
""" Send handshake response """
- wg_output_node_name = '/err/wg-output-tun/'
- wg_input_node_name = '/err/wg-input/'
-
port = 12323
# Create interfaces
wg0,
self.pg1.remote_ip4,
port+1,
- ["10.11.2.0/24",
- "10.11.3.0/24"]).add_vpp_config()
+ ["10.11.3.0/24"]).add_vpp_config()
self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
+ r1 = VppIpRoute(self, "10.11.3.0", 24,
+ [VppRoutePath("10.11.3.1",
+ wg0.sw_if_index)]).add_vpp_config()
+
# wait for the peer to send a handshake
rx = self.pg1.get_capture(1, timeout=2)
self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
self.assertEqual(rx[IP].ttl, 19)
- def test_wg_peer_init(self):
- """ Send handshake init """
- wg_output_node_name = '/err/wg-output-tun/'
- wg_input_node_name = '/err/wg-input/'
+ r1.remove_vpp_config()
+ peer_1.remove_vpp_config()
+ wg0.remove_vpp_config()
+
+ def test_wg_peer_v4o4(self):
+ """ Test v4o4"""
port = 12333
wg0,
self.pg1.remote_ip4,
port+1,
- ["10.11.2.0/24",
- "10.11.3.0/24"]).add_vpp_config()
+ ["10.11.3.0/24"]).add_vpp_config()
self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
+ r1 = VppIpRoute(self, "10.11.3.0", 24,
+ [VppRoutePath("10.11.3.1",
+ wg0.sw_if_index)]).add_vpp_config()
+
# route a packet into the wg interface
# use the allowed-ip prefix
# this is dropped because the peer is not initiated
UDP(sport=555, dport=556) /
Raw())
self.send_and_assert_no_replies(self.pg0, [p])
-
- kp_error = wg_output_node_name + "Keypair error"
- self.assertEqual(1, self.statistics.get_err_counter(kp_error))
+ self.assertEqual(self.base_kp4_err + 1,
+ self.statistics.get_err_counter(self.kp4_error))
# send a handsake from the peer with an invalid MAC
p = peer_1.mk_handshake(self.pg1)
p[WireguardInitiation].mac1 = b'foobar'
self.send_and_assert_no_replies(self.pg1, [p])
- self.assertEqual(1, self.statistics.get_err_counter(
- wg_input_node_name + "Invalid MAC handshake"))
+ self.assertEqual(self.base_mac4_err + 1,
+ self.statistics.get_err_counter(self.mac4_error))
# send a handsake from the peer but signed by the wrong key.
p = peer_1.mk_handshake(self.pg1,
+ False,
X25519PrivateKey.generate().public_key())
self.send_and_assert_no_replies(self.pg1, [p])
- self.assertEqual(1, self.statistics.get_err_counter(
- wg_input_node_name + "Peer error"))
+ self.assertEqual(self.base_peer4_err + 1,
+ self.statistics.get_err_counter(self.peer4_error))
# send a valid handsake init for which we expect a response
p = peer_1.mk_handshake(self.pg1)
UDP(sport=555, dport=556) /
Raw())
self.send_and_assert_no_replies(self.pg0, [p])
- self.assertEqual(2, self.statistics.get_err_counter(kp_error))
+ self.assertEqual(self.base_kp4_err + 2,
+ self.statistics.get_err_counter(self.kp4_error))
# send a data packet from the peer through the tunnel
# this completes the handshake
self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
self.assertEqual(rx[IP].ttl, 19)
+ r1.remove_vpp_config()
+ peer_1.remove_vpp_config()
+ wg0.remove_vpp_config()
+
+ def test_wg_peer_v6o6(self):
+ """ Test v6o6"""
+
+ port = 12343
+
+ # Create interfaces
+ wg0 = VppWgInterface(self,
+ self.pg1.local_ip6,
+ port).add_vpp_config()
+ wg0.admin_up()
+ wg0.config_ip6()
+
+ peer_1 = VppWgPeer(self,
+ wg0,
+ self.pg1.remote_ip6,
+ port+1,
+ ["1::3:0/112"]).add_vpp_config(True)
+ self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
+
+ r1 = VppIpRoute(self, "1::3:0", 112,
+ [VppRoutePath("1::3:1",
+ wg0.sw_if_index)]).add_vpp_config()
+
+ # route a packet into the wg interface
+ # use the allowed-ip prefix
+ # this is dropped because the peer is not initiated
+
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+
+ self.assertEqual(self.base_kp6_err + 1,
+ self.statistics.get_err_counter(self.kp6_error))
+
+ # send a handsake from the peer with an invalid MAC
+ p = peer_1.mk_handshake(self.pg1, True)
+ p[WireguardInitiation].mac1 = b'foobar'
+ self.send_and_assert_no_replies(self.pg1, [p])
+
+ self.assertEqual(self.base_mac6_err + 1,
+ self.statistics.get_err_counter(self.mac6_error))
+
+ # send a handsake from the peer but signed by the wrong key.
+ p = peer_1.mk_handshake(self.pg1,
+ True,
+ X25519PrivateKey.generate().public_key())
+ self.send_and_assert_no_replies(self.pg1, [p])
+ self.assertEqual(self.base_peer6_err + 1,
+ self.statistics.get_err_counter(self.peer6_error))
+
+ # send a valid handsake init for which we expect a response
+ p = peer_1.mk_handshake(self.pg1, True)
+
+ rx = self.send_and_expect(self.pg1, [p], self.pg1)
+
+ peer_1.consume_response(rx[0], True)
+
+ # route a packet into the wg interface
+ # this is dropped because the peer is still not initiated
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+ self.assertEqual(self.base_kp6_err + 2,
+ self.statistics.get_err_counter(self.kp6_error))
+
+ # send a data packet from the peer through the tunnel
+ # this completes the handshake
+ p = (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
+ UDP(sport=222, dport=223) /
+ Raw())
+ d = peer_1.encrypt_transport(p)
+ p = (peer_1.mk_tunnel_header(self.pg1, True) /
+ (Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(receiver_index=peer_1.sender,
+ counter=0,
+ encrypted_encapsulated_packet=d)))
+ rxs = self.send_and_expect(self.pg1, [p], self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
+ self.assertEqual(rx[IPv6].hlim, 19)
+
+ # send a packets that are routed into the tunnel
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw(b'\x00' * 80))
+
+ rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
+
+ for rx in rxs:
+ rx = IPv6(peer_1.decrypt_transport(rx, True))
+
+ # chech the oringial packet is present
+ self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
+ self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim-1)
+
+ # send packets into the tunnel, expect to receive them on
+ # the other side
+ p = [(peer_1.mk_tunnel_header(self.pg1, True) /
+ Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(
+ receiver_index=peer_1.sender,
+ counter=ii+1,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
+ UDP(sport=222, dport=223) /
+ Raw())))) for ii in range(255)]
+
+ rxs = self.send_and_expect(self.pg1, p, self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
+ self.assertEqual(rx[IPv6].hlim, 19)
+
+ r1.remove_vpp_config()
+ peer_1.remove_vpp_config()
+ wg0.remove_vpp_config()
+
+ def test_wg_peer_v6o4(self):
+ """ Test v6o4"""
+
+ port = 12353
+
+ # Create interfaces
+ wg0 = VppWgInterface(self,
+ self.pg1.local_ip4,
+ port).add_vpp_config()
+ wg0.admin_up()
+ wg0.config_ip6()
+
+ peer_1 = VppWgPeer(self,
+ wg0,
+ self.pg1.remote_ip4,
+ port+1,
+ ["1::3:0/112"]).add_vpp_config(True)
+ self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
+
+ r1 = VppIpRoute(self, "1::3:0", 112,
+ [VppRoutePath("1::3:1",
+ wg0.sw_if_index)]).add_vpp_config()
+
+ # route a packet into the wg interface
+ # use the allowed-ip prefix
+ # this is dropped because the peer is not initiated
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+ self.assertEqual(self.base_kp6_err + 1,
+ self.statistics.get_err_counter(self.kp6_error))
+
+ # send a handsake from the peer with an invalid MAC
+ p = peer_1.mk_handshake(self.pg1)
+ p[WireguardInitiation].mac1 = b'foobar'
+ self.send_and_assert_no_replies(self.pg1, [p])
+
+ self.assertEqual(self.base_mac4_err + 1,
+ self.statistics.get_err_counter(self.mac4_error))
+
+ # send a handsake from the peer but signed by the wrong key.
+ p = peer_1.mk_handshake(self.pg1,
+ False,
+ X25519PrivateKey.generate().public_key())
+ self.send_and_assert_no_replies(self.pg1, [p])
+ self.assertEqual(self.base_peer4_err + 1,
+ self.statistics.get_err_counter(self.peer4_error))
+
+ # send a valid handsake init for which we expect a response
+ p = peer_1.mk_handshake(self.pg1)
+
+ rx = self.send_and_expect(self.pg1, [p], self.pg1)
+
+ peer_1.consume_response(rx[0])
+
+ # route a packet into the wg interface
+ # this is dropped because the peer is still not initiated
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+ self.assertEqual(self.base_kp6_err + 2,
+ self.statistics.get_err_counter(self.kp6_error))
+
+ # send a data packet from the peer through the tunnel
+ # this completes the handshake
+ p = (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
+ UDP(sport=222, dport=223) /
+ Raw())
+ d = peer_1.encrypt_transport(p)
+ p = (peer_1.mk_tunnel_header(self.pg1) /
+ (Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(receiver_index=peer_1.sender,
+ counter=0,
+ encrypted_encapsulated_packet=d)))
+ rxs = self.send_and_expect(self.pg1, [p], self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
+ self.assertEqual(rx[IPv6].hlim, 19)
+
+ # send a packets that are routed into the tunnel
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw(b'\x00' * 80))
+
+ rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
+
+ for rx in rxs:
+ rx = IPv6(peer_1.decrypt_transport(rx))
+
+ # chech the oringial packet is present
+ self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
+ self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim-1)
+
+ # send packets into the tunnel, expect to receive them on
+ # the other side
+ p = [(peer_1.mk_tunnel_header(self.pg1) /
+ Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(
+ receiver_index=peer_1.sender,
+ counter=ii+1,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
+ UDP(sport=222, dport=223) /
+ Raw())))) for ii in range(255)]
+
+ rxs = self.send_and_expect(self.pg1, p, self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
+ self.assertEqual(rx[IPv6].hlim, 19)
+
+ r1.remove_vpp_config()
+ peer_1.remove_vpp_config()
+ wg0.remove_vpp_config()
+
+ def test_wg_peer_v4o6(self):
+ """ Test v4o6"""
+
+ port = 12363
+
+ # Create interfaces
+ wg0 = VppWgInterface(self,
+ self.pg1.local_ip6,
+ port).add_vpp_config()
+ wg0.admin_up()
+ wg0.config_ip4()
+
+ peer_1 = VppWgPeer(self,
+ wg0,
+ self.pg1.remote_ip6,
+ port+1,
+ ["10.11.3.0/24"]).add_vpp_config()
+ self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
+
+ r1 = VppIpRoute(self, "10.11.3.0", 24,
+ [VppRoutePath("10.11.3.1",
+ wg0.sw_if_index)]).add_vpp_config()
+
+ # route a packet into the wg interface
+ # use the allowed-ip prefix
+ # this is dropped because the peer is not initiated
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+ self.assertEqual(self.base_kp4_err + 1,
+ self.statistics.get_err_counter(self.kp4_error))
+
+ # send a handsake from the peer with an invalid MAC
+ p = peer_1.mk_handshake(self.pg1, True)
+ p[WireguardInitiation].mac1 = b'foobar'
+ self.send_and_assert_no_replies(self.pg1, [p])
+ self.assertEqual(self.base_mac6_err + 1,
+ self.statistics.get_err_counter(self.mac6_error))
+
+ # send a handsake from the peer but signed by the wrong key.
+ p = peer_1.mk_handshake(self.pg1,
+ True,
+ X25519PrivateKey.generate().public_key())
+ self.send_and_assert_no_replies(self.pg1, [p])
+ self.assertEqual(self.base_peer6_err + 1,
+ self.statistics.get_err_counter(self.peer6_error))
+
+ # send a valid handsake init for which we expect a response
+ p = peer_1.mk_handshake(self.pg1, True)
+
+ rx = self.send_and_expect(self.pg1, [p], self.pg1)
+
+ peer_1.consume_response(rx[0], True)
+
+ # route a packet into the wg interface
+ # this is dropped because the peer is still not initiated
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+ self.assertEqual(self.base_kp4_err + 2,
+ self.statistics.get_err_counter(self.kp4_error))
+
+ # send a data packet from the peer through the tunnel
+ # this completes the handshake
+ p = (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
+ UDP(sport=222, dport=223) /
+ Raw())
+ d = peer_1.encrypt_transport(p)
+ p = (peer_1.mk_tunnel_header(self.pg1, True) /
+ (Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(receiver_index=peer_1.sender,
+ counter=0,
+ encrypted_encapsulated_packet=d)))
+ rxs = self.send_and_expect(self.pg1, [p], self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
+ self.assertEqual(rx[IP].ttl, 19)
+
+ # send a packets that are routed into the tunnel
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
+ UDP(sport=555, dport=556) /
+ Raw(b'\x00' * 80))
+
+ rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
+
+ for rx in rxs:
+ rx = IP(peer_1.decrypt_transport(rx, True))
+
+ # chech the oringial packet is present
+ self.assertEqual(rx[IP].dst, p[IP].dst)
+ self.assertEqual(rx[IP].ttl, p[IP].ttl-1)
+
+ # send packets into the tunnel, expect to receive them on
+ # the other side
+ p = [(peer_1.mk_tunnel_header(self.pg1, True) /
+ Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(
+ receiver_index=peer_1.sender,
+ counter=ii+1,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
+ UDP(sport=222, dport=223) /
+ Raw())))) for ii in range(255)]
+
+ rxs = self.send_and_expect(self.pg1, p, self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
+ self.assertEqual(rx[IP].ttl, 19)
+
+ r1.remove_vpp_config()
peer_1.remove_vpp_config()
wg0.remove_vpp_config()
def test_wg_multi_peer(self):
""" multiple peer setup """
- port = 12343
+ port = 12373
# Create interfaces
wg0 = VppWgInterface(self,
peers_1 = []
peers_2 = []
+ routes_1 = []
+ routes_2 = []
for i in range(NUM_PEERS):
peers_1.append(VppWgPeer(self,
wg0,
self.pg1.remote_hosts[i].ip4,
port+1+i,
["10.0.%d.4/32" % i]).add_vpp_config())
+ routes_1.append(VppIpRoute(self, "10.0.%d.4" % i, 32,
+ [VppRoutePath(self.pg1.remote_hosts[i].ip4,
+ wg0.sw_if_index)]).add_vpp_config())
+
peers_2.append(VppWgPeer(self,
wg1,
self.pg2.remote_hosts[i].ip4,
port+100+i,
["10.100.%d.4/32" % i]).add_vpp_config())
+ routes_2.append(VppIpRoute(self, "10.100.%d.4" % i, 32,
+ [VppRoutePath(self.pg2.remote_hosts[i].ip4,
+ wg1.sw_if_index)]).add_vpp_config())
self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS*2)
self.logger.info(self.vapi.cli("sh ip fib 172.16.3.17"))
self.logger.info(self.vapi.cli("sh ip fib 10.11.3.0"))
+ # remove routes
+ for r in routes_1:
+ r.remove_vpp_config()
+ for r in routes_2:
+ r.remove_vpp_config()
+
# remove peers
for p in peers_1:
self.assertTrue(p.query_vpp_config())
wg0.remove_vpp_config()
wg1.remove_vpp_config()
+ def test_wg_multi_interface(self):
+ """ Multi-tunnel on the same port """
+ port = 12500
+
+ # Create many wireguard interfaces
+ NUM_IFS = 4
+ self.pg1.generate_remote_hosts(NUM_IFS)
+ self.pg1.configure_ipv4_neighbors()
+ self.pg0.generate_remote_hosts(NUM_IFS)
+ self.pg0.configure_ipv4_neighbors()
+
+ # Create interfaces with a peer on each
+ peers = []
+ routes = []
+ wg_ifs = []
+ for i in range(NUM_IFS):
+ # Use the same port for each interface
+ wg0 = VppWgInterface(self,
+ self.pg1.local_ip4,
+ port).add_vpp_config()
+ wg0.admin_up()
+ wg0.config_ip4()
+ wg_ifs.append(wg0)
+ peers.append(VppWgPeer(self,
+ wg0,
+ self.pg1.remote_hosts[i].ip4,
+ port+1+i,
+ ["10.0.%d.0/24" % i]).add_vpp_config())
+
+ routes.append(VppIpRoute(self, "10.0.%d.0" % i, 24,
+ [VppRoutePath("10.0.%d.4" % i,
+ wg0.sw_if_index)]).add_vpp_config())
+
+ self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_IFS)
+
+ for i in range(NUM_IFS):
+ # send a valid handsake init for which we expect a response
+ p = peers[i].mk_handshake(self.pg1)
+ rx = self.send_and_expect(self.pg1, [p], self.pg1)
+ peers[i].consume_response(rx[0])
+
+ # send a data packet from the peer through the tunnel
+ # this completes the handshake
+ p = (IP(src="10.0.%d.4" % i,
+ dst=self.pg0.remote_hosts[i].ip4, ttl=20) /
+ UDP(sport=222, dport=223) /
+ Raw())
+ d = peers[i].encrypt_transport(p)
+ p = (peers[i].mk_tunnel_header(self.pg1) /
+ (Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(receiver_index=peers[i].sender,
+ counter=0,
+ encrypted_encapsulated_packet=d)))
+ rxs = self.send_and_expect(self.pg1, [p], self.pg0)
+ for rx in rxs:
+ self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
+ self.assertEqual(rx[IP].ttl, 19)
+
+ # send a packets that are routed into the tunnel
+ for i in range(NUM_IFS):
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_hosts[i].ip4, dst="10.0.%d.4" % i) /
+ UDP(sport=555, dport=556) /
+ Raw(b'\x00' * 80))
+
+ rxs = self.send_and_expect(self.pg0, p * 64, self.pg1)
+
+ for rx in rxs:
+ rx = IP(peers[i].decrypt_transport(rx))
+
+ # check the oringial packet is present
+ self.assertEqual(rx[IP].dst, p[IP].dst)
+ self.assertEqual(rx[IP].ttl, p[IP].ttl-1)
+
+ # send packets into the tunnel
+ for i in range(NUM_IFS):
+ p = [(peers[i].mk_tunnel_header(self.pg1) /
+ Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(
+ receiver_index=peers[i].sender,
+ counter=ii+1,
+ encrypted_encapsulated_packet=peers[i].encrypt_transport(
+ (IP(src="10.0.%d.4" % i,
+ dst=self.pg0.remote_hosts[i].ip4, ttl=20) /
+ UDP(sport=222, dport=223) /
+ Raw())))) for ii in range(64)]
+
+ rxs = self.send_and_expect(self.pg1, p, self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
+ self.assertEqual(rx[IP].ttl, 19)
+
+ for r in routes:
+ r.remove_vpp_config()
+ for p in peers:
+ p.remove_vpp_config()
+ for i in wg_ifs:
+ i.remove_vpp_config()
+
+ def test_wg_event(self):
+ """ Test events """
+ port = 12600
+ ESTABLISHED_FLAG = VppEnum.\
+ vl_api_wireguard_peer_flags_t.\
+ WIREGUARD_PEER_ESTABLISHED
+ DEAD_FLAG = VppEnum.\
+ vl_api_wireguard_peer_flags_t.\
+ WIREGUARD_PEER_STATUS_DEAD
+
+ # Create interfaces
+ wg0 = VppWgInterface(self,
+ self.pg1.local_ip4,
+ port).add_vpp_config()
+ wg1 = VppWgInterface(self,
+ self.pg2.local_ip4,
+ port+1).add_vpp_config()
+ wg0.admin_up()
+ wg1.admin_up()
+
+ # Check peer counter
+ self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Create peers
+ NUM_PEERS = 2
+ self.pg2.generate_remote_hosts(NUM_PEERS)
+ self.pg2.configure_ipv4_neighbors()
+ self.pg1.generate_remote_hosts(NUM_PEERS)
+ self.pg1.configure_ipv4_neighbors()
+
+ peers_0 = []
+ peers_1 = []
+ routes_0 = []
+ routes_1 = []
+ for i in range(NUM_PEERS):
+ peers_0.append(VppWgPeer(self,
+ wg0,
+ self.pg1.remote_hosts[i].ip4,
+ port+1+i,
+ ["10.0.%d.4/32" % i]).add_vpp_config())
+ routes_0.append(VppIpRoute(self, "10.0.%d.4" % i, 32,
+ [VppRoutePath(self.pg1.remote_hosts[i].ip4,
+ wg0.sw_if_index)]).add_vpp_config())
+
+ peers_1.append(VppWgPeer(self,
+ wg1,
+ self.pg2.remote_hosts[i].ip4,
+ port+100+i,
+ ["10.100.%d.4/32" % i]).add_vpp_config())
+ routes_1.append(VppIpRoute(self, "10.100.%d.4" % i, 32,
+ [VppRoutePath(self.pg2.remote_hosts[i].ip4,
+ wg1.sw_if_index)]).add_vpp_config())
+
+ self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS*2)
+
+ # Want events from the first perr of wg0
+ # and from all wg1 peers
+ peers_0[0].want_events()
+ wg1.want_events()
+
+ for i in range(NUM_PEERS):
+ # send a valid handsake init for which we expect a response
+ p = peers_0[i].mk_handshake(self.pg1)
+ rx = self.send_and_expect(self.pg1, [p], self.pg1)
+ peers_0[i].consume_response(rx[0])
+ if (i == 0):
+ peers_0[0].wait_event(ESTABLISHED_FLAG)
+
+ p = peers_1[i].mk_handshake(self.pg2)
+ rx = self.send_and_expect(self.pg2, [p], self.pg2)
+ peers_1[i].consume_response(rx[0])
+
+ wg1.wait_events(
+ ESTABLISHED_FLAG,
+ [peers_1[0].index, peers_1[1].index])
+
+ # remove routes
+ for r in routes_0:
+ r.remove_vpp_config()
+ for r in routes_1:
+ r.remove_vpp_config()
+
+ # remove peers
+ for i in range(NUM_PEERS):
+ self.assertTrue(peers_0[i].query_vpp_config())
+ peers_0[i].remove_vpp_config()
+ if (i == 0):
+ peers_0[i].wait_event(0)
+ peers_0[i].wait_event(DEAD_FLAG)
+ for p in peers_1:
+ self.assertTrue(p.query_vpp_config())
+ p.remove_vpp_config()
+ p.wait_event(0)
+ p.wait_event(DEAD_FLAG)
+
+ wg0.remove_vpp_config()
+ wg1.remove_vpp_config()
+
class WireguardHandoffTests(TestWg):
""" Wireguard Tests in multi worker setup """
def test_wg_peer_init(self):
""" Handoff """
- wg_output_node_name = '/err/wg-output-tun/'
- wg_input_node_name = '/err/wg-input/'
- port = 12353
+ port = 12383
# Create interfaces
wg0 = VppWgInterface(self,
"10.11.3.0/24"]).add_vpp_config()
self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
+ r1 = VppIpRoute(self, "10.11.3.0", 24,
+ [VppRoutePath("10.11.3.1",
+ wg0.sw_if_index)]).add_vpp_config()
+
# send a valid handsake init for which we expect a response
p = peer_1.mk_handshake(self.pg1)
p = [(peer_1.mk_tunnel_header(self.pg1) /
Wireguard(message_type=4, reserved_zero=0) /
WireguardTransport(
- receiver_index=peer_1.sender,
- counter=ii+1,
- encrypted_encapsulated_packet=peer_1.encrypt_transport(
- (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
- UDP(sport=222, dport=223) /
- Raw())))) for ii in range(255)]
+ receiver_index=peer_1.sender,
+ counter=ii+1,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
+ UDP(sport=222, dport=223) /
+ Raw())))) for ii in range(255)]
rxs = self.send_and_expect(self.pg1, p, self.pg0, worker=1)
peer_1.validate_encapped(rxs, pe)
+ r1.remove_vpp_config()
peer_1.remove_vpp_config()
wg0.remove_vpp_config()
+
+ @unittest.skip("test disabled")
+ def test_wg_multi_interface(self):
+ """ Multi-tunnel on the same port """