[HICN-85] Added state to connectors. 24/18124/4
authorMauro Sardara <msardara@cisco.com>
Thu, 7 Mar 2019 17:34:52 +0000 (18:34 +0100)
committerMauro Sardara <msardara@cisco.com>
Fri, 8 Mar 2019 09:33:21 +0000 (09:33 +0000)
Change-Id: I26d1b37fec4a2482b97a80fa5648f243745908f7
Signed-off-by: Mauro Sardara <msardara@cisco.com>
libtransport/src/hicn/transport/core/connector.cc
libtransport/src/hicn/transport/core/connector.h
libtransport/src/hicn/transport/core/memif_connector.cc
libtransport/src/hicn/transport/core/memif_connector.h
libtransport/src/hicn/transport/core/raw_socket_connector.cc
libtransport/src/hicn/transport/core/raw_socket_connector.h
libtransport/src/hicn/transport/core/tcp_socket_connector.cc
libtransport/src/hicn/transport/core/tcp_socket_connector.h
libtransport/src/hicn/transport/core/udp_socket_connector.cc
libtransport/src/hicn/transport/core/udp_socket_connector.h
libtransport/src/hicn/transport/utils/fd_deadline_timer.h

index e89b98f..fc27157 100644 (file)
@@ -25,7 +25,8 @@ Connector::Connector(PacketReceivedCallback &&receive_callback,
                      OnReconnect &&reconnect_callback)
     : packet_pool_(),
       receive_callback_(std::move(receive_callback)),
-      on_reconnect_callback_(std::move(reconnect_callback)) {
+      on_reconnect_callback_(std::move(reconnect_callback)),
+      state_(ConnectorState::CLOSED) {
   init();
 }
 
index 529e97b..c790f2b 100644 (file)
@@ -34,6 +34,13 @@ enum class ConnectorType : uint8_t {
 };
 
 class Connector {
+ protected:
+  enum class ConnectorState {
+    CLOSED,
+    CONNECTING,
+    CONNECTED,
+  };
+
  public:
   static constexpr std::size_t packet_size = 2048;
   static constexpr std::size_t queue_size = 4096;
@@ -48,7 +55,7 @@ class Connector {
   Connector(PacketReceivedCallback &&receive_callback,
             OnReconnect &&reconnect_callback);
 
-  virtual ~Connector() = default;
+  virtual ~Connector(){};
 
   virtual void send(const Packet::MemBufPtr &packet) = 0;
 
@@ -59,7 +66,7 @@ class Connector {
 
   virtual void enableBurst() = 0;
 
-  virtual void state() = 0;
+  virtual ConnectorState state() { return state_; };
 
  protected:
   void increasePoolSize(std::size_t size = packet_pool_size);
@@ -93,6 +100,9 @@ class Connector {
   // Connector events
   PacketReceivedCallback receive_callback_;
   OnReconnect on_reconnect_callback_;
+
+  // Connector state
+  ConnectorState state_;
 };
 }  // end namespace core
 
index 863e1aa..f969580 100644 (file)
@@ -61,15 +61,15 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback,
       memif_worker_(nullptr),
       timer_set_(false),
       send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
+      disconnect_timer_(
+          std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
       io_service_(io_service),
       packet_counter_(0),
       memif_connection_(std::make_unique<memif_connection_t>()),
       tx_buf_counter_(0),
-      is_connecting_(true),
       is_reconnection_(false),
       data_available_(false),
       enable_burst_(false),
-      closed_(false),
       app_name_(app_name),
       socket_filename_("") {
   std::call_once(MemifConnector::flag_, &MemifConnector::init, this);
@@ -89,6 +89,7 @@ void MemifConnector::init() {
 
 void MemifConnector::connect(uint32_t memif_id, long memif_mode) {
   TRANSPORT_LOGI("Creating memif");
+  state_ = ConnectorState::CONNECTING;
 
   memif_id_ = memif_id;
   socket_filename_ = "/run/vpp/memif.sock";
@@ -97,7 +98,7 @@ void MemifConnector::connect(uint32_t memif_id, long memif_mode) {
 
   work_ = std::make_unique<asio::io_service::work>(io_service_);
 
-  while (is_connecting_) {
+  while (state_ != ConnectorState::CONNECTED) {
     MemifConnector::main_event_reactor_.runOneEvent();
   }
 
@@ -140,7 +141,7 @@ int MemifConnector::createMemif(uint32_t index, uint8_t mode, char *s) {
   args.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP;
   args.socket_filename = (uint8_t *)socket_filename_.c_str();
 
-  TRANSPORT_LOGI("Socket filename: %s", args.socket_filename);
+  TRANSPORT_LOGD("Socket filename: %s", args.socket_filename);
 
   args.interface_id = index;
   /* last argument for memif_create (void * private_ctx) is used by user
@@ -297,7 +298,7 @@ int MemifConnector::txBurst(uint16_t qid) {
 void MemifConnector::sendCallback(const std::error_code &ec) {
   timer_set_ = false;
 
-  if (TRANSPORT_EXPECT_TRUE(!ec && !is_connecting_)) {
+  if (TRANSPORT_EXPECT_TRUE(!ec && state_ == ConnectorState::CONNECTED)) {
     doSend();
   }
 }
@@ -315,8 +316,8 @@ void MemifConnector::processInputBuffer() {
 int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) {
   TRANSPORT_LOGI("memif connected!\n");
   MemifConnector *connector = (MemifConnector *)private_ctx;
+  connector->state_ = ConnectorState::CONNECTED;
   memif_refill_queue(conn, 0, -1, 0);
-  connector->is_connecting_ = false;
 
   return 0;
 }
@@ -326,7 +327,7 @@ int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) {
 int MemifConnector::onDisconnect(memif_conn_handle_t conn, void *private_ctx) {
   TRANSPORT_LOGI("memif disconnected!");
   MemifConnector *connector = (MemifConnector *)private_ctx;
-  //  TRANSPORT_LOGI ("Packet received: %u", connector->packet_counter_);
+  connector->state_ = ConnectorState::CLOSED;
   TRANSPORT_LOGI("Packet to process: %u",
                  connector->memif_connection_->tx_buf_num);
   return 0;
@@ -390,10 +391,6 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
 
     TRANSPORT_LOGD("freed %d buffers. %u/%u alloc/free buffers", rx, rx,
                    MAX_MEMIF_BUFS - rx);
-
-    //    if (connector->enable_burst_) {
-    //      connector->doSend();
-    //    }
   } while (ret_val == MEMIF_ERR_NOBUF);
 
   connector->io_service_.post(
@@ -415,15 +412,17 @@ error:
 }
 
 void MemifConnector::close() {
-  if (!closed_) {
-    closed_ = true;
-    event_reactor_.stop();
-    work_.reset();
+  if (state_ != ConnectorState::CLOSED) {
+    disconnect_timer_->expiresFromNow(std::chrono::microseconds(50));
+    disconnect_timer_->asyncWait([this](const std::error_code &ec) {
+      deleteMemif();
+      event_reactor_.stop();
+      work_.reset();
+    });
 
     if (memif_worker_ && memif_worker_->joinable()) {
       memif_worker_->join();
       TRANSPORT_LOGD("Memif worker joined");
-      deleteMemif();
     } else {
       TRANSPORT_LOGD("Memif worker not joined");
     }
@@ -496,11 +495,6 @@ int MemifConnector::doSend() {
   return 0;
 }
 
-void MemifConnector::state() {
-  TRANSPORT_LOGD("Event reactor map: %zu", event_reactor_.mapSize());
-  TRANSPORT_LOGD("Output buffer %zu", output_buffer_.size());
-}
-
 void MemifConnector::send(const uint8_t *packet, std::size_t len,
                           const PacketSentCallback &packet_sent) {}
 
index 6095713..057df37 100644 (file)
@@ -69,8 +69,6 @@ class MemifConnector : public Connector {
 
   void enableBurst() override;
 
-  void state() override;
-
   TRANSPORT_ALWAYS_INLINE uint32_t getMemifId() { return memif_id_; };
 
  private:
@@ -112,6 +110,7 @@ class MemifConnector : public Connector {
   utils::EpollEventReactor event_reactor_;
   std::atomic_bool timer_set_;
   std::unique_ptr<utils::FdDeadlineTimer> send_timer_;
+  std::unique_ptr<utils::FdDeadlineTimer> disconnect_timer_;
   asio::io_service &io_service_;
   std::unique_ptr<asio::io_service::work> work_;
   uint32_t packet_counter_;
@@ -119,11 +118,9 @@ class MemifConnector : public Connector {
   uint16_t tx_buf_counter_;
 
   PacketRing input_buffer_;
-  volatile bool is_connecting_;
-  volatile bool is_reconnection_;
+  bool is_reconnection_;
   bool data_available_;
   bool enable_burst_;
-  bool closed_;
   uint32_t memif_id_;
   uint8_t memif_mode_;
   std::string app_name_;
index 78241b2..12cc4e0 100644 (file)
@@ -54,6 +54,7 @@ RawSocketConnector::~RawSocketConnector() {}
 
 void RawSocketConnector::connect(const std::string &interface_name,
                                  const std::string &mac_address_str) {
+  state_ = ConnectorState::CONNECTING;
   memset(&ethernet_header_, 0, sizeof(ethernet_header_));
   struct ifreq ifr;
   struct ifreq if_mac;
@@ -112,8 +113,6 @@ void RawSocketConnector::connect(const std::string &interface_name,
   doRecvPacket();
 }
 
-void RawSocketConnector::state() { return; }
-
 void RawSocketConnector::send(const uint8_t *packet, std::size_t len,
                               const PacketSentCallback &packet_sent) {
   // asio::async_write(socket_, asio::buffer(packet, len),
@@ -124,30 +123,16 @@ void RawSocketConnector::send(const uint8_t *packet, std::size_t len,
 }
 
 void RawSocketConnector::send(const Packet::MemBufPtr &packet) {
-  // Packet &p = const_cast<Packet &>(packet);
-  // p.setTcpChecksum();
-
-  // if (!p.checkIntegrity()) {
-  //   TRANSPORT_LOGW("Sending message with wrong checksum!!!");
-  // }
-
-  // std::shared_ptr<const Packet> ptr;
-  // try {
-  //   ptr = packet.shared_from_this();
-  // } catch (std::bad_weak_ptr& exc) {
-  //   TRANSPORT_LOGW("Sending interest which has not been created using a
-  //   shared PTR! A copy will be made."); ptr =
-  //   std::shared_ptr<Packet>(packet.clone());
-  // }
-
   io_service_.post([this, packet]() {
     bool write_in_progress = !output_buffer_.empty();
     output_buffer_.push_back(std::move(packet));
-    if (!write_in_progress) {
-      doSendPacket();
-    } else {
-      // Tell the handle connect it has data to write
-      data_available_ = true;
+    if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) {
+      if (!write_in_progress) {
+        doSendPacket();
+      } else {
+        // Tell the handle connect it has data to write
+        data_available_ = true;
+      }
     }
   });
 }
@@ -202,6 +187,7 @@ void RawSocketConnector::doRecvPacket() {
 }
 
 void RawSocketConnector::doConnect() {
+  state_ = ConnectorState::CONNECTED;
   socket_.bind(raw_endpoint(&link_layer_address_, sizeof(link_layer_address_)));
 }
 
index 57a2bc0..a5474f7 100644 (file)
@@ -54,8 +54,6 @@ class RawSocketConnector : public Connector {
   void connect(const std::string &interface_name,
                const std::string &mac_address_str);
 
-  void state() override;
-
  private:
   void doConnect();
 
index 4c5f90d..f1fd4bb 100644 (file)
@@ -62,10 +62,8 @@ TcpSocketConnector::TcpSocketConnector(
       resolver_(io_service_),
       timer_(io_service_),
       read_msg_(packet_pool_.makePtr(nullptr)),
-      is_connecting_(false),
       is_reconnection_(false),
       data_available_(false),
-      is_closed_(false),
       app_name_(app_name) {}
 
 TcpSocketConnector::~TcpSocketConnector() {}
@@ -74,11 +72,10 @@ void TcpSocketConnector::connect(std::string ip_address, std::string port) {
   endpoint_iterator_ = resolver_.resolve(
       {ip_address, port, asio::ip::resolver_query_base::numeric_service});
 
+  state_ = ConnectorState::CONNECTING;
   doConnect();
 }
 
-void TcpSocketConnector::state() { return; }
-
 void TcpSocketConnector::send(const uint8_t *packet, std::size_t len,
                               const PacketSentCallback &packet_sent) {
   asio::async_write(socket_, asio::buffer(packet, len),
@@ -91,7 +88,7 @@ void TcpSocketConnector::send(const Packet::MemBufPtr &packet) {
   io_service_.post([this, packet]() {
     bool write_in_progress = !output_buffer_.empty();
     output_buffer_.push_back(std::move(packet));
-    if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) {
+    if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) {
       if (!write_in_progress) {
         doWrite();
       }
@@ -103,11 +100,13 @@ void TcpSocketConnector::send(const Packet::MemBufPtr &packet) {
 }
 
 void TcpSocketConnector::close() {
-  io_service_.dispatch([this]() {
-    is_closed_ = true;
-    socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
-    socket_.close();
-  });
+  if (state_ != ConnectorState::CLOSED) {
+    state_ = ConnectorState::CLOSED;
+    if (socket_.is_open()) {
+      socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+      socket_.close();
+    }
+  }
 }
 
 void TcpSocketConnector::doWrite() {
@@ -208,13 +207,15 @@ void TcpSocketConnector::doReadHeader() {
 }
 
 void TcpSocketConnector::tryReconnect() {
-  if (!is_connecting_ && !is_closed_) {
+  if (state_ == ConnectorState::CONNECTED) {
     TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
-    is_connecting_ = true;
+    state_ = ConnectorState::CONNECTING;
     is_reconnection_ = true;
     io_service_.post([this]() {
-      socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
-      socket_.close();
+      if (socket_.is_open()) {
+        socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+        socket_.close();
+      }
       startConnectionTimer();
       doConnect();
     });
@@ -226,7 +227,7 @@ void TcpSocketConnector::doConnect() {
                       [this](std::error_code ec, tcp::resolver::iterator) {
                         if (!ec) {
                           timer_.cancel();
-                          is_connecting_ = false;
+                          state_ = ConnectorState::CONNECTED;
                           asio::ip::tcp::no_delay noDelayOption(true);
                           socket_.set_option(noDelayOption);
                           doReadHeader();
@@ -248,7 +249,9 @@ void TcpSocketConnector::doConnect() {
                       });
 }
 
-bool TcpSocketConnector::checkConnected() { return !is_connecting_; }
+bool TcpSocketConnector::checkConnected() {
+  return state_ == ConnectorState::CONNECTED;
+}
 
 void TcpSocketConnector::enableBurst() { return; }
 
index 465eeb9..6df2fed 100644 (file)
@@ -51,8 +51,6 @@ class TcpSocketConnector : public Connector {
 
   void connect(std::string ip_address = "127.0.0.1", std::string port = "9695");
 
-  void state() override;
-
  private:
   void doConnect();
 
@@ -79,10 +77,8 @@ class TcpSocketConnector : public Connector {
 
   utils::ObjectPool<utils::MemBuf>::Ptr read_msg_;
 
-  bool is_connecting_;
   bool is_reconnection_;
   bool data_available_;
-  bool is_closed_;
 
   std::string app_name_;
 };
index f38891e..54c0eb9 100644 (file)
@@ -38,10 +38,8 @@ UdpSocketConnector::UdpSocketConnector(
       connection_timer_(io_service_),
       connection_timeout_(io_service_),
       read_msg_(packet_pool_.makePtr(nullptr)),
-      is_connecting_(false),
       is_reconnection_(false),
       data_available_(false),
-      is_closed_(false),
       app_name_(app_name) {}
 
 UdpSocketConnector::~UdpSocketConnector() {}
@@ -50,11 +48,10 @@ void UdpSocketConnector::connect(std::string ip_address, std::string port) {
   endpoint_iterator_ = resolver_.resolve(
       {ip_address, port, asio::ip::resolver_query_base::numeric_service});
 
+  state_ = ConnectorState::CONNECTING;
   doConnect();
 }
 
-void UdpSocketConnector::state() { return; }
-
 void UdpSocketConnector::send(const uint8_t *packet, std::size_t len,
                               const PacketSentCallback &packet_sent) {
   socket_.async_send(asio::buffer(packet, len),
@@ -67,7 +64,7 @@ void UdpSocketConnector::send(const Packet::MemBufPtr &packet) {
   io_service_.post([this, packet]() {
     bool write_in_progress = !output_buffer_.empty();
     output_buffer_.push_back(std::move(packet));
-    if (TRANSPORT_EXPECT_FALSE(!is_connecting_)) {
+    if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) {
       if (!write_in_progress) {
         doWrite();
       }
@@ -79,11 +76,13 @@ void UdpSocketConnector::send(const Packet::MemBufPtr &packet) {
 }
 
 void UdpSocketConnector::close() {
-  io_service_.dispatch([this]() {
-    is_closed_ = true;
-    socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both);
-    socket_.close();
-  });
+  if (state_ != ConnectorState::CLOSED) {
+    state_ = ConnectorState::CLOSED;
+    if (socket_.is_open()) {
+      socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+      socket_.close();
+    }
+  }
 }
 
 void UdpSocketConnector::doWrite() {
@@ -136,15 +135,17 @@ void UdpSocketConnector::doRead() {
 }
 
 void UdpSocketConnector::tryReconnect() {
-  if (!is_connecting_ && !is_closed_) {
+  if (state_ == ConnectorState::CONNECTED) {
     TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
-    is_connecting_ = true;
+    state_ = ConnectorState::CONNECTING;
     is_reconnection_ = true;
     connection_timer_.expires_from_now(std::chrono::seconds(1));
     connection_timer_.async_wait([this](const std::error_code &ec) {
       if (!ec) {
-        socket_.shutdown(asio::ip::udp::socket::shutdown_type::shutdown_both);
-        socket_.close();
+        if (socket_.is_open()) {
+          socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+          socket_.close();
+        }
         startConnectionTimer();
         doConnect();
       }
@@ -157,7 +158,7 @@ void UdpSocketConnector::doConnect() {
                       [this](std::error_code ec, udp::resolver::iterator) {
                         if (!ec) {
                           connection_timeout_.cancel();
-                          is_connecting_ = false;
+                          state_ = ConnectorState::CONNECTED;
                           doRead();
 
                           if (data_available_) {
@@ -176,7 +177,9 @@ void UdpSocketConnector::doConnect() {
                       });
 }
 
-bool UdpSocketConnector::checkConnected() { return !is_connecting_; }
+bool UdpSocketConnector::checkConnected() {
+  return state_ == ConnectorState::CONNECTED;
+}
 
 void UdpSocketConnector::enableBurst() { return; }
 
index 4cde8f2..87198ef 100644 (file)
@@ -49,8 +49,6 @@ class UdpSocketConnector : public Connector {
 
   void connect(std::string ip_address = "127.0.0.1", std::string port = "9695");
 
-  void state() override;
-
  private:
   void doConnect();
 
@@ -76,10 +74,8 @@ class UdpSocketConnector : public Connector {
 
   utils::ObjectPool<utils::MemBuf>::Ptr read_msg_;
 
-  bool is_connecting_;
   bool is_reconnection_;
   bool data_available_;
-  bool is_closed_;
 
   std::string app_name_;
 };
index 3ed4590..6fb823a 100644 (file)
@@ -53,7 +53,7 @@ class FdDeadlineTimer : public DeadlineTimer<FdDeadlineTimer> {
 
     reactor_.addFileDescriptor(
         timer_fd_, events,
-        [callback{move(callback)}](const Event &event) -> int {
+        [callback = std::forward<WaitHandler &&>(callback)](const Event &event) -> int {
           uint64_t s = 0;
           std::error_code ec;