from vpp_papi import mac_pton
from syslog_rfc5424_parser import SyslogMessage, ParseError
from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
-from vpp_papi_provider import SYSLOG_SEVERITY
from io import BytesIO
from vpp_papi import VppEnum
from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
def config_flags(self):
return VppEnum.vl_api_nat_config_flags_t
+ @property
+ def SYSLOG_SEVERITY(self):
+ return VppEnum.vl_api_syslog_severity_t
+
def clear_nat44(self):
"""
Clear NAT44 configuration.
self.ipfix_src_port = 4739
self.ipfix_domain_id = 1
- self.vapi.syslog_set_filter(SYSLOG_SEVERITY.EMERG)
+ self.vapi.syslog_set_filter(
+ self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
self.vapi.nat_ha_set_listener(ip_address='0.0.0.0', port=0,
path_mtu=512)
is_add=1)
# in2out
- tcpn = self.statistics.get_counter(
+ tcpn = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/TCP packets')
- udpn = self.statistics.get_counter(
+ udpn = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/UDP packets')
- icmpn = self.statistics.get_counter(
+ icmpn = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/ICMP packets')
- totaln = self.statistics.get_counter(
+ totaln = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/good in2out packets processed')
pkts = self.create_stream_in(self.pg0, self.pg1)
capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
- err = self.statistics.get_counter(
+ err = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/TCP packets')
self.assertEqual(err - tcpn, 1)
- err = self.statistics.get_counter(
+ err = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/UDP packets')
self.assertEqual(err - udpn, 1)
- err = self.statistics.get_counter(
+ err = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/ICMP packets')
self.assertEqual(err - icmpn, 1)
- err = self.statistics.get_counter(
+ err = self.statistics.get_err_counter(
'/err/nat44-in2out-slowpath/good in2out packets processed')
self.assertEqual(err - totaln, 3)
# out2in
- tcpn = self.statistics.get_counter('/err/nat44-out2in/TCP packets')
- udpn = self.statistics.get_counter('/err/nat44-out2in/UDP packets')
- icmpn = self.statistics.get_counter('/err/nat44-out2in/ICMP packets')
- totaln = self.statistics.get_counter(
+ tcpn = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets')
+ udpn = self.statistics.get_err_counter('/err/nat44-out2in/UDP packets')
+ icmpn = self.statistics.get_err_counter(
+ '/err/nat44-out2in/ICMP packets')
+ totaln = self.statistics.get_err_counter(
'/err/nat44-out2in/good out2in packets processed')
pkts = self.create_stream_out(self.pg1)
capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
- err = self.statistics.get_counter('/err/nat44-out2in/TCP packets')
+ err = self.statistics.get_err_counter('/err/nat44-out2in/TCP packets')
self.assertEqual(err - tcpn, 1)
- err = self.statistics.get_counter('/err/nat44-out2in/UDP packets')
+ err = self.statistics.get_err_counter('/err/nat44-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
- err = self.statistics.get_counter('/err/nat44-out2in/ICMP packets')
+ err = self.statistics.get_err_counter('/err/nat44-out2in/ICMP packets')
self.assertEqual(err - icmpn, 1)
- err = self.statistics.get_counter(
+ err = self.statistics.get_err_counter(
'/err/nat44-out2in/good out2in packets processed')
self.assertEqual(err - totaln, 3)
def test_syslog_apmap(self):
""" Test syslog address and port mapping creation and deletion """
- self.vapi.syslog_set_filter(SYSLOG_SEVERITY.INFO)
+ self.vapi.syslog_set_filter(
+ self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
self.vapi.syslog_set_sender(self.pg3.local_ip4n, self.pg3.remote_ip4n)
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_INSIDE
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- err = self.statistics.get_counter('/err/nat44-classify/next in2out')
+ err = self.statistics.get_err_counter(
+ '/err/nat44-classify/next in2out')
self.assertEqual(err, 1)
- err = self.statistics.get_counter('/err/nat44-classify/next out2in')
+ err = self.statistics.get_err_counter(
+ '/err/nat44-classify/next out2in')
self.assertEqual(err, 1)
def test_del_session(self):
self.verify_capture_in(capture, self.pg0)
finally:
+ self.nat44_add_address(self.nat_addr, is_add=0)
self.pg1.unconfig_ip4()
self.pg2.unconfig_ip4()
self.pg1.set_table_ip4(0)
stats = self.statistics.get_counter('/nat44/ha/del-event-recv')
self.assertEqual(stats[0][0], 1)
- stats = self.statistics.get_counter('/err/nat-ha/pkts-processed')
+ stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
self.assertEqual(stats, 2)
# send HA session refresh event to failover/passive
stats = self.statistics.get_counter('/nat44/ha/refresh-event-recv')
self.assertEqual(stats[0][0], 1)
- stats = self.statistics.get_counter('/err/nat-ha/pkts-processed')
+ stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
self.assertEqual(stats, 3)
# send packet to test session created by HA
self.assertEqual(1, nat_config.endpoint_dependent)
# in2out
- tcpn = self.statistics.get_counter(
+ tcpn = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/TCP packets')
- udpn = self.statistics.get_counter(
+ udpn = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/UDP packets')
- icmpn = self.statistics.get_counter(
+ icmpn = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/ICMP packets')
- totaln = self.statistics.get_counter(
+ totaln = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/good in2out packets processed')
pkts = self.create_stream_in(self.pg0, self.pg1)
capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
- err = self.statistics.get_counter(
+ err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/TCP packets')
self.assertEqual(err - tcpn, 1)
- err = self.statistics.get_counter(
+ err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/UDP packets')
self.assertEqual(err - udpn, 1)
- err = self.statistics.get_counter(
+ err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/ICMP packets')
self.assertEqual(err - icmpn, 1)
- err = self.statistics.get_counter(
+ err = self.statistics.get_err_counter(
'/err/nat44-ed-in2out-slowpath/good in2out packets processed')
self.assertEqual(err - totaln, 3)
# out2in
- tcpn = self.statistics.get_counter('/err/nat44-ed-out2in/TCP packets')
- udpn = self.statistics.get_counter('/err/nat44-ed-out2in/UDP packets')
- icmpn = self.statistics.get_counter(
+ tcpn = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/TCP packets')
+ udpn = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/UDP packets')
+ icmpn = self.statistics.get_err_counter(
'/err/nat44-ed-out2in-slowpath/ICMP packets')
- totaln = self.statistics.get_counter(
+ totaln = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/good out2in packets processed')
pkts = self.create_stream_out(self.pg1)
capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
- err = self.statistics.get_counter('/err/nat44-ed-out2in/TCP packets')
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/TCP packets')
self.assertEqual(err - tcpn, 1)
- err = self.statistics.get_counter('/err/nat44-ed-out2in/UDP packets')
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
- err = self.statistics.get_counter(
+ err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in-slowpath/ICMP packets')
self.assertEqual(err - icmpn, 1)
- err = self.statistics.get_counter(
+ err = self.statistics.get_err_counter(
'/err/nat44-ed-out2in/good out2in packets processed')
self.assertEqual(err - totaln, 2)
def test_syslog_sess(self):
""" Test syslog session creation and deletion """
- self.vapi.syslog_set_filter(SYSLOG_SEVERITY.INFO)
+ self.vapi.syslog_set_filter(
+ self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
self.vapi.syslog_set_sender(self.pg2.local_ip4n, self.pg2.remote_ip4n)
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_INSIDE
sw_if_index=self.pg1.sw_if_index)
# in2out
- tcpn = self.statistics.get_counter('/err/nat64-in2out/TCP packets')
- udpn = self.statistics.get_counter('/err/nat64-in2out/UDP packets')
- icmpn = self.statistics.get_counter('/err/nat64-in2out/ICMP packets')
- totaln = self.statistics.get_counter(
+ tcpn = self.statistics.get_err_counter('/err/nat64-in2out/TCP packets')
+ udpn = self.statistics.get_err_counter('/err/nat64-in2out/UDP packets')
+ icmpn = self.statistics.get_err_counter(
+ '/err/nat64-in2out/ICMP packets')
+ totaln = self.statistics.get_err_counter(
'/err/nat64-in2out/good in2out packets processed')
pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
self.verify_capture_out(capture, nat_ip=self.nat_addr,
dst_ip=self.pg1.remote_ip4)
- err = self.statistics.get_counter('/err/nat64-in2out/TCP packets')
+ err = self.statistics.get_err_counter('/err/nat64-in2out/TCP packets')
self.assertEqual(err - tcpn, 1)
- err = self.statistics.get_counter('/err/nat64-in2out/UDP packets')
+ err = self.statistics.get_err_counter('/err/nat64-in2out/UDP packets')
self.assertEqual(err - udpn, 1)
- err = self.statistics.get_counter('/err/nat64-in2out/ICMP packets')
+ err = self.statistics.get_err_counter('/err/nat64-in2out/ICMP packets')
self.assertEqual(err - icmpn, 1)
- err = self.statistics.get_counter(
+ err = self.statistics.get_err_counter(
'/err/nat64-in2out/good in2out packets processed')
self.assertEqual(err - totaln, 3)
# out2in
- tcpn = self.statistics.get_counter('/err/nat64-out2in/TCP packets')
- udpn = self.statistics.get_counter('/err/nat64-out2in/UDP packets')
- icmpn = self.statistics.get_counter('/err/nat64-out2in/ICMP packets')
- totaln = self.statistics.get_counter(
+ tcpn = self.statistics.get_err_counter('/err/nat64-out2in/TCP packets')
+ udpn = self.statistics.get_err_counter('/err/nat64-out2in/UDP packets')
+ icmpn = self.statistics.get_err_counter(
+ '/err/nat64-out2in/ICMP packets')
+ totaln = self.statistics.get_err_counter(
'/err/nat64-out2in/good out2in packets processed')
pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
- err = self.statistics.get_counter('/err/nat64-out2in/TCP packets')
+ err = self.statistics.get_err_counter('/err/nat64-out2in/TCP packets')
self.assertEqual(err - tcpn, 1)
- err = self.statistics.get_counter('/err/nat64-out2in/UDP packets')
+ err = self.statistics.get_err_counter('/err/nat64-out2in/UDP packets')
self.assertEqual(err - udpn, 1)
- err = self.statistics.get_counter('/err/nat64-out2in/ICMP packets')
+ err = self.statistics.get_err_counter('/err/nat64-out2in/ICMP packets')
self.assertEqual(err - icmpn, 1)
- err = self.statistics.get_counter(
+ err = self.statistics.get_err_counter(
'/err/nat64-out2in/good out2in packets processed')
self.assertEqual(err - totaln, 3)
sw_if_index=self.pg0.sw_if_index)
self.vapi.nat64_add_del_interface(is_add=1, flags=0,
sw_if_index=self.pg1.sw_if_index)
- self.vapi.syslog_set_filter(SYSLOG_SEVERITY.INFO)
+ self.vapi.syslog_set_filter(
+ self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
self.vapi.syslog_set_sender(self.pg3.local_ip4n, self.pg3.remote_ip4n)
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
self.ipfix_src_port = 4739
self.ipfix_domain_id = 1
- self.vapi.syslog_set_filter(SYSLOG_SEVERITY.EMERG)
+ self.vapi.syslog_set_filter(
+ self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
self.vapi.nat_set_timeouts(udp=300, tcp_established=7440,
tcp_transitory=240, icmp=60)