[HICN-17] Add possibility to destroy connection directly from stopEventsLoop with... 42/17142/2
authorMauro Sardara <msardara@cisco.com>
Tue, 29 Jan 2019 14:49:15 +0000 (15:49 +0100)
committerMichele Papalini <micpapal@cisco.com>
Tue, 29 Jan 2019 17:30:27 +0000 (17:30 +0000)
Change-Id: I869a079a7b2f436768a62de66fd9281a7d1243cd
Signed-off-by: Mauro Sardara <msardara@cisco.com>
libtransport/src/hicn/transport/core/memif_connector.cc
libtransport/src/hicn/transport/core/portal.h
libtransport/src/hicn/transport/core/socket_connector.cc
libtransport/src/hicn/transport/core/socket_connector.h
libtransport/src/hicn/transport/interfaces/socket_consumer.cc
libtransport/src/hicn/transport/interfaces/socket_consumer.h
libtransport/src/hicn/transport/interfaces/socket_producer.cc
utils/src/ping_client.cc

index b3785e5..6c5f2ff 100644 (file)
@@ -38,7 +38,6 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback,
       timer_set_(false),
       send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
       io_service_(io_service),
-      work_(std::make_unique<asio::io_service::work>(io_service_)),
       packet_counter_(0),
       memif_connection_({}),
       tx_buf_counter_(0),
@@ -74,6 +73,8 @@ void MemifConnector::connect(uint32_t memif_id, long memif_mode) {
 
   createMemif(memif_id, memif_mode, nullptr);
 
+  work_ = std::make_unique<asio::io_service::work>(io_service_);
+
   while (is_connecting_) {
     MemifConnector::main_event_reactor_.runOneEvent();
   }
@@ -402,7 +403,7 @@ void MemifConnector::close() {
   if (!closed_) {
     closed_ = true;
     event_reactor_.stop();
-    io_service_.stop();
+    work_.reset();
 
     if (memif_worker_ && memif_worker_->joinable()) {
       memif_worker_->join();
index 52a721a..58406bb 100644 (file)
@@ -97,9 +97,7 @@ class Portal {
     virtual void onInterest(Interest::Ptr &&i) = 0;
   };
 
-  Portal() : Portal(internal_io_service_) {
-    internal_work_ = std::make_unique<asio::io_service::work>(io_service_);
-  }
+  Portal() : Portal(internal_io_service_) {}
 
   Portal(asio::io_service &io_service)
       : io_service_(io_service),
@@ -130,8 +128,7 @@ class Portal {
   }
 
   ~Portal() {
-    connector_.close();
-    stopEventsLoop();
+    stopEventsLoop(true);
   }
 
   TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) {
@@ -227,15 +224,17 @@ class Portal {
     forwarder_interface_.send(content_object);
   }
 
-  TRANSPORT_ALWAYS_INLINE void stopEventsLoop() {
-    internal_work_.reset();
-
+  TRANSPORT_ALWAYS_INLINE void stopEventsLoop(bool kill_connection = false) {
     for (auto &pend_interest : pending_interest_hash_table_) {
       pend_interest.second->cancelTimer();
     }
 
     clear();
 
+    if(kill_connection) {
+      connector_.close();
+    }
+
     io_service_.post([this]() { io_service_.stop(); });
   }
 
@@ -340,7 +339,6 @@ class Portal {
  private:
   asio::io_service &io_service_;
   asio::io_service internal_io_service_;
-  std::unique_ptr<asio::io_service::work> internal_work_;
 
   std::string app_name_;
 
index 332b87e..704d3ba 100644 (file)
@@ -62,6 +62,7 @@ SocketConnector::SocketConnector(PacketReceivedCallback &&receive_callback,
       is_connecting_(false),
       is_reconnection_(false),
       data_available_(false),
+      is_closed_(false),
       receive_callback_(receive_callback),
       on_reconnect_callback_(on_reconnect_callback),
       app_name_(app_name) {}
@@ -102,7 +103,11 @@ void SocketConnector::send(const Packet::MemBufPtr &packet) {
 }
 
 void SocketConnector::close() {
-  io_service_.post([this]() { socket_.close(); });
+  io_service_.dispatch([this]() {
+    is_closed_ = true;
+    socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+    socket_.close();
+  });
 }
 
 void SocketConnector::doWrite() {
@@ -125,6 +130,9 @@ void SocketConnector::doWrite() {
           if (!output_buffer_.empty()) {
             doWrite();
           }
+        } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+          // The connection has been closed by the application.
+          return;
         } else {
           TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
           tryReconnect();
@@ -141,6 +149,9 @@ void SocketConnector::doReadBody(std::size_t body_length) {
         if (TRANSPORT_EXPECT_TRUE(!ec)) {
           receive_callback_(std::move(read_msg_));
           doReadHeader();
+        } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+          // The connection has been closed by the application.
+          return;
         } else {
           TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
           tryReconnect();
@@ -165,6 +176,9 @@ void SocketConnector::doReadHeader() {
           } else {
             TRANSPORT_LOGE("Decoding error. Ignoring packet.");
           }
+        } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+          // The connection has been closed by the application.
+          return;
         } else {
           TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
           tryReconnect();
@@ -173,11 +187,12 @@ void SocketConnector::doReadHeader() {
 }
 
 void SocketConnector::tryReconnect() {
-  if (!is_connecting_) {
+  if (!is_connecting_ && !is_closed_) {
     TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
     is_connecting_ = true;
     is_reconnection_ = true;
     io_service_.post([this]() {
+      socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
       socket_.close();
       startConnectionTimer();
       doConnect();
index b1757e5..e014111 100644 (file)
@@ -79,6 +79,7 @@ class SocketConnector : public Connector {
   bool is_connecting_;
   bool is_reconnection_;
   bool data_available_;
+  bool is_closed_;
 
   PacketReceivedCallback receive_callback_;
   OnReconnect on_reconnect_callback_;
index 27ed4e6..89411e9 100644 (file)
@@ -99,7 +99,7 @@ int ConsumerSocket::consume(const Name &name,
 
   transport_protocol_->start(receive_buffer);
 
-  return CONSUMER_READY;
+  return CONSUMER_FINISHED;
 }
 
 int ConsumerSocket::asyncConsume(
@@ -115,7 +115,7 @@ int ConsumerSocket::asyncConsume(
     });
   }
 
-  return CONSUMER_READY;
+  return CONSUMER_RUNNING;
 }
 
 void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest,
index 9e309aa..536d2fd 100755 (executable)
@@ -26,8 +26,9 @@
 #include <hicn/transport/utils/event_thread.h>
 #include <hicn/transport/utils/sharable_vector.h>
 
-#define CONSUMER_READY 0
+#define CONSUMER_FINISHED 0
 #define CONSUMER_BUSY 1
+#define CONSUMER_RUNNING 2
 
 namespace transport {
 
index 69adc2b..d9204f1 100755 (executable)
@@ -54,9 +54,9 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service)
 }
 
 ProducerSocket::~ProducerSocket() {
-  TRANSPORT_LOGI("Destroying the ProducerSocket");
+
   processing_thread_stop_ = true;
-  portal_->stopEventsLoop();
+  portal_->stopEventsLoop(true);
 
   if (processing_thread_.joinable()) {
     processing_thread_.join();
@@ -79,8 +79,6 @@ void ProducerSocket::serveForever() {
 }
 
 void ProducerSocket::stop() {
-  TRANSPORT_LOGI("Calling stop for ProducerSocket");
-  portal_->killConnection();
   portal_->stopEventsLoop();
 }
 
index e98a8b4..24f7bd7 100755 (executable)
@@ -76,13 +76,15 @@ class Configuration {
 
 class Client : interface::BasePortal::ConsumerCallback {
  public:
-  Client(Configuration *c) : portal_() {
+  Client(Configuration *c) 
+    : portal_(),
+      signals_(portal_.getIoService(), SIGINT, SIGQUIT) {
     // Let the main thread to catch SIGINT and SIGQUIT
-    // asio::signal_set signals(io_service, SIGINT, SIGQUIT);
-    // signals.async_wait(std::bind(&Client::afterSignal, this));
-
     portal_.connect();
     portal_.setConsumerCallback(this);
+
+    signals_.async_wait(std::bind(&Client::afterSignal, this));
+
     timer_.reset(new asio::steady_timer(portal_.getIoService()));
     config_ = c;
     sequence_number_ = config_->first_suffix_;
@@ -272,7 +274,7 @@ class Client : interface::BasePortal::ConsumerCallback {
     std::cout << "Stop ping" << std::endl;
     std::cout << "Sent: " << sent_ << " Received: " << received_
               << " Timeouts: " << timedout_ << std::endl;
-    portal_.stopEventsLoop();
+    portal_.stopEventsLoop(true);
   }
 
   void reset() {
@@ -289,6 +291,7 @@ class Client : interface::BasePortal::ConsumerCallback {
  private:
   SendTimeMap send_timestamps_;
   interface::BasePortal portal_;
+  asio::signal_set signals_;
   uint64_t sequence_number_;
   uint64_t last_jump_;
   uint64_t processed_;