/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.messaging.simp.stomp;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompReactorNettyTcpClient;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.ListenableFutureTask;

public class StompBrokerRelayMessageHandler
extends AbstractBrokerMessageHandler {
    private static final byte[] EMPTY_PAYLOAD = new byte[0];
    private static final Message<byte[]> HEARTBEAT_MESSAGE;
    private static final long HEARTBEAT_MULTIPLIER = 3L;
    private final SubscribableChannel clientInboundChannel;
    private final MessageChannel clientOutboundChannel;
    private final SubscribableChannel brokerChannel;
    private String relayHost = "127.0.0.1";
    private int relayPort = 61613;
    private String clientLogin = "guest";
    private String clientPasscode = "guest";
    private String systemLogin = "guest";
    private String systemPasscode = "guest";
    private long systemHeartbeatSendInterval = 10000L;
    private long systemHeartbeatReceiveInterval = 10000L;
    private String virtualHost;
    private TcpOperations<byte[]> tcpClient;
    private final Map<String, StompConnectionHandler> connectionHandlers = new ConcurrentHashMap<String, StompConnectionHandler>();

    public StompBrokerRelayMessageHandler(SubscribableChannel clientInChannel, MessageChannel clientOutChannel, SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) {
        super(destinationPrefixes);
        Assert.notNull((Object)clientInChannel, (String)"'clientInChannel' must not be null");
        Assert.notNull((Object)clientOutChannel, (String)"'clientOutChannel' must not be null");
        Assert.notNull((Object)brokerChannel, (String)"'brokerChannel' must not be null");
        this.clientInboundChannel = clientInChannel;
        this.clientOutboundChannel = clientOutChannel;
        this.brokerChannel = brokerChannel;
    }

    public void setRelayHost(String relayHost) {
        Assert.hasText((String)relayHost, (String)"relayHost must not be empty");
        this.relayHost = relayHost;
    }

    public String getRelayHost() {
        return this.relayHost;
    }

    public void setRelayPort(int relayPort) {
        this.relayPort = relayPort;
    }

    public int getRelayPort() {
        return this.relayPort;
    }

    public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
        this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
    }

    public long getSystemHeartbeatSendInterval() {
        return this.systemHeartbeatSendInterval;
    }

    public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
        this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
    }

    public long getSystemHeartbeatReceiveInterval() {
        return this.systemHeartbeatReceiveInterval;
    }

    public void setClientLogin(String clientLogin) {
        Assert.hasText((String)clientLogin, (String)"clientLogin must not be empty");
        this.clientLogin = clientLogin;
    }

    public String getClientLogin() {
        return this.clientLogin;
    }

    public void setClientPasscode(String clientPasscode) {
        Assert.hasText((String)clientPasscode, (String)"clientPasscode must not be empty");
        this.clientPasscode = clientPasscode;
    }

    public String getClientPasscode() {
        return this.clientPasscode;
    }

    public void setSystemLogin(String systemLogin) {
        Assert.hasText((String)systemLogin, (String)"systemLogin must not be empty");
        this.systemLogin = systemLogin;
    }

    public String getSystemLogin() {
        return this.systemLogin;
    }

    public void setSystemPasscode(String systemPasscode) {
        this.systemPasscode = systemPasscode;
    }

    public String getSystemPasscode() {
        return this.systemPasscode;
    }

    public void setVirtualHost(String virtualHost) {
        this.virtualHost = virtualHost;
    }

    public String getVirtualHost() {
        return this.virtualHost;
    }

    public void setTcpClient(TcpOperations<byte[]> tcpClient) {
        this.tcpClient = tcpClient;
    }

    public TcpOperations<byte[]> getTcpClient() {
        return this.tcpClient;
    }

    @Override
    protected void startInternal() {
        this.clientInboundChannel.subscribe(this);
        this.brokerChannel.subscribe(this);
        if (this.tcpClient == null) {
            this.tcpClient = new StompReactorNettyTcpClient(this.relayHost, this.relayPort);
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)"Initializing \"system\" TCP connection");
        }
        StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
        headers.setAcceptVersion("1.1,1.2");
        headers.setLogin(this.systemLogin);
        headers.setPasscode(this.systemPasscode);
        headers.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval);
        headers.setHost(this.getVirtualHost());
        SystemStompConnectionHandler handler = new SystemStompConnectionHandler(headers);
        this.connectionHandlers.put(handler.getSessionId(), handler);
        this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000L));
    }

    @Override
    protected void stopInternal() {
        this.clientInboundChannel.unsubscribe(this);
        this.brokerChannel.unsubscribe(this);
        for (StompConnectionHandler handler : this.connectionHandlers.values()) {
            try {
                handler.resetTcpConnection();
            }
            catch (Throwable t) {
                this.logger.error((Object)("Failed to close STOMP connection " + t.getMessage()));
            }
        }
        try {
            this.tcpClient.shutdown();
        }
        catch (Throwable t) {
            this.logger.error((Object)"Error while shutting down TCP client", t);
        }
    }

    @Override
    protected void handleMessageInternal(Message<?> message) {
        StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
        String sessionId = headers.getSessionId();
        String destination = headers.getDestination();
        StompCommand command = headers.getCommand();
        SimpMessageType messageType = headers.getMessageType();
        if (SimpMessageType.MESSAGE.equals((Object)messageType)) {
            sessionId = sessionId == null ? "stompRelaySystemSessionId" : sessionId;
            headers.setSessionId(sessionId);
            command = headers.updateStompCommandAsClientMessage();
            message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
        }
        if (sessionId == null) {
            if (this.logger.isWarnEnabled()) {
                this.logger.warn((Object)("No sessionId, ignoring message: " + message));
            }
            return;
        }
        if (command != null && command.requiresDestination() && !this.checkDestinationPrefix(destination)) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace((Object)("Ignoring message to destination=" + destination));
            }
            return;
        }
        if (SimpMessageType.CONNECT.equals((Object)messageType)) {
            this.logger.debug((Object)("Processing CONNECT in session=" + sessionId));
            headers.setLogin(this.clientLogin);
            headers.setPasscode(this.clientPasscode);
            if (this.getVirtualHost() != null) {
                headers.setHost(this.getVirtualHost());
            }
            StompConnectionHandler handler = new StompConnectionHandler(sessionId, headers);
            this.connectionHandlers.put(sessionId, handler);
            this.tcpClient.connect(handler);
        } else if (SimpMessageType.DISCONNECT.equals((Object)messageType)) {
            StompConnectionHandler handler = this.removeConnectionHandler(sessionId);
            if (handler == null) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace((Object)("Connection already removed for sessionId=" + sessionId));
                }
                return;
            }
            handler.forward(message);
        } else {
            StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
            if (handler == null) {
                this.logger.warn((Object)("Connection for sessionId=" + sessionId + " not found. Ignoring message"));
                return;
            }
            handler.forward(message);
        }
    }

    private StompConnectionHandler removeConnectionHandler(String sessionId) {
        return "stompRelaySystemSessionId".equals(sessionId) ? null : this.connectionHandlers.remove(sessionId);
    }

    static {
        SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT);
        HEARTBEAT_MESSAGE = MessageBuilder.withPayload(new byte[]{10}).setHeaders(headers).build();
    }

    private class StompConnectionHandler
    implements TcpConnectionHandler<byte[]> {
        private final String sessionId;
        private final boolean isRemoteClientSession;
        private final StompHeaderAccessor connectHeaders;
        private volatile TcpConnection<byte[]> tcpConnection;
        private volatile boolean isStompConnected;

        private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders) {
            this(sessionId, connectHeaders, true);
        }

        private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isRemoteClientSession) {
            Assert.notNull((Object)sessionId, (String)"SessionId must not be null");
            Assert.notNull((Object)connectHeaders, (String)"ConnectHeaders must not be null");
            this.sessionId = sessionId;
            this.connectHeaders = connectHeaders;
            this.isRemoteClientSession = isRemoteClientSession;
        }

        public String getSessionId() {
            return this.sessionId;
        }

        @Override
        public void afterConnected(TcpConnection<byte[]> connection) {
            this.tcpConnection = connection;
            connection.send(MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(this.connectHeaders).build());
        }

        @Override
        public void afterConnectFailure(Throwable ex) {
            this.handleTcpConnectionFailure("Failed to connect to message broker", ex);
        }

        protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) {
            if (StompBrokerRelayMessageHandler.this.logger.isErrorEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.error((Object)(errorMessage + ", sessionId=" + this.sessionId), ex);
            }
            this.resetTcpConnection();
            this.sendStompErrorToClient(errorMessage);
        }

        private void sendStompErrorToClient(String errorText) {
            StompConnectionHandler removed;
            if (this.isRemoteClientSession && (removed = StompBrokerRelayMessageHandler.this.removeConnectionHandler(this.sessionId)) != null) {
                StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR);
                headers.setSessionId(this.sessionId);
                headers.setMessage(errorText);
                Message<byte[]> errorMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(headers).build();
                this.sendMessageToClient(errorMessage);
            }
        }

        protected void sendMessageToClient(Message<?> message) {
            if (this.isRemoteClientSession) {
                StompBrokerRelayMessageHandler.this.clientOutboundChannel.send(message);
            }
        }

        @Override
        public void handleMessage(Message<byte[]> message) {
            StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
            if (SimpMessageType.HEARTBEAT.equals((Object)headers.getMessageType())) {
                StompBrokerRelayMessageHandler.this.logger.trace((Object)"Received broker heartbeat");
            } else if (StompBrokerRelayMessageHandler.this.logger.isDebugEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.debug((Object)("Received broker message in session=" + this.sessionId));
            }
            if (StompCommand.CONNECTED == headers.getCommand()) {
                this.afterStompConnected(headers);
            }
            headers.setSessionId(this.sessionId);
            message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
            this.sendMessageToClient(message);
        }

        protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
            this.isStompConnected = true;
            this.initHeartbeats(connectedHeaders);
        }

        private void initHeartbeats(StompHeaderAccessor connectedHeaders) {
            long interval;
            if (this.isRemoteClientSession) {
                return;
            }
            long clientSendInterval = this.connectHeaders.getHeartbeat()[0];
            long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1];
            long serverSendInterval = connectedHeaders.getHeartbeat()[0];
            long serverReceiveInterval = connectedHeaders.getHeartbeat()[1];
            if (clientSendInterval > 0L && serverReceiveInterval > 0L) {
                interval = Math.max(clientSendInterval, serverReceiveInterval);
                this.tcpConnection.onWriteInactivity(new Runnable(){

                    @Override
                    public void run() {
                        TcpConnection conn = StompConnectionHandler.this.tcpConnection;
                        if (conn != null) {
                            conn.send(HEARTBEAT_MESSAGE).addCallback((ListenableFutureCallback)new ListenableFutureCallback<Void>(){

                                public void onFailure(Throwable t) {
                                    StompConnectionHandler.this.handleTcpConnectionFailure("Failed to send heartbeat", t);
                                }

                                public void onSuccess(Void result) {
                                }
                            });
                        }
                    }
                }, interval);
            }
            if (clientReceiveInterval > 0L && serverSendInterval > 0L) {
                interval = Math.max(clientReceiveInterval, serverSendInterval) * 3L;
                this.tcpConnection.onReadInactivity(new Runnable(){

                    @Override
                    public void run() {
                        StompConnectionHandler.this.handleTcpConnectionFailure("No hearbeat from broker for more than " + interval + "ms, closing connection", null);
                    }
                }, interval);
            }
        }

        @Override
        public void afterConnectionClosed() {
            this.sendStompErrorToClient("Connection to broker closed");
        }

        public ListenableFuture<Void> forward(final Message<?> message) {
            if (!this.isStompConnected) {
                if (StompBrokerRelayMessageHandler.this.logger.isWarnEnabled()) {
                    StompBrokerRelayMessageHandler.this.logger.warn((Object)"Connection to broker inactive or not ready. Ignoring message");
                }
                return new ListenableFutureTask((Callable)new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        return null;
                    }
                });
            }
            if (StompBrokerRelayMessageHandler.this.logger.isDebugEnabled()) {
                StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
                if (SimpMessageType.HEARTBEAT.equals((Object)headers.getMessageType())) {
                    StompBrokerRelayMessageHandler.this.logger.trace((Object)"Forwarding heartbeat to broker");
                } else {
                    StompBrokerRelayMessageHandler.this.logger.debug((Object)"Forwarding message to broker");
                }
            }
            ListenableFuture<Void> future = this.tcpConnection.send(message);
            future.addCallback((ListenableFutureCallback)new ListenableFutureCallback<Void>(){

                public void onSuccess(Void result) {
                    StompCommand command = StompHeaderAccessor.wrap(message).getCommand();
                    if (command == StompCommand.DISCONNECT) {
                        StompConnectionHandler.this.resetTcpConnection();
                    }
                }

                public void onFailure(Throwable t) {
                    StompConnectionHandler.this.handleTcpConnectionFailure("Failed to send message " + message, t);
                }
            });
            return future;
        }

        public void resetTcpConnection() {
            TcpConnection<byte[]> conn = this.tcpConnection;
            this.isStompConnected = false;
            this.tcpConnection = null;
            if (conn != null) {
                try {
                    this.tcpConnection.close();
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
        }

        public String toString() {
            return "StompConnectionHandler{sessionId=" + this.sessionId + "}";
        }
    }

    private class SystemStompConnectionHandler
    extends StompConnectionHandler {
        public static final String SESSION_ID = "stompRelaySystemSessionId";

        public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) {
            super(SESSION_ID, connectHeaders, false);
        }

        @Override
        protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
            super.afterStompConnected(connectedHeaders);
            StompBrokerRelayMessageHandler.this.publishBrokerAvailableEvent();
        }

        @Override
        protected void handleTcpConnectionFailure(String errorMessage, Throwable t) {
            super.handleTcpConnectionFailure(errorMessage, t);
            StompBrokerRelayMessageHandler.this.publishBrokerUnavailableEvent();
        }

        @Override
        public void afterConnectionClosed() {
            super.afterConnectionClosed();
            StompBrokerRelayMessageHandler.this.publishBrokerUnavailableEvent();
        }

        @Override
        public ListenableFuture<Void> forward(Message<?> message) {
            try {
                ListenableFuture<Void> future = super.forward(message);
                future.get();
                return future;
            }
            catch (Throwable t) {
                throw new MessageDeliveryException(message, t);
            }
        }
    }
}

