X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_classifier.py;h=068561230ea0db9f7afbf1011a46e8a5d2af4b0e;hb=d9b0c6fbf7aa5bd9af84264105b39c82028a4a29;hp=9f0fdbf587ab3fbc88bd7e68d4cc3d45c3b1e284;hpb=f90348bcb4afd0af2611cefc43b17ef3042b511c;p=vpp.git diff --git a/test/test_classifier.py b/test/test_classifier.py index 9f0fdbf587a..068561230ea 100644 --- a/test/test_classifier.py +++ b/test/test_classifier.py @@ -19,7 +19,7 @@ from vpp_papi import VppEnum # Tests split to different test case classes because of issue reported in # ticket VPP-1336 class TestClassifierIP(TestClassifier): - """ Classifier IP Test Case """ + """Classifier IP Test Case""" @classmethod def setUpClass(cls): @@ -30,7 +30,7 @@ class TestClassifierIP(TestClassifier): super(TestClassifierIP, cls).tearDownClass() def test_iacl_src_ip(self): - """ Source IP iACL test + """Source IP iACL test Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg0 -> pg1 interface. @@ -42,11 +42,11 @@ class TestClassifierIP(TestClassifier): pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes) self.pg0.add_stream(pkts) - key = 'ip_src' - self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff')) + key = "ip_src" + self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff")) self.create_classify_session( - self.acl_tbl_idx.get(key), - self.build_ip_match(src_ip=self.pg0.remote_ip4)) + self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg0.remote_ip4) + ) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -60,7 +60,7 @@ class TestClassifierIP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_dst_ip(self): - """ Destination IP iACL test + """Destination IP iACL test Test scenario for basic IP ACL with destination IP - Create IPv4 stream for pg0 -> pg1 interface. @@ -72,11 +72,11 @@ class TestClassifierIP(TestClassifier): pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes) self.pg0.add_stream(pkts) - key = 'ip_dst' - self.create_classify_table(key, self.build_ip_mask(dst_ip='ffffffff')) + key = "ip_dst" + self.create_classify_table(key, self.build_ip_mask(dst_ip="ffffffff")) self.create_classify_session( - self.acl_tbl_idx.get(key), - self.build_ip_match(dst_ip=self.pg1.remote_ip4)) + self.acl_tbl_idx.get(key), self.build_ip_match(dst_ip=self.pg1.remote_ip4) + ) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -90,7 +90,7 @@ class TestClassifierIP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_src_dst_ip(self): - """ Source and destination IP iACL test + """Source and destination IP iACL test Test scenario for basic IP ACL with source and destination IP - Create IPv4 stream for pg0 -> pg1 interface. @@ -102,13 +102,14 @@ class TestClassifierIP(TestClassifier): pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes) self.pg0.add_stream(pkts) - key = 'ip' + key = "ip" self.create_classify_table( - key, self.build_ip_mask(src_ip='ffffffff', dst_ip='ffffffff')) + key, self.build_ip_mask(src_ip="ffffffff", dst_ip="ffffffff") + ) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_ip_match(src_ip=self.pg0.remote_ip4, - dst_ip=self.pg1.remote_ip4)) + self.build_ip_match(src_ip=self.pg0.remote_ip4, dst_ip=self.pg1.remote_ip4), + ) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -123,7 +124,7 @@ class TestClassifierIP(TestClassifier): class TestClassifierUDP(TestClassifier): - """ Classifier UDP proto Test Case """ + """Classifier UDP proto Test Case""" @classmethod def setUpClass(cls): @@ -134,7 +135,7 @@ class TestClassifierUDP(TestClassifier): super(TestClassifierUDP, cls).tearDownClass() def test_iacl_proto_udp(self): - """ UDP protocol iACL test + """UDP protocol iACL test Test scenario for basic protocol ACL with UDP protocol - Create IPv4 stream for pg0 -> pg1 interface. @@ -146,13 +147,12 @@ class TestClassifierUDP(TestClassifier): pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes) self.pg0.add_stream(pkts) - key = 'proto_udp' - self.create_classify_table(key, self.build_ip_mask(proto='ff')) + key = "proto_udp" + self.create_classify_table(key, self.build_ip_mask(proto="ff")) self.create_classify_session( - self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_UDP)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_UDP) + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -165,7 +165,7 @@ class TestClassifierUDP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_proto_udp_sport(self): - """ UDP source port iACL test + """UDP source port iACL test Test scenario for basic protocol ACL with UDP and sport - Create IPv4 stream for pg0 -> pg1 interface. @@ -175,18 +175,18 @@ class TestClassifierUDP(TestClassifier): # Basic iACL testing with UDP and sport sport = 38 - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - UDP(sport=sport, dport=5678)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=5678) + ) self.pg0.add_stream(pkts) - key = 'proto_udp_sport' - self.create_classify_table( - key, self.build_ip_mask(proto='ff', src_port='ffff')) + key = "proto_udp_sport" + self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff")) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport), + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -199,7 +199,7 @@ class TestClassifierUDP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_proto_udp_dport(self): - """ UDP destination port iACL test + """UDP destination port iACL test Test scenario for basic protocol ACL with UDP and dport - Create IPv4 stream for pg0 -> pg1 interface. @@ -209,18 +209,18 @@ class TestClassifierUDP(TestClassifier): # Basic iACL testing with UDP and dport dport = 427 - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - UDP(sport=1234, dport=dport)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=1234, dport=dport) + ) self.pg0.add_stream(pkts) - key = 'proto_udp_dport' - self.create_classify_table( - key, self.build_ip_mask(proto='ff', dst_port='ffff')) + key = "proto_udp_dport" + self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff")) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport), + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -233,7 +233,7 @@ class TestClassifierUDP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_proto_udp_sport_dport(self): - """ UDP source and destination ports iACL test + """UDP source and destination ports iACL test Test scenario for basic protocol ACL with UDP and sport and dport - Create IPv4 stream for pg0 -> pg1 interface. @@ -244,20 +244,22 @@ class TestClassifierUDP(TestClassifier): # Basic iACL testing with UDP and sport and dport sport = 13720 dport = 9080 - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - UDP(sport=sport, dport=dport)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport) + ) self.pg0.add_stream(pkts) - key = 'proto_udp_ports' + key = "proto_udp_ports" self.create_classify_table( - key, - self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff')) + key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff") + ) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport, - dst_port=dport)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.build_ip_match( + proto=socket.IPPROTO_UDP, src_port=sport, dst_port=dport + ), + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -271,7 +273,7 @@ class TestClassifierUDP(TestClassifier): class TestClassifierTCP(TestClassifier): - """ Classifier TCP proto Test Case """ + """Classifier TCP proto Test Case""" @classmethod def setUpClass(cls): @@ -282,7 +284,7 @@ class TestClassifierTCP(TestClassifier): super(TestClassifierTCP, cls).tearDownClass() def test_iacl_proto_tcp(self): - """ TCP protocol iACL test + """TCP protocol iACL test Test scenario for basic protocol ACL with TCP protocol - Create IPv4 stream for pg0 -> pg1 interface. @@ -291,17 +293,17 @@ class TestClassifierTCP(TestClassifier): """ # Basic iACL testing with TCP protocol - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - TCP(sport=1234, dport=5678)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=5678) + ) self.pg0.add_stream(pkts) - key = 'proto_tcp' - self.create_classify_table(key, self.build_ip_mask(proto='ff')) + key = "proto_tcp" + self.create_classify_table(key, self.build_ip_mask(proto="ff")) self.create_classify_session( - self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_TCP)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_TCP) + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -314,7 +316,7 @@ class TestClassifierTCP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_proto_tcp_sport(self): - """ TCP source port iACL test + """TCP source port iACL test Test scenario for basic protocol ACL with TCP and sport - Create IPv4 stream for pg0 -> pg1 interface. @@ -324,18 +326,18 @@ class TestClassifierTCP(TestClassifier): # Basic iACL testing with TCP and sport sport = 38 - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - TCP(sport=sport, dport=5678)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=5678) + ) self.pg0.add_stream(pkts) - key = 'proto_tcp_sport' - self.create_classify_table( - key, self.build_ip_mask(proto='ff', src_port='ffff')) + key = "proto_tcp_sport" + self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff")) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport), + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -348,7 +350,7 @@ class TestClassifierTCP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_proto_tcp_dport(self): - """ TCP destination port iACL test + """TCP destination port iACL test Test scenario for basic protocol ACL with TCP and dport - Create IPv4 stream for pg0 -> pg1 interface. @@ -358,18 +360,18 @@ class TestClassifierTCP(TestClassifier): # Basic iACL testing with TCP and dport dport = 427 - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - TCP(sport=1234, dport=dport)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=dport) + ) self.pg0.add_stream(pkts) - key = 'proto_tcp_sport' - self.create_classify_table( - key, self.build_ip_mask(proto='ff', dst_port='ffff')) + key = "proto_tcp_sport" + self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff")) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport), + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -382,7 +384,7 @@ class TestClassifierTCP(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_proto_tcp_sport_dport(self): - """ TCP source and destination ports iACL test + """TCP source and destination ports iACL test Test scenario for basic protocol ACL with TCP and sport and dport - Create IPv4 stream for pg0 -> pg1 interface. @@ -393,20 +395,22 @@ class TestClassifierTCP(TestClassifier): # Basic iACL testing with TCP and sport and dport sport = 13720 dport = 9080 - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - TCP(sport=sport, dport=dport)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=dport) + ) self.pg0.add_stream(pkts) - key = 'proto_tcp_ports' + key = "proto_tcp_ports" self.create_classify_table( - key, - self.build_ip_mask(proto='ff', src_port='ffff', dst_port='ffff')) + key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff") + ) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport, - dst_port=dport)) - self.input_acl_set_interface( - self.pg0, self.acl_tbl_idx.get(key)) + self.build_ip_match( + proto=socket.IPPROTO_TCP, src_port=sport, dst_port=dport + ), + ) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key self.pg_enable_capture(self.pg_interfaces) @@ -420,7 +424,7 @@ class TestClassifierTCP(TestClassifier): class TestClassifierIPOut(TestClassifier): - """ Classifier output IP Test Case """ + """Classifier output IP Test Case""" @classmethod def setUpClass(cls): @@ -431,7 +435,7 @@ class TestClassifierIPOut(TestClassifier): super(TestClassifierIPOut, cls).tearDownClass() def test_acl_ip_out(self): - """ Output IP ACL test + """Output IP ACL test Test scenario for basic IP ACL with source IP - Create IPv4 stream for pg1 -> pg0 interface. @@ -443,12 +447,13 @@ class TestClassifierIPOut(TestClassifier): pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes) self.pg1.add_stream(pkts) - key = 'ip_out' + key = "ip_out" self.create_classify_table( - key, self.build_ip_mask(src_ip='ffffffff'), data_offset=0) + key, self.build_ip_mask(src_ip="ffffffff"), data_offset=0 + ) self.create_classify_session( - self.acl_tbl_idx.get(key), - self.build_ip_match(src_ip=self.pg1.remote_ip4)) + self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg1.remote_ip4) + ) self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -463,7 +468,7 @@ class TestClassifierIPOut(TestClassifier): class TestClassifierMAC(TestClassifier): - """ Classifier MAC Test Case """ + """Classifier MAC Test Case""" @classmethod def setUpClass(cls): @@ -474,7 +479,7 @@ class TestClassifierMAC(TestClassifier): super(TestClassifierMAC, cls).tearDownClass() def test_acl_mac(self): - """ MAC ACL test + """MAC ACL test Test scenario for basic MAC ACL with source MAC - Create IPv4 stream for pg0 -> pg2 interface. @@ -486,12 +491,13 @@ class TestClassifierMAC(TestClassifier): pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes) self.pg0.add_stream(pkts) - key = 'mac' + key = "mac" self.create_classify_table( - key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14) + key, self.build_mac_mask(src_mac="ffffffffffff"), data_offset=-14 + ) self.create_classify_session( - self.acl_tbl_idx.get(key), - self.build_mac_match(src_mac=self.pg0.remote_mac)) + self.acl_tbl_idx.get(key), self.build_mac_match(src_mac=self.pg0.remote_mac) + ) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -506,7 +512,7 @@ class TestClassifierMAC(TestClassifier): class TestClassifierComplex(TestClassifier): - """ Large & Nested Classifiers Test Cases """ + """Large & Nested Classifiers Test Cases""" @classmethod def setUpClass(cls): @@ -517,7 +523,7 @@ class TestClassifierComplex(TestClassifier): super(TestClassifierComplex, cls).tearDownClass() def test_iacl_large(self): - """ Large input ACL test + """Large input ACL test Test scenario for Large ACL matching on ethernet+ip+udp headers - Create IPv4 stream for pg0 -> pg1 interface. @@ -527,7 +533,7 @@ class TestClassifierComplex(TestClassifier): # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b) # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum - msk = VarMask(offset=40, spec='ffff') + msk = VarMask(offset=40, spec="ffff") mth = VarMatch(offset=40, value=0x1234, length=2) payload_msk = self.build_payload_mask([msk]) @@ -537,38 +543,49 @@ class TestClassifierComplex(TestClassifier): dport = 9080 # 36b offset = 80bytes - (sizeof(UDP/IP/ETH)) - packet_ex = bytes.fromhex(('0' * 36) + '1234') - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - UDP(sport=sport, dport=dport), - packet_ex) + packet_ex = bytes.fromhex(("0" * 36) + "1234") + pkts = self.create_stream( + self.pg0, + self.pg1, + self.pg_if_packet_sizes, + UDP(sport=sport, dport=dport), + packet_ex, + ) self.pg0.add_stream(pkts) - key = 'large_in' + key = "large_in" self.create_classify_table( key, - self.build_mac_mask(src_mac='ffffffffffff', - dst_mac='ffffffffffff', - ether_type='ffff') + - self.build_ip_mask(proto='ff', - src_ip='ffffffff', - dst_ip='ffffffff', - src_port='ffff', - dst_port='ffff') + - payload_msk, - data_offset=-14) + self.build_mac_mask( + src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff" + ) + + self.build_ip_mask( + proto="ff", + src_ip="ffffffff", + dst_ip="ffffffff", + src_port="ffff", + dst_port="ffff", + ) + + payload_msk, + data_offset=-14, + ) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_mac_match(src_mac=self.pg0.remote_mac, - dst_mac=self.pg0.local_mac, - # ipv4 next header - ether_type='0800') + - self.build_ip_match(proto=socket.IPPROTO_UDP, - src_ip=self.pg0.remote_ip4, - dst_ip=self.pg1.remote_ip4, - src_port=sport, - dst_port=dport) + - payload_match + self.build_mac_match( + src_mac=self.pg0.remote_mac, + dst_mac=self.pg0.local_mac, + # ipv4 next header + ether_type="0800", + ) + + self.build_ip_match( + proto=socket.IPPROTO_UDP, + src_ip=self.pg0.remote_ip4, + dst_ip=self.pg1.remote_ip4, + src_port=sport, + dst_port=dport, + ) + + payload_match, ) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) @@ -584,7 +601,7 @@ class TestClassifierComplex(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_oacl_large(self): - """ Large output ACL test + """Large output ACL test Test scenario for Large ACL matching on ethernet+ip+udp headers - Create IPv4 stream for pg1 -> pg0 interface. - Create large acl matching on ethernet+ip+udp header fields @@ -593,7 +610,7 @@ class TestClassifierComplex(TestClassifier): # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b) # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum - msk = VarMask(offset=40, spec='ffff') + msk = VarMask(offset=40, spec="ffff") mth = VarMatch(offset=40, value=0x1234, length=2) payload_msk = self.build_payload_mask([msk]) @@ -603,38 +620,50 @@ class TestClassifierComplex(TestClassifier): dport = 9080 # 36b offset = 80bytes - (sizeof(UDP/IP/ETH)) - packet_ex = bytes.fromhex(('0' * 36) + '1234') - pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes, - UDP(sport=sport, dport=dport), - packet_ex) + packet_ex = bytes.fromhex(("0" * 36) + "1234") + pkts = self.create_stream( + self.pg1, + self.pg0, + self.pg_if_packet_sizes, + UDP(sport=sport, dport=dport), + packet_ex, + ) self.pg1.add_stream(pkts) - key = 'large_out' + key = "large_out" self.create_classify_table( key, - self.build_mac_mask(src_mac='ffffffffffff', - dst_mac='ffffffffffff', - ether_type='ffff') + - self.build_ip_mask(proto='ff', - src_ip='ffffffff', - dst_ip='ffffffff', - src_port='ffff', - dst_port='ffff') + - payload_msk, - data_offset=-14) + self.build_mac_mask( + src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff" + ) + + self.build_ip_mask( + proto="ff", + src_ip="ffffffff", + dst_ip="ffffffff", + src_port="ffff", + dst_port="ffff", + ) + + payload_msk, + data_offset=-14, + ) self.create_classify_session( self.acl_tbl_idx.get(key), - self.build_mac_match(src_mac=self.pg0.local_mac, - dst_mac=self.pg0.remote_mac, - # ipv4 next header - ether_type='0800') + - self.build_ip_match(proto=socket.IPPROTO_UDP, - src_ip=self.pg1.remote_ip4, - dst_ip=self.pg0.remote_ip4, - src_port=sport, - dst_port=dport) + - payload_match) + self.build_mac_match( + src_mac=self.pg0.local_mac, + dst_mac=self.pg0.remote_mac, + # ipv4 next header + ether_type="0800", + ) + + self.build_ip_match( + proto=socket.IPPROTO_UDP, + src_ip=self.pg1.remote_ip4, + dst_ip=self.pg0.remote_ip4, + src_port=sport, + dst_port=dport, + ) + + payload_match, + ) self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -649,7 +678,7 @@ class TestClassifierComplex(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_iacl_nested(self): - """ Nested input ACL test + """Nested input ACL test Test scenario for Large ACL matching on ethernet+ip+udp headers - Create IPv4 stream for pg0 -> pg1 interface. @@ -660,48 +689,60 @@ class TestClassifierComplex(TestClassifier): sport = 13720 dport = 9080 - pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes, - UDP(sport=sport, dport=dport)) + pkts = self.create_stream( + self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport) + ) self.pg0.add_stream(pkts) - subtable_key = 'subtable_in' + subtable_key = "subtable_in" self.create_classify_table( subtable_key, - self.build_mac_mask(src_mac='ffffffffffff', - dst_mac='ffffffffffff', - ether_type='ffff') + - self.build_ip_mask(proto='ff', - src_ip='ffffffff', - dst_ip='ffffffff', - src_port='ffff', - dst_port='ffff'), - data_offset=-14) - - key = 'nested_in' + self.build_mac_mask( + src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff" + ) + + self.build_ip_mask( + proto="ff", + src_ip="ffffffff", + dst_ip="ffffffff", + src_port="ffff", + dst_port="ffff", + ), + data_offset=-14, + ) + + key = "nested_in" self.create_classify_table( key, - self.build_mac_mask(src_mac='ffffffffffff', - dst_mac='ffffffffffff', - ether_type='ffff') + - self.build_ip_mask(proto='ff', - src_ip='ffffffff', - dst_ip='ffffffff', - src_port='ffff', - dst_port='ffff'), - next_table_index=self.acl_tbl_idx.get(subtable_key)) + self.build_mac_mask( + src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff" + ) + + self.build_ip_mask( + proto="ff", + src_ip="ffffffff", + dst_ip="ffffffff", + src_port="ffff", + dst_port="ffff", + ), + next_table_index=self.acl_tbl_idx.get(subtable_key), + ) self.create_classify_session( self.acl_tbl_idx.get(subtable_key), - self.build_mac_match(src_mac=self.pg0.remote_mac, - dst_mac=self.pg0.local_mac, - # ipv4 next header - ether_type='0800') + - self.build_ip_match(proto=socket.IPPROTO_UDP, - src_ip=self.pg0.remote_ip4, - dst_ip=self.pg1.remote_ip4, - src_port=sport, - dst_port=dport)) + self.build_mac_match( + src_mac=self.pg0.remote_mac, + dst_mac=self.pg0.local_mac, + # ipv4 next header + ether_type="0800", + ) + + self.build_ip_match( + proto=socket.IPPROTO_UDP, + src_ip=self.pg0.remote_ip4, + dst_ip=self.pg1.remote_ip4, + src_port=sport, + dst_port=dport, + ), + ) self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -716,7 +757,7 @@ class TestClassifierComplex(TestClassifier): self.pg3.assert_nothing_captured(remark="packets forwarded") def test_oacl_nested(self): - """ Nested output ACL test + """Nested output ACL test Test scenario for Large ACL matching on ethernet+ip+udp headers - Create IPv4 stream for pg1 -> pg0 interface. @@ -727,48 +768,60 @@ class TestClassifierComplex(TestClassifier): sport = 13720 dport = 9080 - pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes, - UDP(sport=sport, dport=dport)) + pkts = self.create_stream( + self.pg1, self.pg0, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport) + ) self.pg1.add_stream(pkts) - subtable_key = 'subtable_out' + subtable_key = "subtable_out" self.create_classify_table( subtable_key, - self.build_mac_mask(src_mac='ffffffffffff', - dst_mac='ffffffffffff', - ether_type='ffff') + - self.build_ip_mask(proto='ff', - src_ip='ffffffff', - dst_ip='ffffffff', - src_port='ffff', - dst_port='ffff'), - data_offset=-14) - - key = 'nested_out' + self.build_mac_mask( + src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff" + ) + + self.build_ip_mask( + proto="ff", + src_ip="ffffffff", + dst_ip="ffffffff", + src_port="ffff", + dst_port="ffff", + ), + data_offset=-14, + ) + + key = "nested_out" self.create_classify_table( key, - self.build_mac_mask(src_mac='ffffffffffff', - dst_mac='ffffffffffff', - ether_type='ffff') + - self.build_ip_mask(proto='ff', - src_ip='ffffffff', - dst_ip='ffffffff', - src_port='ffff', - dst_port='ffff'), + self.build_mac_mask( + src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff" + ) + + self.build_ip_mask( + proto="ff", + src_ip="ffffffff", + dst_ip="ffffffff", + src_port="ffff", + dst_port="ffff", + ), next_table_index=self.acl_tbl_idx.get(subtable_key), - data_offset=-14) + data_offset=-14, + ) self.create_classify_session( self.acl_tbl_idx.get(subtable_key), - self.build_mac_match(src_mac=self.pg0.local_mac, - dst_mac=self.pg0.remote_mac, - # ipv4 next header - ether_type='0800') + - self.build_ip_match(proto=socket.IPPROTO_UDP, - src_ip=self.pg1.remote_ip4, - dst_ip=self.pg0.remote_ip4, - src_port=sport, - dst_port=dport)) + self.build_mac_match( + src_mac=self.pg0.local_mac, + dst_mac=self.pg0.remote_mac, + # ipv4 next header + ether_type="0800", + ) + + self.build_ip_match( + proto=socket.IPPROTO_UDP, + src_ip=self.pg1.remote_ip4, + dst_ip=self.pg0.remote_ip4, + src_port=sport, + dst_port=dport, + ), + ) self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) self.acl_active_table = key @@ -784,7 +837,7 @@ class TestClassifierComplex(TestClassifier): class TestClassifierPBR(TestClassifier): - """ Classifier PBR Test Case """ + """Classifier PBR Test Case""" @classmethod def setUpClass(cls): @@ -795,7 +848,7 @@ class TestClassifierPBR(TestClassifier): super(TestClassifierPBR, cls).tearDownClass() def test_acl_pbr(self): - """ IP PBR test + """IP PBR test Test scenario for PBR with source IP - Create IPv4 stream for pg0 -> pg3 interface. @@ -807,19 +860,24 @@ class TestClassifierPBR(TestClassifier): pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes) self.pg0.add_stream(pkts) - key = 'pbr' - self.create_classify_table(key, self.build_ip_mask(src_ip='ffffffff')) + key = "pbr" + self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff")) pbr_option = 1 # this will create the VRF/table in which we will insert the route self.create_classify_session( self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg0.remote_ip4), - pbr_option, self.pbr_vrfid) + pbr_option, + self.pbr_vrfid, + ) self.assertTrue(self.verify_vrf(self.pbr_vrfid)) - r = VppIpRoute(self, self.pg3.local_ip4, 24, - [VppRoutePath(self.pg3.remote_ip4, - INVALID_INDEX)], - table_id=self.pbr_vrfid) + r = VppIpRoute( + self, + self.pg3.local_ip4, + 24, + [VppRoutePath(self.pg3.remote_ip4, INVALID_INDEX)], + table_id=self.pbr_vrfid, + ) r.add_vpp_config() self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) @@ -839,14 +897,17 @@ class TestClassifierPBR(TestClassifier): self.create_classify_session( self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg0.remote_ip4), - pbr_option, self.pbr_vrfid, is_add=0) + pbr_option, + self.pbr_vrfid, + is_add=0, + ) # and the table should be gone. self.assertFalse(self.verify_vrf(self.pbr_vrfid)) class TestClassifierPunt(TestClassifier): - """ Classifier punt Test Case """ + """Classifier punt Test Case""" @classmethod def setUpClass(cls): @@ -857,7 +918,7 @@ class TestClassifierPunt(TestClassifier): super(TestClassifierPunt, cls).tearDownClass() def test_punt_udp(self): - """ IPv4/UDP protocol punt ACL test + """IPv4/UDP protocol punt ACL test Test scenario for basic punt ACL with UDP protocol - Create IPv4 stream for pg0 -> pg1 interface. @@ -868,13 +929,10 @@ class TestClassifierPunt(TestClassifier): sport = 6754 dport = 17923 - key = 'ip4_udp_punt' + key = "ip4_udp_punt" self.create_classify_table( - key, - self.build_ip_mask( - src_ip='ffffffff', - proto='ff', - src_port='ffff')) + key, self.build_ip_mask(src_ip="ffffffff", proto="ff", src_port="ffff") + ) table_index = self.acl_tbl_idx.get(key) self.vapi.punt_acl_add_del(ip4_table_index=table_index) self.acl_active_table = key @@ -883,52 +941,67 @@ class TestClassifierPunt(TestClassifier): self.vapi.set_punt( is_add=1, punt={ - 'type': VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4, - 'punt': { - 'l4': { - 'af': VppEnum.vl_api_address_family_t.ADDRESS_IP4, - 'protocol': VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP, - 'port': dport, - }}}) - self.vapi.ip_punt_redirect(punt={ - 'rx_sw_if_index': self.pg0.sw_if_index, - 'tx_sw_if_index': self.pg1.sw_if_index, - 'nh': self.pg1.remote_ip4, - }) - - pkts = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / - UDP(sport=sport, dport=dport) / - Raw('\x17' * 100))] * 2 + "type": VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4, + "punt": { + "l4": { + "af": VppEnum.vl_api_address_family_t.ADDRESS_IP4, + "protocol": VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP, + "port": dport, + } + }, + }, + ) + self.vapi.ip_punt_redirect( + punt={ + "rx_sw_if_index": self.pg0.sw_if_index, + "tx_sw_if_index": self.pg1.sw_if_index, + "nh": self.pg1.remote_ip4, + } + ) + + pkts = [ + ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) + / UDP(sport=sport, dport=dport) + / Raw("\x17" * 100) + ) + ] * 2 # allow a session but not matching the stream: expect to drop self.create_classify_session( table_index, - self.build_ip_match(src_ip=self.pg0.remote_ip4, - proto=socket.IPPROTO_UDP, src_port=sport + 10)) + self.build_ip_match( + src_ip=self.pg0.remote_ip4, + proto=socket.IPPROTO_UDP, + src_port=sport + 10, + ), + ) self.send_and_assert_no_replies(self.pg0, pkts) # allow a session matching the stream: expect to pass self.create_classify_session( table_index, - self.build_ip_match(src_ip=self.pg0.remote_ip4, - proto=socket.IPPROTO_UDP, src_port=sport)) + self.build_ip_match( + src_ip=self.pg0.remote_ip4, proto=socket.IPPROTO_UDP, src_port=sport + ), + ) self.send_and_expect_only(self.pg0, pkts, self.pg1) # test dump api: ip4 is set, ip6 is not r = self.vapi.punt_acl_get() self.assertEqual(r.ip4_table_index, table_index) - self.assertEqual(r.ip6_table_index, 0xffffffff) + self.assertEqual(r.ip6_table_index, 0xFFFFFFFF) # cleanup - self.acl_active_table = '' + self.acl_active_table = "" self.vapi.punt_acl_add_del(ip4_table_index=table_index, is_add=0) # test dump api: nothing set r = self.vapi.punt_acl_get() - self.assertEqual(r.ip4_table_index, 0xffffffff) - self.assertEqual(r.ip6_table_index, 0xffffffff) + self.assertEqual(r.ip4_table_index, 0xFFFFFFFF) + self.assertEqual(r.ip6_table_index, 0xFFFFFFFF) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)