[HICN-232] fix concurrency problem on rtc producer socket 71/20471/1
authormichele papalini <micpapal@cisco.com>
Wed, 3 Jul 2019 07:53:14 +0000 (09:53 +0200)
committermichele papalini <micpapal@cisco.com>
Wed, 3 Jul 2019 07:53:14 +0000 (09:53 +0200)
Change-Id: Ia873aa3c9b6ef4825df88fa05cc1d6dc40bb73a1
Signed-off-by: michele papalini <micpapal@cisco.com>
libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h

index 495f8c8..c726dfd 100644 (file)
 #define INIT_PACKET_PRODUCTION_RATE 100  // pps random value (almost 1Mbps)
 #define STATS_INTERVAL_DURATION 500      // ms
 #define INTEREST_LIFETIME_REDUCTION_FACTOR 0.8
-#define INACTIVE_TIME \
-  100  // ms opus generates ~50 packets per seocnd, one
-       // every
-// 20ms. to be safe we use 20ms*5 as timer for an
-// inactive socket
+#define INACTIVE_TIME 500 //ms without producing before the socket
+                          //is considered inactive
 #define MILLI_IN_A_SEC 1000  // ms in a second
 
 // NACK HEADER
 //   +-----------------------------------------+
 //   | 4 bytes: production rate (bytes x sec)  |
 //   +-----------------------------------------+
-//   may require additional field (Rate for multiple qualities, ...)
 //
 
+// PACKET HEADER
+//   +-----------------------------------------+
+//   | 8 bytes: TIMESTAMP                      |
+//   +-----------------------------------------+
+//   | packet                                  |
+//   +-----------------------------------------+
+
 namespace transport {
 
 namespace interface {
@@ -120,12 +123,15 @@ void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) {
     return;
   }
 
-  active_ = true;
   uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
                      std::chrono::steady_clock::now().time_since_epoch())
                      .count();
 
-  lastProduced_ = now;
+  {
+    utils::SpinLock::Acquire locked(lock_);
+    active_ = true;
+    lastProduced_ = now;
+  }
 
   updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN), now);
 
@@ -154,39 +160,44 @@ void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) {
 void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
   uint32_t interestSeg = interest->getName().getSuffix();
   uint32_t lifetime = interest->getLifetime();
-  uint32_t max_gap;
 
   if (on_interest_input_ != VOID_HANDLER) {
     on_interest_input_(*this, *interest);
   }
 
-  if (active_.load()) {
-    uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+  bool isActive;
+  {
+    utils::SpinLock::Acquire locked(lock_);
+    isActive = active_;
+    if(isActive){
+      uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
                        std::chrono::steady_clock::now().time_since_epoch())
                        .count();
-    uint64_t lastProduced = lastProduced_.load();
-    if (now - lastProduced >= INACTIVE_TIME) {
-      active_ = false;
+      if ((now - lastProduced_) > INACTIVE_TIME) {
+        //socket is inactive
+        active_ = false;
+        isActive =  false;
+      }
     }
   }
 
-  if (TRANSPORT_EXPECT_FALSE(!active_.load())) {
-    sendNack(*interest);
+  if (TRANSPORT_EXPECT_FALSE(!isActive)) {
+    sendNack(*interest, false);
     return;
   }
 
-  max_gap = (uint32_t)floor(
+  uint32_t max_gap = (uint32_t)floor(
       (double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR /
                         1000.0) *
                (double)packetsProductionRate_.load()));
 
   if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) {
-    sendNack(*interest);
+    sendNack(*interest, true);
   }
   // else drop packet
 }
 
-void RTCProducerSocket::sendNack(const Interest &interest) {
+void RTCProducerSocket::sendNack(const Interest &interest, bool isActive) {
   auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE);
   nack_payload->append(NACK_HEADER_SIZE);
   ContentObject nack;
@@ -197,7 +208,7 @@ void RTCProducerSocket::sendNack(const Interest &interest) {
   uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data();
   *payload_ptr = currentSeg_;
 
-  if (active_.load()) {
+  if (isActive) {
     *(++payload_ptr) = bytesProductionRate_;
   } else {
     *(++payload_ptr) = 0;
@@ -215,4 +226,4 @@ void RTCProducerSocket::sendNack(const Interest &interest) {
 
 }  // namespace interface
 
-}  // end namespace transport
\ No newline at end of file
+}  // end namespace transport
index be39d2b..408ce3f 100644 (file)
@@ -41,26 +41,25 @@ class RTCProducerSocket : public ProducerSocket {
   void onInterest(Interest::Ptr &&interest) override;
 
  private:
-  void sendNack(const Interest &interest);
+  void sendNack(const Interest &interest, bool isActvie);
   void updateStats(uint32_t packet_size, uint64_t now);
 
-  // std::map<uint32_t, uint64_t> pendingInterests_;
   uint32_t currentSeg_;
   uint32_t prodLabel_;
   uint16_t headerSize_;
   Name flowName_;
-  // bool produceInSynch_;
   uint32_t producedBytes_;
   uint32_t producedPackets_;
   uint32_t bytesProductionRate_;
   std::atomic<uint32_t> packetsProductionRate_;
   uint32_t perSecondFactor_;
   uint64_t lastStats_;
-  // std::chrono::steady_clock::time_point lastProduced_;
-  std::atomic<uint64_t> lastProduced_;
-  std::atomic<bool> active_;
+
+  uint64_t lastProduced_;
+  bool active_;
+  utils::SpinLock lock_;
 };
 
 }  // namespace interface
 
-}  // end namespace transport
\ No newline at end of file
+}  // end namespace transport