/*
 * Decompiled with CFR 0.152.
 */
package de.rcenvironment.core.component.workflow.execution.api;

import de.rcenvironment.core.communication.api.CommunicationService;
import de.rcenvironment.core.communication.common.InstanceNodeSessionId;
import de.rcenvironment.core.communication.common.NodeIdentifierUtils;
import de.rcenvironment.core.communication.common.ResolvableNodeId;
import de.rcenvironment.core.communication.management.WorkflowHostService;
import de.rcenvironment.core.component.workflow.execution.api.GenericSubscriptionEventProcessor;
import de.rcenvironment.core.notification.NotificationSubscriber;
import de.rcenvironment.core.notification.SimpleNotificationService;
import de.rcenvironment.core.toolkitbridge.transitional.ConcurrencyUtils;
import de.rcenvironment.core.utils.common.StringUtils;
import de.rcenvironment.core.utils.common.rpc.RemoteOperationException;
import de.rcenvironment.core.utils.incubator.DebugSettings;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncExceptionListener;
import de.rcenvironment.toolkit.modules.concurrency.api.CallablesGroup;
import de.rcenvironment.toolkit.modules.concurrency.api.TaskDescription;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class GenericSubscriptionManager {
    private static final String NOTIFICATION_PATTERN_WILDCARD = ".*";
    private final GenericSubscriptionEventProcessor eventProcessor;
    private final WorkflowHostService workflowHostService;
    private final CommunicationService communicationService;
    private final Set<String> subscribedIds = new HashSet<String>();
    private final boolean verboseLogging = DebugSettings.getVerboseLoggingEnabled(this.getClass());
    private final Log log = LogFactory.getLog(this.getClass());

    public GenericSubscriptionManager(GenericSubscriptionEventProcessor eventProcessor, CommunicationService communicationService, WorkflowHostService workflowHostService) {
        this.eventProcessor = eventProcessor;
        this.communicationService = communicationService;
        this.workflowHostService = workflowHostService;
    }

    private Set<String> updateSubscribedIds() {
        HashSet<String> currentIdsToSubscribe = new HashSet<String>();
        Set allWorkflowNodes = this.workflowHostService.getWorkflowHostNodesAndSelf();
        for (InstanceNodeSessionId wfNode : allWorkflowNodes) {
            String id = wfNode.getInstanceNodeSessionIdString();
            currentIdsToSubscribe.add(id);
        }
        HashSet<String> missingSubscribed = new HashSet<String>(currentIdsToSubscribe);
        missingSubscribed.removeAll(this.subscribedIds);
        this.subscribedIds.retainAll(currentIdsToSubscribe);
        return missingSubscribed;
    }

    public synchronized void updateSubscriptionsForPrefixes(String[] notificationIdPrefixes) {
        Set<String> missingSubscribedIds = this.updateSubscribedIds();
        final SimpleNotificationService sns = new SimpleNotificationService();
        CallablesGroup callablesGroup = ConcurrencyUtils.getFactory().createCallablesGroup(Void.class);
        for (final String missingId : missingSubscribedIds) {
            final InstanceNodeSessionId targetWorkflowHostNode = NodeIdentifierUtils.parseInstanceNodeSessionIdStringWithExceptionWrapping((String)missingId);
            String[] stringArray = notificationIdPrefixes;
            int n = notificationIdPrefixes.length;
            int n2 = 0;
            while (n2 < n) {
                final String notificationIdPrefix = stringArray[n2];
                callablesGroup.add((Callable)new Callable<Void>(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    @TaskDescription(value="Distributed console/input model notification subscriptions")
                    public Void call() throws Exception {
                        Map lastMissedNumbers = sns.subscribe(StringUtils.format((String)"%s%s:.*", (Object[])new Object[]{notificationIdPrefix, targetWorkflowHostNode.getInstanceNodeIdString()}), (NotificationSubscriber)GenericSubscriptionManager.this.eventProcessor, (ResolvableNodeId)targetWorkflowHostNode);
                        GenericSubscriptionManager.this.retrieveMissedNotifications(sns, targetWorkflowHostNode, lastMissedNumbers);
                        Set set = GenericSubscriptionManager.this.subscribedIds;
                        synchronized (set) {
                            GenericSubscriptionManager.this.subscribedIds.add(missingId);
                        }
                        return null;
                    }
                });
                ++n2;
            }
        }
        callablesGroup.executeParallel(new AsyncExceptionListener(){

            public void onAsyncException(Exception e1) {
                Throwable e = e1.getClass() == ExecutionException.class && e1.getCause() != null ? e1.getCause() : e1;
                if (e.getCause() == null) {
                    GenericSubscriptionManager.this.log.warn((Object)("Asynchronous exception during parallel console/input model notification subscriptions: " + e.toString()));
                } else {
                    GenericSubscriptionManager.this.log.error((Object)"Asynchronous exception during parallel console/input model notification subscriptions", e);
                }
            }
        });
    }

    private void retrieveMissedNotifications(SimpleNotificationService sns, InstanceNodeSessionId targetNode, Map<String, Long> lastMissedNumbers) throws RemoteOperationException {
        for (String notifId : lastMissedNumbers.keySet()) {
            Long lastMissedNumber = lastMissedNumbers.get(notifId);
            if (lastMissedNumber == -1L) continue;
            this.eventProcessor.setNumberOfLastMissingNotification(notifId, targetNode.getInstanceNodeSessionIdString(), lastMissedNumber);
            if (this.verboseLogging) {
                this.log.debug((Object)StringUtils.format((String)"Starting to fetch stored notifications for id %s from node %s", (Object[])new Object[]{notifId, targetNode}));
            }
            Map storedNotifications = sns.getNotifications(notifId, (ResolvableNodeId)targetNode);
            if (this.verboseLogging) {
                this.log.debug((Object)StringUtils.format((String)"Received %d stored notification entries for id %s from node %s", (Object[])new Object[]{storedNotifications.size(), notifId, targetNode}));
                for (Map.Entry entry : storedNotifications.entrySet()) {
                    this.log.debug((Object)StringUtils.format((String)"  Received %d notifications for topic %s", (Object[])new Object[]{((List)entry.getValue()).size(), entry.getKey()}));
                }
            }
            for (List list : storedNotifications.values()) {
                this.eventProcessor.receiveBatchedNotifications(list);
            }
        }
    }
}

