+ r = self.cli("trace add pg-input %i filter" % len(packets))
+ self.assertTrue(r.retval == 0)
+ self.pg0.add_stream(packets)
+ self.cli("pa en")
+ self.assert_hits(n if n is not None else len(packets))
+ self.cli("clear trace")
+ self.cli(
+ "classify filter trace del mask hex %s match hex %s" %
+ (mask, match))
+
+ def test_encap(self):
+ """ Packet Tracer Filter Test with encap """
+
+ # the packet we are trying to match
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
+ UDP() /
+ VXLAN() /
+ Ether() /
+ IP() /
+ UDP() /
+ GENEVE(vni=1234) /
+ Ether() /
+ IP(src='192.168.4.167') /
+ UDP() /
+ Raw('\xa5' * 100))
+
+ #
+ # compute filter mask & value
+ # we compute it by XOR'ing a template packet with a modified packet
+ # we need to set checksums to 0 to make sure scapy will not recompute
+ # them
+ #
+ tmpl = (Ether() /
+ IP(chksum=0) /
+ UDP(chksum=0) /
+ VXLAN() /
+ Ether() /
+ IP(chksum=0) /
+ UDP(chksum=0) /
+ GENEVE(vni=0) /
+ Ether() /
+ IP(src='0.0.0.0', chksum=0))
+ ori = raw(tmpl)
+
+ # the mask
+ tmpl[GENEVE].vni = 0xffffff
+ user = tmpl[GENEVE].payload
+ user[IP].src = '255.255.255.255'
+ new = raw(tmpl)
+ mask = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
+
+ # this does not match (wrong vni)
+ tmpl[GENEVE].vni = 1
+ user = tmpl[GENEVE].payload
+ user[IP].src = '192.168.4.167'
+ new = raw(tmpl)
+ match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
+ self.assert_classify(mask, match, [p] * 11, 0)
+
+ # this must match
+ tmpl[GENEVE].vni = 1234
+ new = raw(tmpl)
+ match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
+ self.assert_classify(mask, match, [p] * 17)
+