HONEYCOMB-106 - Support for generic cache management
[honeycomb.git] / infra / notification / impl / src / main / java / io / fd / honeycomb / v3po / notification / impl / NotificationProducerTracker.java
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.fd.honeycomb.v3po.notification.impl;
17
18 import static com.google.common.base.Preconditions.checkState;
19
20 import com.google.common.collect.Sets;
21 import io.fd.honeycomb.v3po.notification.ManagedNotificationProducer;
22 import io.fd.honeycomb.v3po.notification.NotificationCollector;
23 import java.util.HashSet;
24 import java.util.Set;
25 import java.util.stream.Collectors;
26 import javax.annotation.Nonnull;
27 import javax.annotation.concurrent.ThreadSafe;
28 import org.opendaylight.controller.md.sal.dom.spi.DOMNotificationSubscriptionListener;
29 import org.opendaylight.controller.md.sal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
30 import org.opendaylight.yangtools.concepts.ListenerRegistration;
31 import org.opendaylight.yangtools.yang.common.QName;
32 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Starts & stops notification producer dependencies on demand.
38  * Uses {@link DOMNotificationSubscriptionListenerRegistry} to receive subscription change notifications.
39  */
40 @ThreadSafe
41 public final class NotificationProducerTracker
42     implements DOMNotificationSubscriptionListener, AutoCloseable {
43
44     private static final Logger LOG = LoggerFactory.getLogger(NotificationProducerTracker.class);
45
46     private final ListenerRegistration<NotificationProducerTracker> subscriptionListener;
47     private final NotificationProducerRegistry registry;
48     private final NotificationCollector collector;
49
50     private final Set<ManagedNotificationProducer> alreadyStartedProducers = new HashSet<>();
51
52     public NotificationProducerTracker(@Nonnull final NotificationProducerRegistry registry,
53                                        @Nonnull final NotificationCollector collector,
54                                        @Nonnull final DOMNotificationSubscriptionListenerRegistry notificationRouter) {
55         this.registry = registry;
56         this.collector = collector;
57         this.subscriptionListener = notificationRouter.registerSubscriptionListener(this);
58     }
59
60     @Override
61     public synchronized void onSubscriptionChanged(final Set<SchemaPath> set) {
62         LOG.debug("Subscriptions changed. Current subscriptions: {}", set);
63         final Set<QName> currentSubscriptions = set.stream().map(SchemaPath::getLastComponent).collect(Collectors.toSet());
64         final Set<QName> startedQNames = getStartedQNames(alreadyStartedProducers);
65         final Sets.SetView<QName> newSubscriptions = Sets.difference(currentSubscriptions, startedQNames);
66         LOG.debug("Subscriptions changed. New subscriptions: {}", newSubscriptions);
67         final Sets.SetView<QName> deletedSubscriptions = Sets.difference(startedQNames, currentSubscriptions);
68         LOG.debug("Subscriptions changed. Deleted subscriptions: {}", deletedSubscriptions);
69
70         newSubscriptions.stream().forEach(newSub -> {
71             if(!registry.getNotificationQNameToProducer().containsKey(newSub)) {
72                 return;
73             }
74             final ManagedNotificationProducer producer = registry.getNotificationQNameToProducer().get(newSub);
75             if(alreadyStartedProducers.contains(producer)) {
76                 return;
77             }
78             LOG.debug("Starting notification producer: {}", producer);
79             producer.start(collector);
80             alreadyStartedProducers.add(producer);
81         });
82
83         deletedSubscriptions.stream().forEach(newSub -> {
84             checkState(registry.getNotificationQNameToProducer().containsKey(newSub));
85             final ManagedNotificationProducer producer = registry.getNotificationQNameToProducer().get(newSub);
86             checkState(alreadyStartedProducers.contains(producer));
87             LOG.debug("Stopping notification producer: {}", producer);
88             producer.stop();
89             alreadyStartedProducers.remove(producer);
90         });
91
92     }
93
94     private Set<QName> getStartedQNames(final Set<ManagedNotificationProducer> alreadyStartedProducers) {
95         return alreadyStartedProducers.stream()
96             .flatMap(p -> registry.getNotificationProducerQNames().get(p).stream())
97             .collect(Collectors.toSet());
98     }
99
100     @Override
101     public synchronized void close() throws Exception {
102         LOG.trace("Closing");
103         subscriptionListener.close();
104         // Stop all producers
105         LOG.debug("Stopping all producers: {}", alreadyStartedProducers);
106         alreadyStartedProducers.forEach(ManagedNotificationProducer::stop);
107         alreadyStartedProducers.clear();
108     }
109 }