OnReconnect &&reconnect_callback)
: packet_pool_(),
receive_callback_(std::move(receive_callback)),
- on_reconnect_callback_(std::move(reconnect_callback)) {
+ on_reconnect_callback_(std::move(reconnect_callback)),
+ state_(ConnectorState::CLOSED) {
init();
}
};
class Connector {
+ protected:
+ enum class ConnectorState {
+ CLOSED,
+ CONNECTING,
+ CONNECTED,
+ };
+
public:
static constexpr std::size_t packet_size = 2048;
static constexpr std::size_t queue_size = 4096;
Connector(PacketReceivedCallback &&receive_callback,
OnReconnect &&reconnect_callback);
- virtual ~Connector() = default;
+ virtual ~Connector(){};
virtual void send(const Packet::MemBufPtr &packet) = 0;
virtual void enableBurst() = 0;
- virtual void state() = 0;
+ virtual ConnectorState state() { return state_; };
protected:
void increasePoolSize(std::size_t size = packet_pool_size);
// Connector events
PacketReceivedCallback receive_callback_;
OnReconnect on_reconnect_callback_;
+
+ // Connector state
+ ConnectorState state_;
};
} // end namespace core
memif_worker_(nullptr),
timer_set_(false),
send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
+ disconnect_timer_(
+ std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
io_service_(io_service),
packet_counter_(0),
memif_connection_(std::make_unique<memif_connection_t>()),
tx_buf_counter_(0),
- is_connecting_(true),
is_reconnection_(false),
data_available_(false),
enable_burst_(false),
- closed_(false),
app_name_(app_name),
socket_filename_("") {
std::call_once(MemifConnector::flag_, &MemifConnector::init, this);
void MemifConnector::connect(uint32_t memif_id, long memif_mode) {
TRANSPORT_LOGI("Creating memif");
+ state_ = ConnectorState::CONNECTING;
memif_id_ = memif_id;
socket_filename_ = "/run/vpp/memif.sock";
work_ = std::make_unique<asio::io_service::work>(io_service_);
- while (is_connecting_) {
+ while (state_ != ConnectorState::CONNECTED) {
MemifConnector::main_event_reactor_.runOneEvent();
}
args.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP;
args.socket_filename = (uint8_t *)socket_filename_.c_str();
- TRANSPORT_LOGI("Socket filename: %s", args.socket_filename);
+ TRANSPORT_LOGD("Socket filename: %s", args.socket_filename);
args.interface_id = index;
/* last argument for memif_create (void * private_ctx) is used by user
void MemifConnector::sendCallback(const std::error_code &ec) {
timer_set_ = false;
- if (TRANSPORT_EXPECT_TRUE(!ec && !is_connecting_)) {
+ if (TRANSPORT_EXPECT_TRUE(!ec && state_ == ConnectorState::CONNECTED)) {
doSend();
}
}
int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) {
TRANSPORT_LOGI("memif connected!\n");
MemifConnector *connector = (MemifConnector *)private_ctx;
+ connector->state_ = ConnectorState::CONNECTED;
memif_refill_queue(conn, 0, -1, 0);
- connector->is_connecting_ = false;
return 0;
}
int MemifConnector::onDisconnect(memif_conn_handle_t conn, void *private_ctx) {
TRANSPORT_LOGI("memif disconnected!");
MemifConnector *connector = (MemifConnector *)private_ctx;
- // TRANSPORT_LOGI ("Packet received: %u", connector->packet_counter_);
+ connector->state_ = ConnectorState::CLOSED;
TRANSPORT_LOGI("Packet to process: %u",
connector->memif_connection_->tx_buf_num);
return 0;
TRANSPORT_LOGD("freed %d buffers. %u/%u alloc/free buffers", rx, rx,
MAX_MEMIF_BUFS - rx);
-
- // if (connector->enable_burst_) {
- // connector->doSend();
- // }
} while (ret_val == MEMIF_ERR_NOBUF);
connector->io_service_.post(
}
void MemifConnector::close() {
- if (!closed_) {
- closed_ = true;
- event_reactor_.stop();
- work_.reset();
+ if (state_ != ConnectorState::CLOSED) {
+ disconnect_timer_->expiresFromNow(std::chrono::microseconds(50));
+ disconnect_timer_->asyncWait([this](const std::error_code &ec) {
+ deleteMemif();
+ event_reactor_.stop();
+ work_.reset();
+ });
if (memif_worker_ && memif_worker_->joinable()) {
memif_worker_->join();
TRANSPORT_LOGD("Memif worker joined");
- deleteMemif();
} else {
TRANSPORT_LOGD("Memif worker not joined");
}
return 0;
}
-void MemifConnector::state() {
- TRANSPORT_LOGD("Event reactor map: %zu", event_reactor_.mapSize());
- TRANSPORT_LOGD("Output buffer %zu", output_buffer_.size());
-}
-
void MemifConnector::send(const uint8_t *packet, std::size_t len,
const PacketSentCallback &packet_sent) {}
void enableBurst() override;
- void state() override;
-
TRANSPORT_ALWAYS_INLINE uint32_t getMemifId() { return memif_id_; };
private:
utils::EpollEventReactor event_reactor_;
std::atomic_bool timer_set_;
std::unique_ptr<utils::FdDeadlineTimer> send_timer_;
+ std::unique_ptr<utils::FdDeadlineTimer> disconnect_timer_;
asio::io_service &io_service_;
std::unique_ptr<asio::io_service::work> work_;
uint32_t packet_counter_;
uint16_t tx_buf_counter_;
PacketRing input_buffer_;
- volatile bool is_connecting_;
- volatile bool is_reconnection_;
+ bool is_reconnection_;
bool data_available_;
bool enable_burst_;
- bool closed_;
uint32_t memif_id_;
uint8_t memif_mode_;
std::string app_name_;
void RawSocketConnector::connect(const std::string &interface_name,
const std::string &mac_address_str) {
+ state_ = ConnectorState::CONNECTING;
memset(ðernet_header_, 0, sizeof(ethernet_header_));
struct ifreq ifr;
struct ifreq if_mac;
doRecvPacket();
}
-void RawSocketConnector::state() { return; }
-
void RawSocketConnector::send(const uint8_t *packet, std::size_t len,
const PacketSentCallback &packet_sent) {
// asio::async_write(socket_, asio::buffer(packet, len),
}
void RawSocketConnector::send(const Packet::MemBufPtr &packet) {
- // Packet &p = const_cast<Packet &>(packet);
- // p.setTcpChecksum();
-
- // if (!p.checkIntegrity()) {
- // TRANSPORT_LOGW("Sending message with wrong checksum!!!");
- // }
-
- // std::shared_ptr<const Packet> ptr;
- // try {
- // ptr = packet.shared_from_this();
- // } catch (std::bad_weak_ptr& exc) {
- // TRANSPORT_LOGW("Sending interest which has not been created using a
- // shared PTR! A copy will be made."); ptr =
- // std::shared_ptr<Packet>(packet.clone());
- // }
-
io_service_.post([this, packet]() {
bool write_in_progress = !output_buffer_.empty();
output_buffer_.push_back(std::move(packet));
- if (!write_in_progress) {
- doSendPacket();
- } else {
- // Tell the handle connect it has data to write
- data_available_ = true;
+ if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) {
+ if (!write_in_progress) {
+ doSendPacket();
+ } else {
+ // Tell the handle connect it has data to write
+ data_available_ = true;
+ }
}
});
}
}
void RawSocketConnector::doConnect() {
+ state_ = ConnectorState::CONNECTED;
socket_.bind(raw_endpoint(&link_layer_address_, sizeof(link_layer_address_)));
}
void connect(const std::string &interface_name,
const std::string &mac_address_str);
- void state() override;
-
private:
void doConnect();
resolver_(io_service_),
timer_(io_service_),
read_msg_(packet_pool_.makePtr(nullptr)),
- is_connecting_(false),
is_reconnection_(false),
data_available_(false),
- is_closed_(false),
app_name_(app_name) {}
TcpSocketConnector::~TcpSocketConnector() {}
endpoint_iterator_ = resolver_.resolve(
{ip_address, port, asio::ip::resolver_query_base::numeric_service});
+ state_ = ConnectorState::CONNECTING;
doConnect();
}
-void TcpSocketConnector::state() { return; }
-
void TcpSocketConnector::send(const uint8_t *packet, std::size_t len,
const PacketSentCallback &packet_sent) {
asio::async_write(socket_, asio::buffer(packet, len),
io_service_.post([this, packet]() {
bool write_in_progress = !output_buffer_.empty();
output_buffer_.push_back(std::move(packet));
- if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) {
+ if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) {
if (!write_in_progress) {
doWrite();
}
}
void TcpSocketConnector::close() {
- io_service_.dispatch([this]() {
- is_closed_ = true;
- socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
- socket_.close();
- });
+ if (state_ != ConnectorState::CLOSED) {
+ state_ = ConnectorState::CLOSED;
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
+ }
}
void TcpSocketConnector::doWrite() {
}
void TcpSocketConnector::tryReconnect() {
- if (!is_connecting_ && !is_closed_) {
+ if (state_ == ConnectorState::CONNECTED) {
TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
- is_connecting_ = true;
+ state_ = ConnectorState::CONNECTING;
is_reconnection_ = true;
io_service_.post([this]() {
- socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
- socket_.close();
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
startConnectionTimer();
doConnect();
});
[this](std::error_code ec, tcp::resolver::iterator) {
if (!ec) {
timer_.cancel();
- is_connecting_ = false;
+ state_ = ConnectorState::CONNECTED;
asio::ip::tcp::no_delay noDelayOption(true);
socket_.set_option(noDelayOption);
doReadHeader();
});
}
-bool TcpSocketConnector::checkConnected() { return !is_connecting_; }
+bool TcpSocketConnector::checkConnected() {
+ return state_ == ConnectorState::CONNECTED;
+}
void TcpSocketConnector::enableBurst() { return; }
void connect(std::string ip_address = "127.0.0.1", std::string port = "9695");
- void state() override;
-
private:
void doConnect();
utils::ObjectPool<utils::MemBuf>::Ptr read_msg_;
- bool is_connecting_;
bool is_reconnection_;
bool data_available_;
- bool is_closed_;
std::string app_name_;
};
connection_timer_(io_service_),
connection_timeout_(io_service_),
read_msg_(packet_pool_.makePtr(nullptr)),
- is_connecting_(false),
is_reconnection_(false),
data_available_(false),
- is_closed_(false),
app_name_(app_name) {}
UdpSocketConnector::~UdpSocketConnector() {}
endpoint_iterator_ = resolver_.resolve(
{ip_address, port, asio::ip::resolver_query_base::numeric_service});
+ state_ = ConnectorState::CONNECTING;
doConnect();
}
-void UdpSocketConnector::state() { return; }
-
void UdpSocketConnector::send(const uint8_t *packet, std::size_t len,
const PacketSentCallback &packet_sent) {
socket_.async_send(asio::buffer(packet, len),
io_service_.post([this, packet]() {
bool write_in_progress = !output_buffer_.empty();
output_buffer_.push_back(std::move(packet));
- if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) {
+ if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) {
if (!write_in_progress) {
doWrite();
}
}
void UdpSocketConnector::close() {
- io_service_.dispatch([this]() {
- is_closed_ = true;
- socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both);
- socket_.close();
- });
+ if (state_ != ConnectorState::CLOSED) {
+ state_ = ConnectorState::CLOSED;
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
+ }
}
void UdpSocketConnector::doWrite() {
}
void UdpSocketConnector::tryReconnect() {
- if (!is_connecting_ && !is_closed_) {
+ if (state_ == ConnectorState::CONNECTED) {
TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
- is_connecting_ = true;
+ state_ = ConnectorState::CONNECTING;
is_reconnection_ = true;
connection_timer_.expires_from_now(std::chrono::seconds(1));
connection_timer_.async_wait([this](const std::error_code &ec) {
if (!ec) {
- socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both);
- socket_.close();
+ if (socket_.is_open()) {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ }
startConnectionTimer();
doConnect();
}
[this](std::error_code ec, udp::resolver::iterator) {
if (!ec) {
connection_timeout_.cancel();
- is_connecting_ = false;
+ state_ = ConnectorState::CONNECTED;
doRead();
if (data_available_) {
});
}
-bool UdpSocketConnector::checkConnected() { return !is_connecting_; }
+bool UdpSocketConnector::checkConnected() {
+ return state_ == ConnectorState::CONNECTED;
+}
void UdpSocketConnector::enableBurst() { return; }
void connect(std::string ip_address = "127.0.0.1", std::string port = "9695");
- void state() override;
-
private:
void doConnect();
utils::ObjectPool<utils::MemBuf>::Ptr read_msg_;
- bool is_connecting_;
bool is_reconnection_;
bool data_available_;
- bool is_closed_;
std::string app_name_;
};
reactor_.addFileDescriptor(
timer_fd_, events,
- [callback{move(callback)}](const Event &event) -> int {
+ [callback = std::forward<WaitHandler &&>(callback)](const Event &event) -> int {
uint64_t s = 0;
std::error_code ec;