2 * Copyright (c) 2016 Cisco and/or its affiliates.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.fd.honeycomb.notification.impl;
18 import static com.google.common.base.Preconditions.checkState;
20 import com.google.common.collect.Sets;
21 import io.fd.honeycomb.notification.ManagedNotificationProducer;
22 import io.fd.honeycomb.notification.NotificationCollector;
23 import java.util.HashSet;
25 import java.util.stream.Collectors;
26 import javax.annotation.Nonnull;
27 import javax.annotation.concurrent.ThreadSafe;
28 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
29 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
30 import org.opendaylight.yangtools.concepts.ListenerRegistration;
31 import org.opendaylight.yangtools.yang.common.QName;
32 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * Starts & stops notification producer dependencies on demand.
38 * Uses {@link DOMNotificationSubscriptionListenerRegistry} to receive subscription change notifications.
41 public final class NotificationProducerTracker
42 implements DOMNotificationSubscriptionListener, AutoCloseable {
44 private static final Logger LOG = LoggerFactory.getLogger(NotificationProducerTracker.class);
46 private final ListenerRegistration<NotificationProducerTracker> subscriptionListener;
47 private final NotificationProducerRegistry registry;
48 private final NotificationCollector collector;
50 private final Set<ManagedNotificationProducer> alreadyStartedProducers = new HashSet<>();
52 public NotificationProducerTracker(@Nonnull final NotificationProducerRegistry registry,
53 @Nonnull final NotificationCollector collector,
54 @Nonnull final DOMNotificationSubscriptionListenerRegistry notificationRouter) {
55 this.registry = registry;
56 this.collector = collector;
57 this.subscriptionListener = notificationRouter.registerSubscriptionListener(this);
61 public synchronized void onSubscriptionChanged(final Set<SchemaPath> set) {
62 LOG.debug("Subscriptions changed. Current subscriptions: {}", set);
63 final Set<QName> currentSubscriptions = set.stream().map(SchemaPath::getLastComponent).collect(Collectors.toSet());
64 final Set<QName> startedQNames = getStartedQNames(alreadyStartedProducers);
65 final Sets.SetView<QName> newSubscriptions = Sets.difference(currentSubscriptions, startedQNames);
66 LOG.debug("Subscriptions changed. New subscriptions: {}", newSubscriptions);
67 final Sets.SetView<QName> deletedSubscriptions = Sets.difference(startedQNames, currentSubscriptions);
68 LOG.debug("Subscriptions changed. Deleted subscriptions: {}", deletedSubscriptions);
70 newSubscriptions.stream().forEach(newSub -> {
71 if(!registry.getNotificationQNameToProducer().containsKey(newSub)) {
74 final ManagedNotificationProducer producer = registry.getNotificationQNameToProducer().get(newSub);
75 if(alreadyStartedProducers.contains(producer)) {
78 LOG.debug("Starting notification producer: {}", producer);
79 producer.start(collector);
80 alreadyStartedProducers.add(producer);
83 deletedSubscriptions.stream().forEach(newSub -> {
84 checkState(registry.getNotificationQNameToProducer().containsKey(newSub));
85 final ManagedNotificationProducer producer = registry.getNotificationQNameToProducer().get(newSub);
86 checkState(alreadyStartedProducers.contains(producer));
87 LOG.debug("Stopping notification producer: {}", producer);
89 alreadyStartedProducers.remove(producer);
94 private Set<QName> getStartedQNames(final Set<ManagedNotificationProducer> alreadyStartedProducers) {
95 return alreadyStartedProducers.stream()
96 .flatMap(p -> registry.getNotificationProducerQNames().get(p).stream())
97 .collect(Collectors.toSet());
101 public synchronized void close() throws Exception {
102 LOG.trace("Closing");
103 subscriptionListener.close();
104 // Stop all producers
105 LOG.debug("Stopping all producers: {}", alreadyStartedProducers);
106 alreadyStartedProducers.forEach(ManagedNotificationProducer::stop);
107 alreadyStartedProducers.clear();