X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_urpf.py;h=e0dc1210bfc5e3d2ed92949c28594458b267d7c6;hb=d9b0c6fbf7aa5bd9af84264105b39c82028a4a29;hp=8f4e563f8bc27feb88a346439312fbbc2071e3d2;hpb=f90348bcb4afd0af2611cefc43b17ef3042b511c;p=vpp.git diff --git a/test/test_urpf.py b/test/test_urpf.py index 8f4e563f8bc..e0dc1210bfc 100644 --- a/test/test_urpf.py +++ b/test/test_urpf.py @@ -15,7 +15,7 @@ N_PKTS = 63 class TestURPF(VppTestCase): - """ Unicast Reverse Path Forwarding Test Case """ + """Unicast Reverse Path Forwarding Test Case""" @classmethod def setUpClass(cls): @@ -47,26 +47,27 @@ class TestURPF(VppTestCase): super(TestURPF, self).tearDown() def test_urpf4(self): - """ uRPF IP4 """ + """uRPF IP4""" e = VppEnum - p_spoof_loose = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src="3.3.3.3", dst=self.pg1.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) * N_PKTS - p_spoof_strict = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src=self.pg2.remote_ip4, - dst=self.pg1.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) * N_PKTS - p_good = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, - dst=self.pg1.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) * N_PKTS + p_spoof_loose = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src="3.3.3.3", dst=self.pg1.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) * N_PKTS + p_spoof_strict = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg2.remote_ip4, dst=self.pg1.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) * N_PKTS + p_good = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) * N_PKTS # # before adding the uRPF, ensure all packets are forwarded @@ -78,10 +79,12 @@ class TestURPF(VppTestCase): # # apply loose uRPF check on pg0 rx # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg0.sw_if_index, + ) # good packets still pass self.send_and_expect(self.pg0, p_good, self.pg1) @@ -90,16 +93,17 @@ class TestURPF(VppTestCase): # packets from address to which there is no route are dropped self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip4-rx-urpf-loose/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip4-rx-urpf-loose/uRPF Drop", N_PKTS) # # crank it up to strict mode # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg0.sw_if_index, + ) # good packets still pass self.send_and_expect(self.pg0, p_good, self.pg1) @@ -107,16 +111,17 @@ class TestURPF(VppTestCase): self.send_and_assert_no_replies(self.pg0, p_spoof_strict) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip4-rx-urpf-strict/uRPF Drop", - 2 * N_PKTS) + self.assert_error_counter_equal("/err/ip4-rx-urpf-strict/uRPF Drop", 2 * N_PKTS) # # disable uRPF, all traffic should pass # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg0.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) @@ -129,22 +134,25 @@ class TestURPF(VppTestCase): # for strict they should not be forwarded if they would be # forwarded thru that interface. # - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg1.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip4-tx-urpf-loose/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip4-tx-urpf-loose/uRPF Drop", N_PKTS) - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg1.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) # the strict packet, from a peer is allowed, since it does @@ -152,48 +160,49 @@ class TestURPF(VppTestCase): self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", N_PKTS) # change the strict packet so that it would forward through pg1 - p_spoof_strict = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src=self.pg1.remote_ip4, - dst=self.pg1.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) * N_PKTS + p_spoof_strict = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.pg1.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) * N_PKTS self.send_and_assert_no_replies(self.pg0, p_spoof_strict) - self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", - 2 * N_PKTS) + self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", 2 * N_PKTS) # cleanup - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, - af=e.vl_api_address_family_t.ADDRESS_IP4, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, + af=e.vl_api_address_family_t.ADDRESS_IP4, + sw_if_index=self.pg1.sw_if_index, + ) def test_urpf6(self): - """ uRPF IP6 """ + """uRPF IP6""" e = VppEnum - p_spoof_loose = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IPv6(src="3::3", dst=self.pg1.remote_ip6) / - UDP(sport=1236, dport=1236) / - Raw(b'\xa5' * 100)) * N_PKTS - p_spoof_strict = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IPv6(src=self.pg2.remote_ip6, - dst=self.pg1.remote_ip6) / - UDP(sport=1236, dport=1236) / - Raw(b'\xa5' * 100)) * N_PKTS - p_good = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, - dst=self.pg1.remote_ip6) / - UDP(sport=1236, dport=1236) / - Raw(b'\xa5' * 100)) * N_PKTS + p_spoof_loose = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src="3::3", dst=self.pg1.remote_ip6) + / UDP(sport=1236, dport=1236) + / Raw(b"\xa5" * 100) + ) * N_PKTS + p_spoof_strict = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src=self.pg2.remote_ip6, dst=self.pg1.remote_ip6) + / UDP(sport=1236, dport=1236) + / Raw(b"\xa5" * 100) + ) * N_PKTS + p_good = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) + / UDP(sport=1236, dport=1236) + / Raw(b"\xa5" * 100) + ) * N_PKTS # # before adding the uRPF, ensure all packets are forwarded @@ -205,10 +214,12 @@ class TestURPF(VppTestCase): # # apply loose uRPF check on pg0 rx # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg0.sw_if_index, + ) # good packets still pass self.send_and_expect(self.pg0, p_good, self.pg1) @@ -217,16 +228,17 @@ class TestURPF(VppTestCase): # packets from address to which there is no route are dropped self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip6-rx-urpf-loose/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip6-rx-urpf-loose/uRPF Drop", N_PKTS) # # crank it up to strict mode # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg0.sw_if_index, + ) # good packets still pass self.send_and_expect(self.pg0, p_good, self.pg1) @@ -234,16 +246,17 @@ class TestURPF(VppTestCase): self.send_and_assert_no_replies(self.pg0, p_spoof_strict) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip6-rx-urpf-strict/uRPF Drop", - 2 * N_PKTS) + self.assert_error_counter_equal("/err/ip6-rx-urpf-strict/uRPF Drop", 2 * N_PKTS) # # disable uRPF, all traffic should pass # - self.vapi.urpf_update(is_input=True, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg0.sw_if_index) + self.vapi.urpf_update( + is_input=True, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg0.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) @@ -256,22 +269,25 @@ class TestURPF(VppTestCase): # for strict they should not be forwarded if they would be # forwarded thru that interface. # - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg1.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip6-tx-urpf-loose/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip6-tx-urpf-loose/uRPF Drop", N_PKTS) - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg1.sw_if_index, + ) self.send_and_expect(self.pg0, p_good, self.pg1) # the strict packet, from a peer is allowed, since it does @@ -279,27 +295,27 @@ class TestURPF(VppTestCase): self.send_and_expect(self.pg0, p_spoof_strict, self.pg1) self.send_and_assert_no_replies(self.pg0, p_spoof_loose) - self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", - N_PKTS) + self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", N_PKTS) # change the strict packet so that it would forward through pg1 - p_spoof_strict = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IPv6(src=self.pg1.remote_ip6, - dst=self.pg1.remote_ip6) / - UDP(sport=1236, dport=1236) / - Raw(b'\xa5' * 100)) * N_PKTS + p_spoof_strict = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) + / UDP(sport=1236, dport=1236) + / Raw(b"\xa5" * 100) + ) * N_PKTS self.send_and_assert_no_replies(self.pg0, p_spoof_strict) - self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", - 2 * N_PKTS) + self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", 2 * N_PKTS) # cleanup - self.vapi.urpf_update(is_input=False, - mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, - af=e.vl_api_address_family_t.ADDRESS_IP6, - sw_if_index=self.pg1.sw_if_index) + self.vapi.urpf_update( + is_input=False, + mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF, + af=e.vl_api_address_family_t.ADDRESS_IP6, + sw_if_index=self.pg1.sw_if_index, + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)