import unittest
import socket
+import struct
from scapy.layers.inet import IP, ICMP, TCP, UDP
from scapy.layers.ipsec import SecurityAssociation
IPSEC_API_CRYPTO_ALG_AES_CBC_128)
self.crypt_algo = 'AES-CBC' # scapy name
self.crypt_key = 'JPjyOWBeVEQiMe7h'
+ self.salt = 0
self.flags = 0
self.nat_header = None
IPSEC_API_CRYPTO_ALG_AES_CBC_128)
self.crypt_algo = 'AES-CBC' # scapy name
self.crypt_key = 'JPjyOWBeVEQiMe7h'
+ self.salt = 0
self.flags = 0
self.nat_header = None
ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
IPSEC_API_SAD_FLAG_USE_ESN))
+ if p.crypt_algo == "AES-GCM":
+ crypt_key = p.crypt_key + struct.pack("!I", p.salt)
+ else:
+ crypt_key = p.crypt_key
p.scapy_tun_sa = SecurityAssociation(
encryption_type, spi=p.vpp_tun_spi,
- crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+ crypt_algo=p.crypt_algo,
+ crypt_key=crypt_key,
auth_algo=p.auth_algo, auth_key=p.auth_key,
tunnel_header=ip_class_by_addr_type[p.addr_type](
src=tun_if.remote_addr[p.addr_type],
use_esn=use_esn)
p.vpp_tun_sa = SecurityAssociation(
encryption_type, spi=p.scapy_tun_spi,
- crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+ crypt_algo=p.crypt_algo,
+ crypt_key=crypt_key,
auth_algo=p.auth_algo, auth_key=p.auth_key,
tunnel_header=ip_class_by_addr_type[p.addr_type](
dst=tun_if.remote_addr[p.addr_type],
def config_tra_params(p, encryption_type):
use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
IPSEC_API_SAD_FLAG_USE_ESN))
+ if p.crypt_algo == "AES-GCM":
+ crypt_key = p.crypt_key + struct.pack("!I", p.salt)
+ else:
+ crypt_key = p.crypt_key
p.scapy_tra_sa = SecurityAssociation(
encryption_type,
spi=p.vpp_tra_spi,
crypt_algo=p.crypt_algo,
- crypt_key=p.crypt_key,
+ crypt_key=crypt_key,
auth_algo=p.auth_algo,
auth_key=p.auth_key,
nat_t_header=p.nat_header,
encryption_type,
spi=p.scapy_tra_spi,
crypt_algo=p.crypt_algo,
- crypt_key=p.crypt_key,
+ crypt_key=crypt_key,
auth_algo=p.auth_algo,
auth_key=p.auth_key,
nat_t_header=p.nat_header,
IPSEC_API_PROTO_AH)
self.config_interfaces()
+
self.ipsec_select_backend()
def unconfig_interfaces(self):
self.unconfig_interfaces()
- if not self.vpp_dead:
- self.vapi.cli("show hardware")
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show hardware"))
def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
payload_size=54):
recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
self.tra_if)
for rx in recv_pkts:
+ self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
+ self.assert_packet_checksums_valid(rx)
try:
decrypted = p.vpp_tra_sa.decrypt(rx[IP])
self.assert_packet_checksums_valid(decrypted)
recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
self.tra_if)
for rx in recv_pkts:
+ self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
+ rx[IPv6].plen)
try:
decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
self.assert_packet_checksums_valid(decrypted)
def verify_encrypted(self, p, sa, rxs):
decrypt_pkts = []
for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
try:
decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
if not decrypt_pkt.haslayer(IP):
count=count)
recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
for recv_pkt in recv_pkts:
+ self.assertEqual(len(recv_pkt) - len(Ether()) - len(IPv6()),
+ recv_pkt[IPv6].plen)
try:
decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
if not decrypt_pkt.haslayer(IPv6):