vlib: clean up r2 plugin registration relocator
[vpp.git] / test / template_bd.py
index 39f7143..07e824a 100644 (file)
@@ -1,81 +1,95 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 
 import abc
-import six
 
-from scapy.layers.l2 import Ether, Raw
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
 from scapy.layers.inet import IP, UDP
+from scapy.layers.inet6 import IPv6
+from scapy.contrib.mpls import MPLS
 
-from util import ip4_range
 
-
-@six.add_metaclass(abc.ABCMeta)
-class BridgeDomain(object):
-    """ Bridge domain abstraction """
+class BridgeDomain(metaclass=abc.ABCMeta):
+    """Bridge domain abstraction"""
 
     @property
     def frame_request(self):
-        """ Ethernet frame modeling a generic request """
-        return (Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') /
-                IP(src='1.2.3.4', dst='4.3.2.1') /
-                UDP(sport=10000, dport=20000) /
-                Raw('\xa5' * 100))
+        """Ethernet frame modeling a generic request"""
+        return (
+            Ether(src="00:00:00:00:00:01", dst="00:00:00:00:00:02")
+            / IP(src="1.2.3.4", dst="4.3.2.1")
+            / UDP(sport=10000, dport=20000)
+            / Raw("\xa5" * 100)
+        )
 
     @property
     def frame_reply(self):
-        """ Ethernet frame modeling a generic reply """
-        return (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
-                IP(src='4.3.2.1', dst='1.2.3.4') /
-                UDP(sport=20000, dport=10000) /
-                Raw('\xa5' * 100))
+        """Ethernet frame modeling a generic reply"""
+        return (
+            Ether(src="00:00:00:00:00:02", dst="00:00:00:00:00:01")
+            / IP(src="4.3.2.1", dst="1.2.3.4")
+            / UDP(sport=20000, dport=10000)
+            / Raw("\xa5" * 100)
+        )
 
     @abc.abstractmethod
     def ip_range(self, start, end):
-        """ range of remote ip's """
+        """range of remote ip's"""
         pass
 
     @abc.abstractmethod
     def encap_mcast(self, pkt, src_ip, src_mac, vni):
-        """ Encapsulate mcast packet """
+        """Encapsulate mcast packet"""
         pass
 
     @abc.abstractmethod
     def encapsulate(self, pkt, vni):
-        """ Encapsulate packet """
+        """Encapsulate packet"""
         pass
 
     @abc.abstractmethod
     def decapsulate(self, pkt):
-        """ Decapsulate packet """
+        """Decapsulate packet"""
         pass
 
     @abc.abstractmethod
     def check_encapsulation(self, pkt, vni, local_only=False):
-        """ Verify the encapsulation """
+        """Verify the encapsulation"""
         pass
 
     def assert_eq_pkts(self, pkt1, pkt2):
-        """ Verify the Ether, IP, UDP, payload are equal in both
+        """Verify the Ether, IP, UDP, payload are equal in both
         packets
         """
         self.assertEqual(pkt1[Ether].src, pkt2[Ether].src)
         self.assertEqual(pkt1[Ether].dst, pkt2[Ether].dst)
-        self.assertEqual(pkt1[IP].src, pkt2[IP].src)
-        self.assertEqual(pkt1[IP].dst, pkt2[IP].dst)
+        if MPLS in pkt1 or MPLS in pkt2:
+            self.assertEqual(pkt1[MPLS].label, pkt2[MPLS].label)
+            self.assertEqual(pkt1[MPLS].cos, pkt2[MPLS].cos)
+            self.assertEqual(pkt1[MPLS].ttl, pkt2[MPLS].ttl)
+        if IP in pkt1 or IP in pkt2:
+            self.assertEqual(pkt1[IP].src, pkt2[IP].src)
+            self.assertEqual(pkt1[IP].dst, pkt2[IP].dst)
+        elif IPv6 in pkt1 or IPv6 in pkt2:
+            self.assertEqual(pkt1[IPv6].src, pkt2[IPv6].src)
+            self.assertEqual(pkt1[IPv6].dst, pkt2[IPv6].dst)
         self.assertEqual(pkt1[UDP].sport, pkt2[UDP].sport)
         self.assertEqual(pkt1[UDP].dport, pkt2[UDP].dport)
         self.assertEqual(pkt1[Raw], pkt2[Raw])
 
     def test_decap(self):
-        """ Decapsulation test
+        """Decapsulation test
         Send encapsulated frames from pg0
         Verify receipt of decapsulated frames on pg1
         """
 
-        encapsulated_pkt = self.encapsulate(self.frame_request,
-                                            self.single_tunnel_bd)
+        encapsulated_pkt = self.encapsulate(self.frame_request, self.single_tunnel_vni)
 
-        self.pg0.add_stream([encapsulated_pkt, ])
+        self.pg0.add_stream(
+            [
+                encapsulated_pkt,
+            ]
+        )
 
         self.pg1.enable_capture()
 
@@ -88,7 +102,7 @@ class BridgeDomain(object):
         self.assert_eq_pkts(pkt, self.frame_request)
 
     def test_encap(self):
-        """ Encapsulation test
+        """Encapsulation test
         Send frames from pg1
         Verify receipt of encapsulated frames on pg0
         """
@@ -101,13 +115,13 @@ class BridgeDomain(object):
         # Pick first received frame and check if it's correctly encapsulated.
         out = self.pg0.get_capture(1)
         pkt = out[0]
-        self.check_encapsulation(pkt, self.single_tunnel_bd)
+        self.check_encapsulation(pkt, self.single_tunnel_vni)
 
         payload = self.decapsulate(pkt)
         self.assert_eq_pkts(payload, self.frame_reply)
 
     def test_ucast_flood(self):
-        """ Unicast flood test
+        """Unicast flood test
         Send frames from pg3
         Verify receipt of encapsulated frames on pg0
         """
@@ -125,7 +139,7 @@ class BridgeDomain(object):
             self.assert_eq_pkts(payload, self.frame_reply)
 
     def test_mcast_flood(self):
-        """ Multicast flood test
+        """Multicast flood test
         Send frames from pg2
         Verify receipt of encapsulated frames on pg0
         """
@@ -138,14 +152,15 @@ class BridgeDomain(object):
         # Pick first received frame and check if it's correctly encapsulated.
         out = self.pg0.get_capture(1)
         pkt = out[0]
-        self.check_encapsulation(pkt, self.mcast_flood_bd,
-                                 local_only=False, mcast_pkt=True)
+        self.check_encapsulation(
+            pkt, self.mcast_flood_bd, local_only=False, mcast_pkt=True
+        )
 
         payload = self.decapsulate(pkt)
         self.assert_eq_pkts(payload, self.frame_reply)
 
     def test_mcast_rcv(self):
-        """ Multicast receive test
+        """Multicast receive test
         Send 20 encapsulated frames from pg0 only 10 match unicast tunnels
         Verify receipt of 10 decap frames on pg2
         """
@@ -154,7 +169,8 @@ class BridgeDomain(object):
         ip_range_end = 30
         mcast_stream = [
             self.encap_mcast(self.frame_request, ip, mac, self.mcast_flood_bd)
-            for ip in self.ip_range(ip_range_start, ip_range_end)]
+            for ip in self.ip_range(ip_range_start, ip_range_end)
+        ]
         self.pg0.add_stream(mcast_stream)
         self.pg2.enable_capture()
         self.pg_start()