+
+ return req.ch.receiveReplyInternal(msg, req.seqNum)
+}
+
+func (sub *subscriptionCtx) Unsubscribe() error {
+ log.WithFields(logrus.Fields{
+ "msg_name": sub.event.GetMessageName(),
+ "msg_id": sub.msgID,
+ }).Debug("Removing notification subscription.")
+
+ // remove the subscription from the map
+ sub.ch.conn.subscriptionsLock.Lock()
+ defer sub.ch.conn.subscriptionsLock.Unlock()
+
+ for i, item := range sub.ch.conn.subscriptions[sub.msgID] {
+ if item == sub {
+ // remove i-th item in the slice
+ sub.ch.conn.subscriptions[sub.msgID] = append(sub.ch.conn.subscriptions[sub.msgID][:i], sub.ch.conn.subscriptions[sub.msgID][i+1:]...)
+ return nil
+ }
+ }
+
+ return fmt.Errorf("subscription for %q not found", sub.event.GetMessageName())