/*
 * Decompiled with CFR 0.152.
 */
package de.rcenvironment.core.component.execution.internal;

import de.rcenvironment.core.communication.api.CommunicationService;
import de.rcenvironment.core.communication.api.PlatformService;
import de.rcenvironment.core.communication.common.LogicalNodeId;
import de.rcenvironment.core.communication.common.ResolvableNodeId;
import de.rcenvironment.core.component.execution.api.ComponentExecutionController;
import de.rcenvironment.core.component.execution.api.EndpointDatumSerializer;
import de.rcenvironment.core.component.execution.api.ExecutionControllerException;
import de.rcenvironment.core.component.execution.api.LocalExecutionControllerUtilsService;
import de.rcenvironment.core.component.execution.api.RemotableComponentExecutionControllerService;
import de.rcenvironment.core.component.execution.api.RemotableEndpointDatumDispatcher;
import de.rcenvironment.core.component.execution.internal.ComponentExecutionUtils;
import de.rcenvironment.core.component.execution.internal.EndpointDatumDispatcher;
import de.rcenvironment.core.component.model.endpoint.api.EndpointDatum;
import de.rcenvironment.core.toolkitbridge.transitional.ConcurrencyUtils;
import de.rcenvironment.core.utils.common.StringUtils;
import de.rcenvironment.core.utils.common.rpc.RemoteOperationException;
import de.rcenvironment.core.utils.common.security.AllowRemoteAccess;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncCallbackExceptionPolicy;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncOrderedExecutionQueue;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncTaskService;
import java.lang.ref.WeakReference;
import java.util.Map;
import org.apache.commons.collections4.map.LRUMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.framework.BundleContext;

public class EndpointDatumDispatcherImpl
implements EndpointDatumDispatcher,
RemotableEndpointDatumDispatcher {
    private static final String FAILED_TO_SEND_ENDPOINT_DATUM = "Failed to send endpoint datum %s";
    private static final Log LOG = LogFactory.getLog(EndpointDatumDispatcherImpl.class);
    private static final int CACHE_SIZE = 20;
    private AsyncTaskService threadPool = ConcurrencyUtils.getAsyncTaskService();
    private AsyncOrderedExecutionQueue executionQueue = ConcurrencyUtils.getFactory().createAsyncOrderedExecutionQueue(AsyncCallbackExceptionPolicy.LOG_AND_PROCEED);
    private Map<String, WeakReference<ComponentExecutionController>> compExeCtrls = new LRUMap(20);
    private BundleContext bundleContext;
    private CommunicationService communicationService;
    private LocalExecutionControllerUtilsService exeCtrlUtilsService;
    private PlatformService platformService;
    private EndpointDatumSerializer endpointDatumSerializer;

    protected void activate(BundleContext context) {
        this.bundleContext = context;
    }

    @Override
    public void dispatchEndpointDatum(final EndpointDatum endpointDatum) {
        final String executionId = endpointDatum.getInputsComponentExecutionIdentifier();
        this.executionQueue.enqueue(new Runnable(){

            @Override
            public void run() {
                if (EndpointDatumDispatcherImpl.this.platformService.matchesLocalInstance((ResolvableNodeId)endpointDatum.getInputsNodeId())) {
                    EndpointDatumDispatcherImpl.this.processEndpointDatum(executionId, endpointDatum);
                } else if (EndpointDatumDispatcherImpl.this.communicationService.getReachableLogicalNodes().contains(endpointDatum.getInputsNodeId().convertToDefaultLogicalNodeId())) {
                    EndpointDatumDispatcherImpl.this.forwardEndpointDatum(endpointDatum.getInputsNodeId(), endpointDatum);
                } else if (!EndpointDatumDispatcherImpl.this.platformService.matchesLocalInstance((ResolvableNodeId)endpointDatum.getWorkflowNodeId())) {
                    EndpointDatumDispatcherImpl.this.forwardEndpointDatum(endpointDatum.getWorkflowNodeId(), endpointDatum);
                } else {
                    this.tryToForwardEndpointDatumToActualTarget();
                }
            }

            private void tryToForwardEndpointDatumToActualTarget() {
                block2: {
                    int failureCount = 0;
                    while (true) {
                        if (EndpointDatumDispatcherImpl.this.communicationService.getReachableLogicalNodes().contains(endpointDatum.getInputsNodeId())) {
                            EndpointDatumDispatcherImpl.this.forwardEndpointDatum(endpointDatum.getInputsNodeId(), endpointDatum);
                            ComponentExecutionUtils.logCallbackSuccessAfterFailure(LOG, StringUtils.format((String)"Sending endpoint datum %s", (Object[])new Object[]{endpointDatum}), failureCount);
                            break block2;
                        }
                        if (++failureCount >= 5) break;
                        ComponentExecutionUtils.waitForRetryAfterCallbackFailure(LOG, failureCount, StringUtils.format((String)EndpointDatumDispatcherImpl.FAILED_TO_SEND_ENDPOINT_DATUM, (Object[])new Object[]{endpointDatum}), "Target node not reachable: " + endpointDatum.getInputsNodeId());
                    }
                    RemoteOperationException e = new RemoteOperationException("Target node not reachable: " + endpointDatum.getInputsNodeId());
                    ComponentExecutionUtils.logCallbackFailureAfterRetriesExceeded(LOG, StringUtils.format((String)EndpointDatumDispatcherImpl.FAILED_TO_SEND_ENDPOINT_DATUM, (Object[])new Object[]{endpointDatum}), (Exception)((Object)e));
                    EndpointDatumDispatcherImpl.this.callbackComponentExecutionController(endpointDatum, e);
                }
            }
        });
    }

    @Override
    @AllowRemoteAccess
    public void dispatchEndpointDatum(String serializedEndpointDatum) {
        this.dispatchEndpointDatum(this.endpointDatumSerializer.deserializeEndpointDatum(serializedEndpointDatum));
    }

    protected void forwardEndpointDatum(LogicalNodeId node, EndpointDatum endpointDatum) {
        RemotableEndpointDatumDispatcher dispatcher = (RemotableEndpointDatumDispatcher)this.communicationService.getRemotableService(RemotableEndpointDatumDispatcher.class, (ResolvableNodeId)node);
        try {
            dispatcher.dispatchEndpointDatum(this.endpointDatumSerializer.serializeEndpointDatum(endpointDatum));
        }
        catch (RemoteOperationException e) {
            this.callbackComponentExecutionController(endpointDatum, e);
        }
    }

    protected void callbackComponentExecutionController(EndpointDatum endpointDatum, RemoteOperationException e) {
        if (this.platformService.matchesLocalInstance((ResolvableNodeId)endpointDatum.getOutputsNodeId())) {
            this.callbackComponentExecutionControllerLocally(endpointDatum, e);
        } else {
            this.callbackComponentExecutionControllerRemotely(endpointDatum, e);
        }
    }

    private void callbackComponentExecutionControllerLocally(EndpointDatum endpointDatum, RemoteOperationException e) {
        String executionId = endpointDatum.getOutputsComponentExecutionIdentifier();
        ComponentExecutionController compExeCtrl = null;
        try {
            compExeCtrl = this.getComponentExecutionController(executionId);
        }
        catch (ExecutionControllerException e1) {
            LOG.warn((Object)StringUtils.format((String)"Failed to announce that sending endpoint datum '%s'; failed cause: %s", (Object[])new Object[]{endpointDatum.toString(), e1.toString()}));
            return;
        }
        compExeCtrl.onSendingEndointDatumFailed(endpointDatum, e);
    }

    private void callbackComponentExecutionControllerRemotely(EndpointDatum endpointDatum, RemoteOperationException e) {
        String outputCompExeId = endpointDatum.getOutputsComponentExecutionIdentifier();
        RemotableComponentExecutionControllerService compExeCtrlService = (RemotableComponentExecutionControllerService)this.communicationService.getRemotableService(RemotableComponentExecutionControllerService.class, (ResolvableNodeId)endpointDatum.getOutputsNodeId());
        try {
            compExeCtrlService.onSendingEndointDatumFailed(outputCompExeId, this.endpointDatumSerializer.serializeEndpointDatum(endpointDatum), e);
        }
        catch (ExecutionControllerException | RemoteOperationException e1) {
            LOG.warn((Object)StringUtils.format((String)"Failed to announce that sending endpoint datum '%s' failed; cause: %s", (Object[])new Object[]{endpointDatum, e1.toString()}));
        }
    }

    protected void processEndpointDatum(String executionId, EndpointDatum endpointDatum) {
        ComponentExecutionController compExeCtrl = null;
        try {
            compExeCtrl = this.getComponentExecutionController(executionId);
        }
        catch (ExecutionControllerException e) {
            LOG.warn((Object)StringUtils.format((String)"Endpoint datum '%s' not processed; cause: %s", (Object[])new Object[]{endpointDatum.toString(), e.toString()}));
            return;
        }
        compExeCtrl.onEndpointDatumReceived(endpointDatum);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ComponentExecutionController getComponentExecutionController(String executionId) throws ExecutionControllerException {
        ComponentExecutionController compExeCtrl = null;
        Map<String, WeakReference<ComponentExecutionController>> map = this.compExeCtrls;
        synchronized (map) {
            if (this.compExeCtrls.containsKey(executionId)) {
                compExeCtrl = (ComponentExecutionController)this.compExeCtrls.get(executionId).get();
            }
            if (compExeCtrl == null) {
                compExeCtrl = this.exeCtrlUtilsService.getExecutionController(ComponentExecutionController.class, executionId, this.bundleContext);
                this.compExeCtrls.put(executionId, new WeakReference<ComponentExecutionController>(compExeCtrl));
            }
        }
        return compExeCtrl;
    }

    protected void bindCommunicationService(CommunicationService newService) {
        this.communicationService = newService;
    }

    protected void bindLocalExecutionControllerUtilsService(LocalExecutionControllerUtilsService newService) {
        this.exeCtrlUtilsService = newService;
    }

    protected void bindPlatformService(PlatformService newService) {
        this.platformService = newService;
    }

    protected void bindEndpointDatumSerializer(EndpointDatumSerializer newService) {
        this.endpointDatumSerializer = newService;
    }
}

