timer_set_(false),
send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
io_service_(io_service),
- work_(std::make_unique<asio::io_service::work>(io_service_)),
packet_counter_(0),
memif_connection_({}),
tx_buf_counter_(0),
createMemif(memif_id, memif_mode, nullptr);
+ work_ = std::make_unique<asio::io_service::work>(io_service_);
+
while (is_connecting_) {
MemifConnector::main_event_reactor_.runOneEvent();
}
if (!closed_) {
closed_ = true;
event_reactor_.stop();
- io_service_.stop();
+ work_.reset();
if (memif_worker_ && memif_worker_->joinable()) {
memif_worker_->join();
virtual void onInterest(Interest::Ptr &&i) = 0;
};
- Portal() : Portal(internal_io_service_) {
- internal_work_ = std::make_unique<asio::io_service::work>(io_service_);
- }
+ Portal() : Portal(internal_io_service_) {}
Portal(asio::io_service &io_service)
: io_service_(io_service),
}
~Portal() {
- connector_.close();
- stopEventsLoop();
+ stopEventsLoop(true);
}
TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) {
forwarder_interface_.send(content_object);
}
- TRANSPORT_ALWAYS_INLINE void stopEventsLoop() {
- internal_work_.reset();
-
+ TRANSPORT_ALWAYS_INLINE void stopEventsLoop(bool kill_connection = false) {
for (auto &pend_interest : pending_interest_hash_table_) {
pend_interest.second->cancelTimer();
}
clear();
+ if(kill_connection) {
+ connector_.close();
+ }
+
io_service_.post([this]() { io_service_.stop(); });
}
private:
asio::io_service &io_service_;
asio::io_service internal_io_service_;
- std::unique_ptr<asio::io_service::work> internal_work_;
std::string app_name_;
is_connecting_(false),
is_reconnection_(false),
data_available_(false),
+ is_closed_(false),
receive_callback_(receive_callback),
on_reconnect_callback_(on_reconnect_callback),
app_name_(app_name) {}
}
void SocketConnector::close() {
- io_service_.post([this]() { socket_.close(); });
+ io_service_.dispatch([this]() {
+ is_closed_ = true;
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ });
}
void SocketConnector::doWrite() {
if (!output_buffer_.empty()) {
doWrite();
}
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
} else {
TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
tryReconnect();
if (TRANSPORT_EXPECT_TRUE(!ec)) {
receive_callback_(std::move(read_msg_));
doReadHeader();
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
} else {
TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
tryReconnect();
} else {
TRANSPORT_LOGE("Decoding error. Ignoring packet.");
}
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
} else {
TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
tryReconnect();
}
void SocketConnector::tryReconnect() {
- if (!is_connecting_) {
+ if (!is_connecting_ && !is_closed_) {
TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
is_connecting_ = true;
is_reconnection_ = true;
io_service_.post([this]() {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
socket_.close();
startConnectionTimer();
doConnect();
bool is_connecting_;
bool is_reconnection_;
bool data_available_;
+ bool is_closed_;
PacketReceivedCallback receive_callback_;
OnReconnect on_reconnect_callback_;
transport_protocol_->start(receive_buffer);
- return CONSUMER_READY;
+ return CONSUMER_FINISHED;
}
int ConsumerSocket::asyncConsume(
});
}
- return CONSUMER_READY;
+ return CONSUMER_RUNNING;
}
void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest,
#include <hicn/transport/utils/event_thread.h>
#include <hicn/transport/utils/sharable_vector.h>
-#define CONSUMER_READY 0
+#define CONSUMER_FINISHED 0
#define CONSUMER_BUSY 1
+#define CONSUMER_RUNNING 2
namespace transport {
}
ProducerSocket::~ProducerSocket() {
- TRANSPORT_LOGI("Destroying the ProducerSocket");
+
processing_thread_stop_ = true;
- portal_->stopEventsLoop();
+ portal_->stopEventsLoop(true);
if (processing_thread_.joinable()) {
processing_thread_.join();
}
void ProducerSocket::stop() {
- TRANSPORT_LOGI("Calling stop for ProducerSocket");
- portal_->killConnection();
portal_->stopEventsLoop();
}
class Client : interface::BasePortal::ConsumerCallback {
public:
- Client(Configuration *c) : portal_() {
+ Client(Configuration *c)
+ : portal_(),
+ signals_(portal_.getIoService(), SIGINT, SIGQUIT) {
// Let the main thread to catch SIGINT and SIGQUIT
- // asio::signal_set signals(io_service, SIGINT, SIGQUIT);
- // signals.async_wait(std::bind(&Client::afterSignal, this));
-
portal_.connect();
portal_.setConsumerCallback(this);
+
+ signals_.async_wait(std::bind(&Client::afterSignal, this));
+
timer_.reset(new asio::steady_timer(portal_.getIoService()));
config_ = c;
sequence_number_ = config_->first_suffix_;
std::cout << "Stop ping" << std::endl;
std::cout << "Sent: " << sent_ << " Received: " << received_
<< " Timeouts: " << timedout_ << std::endl;
- portal_.stopEventsLoop();
+ portal_.stopEventsLoop(true);
}
void reset() {
private:
SendTimeMap send_timestamps_;
interface::BasePortal portal_;
+ asio::signal_set signals_;
uint64_t sequence_number_;
uint64_t last_jump_;
uint64_t processed_;