gbp: An Endpoint can change sclass
[vpp.git] / test / test_gbp.py
index 9cf1817..f6bded6 100644 (file)
@@ -35,7 +35,8 @@ except NameError:
 NUM_PKTS = 67
 
 
-def find_gbp_endpoint(test, sw_if_index=None, ip=None, mac=None, tep=None):
+def find_gbp_endpoint(test, sw_if_index=None, ip=None, mac=None,
+                      tep=None, sclass=None):
     if ip:
         vip = VppIpAddress(ip)
     if mac:
@@ -52,6 +53,9 @@ def find_gbp_endpoint(test, sw_if_index=None, ip=None, mac=None, tep=None):
         if sw_if_index:
             if ep.endpoint.sw_if_index != sw_if_index:
                 continue
+        if sclass:
+            if ep.endpoint.sclass != sclass:
+                continue
         if ip:
             for eip in ep.endpoint.ips:
                 if vip == eip:
@@ -2089,6 +2093,7 @@ class TestGBP(VppTestCase):
             self.assertTrue(find_gbp_endpoint(
                 self,
                 vx_tun_l2_1.sw_if_index,
+                sclass=113,
                 mac=l['mac'],
                 tep=[self.pg2.local_ip4,
                      self.pg2.remote_hosts[2].ip4]))
@@ -2117,6 +2122,65 @@ class TestGBP(VppTestCase):
                 self.assertFalse(rx[VXLAN].gpflags.D)
                 self.assertEqual(rx[IPv6].dst, l['ip6'])
 
+        #
+        # EP changes sclass
+        #
+        for l in learnt:
+            # a packet with an sclass from a known EPG
+            p = (Ether(src=self.pg2.remote_mac,
+                       dst=self.pg2.local_mac) /
+                 IP(src=self.pg2.remote_hosts[2].ip4,
+                    dst=self.pg2.local_ip4) /
+                 UDP(sport=1234, dport=48879) /
+                 VXLAN(vni=99, gpid=112, flags=0x88) /
+                 Ether(src=l['mac'], dst=ep.mac) /
+                 IPv6(src=l['ip6'], dst=ep.ip6.address) /
+                 UDP(sport=1234, dport=1234) /
+                 Raw('\xa5' * 100))
+
+            rx = self.send_and_expect(self.pg2, p * 1, self.pg0)
+            rx = self.send_and_expect(self.pg2, p * NUM_PKTS, self.pg0)
+
+            self.assertTrue(find_gbp_endpoint(
+                self,
+                vx_tun_l2_1.sw_if_index,
+                mac=l['mac'],
+                sclass=112,
+                tep=[self.pg2.local_ip4,
+                     self.pg2.remote_hosts[2].ip4]))
+
+        #
+        # check reachability and contract intra-epg
+        #
+        allow_intra_class = self.statistics.get_err_counter(
+            '/err/gbp-policy-mac/allow-intra-sclass')
+
+        for l in learnt:
+            p = (Ether(src=ep.mac, dst=l['mac']) /
+                 IPv6(dst=l['ip6'], src=ep.ip6.address) /
+                 UDP(sport=1234, dport=1234) /
+                 Raw('\xa5' * 100))
+
+            rxs = self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg2)
+
+            for rx in rxs:
+                self.assertEqual(rx[IP].src, self.pg2.local_ip4)
+                self.assertEqual(rx[IP].dst, self.pg2.remote_hosts[2].ip4)
+                self.assertEqual(rx[UDP].dport, 48879)
+                self.assertEqual(rx[VXLAN].gpid, 112)
+                self.assertEqual(rx[VXLAN].vni, 99)
+                self.assertTrue(rx[VXLAN].flags.G)
+                self.assertTrue(rx[VXLAN].flags.Instance)
+                self.assertTrue(rx[VXLAN].gpflags.A)
+                self.assertFalse(rx[VXLAN].gpflags.D)
+                self.assertEqual(rx[IPv6].dst, l['ip6'])
+
+            allow_intra_class += NUM_PKTS
+
+        self.assert_error_counter_equal(
+            '/err/gbp-policy-mac/allow-intra-sclass',
+            allow_intra_class)
+
         #
         # clean up
         #