#!/usr/bin/env python
import socket
-from util import ip4n_range, ip4_range
+from util import ip4_range, reassemble4_ether
import unittest
from framework import VppTestCase, VppTestRunner
from template_bd import BridgeDomain
+from vpp_ip import VppIpAddress
from scapy.layers.l2 import Ether, Raw
from scapy.layers.inet import IP, UDP
from scapy.layers.vxlan import VXLAN
from scapy.utils import atol
-import StringIO
-
-
-def reassemble(listoffragments):
- buffer = StringIO.StringIO()
- first = listoffragments[0]
- buffer.seek(20)
- for pkt in listoffragments:
- buffer.seek(pkt[IP].frag*8)
- buffer.write(pkt[IP].payload)
- first.len = len(buffer.getvalue()) + 20
- first.flags = 0
- del(first.chksum)
- header = str(first[Ether])[:34]
- return first[Ether].__class__(header + buffer.getvalue())
-
class TestVxlanGbp(VppTestCase):
""" VXLAN GBP Test Case """
ip_range_start = 10
ip_range_end = ip_range_start + n_ucast_tunnels
next_hop_address = cls.pg0.remote_ip4n
- for dest_ip4n in ip4n_range(next_hop_address, ip_range_start,
- ip_range_end):
+ for dest_ip4 in ip4_range(cls.pg0.remote_ip4,
+ ip_range_start,
+ ip_range_end):
# add host route so dest_ip4n will not be resolved
- cls.vapi.ip_add_del_route(dest_ip4n, 32, next_hop_address)
- r = cls.vapi.vxlan_gbp_add_del_tunnel(
- src_addr=cls.pg0.local_ip4n,
- dst_addr=dest_ip4n,
+ vip = VppIpAddress(dest_ip4)
+ cls.vapi.ip_add_del_route(vip.bytes, 32, next_hop_address)
+ r = cls.vapi.vxlan_gbp_tunnel_add_del(
+ VppIpAddress(cls.pg0.local_ip4).encode(),
+ vip.encode(),
vni=vni)
cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
# Create VXLAN GBP VTEP on VPP pg0, and put vxlan_gbp_tunnel0 and
# pg1 into BD.
cls.single_tunnel_bd = 1
- r = cls.vapi.vxlan_gbp_add_del_tunnel(
- src_addr=cls.pg0.local_ip4n,
- dst_addr=cls.pg0.remote_ip4n,
+ r = cls.vapi.vxlan_gbp_tunnel_add_del(
+ VppIpAddress(cls.pg0.local_ip4).encode(),
+ VppIpAddress(cls.pg0.remote_ip4).encode(),
vni=cls.single_tunnel_bd)
cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index,
bd_id=cls.single_tunnel_bd)
# Pick first received frame and check if it's correctly encapsulated.
out = self.pg0.get_capture(2)
- pkt = reassemble(out)
+ pkt = reassemble4_ether(out)
self.check_encapsulation(pkt, self.single_tunnel_bd)
payload = self.decapsulate(pkt)