package org.eclipse.gyrex.eventbus.websocket.internal;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.core.runtime.jobs.Job;
import org.eclipse.gyrex.cloud.admin.ICloudManager;
import org.eclipse.gyrex.cloud.admin.INodeDescriptor;
import org.eclipse.gyrex.cloud.admin.INodeListener;
import org.eclipse.gyrex.cloud.services.events.EventMessage;
import org.eclipse.gyrex.cloud.services.events.IEventReceiver;
import org.eclipse.gyrex.cloud.services.events.IEventTransport;
import org.eclipse.gyrex.eventbus.websocket.internal.EventMessageReceiver;
import org.eclipse.gyrex.server.Platform;
import org.eclipse.gyrex.server.settings.SystemSetting;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.WebSocketHandler;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/gyrex/eventbus/websocket/internal/WebsocketEventTransport.class */
public class WebsocketEventTransport implements IEventTransport {
    private static final Logger LOG = LoggerFactory.getLogger(WebsocketEventTransport.class);
    private static final SystemSetting<Integer> eventsPort = SystemSetting.newIntegerSetting("gyrex.event.websocket.port", "Default port for web socket based event transport.").usingDefault(Integer.valueOf(Platform.getInstancePort(3111))).create();
    private static final AtomicReference<Server> serverRef = new AtomicReference<>();
    private static final AtomicReference<WebSocketClient> clientRef = new AtomicReference<>();
    private volatile ICloudManager cloudManager;
    volatile Job connectionMonitor;
    private volatile Map<String, EventMessageSender> connectedNodesByNodeId = Collections.emptyMap();
    private final INodeListener reconnectListener = new INodeListener() { // from class: org.eclipse.gyrex.eventbus.websocket.internal.WebsocketEventTransport.1
        public void nodesChanged() {
            Job job = WebsocketEventTransport.this.connectionMonitor;
            if (job != null) {
                job.schedule();
            }
        }
    };
    private final ConcurrentMap<String, CopyOnWriteArrayList<IEventReceiver>> eventReceiverListsByTopicId = new ConcurrentHashMap();

    public void activate(ComponentContext componentContext) {
        LOG.info("Activating WebsocketEventTransport.");
        startWebSocketServer();
        try {
            startWebSocketClient();
            try {
                getCloudManager().addNodeListener(this.reconnectListener);
                this.connectionMonitor = new Job("Event Bus Connection Monitor") { // from class: org.eclipse.gyrex.eventbus.websocket.internal.WebsocketEventTransport.2
                    protected IStatus run(IProgressMonitor iProgressMonitor) {
                        try {
                            WebsocketEventTransport.LOG.debug("Connecting online nodes for the event transport.");
                            WebsocketEventTransport.this.connectAllOnlineNodes();
                        } catch (Exception e) {
                            WebsocketEventTransport.LOG.warn("An error occured while connecting online nodes for the event transport. Operation will be retried.");
                        }
                        if (!iProgressMonitor.isCanceled()) {
                            schedule(120000L);
                        }
                        return Status.OK_STATUS;
                    }
                };
                this.connectionMonitor.setSystem(true);
                this.connectionMonitor.schedule();
            } catch (LinkageError | RuntimeException e) {
                stopWebSocketClient();
                throw e;
            }
        } catch (LinkageError | RuntimeException e2) {
            stopWebSocketServer();
            throw e2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void connectAllOnlineNodes() {
        INodeDescriptor iNodeDescriptor;
        HashMap hashMap = new HashMap(this.connectedNodesByNodeId);
        HashMap hashMap2 = new HashMap();
        ImmutableMap uniqueIndex = Maps.uniqueIndex(getCloudManager().getApprovedNodes(), new Function<INodeDescriptor, String>() { // from class: org.eclipse.gyrex.eventbus.websocket.internal.WebsocketEventTransport.3
            public String apply(INodeDescriptor iNodeDescriptor2) {
                return iNodeDescriptor2.getId();
            }
        });
        WebSocketClient webSocketClient = clientRef.get();
        if (webSocketClient == null) {
            return;
        }
        for (String str : getCloudManager().getOnlineNodes()) {
            if (!str.equals(getCloudManager().getLocalInfo().getNodeId()) && (iNodeDescriptor = (INodeDescriptor) uniqueIndex.get(str)) != null) {
                EventMessageSender eventMessageSender = (EventMessageSender) hashMap.remove(str);
                if (eventMessageSender == null || !eventMessageSender.isConnected()) {
                    EventMessageSender eventMessageSender2 = new EventMessageSender(getCloudManager().getLocalInfo().getNodeId());
                    if (connectToNextAvailableAddress(new ArrayList(iNodeDescriptor.getAddresses()).iterator(), eventMessageSender2, webSocketClient, str)) {
                        hashMap2.put(str, eventMessageSender2);
                    }
                } else {
                    hashMap2.put(str, eventMessageSender);
                }
            }
        }
        this.connectedNodesByNodeId = hashMap2;
        for (EventMessageSender eventMessageSender3 : hashMap.values()) {
            if (eventMessageSender3.isConnected()) {
                eventMessageSender3.getSession().close();
            }
        }
    }

    private boolean connectToNextAvailableAddress(Iterator<String> it, EventMessageSender eventMessageSender, WebSocketClient webSocketClient, String str) {
        if (!it.hasNext()) {
            LOG.error("No routes available to node ({}). Unable to connect event transport. Please check node addresses configuration.", str);
            return false;
        }
        String next = it.next();
        it.remove();
        try {
            webSocketClient.connect(eventMessageSender, new URI("ws://" + next + ":" + eventsPort.get() + "/eventbus/")).get();
            return true;
        } catch (IOException | URISyntaxException | ExecutionException e) {
            LOG.warn("Unable to connect to ({}). {}", new Object[]{next, e.getMessage(), e});
            return connectToNextAvailableAddress(it, eventMessageSender, webSocketClient, str);
        } catch (InterruptedException e2) {
            LOG.debug("Interrupted while connecting to node ({}) at ({}).", str, next);
            Thread.currentThread().interrupt();
            return false;
        } catch (CancellationException e3) {
            LOG.debug("Aborted while connecting to node ({}) at ({}).", str, next);
            return false;
        }
    }

    public void deactivate(ComponentContext componentContext) {
        LOG.info("Deactivating WebsocketEventTransport.");
        getCloudManager().removeNodeListener(this.reconnectListener);
        Job job = this.connectionMonitor;
        if (job != null) {
            job.cancel();
        }
        Map<String, EventMessageSender> map = this.connectedNodesByNodeId;
        this.connectedNodesByNodeId = Collections.emptyMap();
        Iterator<EventMessageSender> it = map.values().iterator();
        while (it.hasNext()) {
            it.next().getSession().close();
        }
        stopWebSocketServer();
        stopWebSocketClient();
    }

    @VisibleForTesting
    void distributeEventToSubscribers(String str, EventMessage eventMessage) {
        CopyOnWriteArrayList<IEventReceiver> copyOnWriteArrayList = this.eventReceiverListsByTopicId.get(str);
        if (copyOnWriteArrayList != null) {
            Iterator<IEventReceiver> it = copyOnWriteArrayList.iterator();
            while (it.hasNext()) {
                it.next().receiveEvent(eventMessage);
            }
        }
    }

    public ICloudManager getCloudManager() {
        ICloudManager iCloudManager = this.cloudManager;
        Preconditions.checkState(iCloudManager != null, "inactive");
        return iCloudManager;
    }

    public void sendEvent(String str, EventMessage eventMessage, Map<String, ?> map) {
        distributeEventToSubscribers(str, eventMessage);
        for (Map.Entry<String, EventMessageSender> entry : this.connectedNodesByNodeId.entrySet()) {
            LOG.trace("Sending event ({}) to ({})", eventMessage.getId(), entry.getKey());
            if (entry.getValue().isConnected()) {
                entry.getValue().sendEvent(str, eventMessage);
            } else {
                LOG.warn("Dead connection to node ({}).", entry.getKey());
            }
        }
    }

    public void setCloudManager(ICloudManager iCloudManager) {
        this.cloudManager = iCloudManager;
    }

    private void startWebSocketClient() {
        try {
            WebSocketClient webSocketClient = new WebSocketClient();
            Preconditions.checkState(clientRef.compareAndSet(null, webSocketClient), "Only one active transport allowed!");
            webSocketClient.setAsyncWriteTimeout(5000L);
            webSocketClient.setConnectTimeout(5000L);
            webSocketClient.start();
        } catch (Exception e) {
            throw new IllegalStateException("Error starting web socket client for cluster event websocket", e);
        }
    }

    private void startWebSocketServer() {
        try {
            Server server = new Server();
            Preconditions.checkState(serverRef.compareAndSet(null, server), "Only one active transport allowed!");
            HttpConfiguration httpConfiguration = new HttpConfiguration();
            httpConfiguration.setSendServerVersion(false);
            httpConfiguration.setSendDateHeader(false);
            ServerConnector serverConnector = new ServerConnector(server, new ConnectionFactory[]{new HttpConnectionFactory(httpConfiguration)});
            serverConnector.setPort(((Integer) eventsPort.get()).intValue());
            serverConnector.setIdleTimeout(60000L);
            server.addConnector(serverConnector);
            server.setStopAtShutdown(true);
            server.setStopTimeout(5000L);
            HandlerCollection handlerCollection = new HandlerCollection();
            final EventMessageReceiver eventMessageReceiver = new EventMessageReceiver(getCloudManager().getLocalInfo().getNodeId(), new EventMessageReceiver.IEventMessageCallback() { // from class: org.eclipse.gyrex.eventbus.websocket.internal.WebsocketEventTransport.4
                @Override // org.eclipse.gyrex.eventbus.websocket.internal.EventMessageReceiver.IEventMessageCallback
                public void onEventMessage(String str, EventMessage eventMessage) {
                    WebsocketEventTransport.this.distributeEventToSubscribers(str, eventMessage);
                }
            });
            WebSocketHandler webSocketHandler = new WebSocketHandler() { // from class: org.eclipse.gyrex.eventbus.websocket.internal.WebsocketEventTransport.5
                public void configure(WebSocketServletFactory webSocketServletFactory) {
                    final EventMessageReceiver eventMessageReceiver2 = eventMessageReceiver;
                    webSocketServletFactory.setCreator(new WebSocketCreator() { // from class: org.eclipse.gyrex.eventbus.websocket.internal.WebsocketEventTransport.5.1
                        public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) {
                            return eventMessageReceiver2;
                        }
                    });
                }
            };
            ContextHandler contextHandler = new ContextHandler();
            contextHandler.setContextPath("/eventbus");
            contextHandler.setHandler(webSocketHandler);
            handlerCollection.addHandler(contextHandler);
            server.setHandler(handlerCollection);
            server.start();
        } catch (Exception e) {
            throw new IllegalStateException("Error starting jetty for cluster event websocket", e);
        }
    }

    private void stopWebSocketClient() {
        try {
            WebSocketClient andSet = clientRef.getAndSet(null);
            if (andSet != null) {
                andSet.stop();
            }
        } catch (Exception e) {
            LOG.error("Error stopping websocket client.", e);
        }
    }

    private void stopWebSocketServer() {
        try {
            Server andSet = serverRef.getAndSet(null);
            if (andSet != null) {
                andSet.stop();
            }
        } catch (Exception e) {
            LOG.error("Error stopping websocket server.", e);
        }
    }

    public void subscribeTopic(String str, IEventReceiver iEventReceiver, Map<String, ?> map) {
        CopyOnWriteArrayList<IEventReceiver> copyOnWriteArrayList = this.eventReceiverListsByTopicId.get(str);
        while (true) {
            CopyOnWriteArrayList<IEventReceiver> copyOnWriteArrayList2 = copyOnWriteArrayList;
            if (copyOnWriteArrayList2 != null) {
                copyOnWriteArrayList2.add(iEventReceiver);
                return;
            }
            copyOnWriteArrayList = this.eventReceiverListsByTopicId.putIfAbsent(str, new CopyOnWriteArrayList<>());
        }
    }

    public void unsubscribeTopic(String str, IEventReceiver iEventReceiver, Map<String, ?> map) {
        CopyOnWriteArrayList<IEventReceiver> copyOnWriteArrayList = this.eventReceiverListsByTopicId.get(str);
        if (copyOnWriteArrayList != null) {
            copyOnWriteArrayList.remove(iEventReceiver);
        }
    }
}
