- def test_gtp4_reply(self):
- # TESTS:
- # trace add af-packet-input 10
- # pg interface on c1 172.20.0.1
- # pg interface on c4 B::1/120
-
- self.start_containers()
-
- c1 = self.containers.get(self.get_name(self.instance_names[0]))
- c2 = self.containers.get(self.get_name(self.instance_names[1]))
- c3 = self.containers.get(self.get_name(self.instance_names[2]))
- c4 = self.containers.get(self.get_name(self.instance_names[-1]))
-
- c1.pg_create_interface4(
- local_ip="172.16.0.1/30",
- remote_ip="172.16.0.2/30",
- local_mac="aa:bb:cc:dd:ee:01",
- remote_mac="aa:bb:cc:dd:ee:02")
- c4.pg_create_interface4(
- local_ip="1.0.0.2/30",
- remote_ip="1.0.0.1",
- local_mac="aa:bb:cc:dd:ee:11",
- remote_mac="aa:bb:cc:dd:ee:22")
-
- c1.vppctl_exec("set sr encaps source addr A1::1")
- c1.vppctl_exec("sr policy add bsid D4:: next D2:: next D3::")
- c1.vppctl_exec("sr policy add bsid D5:: behavior t.m.gtp4.d D4::/32 v6src_prefix C1::/64 nhtype ipv4")
- c1.vppctl_exec("sr steer l3 172.20.0.1/32 via bsid D5::")
-
- c2.vppctl_exec("sr localsid address D2:: behavior end")
-
- c3.vppctl_exec("sr localsid address D3:: behavior end")
-
- c4.vppctl_exec(
- "sr localsid prefix D4::/32 "
- "behavior end.m.gtp4.e v4src_position 64")
-
- c2.set_ipv6_route("eth2", "A2::2", "D3::/128")
- c2.set_ipv6_route("eth1", "A1::1", "C::/120")
- c3.set_ipv6_route("eth2", "A3::2", "D4::/32")
- c3.set_ipv6_route("eth1", "A2::1", "C::/120")
- c4.set_ip_pgroute("pg0", "1.0.0.1", "172.20.0.1/32")
-
- p = (Ether(src="aa:bb:cc:dd:ee:02", dst="aa:bb:cc:dd:ee:01") /
- IP(src="172.20.0.2", dst="172.20.0.1") /
- UDP(sport=2152, dport=2152) /
- GTP_U_Header(gtp_type="echo_response", S=1, teid=200, seq=200))
-
- print("Sending packet on {}:".format(c1.name))
- p.show2()
-
- c1.enable_trace(10)
- c4.enable_trace(10)
-
- c4.pg_start_capture()
-
- c1.pg_create_stream(p)
- c1.pg_enable()
-
- # timeout (sleep) if needed
- print("Sleeping")
- time.sleep(5)
-
- print("Receiving packet on {}:".format(c4.name))
- for p in c4.pg_read_packets():
- p.show2()
-
- def test_gtp4_error(self):
- # TESTS:
- # trace add af-packet-input 10
- # pg interface on c1 172.20.0.1
- # pg interface on c4 B::1/120
-
- self.start_containers()
-
- c1 = self.containers.get(self.get_name(self.instance_names[0]))
- c2 = self.containers.get(self.get_name(self.instance_names[1]))
- c3 = self.containers.get(self.get_name(self.instance_names[2]))
- c4 = self.containers.get(self.get_name(self.instance_names[-1]))
-
- c1.pg_create_interface4(
- local_ip="172.16.0.1/30",
- remote_ip="172.16.0.2/30",
- local_mac="aa:bb:cc:dd:ee:01",
- remote_mac="aa:bb:cc:dd:ee:02")
- c4.pg_create_interface4(
- local_ip="1.0.0.2/30",
- remote_ip="1.0.0.1",
- local_mac="aa:bb:cc:dd:ee:11",
- remote_mac="aa:bb:cc:dd:ee:22")
-
- c1.vppctl_exec("set sr encaps source addr A1::1")
- c1.vppctl_exec("sr policy add bsid D4:: next D2:: next D3::")
- c1.vppctl_exec("sr policy add bsid D5:: behavior t.m.gtp4.d D4::/32 v6src_prefix C1::/64 nhtype ipv4")
- c1.vppctl_exec("sr steer l3 172.20.0.1/32 via bsid D5::")
-
- c2.vppctl_exec("sr localsid address D2:: behavior end")
-
- c3.vppctl_exec("sr localsid address D3:: behavior end")
-
- c4.vppctl_exec(
- "sr localsid prefix D4::/32 "
- "behavior end.m.gtp4.e v4src_position 64")
-
- c2.set_ipv6_route("eth2", "A2::2", "D3::/128")
- c2.set_ipv6_route("eth1", "A1::1", "C::/120")
- c3.set_ipv6_route("eth2", "A3::2", "D4::/32")
- c3.set_ipv6_route("eth1", "A2::1", "C::/120")
- c4.set_ip_pgroute("pg0", "1.0.0.1", "172.20.0.1/32")
-
- p = (Ether(src="aa:bb:cc:dd:ee:02", dst="aa:bb:cc:dd:ee:01") /
- IP(src="172.20.0.2", dst="172.20.0.1") /
- UDP(sport=2152, dport=2152) /
- GTP_U_Header(gtp_type="error_indication", S=1, teid=200, seq=200)/
- IE_TEIDI(TEIDI=65535)/IE_GSNAddress(address="1.1.1.1")/
- IE_PrivateExtension(extention_value="z"))
-
- print("Sending packet on {}:".format(c1.name))
- p.show2()
-
- c1.enable_trace(10)
- c4.enable_trace(10)
-
- c4.pg_start_capture()
-
- c1.pg_create_stream(p)
- c1.pg_enable()
-
- # timeout (sleep) if needed
- print("Sleeping")
- time.sleep(5)
-
- print("Receiving packet on {}:".format(c4.name))
- for p in c4.pg_read_packets():
- p.show2()
-