/*
 * Decompiled with CFR 0.152.
 */
package de.rcenvironment.core.communication.transport.jms.common;

import de.rcenvironment.core.communication.channel.MessageChannelState;
import de.rcenvironment.core.communication.common.InstanceNodeSessionId;
import de.rcenvironment.core.communication.model.NetworkRequest;
import de.rcenvironment.core.communication.model.NetworkResponse;
import de.rcenvironment.core.communication.protocol.NetworkResponseFactory;
import de.rcenvironment.core.communication.transport.jms.common.JmsMessageChannel;
import de.rcenvironment.core.communication.transport.jms.common.JmsProtocolUtils;
import de.rcenvironment.core.communication.transport.jms.common.NonBlockingResponseInboxConsumer;
import de.rcenvironment.core.communication.transport.spi.AbstractMessageChannel;
import de.rcenvironment.core.communication.transport.spi.MessageChannel;
import de.rcenvironment.core.communication.transport.spi.MessageChannelResponseHandler;
import de.rcenvironment.core.toolkitbridge.transitional.ConcurrencyUtils;
import de.rcenvironment.core.toolkitbridge.transitional.StatsCounter;
import de.rcenvironment.core.utils.common.LogUtils;
import de.rcenvironment.core.utils.common.StringUtils;
import de.rcenvironment.core.utils.incubator.DebugSettings;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncTaskService;
import de.rcenvironment.toolkit.modules.concurrency.api.TaskDescription;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public abstract class AbstractJmsMessageChannel
extends AbstractMessageChannel
implements JmsMessageChannel {
    protected final AsyncTaskService threadPool = ConcurrencyUtils.getAsyncTaskService();
    protected Connection connection;
    protected InstanceNodeSessionId localNodeId;
    protected final Log log = LogFactory.getLog(this.getClass());
    private String outgoingRequestQueueName;
    private String shutdownSecurityToken;
    private String sharedResponseQueueName;
    private RequestSender requestSender;
    private NonBlockingResponseInboxConsumer responseInboxConsumer;
    private final boolean verboseRequestLoggingEnabled = DebugSettings.getVerboseLoggingEnabled((String)"NetworkRequests");

    public AbstractJmsMessageChannel(InstanceNodeSessionId localNodeId) {
        this.localNodeId = localNodeId;
    }

    public void sendRequest(NetworkRequest request, MessageChannelResponseHandler responseHandler, int timeoutMsec) {
        this.requestSender.enqueue(request, responseHandler, timeoutMsec);
    }

    protected void sendShutdownMessageToRemoteRequestInbox() {
        try (Session session = this.connection.createSession(false, 1);){
            Queue destinationQueue = session.createQueue(this.outgoingRequestQueueName);
            Message shutdownMessage = JmsProtocolUtils.createChannelShutdownMessage(session, this.getChannelId(), this.shutdownSecurityToken);
            session.createProducer((Destination)destinationQueue).send(shutdownMessage);
        }
        catch (JMSException e) {
            this.log.debug((Object)("Failed to send shutdown message while closing channel " + this.getChannelId()), (Throwable)e);
        }
    }

    protected void asyncSendShutdownMessageToB2CJmsQueue() throws JMSException {
        this.threadPool.execute(new Runnable(){

            @Override
            @TaskDescription(value="JMS Network Transport: Send shutdown signal to Client-to-Broker queue")
            public void run() {
                block5: {
                    try (Session session = AbstractJmsMessageChannel.this.connection.createSession(false, 1);){
                        Queue destinationQueue = session.createQueue(AbstractJmsMessageChannel.this.outgoingRequestQueueName);
                        Message shutdownMessage = JmsProtocolUtils.createQueueShutdownMessage(session, AbstractJmsMessageChannel.this.shutdownSecurityToken);
                        session.createProducer((Destination)destinationQueue).send(shutdownMessage);
                    }
                    catch (JMSException e) {
                        String message = e.toString();
                        if (message.contains("")) break block5;
                        AbstractJmsMessageChannel.this.log.warn((Object)StringUtils.format((String)"Exception on sending shutdown signal to Client-to-Broker JMS queue %s: %s", (Object[])new Object[]{AbstractJmsMessageChannel.this.outgoingRequestQueueName, message}));
                    }
                }
            }
        });
    }

    @Override
    public String getOutgoingRequestQueueName() {
        return this.outgoingRequestQueueName;
    }

    @Override
    public void setupNonBlockingRequestSending(String outgoingRequestQueue, String incomingResponseQueue) throws JMSException {
        this.log.debug((Object)StringUtils.format((String)"Setting outgoing request queue for channel %s to %s", (Object[])new Object[]{this.getChannelId(), outgoingRequestQueue}));
        this.outgoingRequestQueueName = outgoingRequestQueue;
        this.startRequestSender(StringUtils.format((String)"Request Sender for channel %s @ %s", (Object[])new Object[]{this.getChannelId(), outgoingRequestQueue}));
        this.log.debug((Object)StringUtils.format((String)"Setting incoming response queue for channel %s to %s", (Object[])new Object[]{this.getChannelId(), incomingResponseQueue}));
        this.sharedResponseQueueName = incomingResponseQueue;
        this.startResponseConsumer(StringUtils.format((String)"Response Inbox Consumer for channel %s @ %s", (Object[])new Object[]{this.getChannelId(), incomingResponseQueue}));
    }

    protected void onClosedOrBroken() {
        if (this.requestSender != null) {
            this.requestSender.shutdown();
        }
        try {
            if (this.responseInboxConsumer != null) {
                this.responseInboxConsumer.triggerShutDown();
            }
        }
        catch (JMSException e) {
            this.log.warn((Object)("Error while shutting down response consumer for channel " + this.getChannelId()), (Throwable)e);
        }
    }

    private void startRequestSender(String taskName) throws JMSException {
        this.requestSender = new RequestSender(this.outgoingRequestQueueName, this.connection);
        this.threadPool.execute((Runnable)this.requestSender, taskName);
    }

    private void startResponseConsumer(String taskName) throws JMSException {
        this.responseInboxConsumer = new NonBlockingResponseInboxConsumer(this.sharedResponseQueueName, this.connection);
        this.threadPool.execute((Runnable)this.responseInboxConsumer, taskName);
    }

    @Override
    public void setShutdownSecurityToken(String shutdownSecurityToken) {
        this.shutdownSecurityToken = shutdownSecurityToken;
    }

    protected String getShutdownSecurityToken() {
        return this.shutdownSecurityToken;
    }

    private void spawnBlockingRequestResponseTask(final NetworkRequest request, final MessageChannelResponseHandler responseHandler, final int timeoutMsec) {
        this.threadPool.execute(new Runnable(){

            @Override
            @TaskDescription(value="JMS Network Transport: blocking request/response")
            public void run() {
                if (!AbstractJmsMessageChannel.this.isReadyToUse()) {
                    NetworkResponse response = NetworkResponseFactory.generateResponseForCloseOrBrokenChannelDuringRequestDelivery((NetworkRequest)request, (InstanceNodeSessionId)AbstractJmsMessageChannel.this.localNodeId, null);
                    responseHandler.onResponseAvailable(response);
                    return;
                }
                AbstractJmsMessageChannel.this.performBlockingRequestResponse(request, responseHandler, timeoutMsec);
            }
        }, request.getRequestId());
    }

    private void performBlockingRequestResponse(NetworkRequest request, MessageChannelResponseHandler responseHandler, int timeoutMsec) {
        try (Session session = this.connection.createSession(false, 1);){
            Queue destinationQueue = session.createQueue(this.outgoingRequestQueueName);
            Message jmsRequest = JmsProtocolUtils.createMessageFromNetworkRequest(request, session);
            Message jmsResponse = this.performBlockingJmsRequestResponse(session, jmsRequest, destinationQueue, timeoutMsec);
            NetworkResponse response = JmsProtocolUtils.createNetworkResponseFromMessage(jmsResponse, request);
            responseHandler.onResponseAvailable(response);
        }
        catch (TimeoutException e) {
            this.log.debug((Object)StringUtils.format((String)"Timeout while waiting for response to request '%s' of type '%s': %s", (Object[])new Object[]{request.getRequestId(), request.getMessageType(), e.getMessage()}));
            NetworkResponse response = NetworkResponseFactory.generateResponseForTimeoutWaitingForResponse((NetworkRequest)request, (InstanceNodeSessionId)this.localNodeId);
            responseHandler.onResponseAvailable(response);
        }
        catch (JMSException e) {
            responseHandler.onChannelBroken(request, (MessageChannel)this);
            String errorId = LogUtils.logErrorAndAssignUniqueMarker((Log)this.log, (String)StringUtils.format((String)"Error sending JMS message via channel %s; channel will be marked as broken (exception: %s) ", (Object[])new Object[]{this.getChannelId(), e.toString()}));
            NetworkResponse response = NetworkResponseFactory.generateResponseForErrorDuringDelivery((NetworkRequest)request, (InstanceNodeSessionId)this.localNodeId, (String)errorId);
            responseHandler.onResponseAvailable(response);
        }
    }

    protected final Message performBlockingJmsRequestResponse(Session session, Message message, Queue destinationQueue, int timeoutMsec) throws JMSException, TimeoutException {
        Message message2;
        TemporaryQueue tempResponseQueue = session.createTemporaryQueue();
        try {
            message.setJMSReplyTo((Destination)tempResponseQueue);
            this.sendRequest(session, message, destinationQueue);
            message2 = this.receiveResponse(session, timeoutMsec, tempResponseQueue);
        }
        catch (Throwable throwable) {
            try {
                tempResponseQueue.delete();
            }
            catch (JMSException e) {
                this.log.debug((Object)StringUtils.format((String)"Exception on deleting a temporary response queue for channel %s (%s - %s): %s", (Object[])new Object[]{this.getChannelId(), tempResponseQueue.getQueueName(), this.getState(), e.toString()}));
            }
            throw throwable;
        }
        try {
            tempResponseQueue.delete();
        }
        catch (JMSException e) {
            this.log.debug((Object)StringUtils.format((String)"Exception on deleting a temporary response queue for channel %s (%s - %s): %s", (Object[])new Object[]{this.getChannelId(), tempResponseQueue.getQueueName(), this.getState(), e.toString()}));
        }
        return message2;
    }

    private void sendNonBlockingRequestInTempSession(NetworkRequest request, MessageChannelResponseHandler responseHandler, int timeoutMsec) {
        Session session = null;
        Queue destinationQueue = null;
        try {
            try {
                session = this.connection.createSession(false, 1);
                destinationQueue = session.createQueue(this.outgoingRequestQueueName);
            }
            catch (JMSException e) {
                this.log.error((Object)"Error creating JMS session or destination for message sending", (Throwable)e);
                try {
                    if (session != null) {
                        session.close();
                    }
                }
                catch (JMSException e2) {
                    this.log.error((Object)"Error closing JMS session after message sending", (Throwable)e2);
                    return;
                }
                return;
            }
            this.sendNonBlockingRequest(session, destinationQueue, request, responseHandler, timeoutMsec);
        }
        finally {
            try {
                if (session != null) {
                    session.close();
                }
            }
            catch (JMSException e) {
                this.log.error((Object)"Error closing JMS session after message sending", (Throwable)e);
                return;
            }
        }
    }

    private void sendNonBlockingRequest(Session session, Queue destinationQueue, final NetworkRequest request, final MessageChannelResponseHandler responseHandler, int timeoutMsec) {
        try {
            final int requestPayloadSize = request.getContentBytes().length;
            if (this.verboseRequestLoggingEnabled) {
                this.log.debug((Object)StringUtils.format((String)"Sending request   %s: type %s, payload length %d", (Object[])new Object[]{request.getRequestId(), request.getMessageType(), requestPayloadSize}));
            }
            if (requestPayloadSize >= 0x100000) {
                this.log.debug((Object)StringUtils.format((String)"Sending large network request %s towards recipient %s: type %s, payload length %d", (Object[])new Object[]{request.getRequestId(), request.accessMetaData().getFinalRecipient(), request.getMessageType(), requestPayloadSize}));
            }
            Message jmsRequest = JmsProtocolUtils.createMessageFromNetworkRequest(request, session);
            Queue replyToQueue = session.createQueue(this.sharedResponseQueueName);
            jmsRequest.setJMSReplyTo((Destination)replyToQueue);
            this.sendRequest(session, jmsRequest, destinationQueue);
            String messageId = jmsRequest.getJMSMessageID();
            this.responseInboxConsumer.registerResponseListener(messageId, new NonBlockingResponseInboxConsumer.JmsResponseCallback(){

                @Override
                public void onResponseReceived(Message jmsResponse) {
                    try {
                        NetworkResponse response = JmsProtocolUtils.createNetworkResponseFromMessage(jmsResponse, request);
                        int responsePayloadSize = response.getContentBytes().length;
                        if (AbstractJmsMessageChannel.this.verboseRequestLoggingEnabled) {
                            AbstractJmsMessageChannel.this.log.debug((Object)StringUtils.format((String)"Received response %s from %s: response payload length is %d", (Object[])new Object[]{response.getRequestId(), request.accessMetaData().getFinalRecipient(), responsePayloadSize}));
                        }
                        if (responsePayloadSize >= 0x100000) {
                            AbstractJmsMessageChannel.this.log.debug((Object)StringUtils.format((String)"Received large network response %s from %s: request type was %s, response payload length is %d", (Object[])new Object[]{response.getRequestId(), request.accessMetaData().getFinalRecipient(), request.getMessageType(), responsePayloadSize}));
                        } else if (requestPayloadSize >= 0x100000) {
                            AbstractJmsMessageChannel.this.log.debug((Object)StringUtils.format((String)"Received network response %s for a large request sent to %s: request type was %s, response payload length is %d", (Object[])new Object[]{response.getRequestId(), request.accessMetaData().getFinalRecipient(), request.getMessageType(), responsePayloadSize}));
                        }
                        responseHandler.onResponseAvailable(response);
                    }
                    catch (JMSException e) {
                        String errorId = LogUtils.logExceptionWithStacktraceAndAssignUniqueMarker((Log)AbstractJmsMessageChannel.this.log, (String)"JMS exception while parsing response message", (Throwable)e);
                        NetworkResponse response = NetworkResponseFactory.generateResponseForErrorDuringDelivery((NetworkRequest)request, (InstanceNodeSessionId)AbstractJmsMessageChannel.this.localNodeId, (String)errorId);
                        responseHandler.onResponseAvailable(response);
                    }
                }

                @Override
                public void onTimeoutReached() {
                    AbstractJmsMessageChannel.this.log.debug((Object)StringUtils.format((String)"Timeout reached while waiting for response to request '%s' of type '%s'", (Object[])new Object[]{request.getRequestId(), request.getMessageType()}));
                    NetworkResponse response = NetworkResponseFactory.generateResponseForTimeoutWaitingForResponse((NetworkRequest)request, (InstanceNodeSessionId)AbstractJmsMessageChannel.this.localNodeId);
                    responseHandler.onResponseAvailable(response);
                }

                @Override
                public void onChannelClosed() {
                    AbstractJmsMessageChannel.this.log.debug((Object)StringUtils.format((String)"Message channel closed while waiting for response to request '%s' of type '%s'", (Object[])new Object[]{request.getRequestId(), request.getMessageType()}));
                    NetworkResponse response = NetworkResponseFactory.generateResponseForChannelCloseWhileWaitingForResponse((NetworkRequest)request, (InstanceNodeSessionId)AbstractJmsMessageChannel.this.localNodeId, null);
                    responseHandler.onResponseAvailable(response);
                }
            }, timeoutMsec);
        }
        catch (JMSException e) {
            responseHandler.onChannelBroken(request, (MessageChannel)this);
            String errorId = LogUtils.logErrorAndAssignUniqueMarker((Log)this.log, (String)StringUtils.format((String)"Error sending JMS message via channel %s; channel will be marked as broken (exception: %s) ", (Object[])new Object[]{this.getChannelId(), e.toString()}));
            NetworkResponse response = NetworkResponseFactory.generateResponseForErrorDuringDelivery((NetworkRequest)request, (InstanceNodeSessionId)this.localNodeId, (String)errorId);
            responseHandler.onResponseAvailable(response);
        }
    }

    private void sendRequest(Session session, Message message, Queue destinationQueue) throws JMSException {
        JmsProtocolUtils.sendWithTransientProducer(session, message, (Destination)destinationQueue);
    }

    private Message receiveResponse(Session session, int timeoutMsec, TemporaryQueue tempResponseQueue) throws JMSException, TimeoutException {
        try (MessageConsumer consumer = session.createConsumer((Destination)tempResponseQueue);){
            Message response = consumer.receive((long)timeoutMsec);
            if (response != null) {
                Message message = response;
                return message;
            }
            MessageChannelState currentState = this.getState();
            if (currentState == MessageChannelState.CLOSED || currentState == MessageChannelState.MARKED_AS_BROKEN) {
                throw new TimeoutException(StringUtils.format((String)"Received JMS exception while waiting for a response from message channel %s (on queue %s), which is already %s", (Object[])new Object[]{this.getChannelId(), tempResponseQueue.getQueueName(), currentState}));
            }
            throw new TimeoutException(StringUtils.format((String)"Timeout (%d ms) exceeded while waiting for a response from message channel %s (on queue %s), which is in state %s", (Object[])new Object[]{timeoutMsec, this.getChannelId(), tempResponseQueue.getQueueName(), currentState}));
        }
    }

    private final class RequestSender
    implements Runnable {
        private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue();
        private Session jmsSession;
        private Queue jmsDestinationQueue;
        private volatile boolean cancelled = false;

        RequestSender(String queueName, Connection connection) {
        }

        @Override
        @TaskDescription(value="JMS Network Transport: Message channel request sender")
        public void run() {
            try {
                try {
                    this.jmsSession = AbstractJmsMessageChannel.this.connection.createSession(false, 1);
                    this.jmsDestinationQueue = this.jmsSession.createQueue(AbstractJmsMessageChannel.this.outgoingRequestQueueName);
                }
                catch (JMSException e) {
                    AbstractJmsMessageChannel.this.log.error((Object)"Error creating JMS session or destination for request sender loop", (Throwable)e);
                    try {
                        if (this.jmsSession != null) {
                            this.jmsSession.close();
                        }
                    }
                    catch (JMSException e2) {
                        AbstractJmsMessageChannel.this.log.error((Object)"Error closing JMS session after running request sender loop", (Throwable)e2);
                        return;
                    }
                    return;
                }
                this.runDispatchLoop();
            }
            finally {
                try {
                    if (this.jmsSession != null) {
                        this.jmsSession.close();
                    }
                }
                catch (JMSException e) {
                    AbstractJmsMessageChannel.this.log.error((Object)"Error closing JMS session after running request sender loop", (Throwable)e);
                    return;
                }
            }
        }

        private void runDispatchLoop() {
            while (true) {
                Runnable nextTask;
                try {
                    nextTask = this.queue.take();
                }
                catch (InterruptedException interruptedException) {
                    AbstractJmsMessageChannel.this.log.warn((Object)"Request sender interrupted; shutting down");
                    return;
                }
                if (this.cancelled) {
                    AbstractJmsMessageChannel.this.log.debug((Object)"Clean request sender shutdown");
                    return;
                }
                nextTask.run();
            }
        }

        void enqueue(final NetworkRequest request, final MessageChannelResponseHandler responseHandler, final int timeoutMsec) {
            final long startTime = System.currentTimeMillis();
            this.queue.add(new Runnable(){
                private static final int SIZE_CATEGORY_DIVISOR = 102400;

                @Override
                public void run() {
                    AbstractJmsMessageChannel.this.sendNonBlockingRequest(RequestSender.this.jmsSession, RequestSender.this.jmsDestinationQueue, request, responseHandler, timeoutMsec);
                    int rangeValue = request.getContentBytes().length / 102400;
                    String categoryString = StringUtils.format((String)"Size range: %1$s00..%1$s99 kiB", (Object[])new Object[]{rangeValue});
                    StatsCounter.registerValue((String)"Messaging: Outgoing request queue transit time", (String)categoryString, (long)(System.currentTimeMillis() - startTime));
                }
            });
        }

        public void shutdown() {
            int numDiscarded = this.queue.size();
            this.queue.clear();
            this.cancelled = true;
            this.enqueue(null, null, 0);
            if (numDiscarded != 0) {
                AbstractJmsMessageChannel.this.log.debug((Object)StringUtils.format((String)"Discarded %d pending requests for %s as channel %s is shutting down", (Object[])new Object[]{numDiscarded, AbstractJmsMessageChannel.this.getRemoteNodeInformation().getInstanceNodeSessionId(), AbstractJmsMessageChannel.this.getChannelId()}));
            }
        }
    }
}

