/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.californium.scandium;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedByInterruptException;
import java.security.GeneralSecurityException;
import java.security.cert.Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import org.eclipse.californium.elements.Connector;
import org.eclipse.californium.elements.RawData;
import org.eclipse.californium.elements.RawDataChannel;
import org.eclipse.californium.scandium.DTLSConnectorConfig;
import org.eclipse.californium.scandium.config.DtlsConnectorConfig;
import org.eclipse.californium.scandium.dtls.AlertMessage;
import org.eclipse.californium.scandium.dtls.ApplicationMessage;
import org.eclipse.californium.scandium.dtls.ClientHandshaker;
import org.eclipse.californium.scandium.dtls.ClientHello;
import org.eclipse.californium.scandium.dtls.CompressionMethod;
import org.eclipse.californium.scandium.dtls.ContentType;
import org.eclipse.californium.scandium.dtls.Cookie;
import org.eclipse.californium.scandium.dtls.DTLSFlight;
import org.eclipse.californium.scandium.dtls.DTLSSession;
import org.eclipse.californium.scandium.dtls.FragmentedHandshakeMessage;
import org.eclipse.californium.scandium.dtls.HandshakeException;
import org.eclipse.californium.scandium.dtls.HandshakeMessage;
import org.eclipse.californium.scandium.dtls.Handshaker;
import org.eclipse.californium.scandium.dtls.HelloVerifyRequest;
import org.eclipse.californium.scandium.dtls.InMemorySessionStore;
import org.eclipse.californium.scandium.dtls.ProtocolVersion;
import org.eclipse.californium.scandium.dtls.Record;
import org.eclipse.californium.scandium.dtls.ResumingClientHandshaker;
import org.eclipse.californium.scandium.dtls.ResumingServerHandshaker;
import org.eclipse.californium.scandium.dtls.ServerHandshaker;
import org.eclipse.californium.scandium.dtls.SessionId;
import org.eclipse.californium.scandium.dtls.SessionListener;
import org.eclipse.californium.scandium.dtls.SessionStore;
import org.eclipse.californium.scandium.dtls.cipher.CipherSuite;
import org.eclipse.californium.scandium.dtls.cipher.InvalidMacException;
import org.eclipse.californium.scandium.util.ByteArrayUtils;
import org.eclipse.californium.scandium.util.LeastRecentlyUsedCache;

public class DTLSConnector
implements Connector {
    private static final Logger LOGGER = Logger.getLogger(DTLSConnector.class.getCanonicalName());
    private SecretKey cookieMacKey = new SecretKeySpec("generate cookie".getBytes(), "MAC");
    private final DtlsConnectorConfig config;
    private DatagramSocket socket;
    private Timer timer;
    private Worker receiver;
    private Worker sender;
    private final SessionStore sessionStore;
    private final BlockingQueue<RawData> outboundMessages;
    private LeastRecentlyUsedCache<InetSocketAddress, Handshaker> handshakers = new LeastRecentlyUsedCache(10000, 250L);
    private Map<InetSocketAddress, DTLSFlight> flights = new ConcurrentHashMap<InetSocketAddress, DTLSFlight>();
    private boolean running;
    private RawDataChannel messageHandler;
    private SessionListener sessionListener;

    public DTLSConnector(DtlsConnectorConfig configuration) {
        this(configuration, null);
    }

    public DTLSConnector(DtlsConnectorConfig configuration, SessionStore sessionStore) {
        if (configuration == null) {
            throw new NullPointerException("Configuration must not be null");
        }
        this.config = configuration;
        this.outboundMessages = new LinkedBlockingQueue<RawData>(this.config.getOutboundMessageBufferSize());
        this.sessionStore = sessionStore != null ? sessionStore : new InMemorySessionStore();
        this.setSessionListener();
    }

    public DTLSConnector(InetSocketAddress address) {
        this(address, null);
    }

    public DTLSConnector(InetSocketAddress address, Certificate[] rootCertificates) {
        this(address, rootCertificates, null, null);
    }

    public DTLSConnector(InetSocketAddress address, Certificate[] rootCertificates, SessionStore sessionStore, DTLSConnectorConfig config) {
        DtlsConnectorConfig.Builder builder = new DtlsConnectorConfig.Builder(address);
        if (config != null) {
            builder.setMaxFragmentLength(config.getMaxFragmentLength());
            builder.setMaxPayloadSize(config.getMaxPayloadSize());
            builder.setMaxRetransmissions(config.getMaxRetransmit());
            builder.setRetransmissionTimeout(config.getRetransmissionTimeout());
            if (rootCertificates != null) {
                builder.setTrustStore(rootCertificates);
            }
            if (config.pskStore != null) {
                builder.setPskStore(config.pskStore);
            } else if (config.certChain != null) {
                builder.setIdentity(config.privateKey, config.certChain, config.sendRawKey);
            } else {
                builder.setIdentity(config.privateKey, config.publicKey);
            }
        }
        this.config = builder.build();
        this.outboundMessages = new LinkedBlockingQueue<RawData>();
        this.sessionStore = sessionStore != null ? sessionStore : new InMemorySessionStore();
        this.setSessionListener();
    }

    private void setSessionListener() {
        this.sessionListener = new SessionListener(){

            @Override
            public void handshakeStarted(Handshaker handshaker) throws HandshakeException {
                if (handshaker != null) {
                    if (!DTLSConnector.this.handshakers.put(handshaker.getPeerAddress(), handshaker)) {
                        throw new HandshakeException("Maximum number of simultanous handshakes in progress", new AlertMessage(AlertMessage.AlertLevel.FATAL, AlertMessage.AlertDescription.INTERNAL_ERROR));
                    }
                    LOGGER.log(Level.FINE, "Handshake with [{0}] has been started", handshaker.getPeerAddress());
                }
            }

            @Override
            public void sessionEstablished(Handshaker handshaker, DTLSSession session) throws HandshakeException {
                if (handshaker != null && session != null && session.isActive()) {
                    if (!DTLSConnector.this.sessionStore.put(session)) {
                        DTLSConnector.this.handshakers.remove(handshaker.getPeerAddress());
                        throw new HandshakeException("Maximum number of sessions has been established", new AlertMessage(AlertMessage.AlertLevel.FATAL, AlertMessage.AlertDescription.INTERNAL_ERROR));
                    }
                    LOGGER.log(Level.FINE, "Session with [{0}] has been established", session.getPeer());
                }
            }

            @Override
            public void handshakeCompleted(InetSocketAddress peer) {
                Handshaker completedHandshaker;
                if (peer != null && (completedHandshaker = (Handshaker)DTLSConnector.this.handshakers.remove(peer)) != null) {
                    LOGGER.log(Level.FINE, "Handshake with [{0}] has been completed", peer);
                }
            }
        };
    }

    private DTLSFlight getFlight(InetSocketAddress peerAddress) {
        return this.flights.get(peerAddress);
    }

    private DTLSFlight storeFlight(DTLSFlight flight) {
        if (flight != null) {
            return this.flights.put(flight.getPeerAddress(), flight);
        }
        return null;
    }

    private DTLSFlight removeFlight(InetSocketAddress peerAddress) {
        if (peerAddress != null) {
            return this.flights.remove(peerAddress);
        }
        return null;
    }

    public final void close(InetSocketAddress peerAddress) {
        AlertMessage closeNotify = new AlertMessage(AlertMessage.AlertLevel.WARNING, AlertMessage.AlertDescription.CLOSE_NOTIFY);
        this.terminateConnection(peerAddress, closeNotify);
    }

    public final synchronized void start() throws IOException {
        if (this.running) {
            return;
        }
        this.timer = new Timer(true);
        this.socket = new DatagramSocket(null);
        this.socket.setReuseAddress(true);
        this.socket.bind(this.config.getAddress());
        this.running = true;
        this.sender = new Worker("DTLS-Sender-" + this.config.getAddress()){

            @Override
            public void doWork() throws Exception {
                DTLSConnector.this.sendNextMessageOverNetwork();
            }
        };
        this.receiver = new Worker("DTLS-Receiver-" + this.config.getAddress()){

            @Override
            public void doWork() throws Exception {
                DTLSConnector.this.receiveNextDatagramFromNetwork();
            }
        };
        this.receiver.start();
        this.sender.start();
        LOGGER.log(Level.CONFIG, "DLTS connector listening on [{0}]", this.config.getAddress());
    }

    final synchronized void releaseSocket() {
        this.running = false;
        this.sender.interrupt();
        this.receiver.interrupt();
        this.outboundMessages.clear();
        if (this.socket != null) {
            this.socket.close();
        }
    }

    public final synchronized void stop() {
        if (!this.running) {
            return;
        }
        this.timer.cancel();
        this.flights.clear();
        this.releaseSocket();
    }

    public final synchronized void destroy() {
        this.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void receiveNextDatagramFromNetwork() throws IOException {
        byte[] buffer = new byte[this.config.getMaxPayloadSize()];
        DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
        DatagramSocket datagramSocket = this.socket;
        synchronized (datagramSocket) {
            this.socket.receive(packet);
        }
        if (packet.getLength() == 0) {
            return;
        }
        InetSocketAddress peerAddress = new InetSocketAddress(packet.getAddress(), packet.getPort());
        byte[] data = Arrays.copyOfRange(packet.getData(), packet.getOffset(), packet.getLength());
        List<Record> records = Record.fromByteArray(data);
        for (Record record : records) {
            try {
                LOGGER.log(Level.FINEST, "Received DTLS record of type [{0}]", (Object)record.getType());
                switch (record.getType()) {
                    case APPLICATION_DATA: {
                        this.processApplicationDataRecord(peerAddress, record);
                        break;
                    }
                    case ALERT: {
                        this.processAlertRecord(peerAddress, record);
                        break;
                    }
                    case CHANGE_CIPHER_SPEC: {
                        this.processChangeCipherSpecRecord(peerAddress, record);
                        break;
                    }
                    case HANDSHAKE: {
                        this.processHandshakeRecord(peerAddress, record);
                    }
                }
            }
            catch (InvalidMacException e) {
                LOGGER.log(Level.FINE, "MAC validation failed for [{0}] record from peer [{1}], discarding ...", new Object[]{record.getType(), peerAddress});
            }
            catch (GeneralSecurityException e) {
                LOGGER.log(Level.FINE, "Cannot process [{0}] record from peer [{1}] due to [{2}], discarding ...", new Object[]{record.getType(), peerAddress, e.getMessage()});
            }
            catch (HandshakeException e) {
                if (AlertMessage.AlertLevel.FATAL.equals((Object)e.getAlert().getLevel())) {
                    LOGGER.log(Level.FINE, "Cannot process [{0}] record from peer [{1}] due to [{2}], aborting handshake ...", new Object[]{record.getType(), peerAddress, e.getMessage()});
                    this.terminateConnection(peerAddress, e.getAlert());
                    break;
                }
                LOGGER.log(Level.FINE, "Cannot process [{0}] record from peer [{1}] due to [{2}], discarding ...", new Object[]{record.getType(), peerAddress, e.getMessage()});
            }
        }
    }

    private void terminateConnection(InetSocketAddress peerAddress, AlertMessage alert) {
        LOGGER.log(Level.FINE, "Terminating connection with peer [{0}], reason [{1}]", new Object[]{peerAddress, alert.getDescription()});
        this.cancelPreviousFlight(peerAddress);
        DTLSSession session = this.sessionStore.get(peerAddress);
        if (session != null) {
            session.setActive(false);
            if (alert != null) {
                try {
                    DTLSFlight flight = new DTLSFlight(session);
                    flight.setRetransmissionNeeded(false);
                    flight.addMessage(new Record(ContentType.ALERT, session.getWriteEpoch(), session.getSequenceNumber(), alert, session));
                    this.sendFlight(flight);
                }
                catch (GeneralSecurityException e) {
                    LOGGER.log(Level.FINE, "Cannot create ALERT message for peer [{0}] due to [{1}]", new Object[]{peerAddress, e.getMessage()});
                }
            }
        }
        this.connectionClosed(peerAddress);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processApplicationDataRecord(InetSocketAddress peerAddress, Record record) throws GeneralSecurityException, HandshakeException {
        DTLSSession session = this.sessionStore.get(peerAddress);
        if (session != null && session.isActive()) {
            DTLSSession dTLSSession = session;
            synchronized (dTLSSession) {
                if (session.isRecordProcessable(record.getEpoch(), record.getSequenceNumber())) {
                    record.setSession(session);
                    ApplicationMessage message = (ApplicationMessage)record.getFragment();
                    this.sessionListener.handshakeCompleted(peerAddress);
                    session.markRecordAsRead(record.getEpoch(), record.getSequenceNumber());
                    this.sessionStore.update(session);
                    if (this.messageHandler != null) {
                        this.messageHandler.receiveData(new RawData(message.getData(), peerAddress, session.getPeerIdentity()));
                    }
                } else {
                    LOGGER.log(Level.FINER, "Discarding duplicate APPLICATION_DATA record received from peer [{0}]", peerAddress);
                }
            }
        } else {
            LOGGER.log(Level.FINER, "Discarding APPLICATION_DATA record received from peer [{0}] without an active session", new Object[]{peerAddress});
        }
    }

    private void processAlertRecord(InetSocketAddress peerAddress, Record record) throws GeneralSecurityException, HandshakeException {
        DTLSSession session = this.sessionStore.get(peerAddress);
        if (session != null) {
            record.setSession(session);
            AlertMessage alert = (AlertMessage)record.getFragment();
            LOGGER.log(Level.FINER, "Received ALERT record [{0}] from [{1}]", new Object[]{alert, peerAddress});
            if (AlertMessage.AlertLevel.FATAL.equals((Object)alert.getLevel())) {
                AlertMessage bye = null;
                switch (alert.getDescription()) {
                    case CLOSE_NOTIFY: {
                        bye = new AlertMessage(AlertMessage.AlertLevel.WARNING, AlertMessage.AlertDescription.CLOSE_NOTIFY);
                    }
                }
                this.terminateConnection(peerAddress, bye);
            }
        } else {
            LOGGER.log(Level.FINER, "Received ALERT record from [{0}] without existing session, discarding ...", peerAddress);
        }
    }

    private void processChangeCipherSpecRecord(InetSocketAddress peerAddress, Record record) throws HandshakeException {
        Handshaker handshaker = this.handshakers.get(peerAddress);
        if (handshaker == null) {
            LOGGER.log(Level.FINE, "Discarding CHANGE_CIPHER_SPEC record from peer [{0}], no handshake in progress...", peerAddress);
        } else {
            handshaker.processMessage(record);
        }
    }

    private void processHandshakeRecord(InetSocketAddress peerAddress, Record record) throws HandshakeException, GeneralSecurityException {
        LOGGER.log(Level.FINER, "Received HANDSHAKE record from peer [{0}]", peerAddress);
        Handshaker handshaker = this.handshakers.get(peerAddress);
        DTLSFlight flight = null;
        if (handshaker != null) {
            flight = handshaker.processMessage(record);
        } else {
            HandshakeMessage handshake = (HandshakeMessage)record.getFragment();
            switch (handshake.getMessageType()) {
                case HELLO_REQUEST: {
                    flight = this.processHelloRequest(peerAddress, record);
                    break;
                }
                case CLIENT_HELLO: {
                    flight = this.processClientHello(peerAddress, record);
                    break;
                }
                default: {
                    LOGGER.log(Level.FINER, "Discarding unexpected handshake message of type [{0}] from peer [{1}]", new Object[]{handshake.getMessageType(), peerAddress});
                }
            }
        }
        if (flight != null) {
            this.cancelPreviousFlight(peerAddress);
            if (flight.isRetransmissionNeeded()) {
                this.storeFlight(flight);
                this.scheduleRetransmission(flight);
            }
            this.sendFlight(flight);
        }
    }

    private DTLSFlight processHelloRequest(InetSocketAddress peerAddress, Record record) throws HandshakeException, GeneralSecurityException {
        DTLSSession session = this.sessionStore.get(peerAddress);
        if (session == null) {
            session = new DTLSSession(peerAddress, true);
        }
        ClientHandshaker handshaker = new ClientHandshaker(null, session, this.sessionListener, this.config);
        return ((Handshaker)handshaker).getStartHandshakeMessage();
    }

    private DTLSFlight processClientHello(InetSocketAddress peerAddress, Record record) throws HandshakeException, GeneralSecurityException {
        DTLSFlight nextFlight = null;
        HandshakeMessage handshake = (HandshakeMessage)record.getFragment();
        if (handshake instanceof FragmentedHandshakeMessage) {
            LOGGER.log(Level.INFO, "Discarding fragmented CLIENT_HELLO message from peer [{0}]", peerAddress);
            return null;
        }
        ClientHello clientHello = (ClientHello)handshake;
        if (record.getEpoch() == 0) {
            Cookie expectedCookie = this.generateCookie(peerAddress, clientHello);
            if (!expectedCookie.equals(clientHello.getCookie())) {
                LOGGER.log(Level.FINE, "Processing CLIENT_HELLO from peer [{0}]:\n{1}", new Object[]{peerAddress, record});
                LOGGER.log(Level.FINER, "Verifying client IP address [{0}] using HELLO_VERIFY_REQUEST", peerAddress);
                HelloVerifyRequest msg = new HelloVerifyRequest(new ProtocolVersion(), expectedCookie);
                msg.setMessageSeq(clientHello.getMessageSeq());
                Record helloVerify = new Record(ContentType.HANDSHAKE, 0, record.getSequenceNumber(), msg, null);
                nextFlight = new DTLSFlight(peerAddress);
                nextFlight.addMessage(helloVerify);
            } else {
                SessionId sessionId;
                LOGGER.log(Level.FINER, "Successfully verified client IP address [{0}] using cookie exchange", peerAddress);
                SessionId sessionId2 = sessionId = clientHello.getSessionId().length() > 0 ? clientHello.getSessionId() : null;
                if (sessionId == null) {
                    DTLSSession newSession = new DTLSSession(peerAddress, false, record.getSequenceNumber());
                    ServerHandshaker handshaker = new ServerHandshaker(clientHello.getMessageSeq(), newSession, this.sessionListener, this.config);
                    nextFlight = handshaker.processMessage(record);
                } else {
                    LOGGER.log(Level.FINER, "Client [{0}] wants to resume session with ID [{1}]", new Object[]{peerAddress, ByteArrayUtils.toHexString(sessionId.getSessionId())});
                    DTLSSession session = this.sessionStore.find(sessionId);
                    if (session != null) {
                        ResumingServerHandshaker handshaker = new ResumingServerHandshaker(session, this.sessionListener, this.config);
                        nextFlight = handshaker.processMessage(record);
                    } else {
                        LOGGER.log(Level.FINER, "Client [{0}] tries to resume non-existing session with ID [{1}], starting new handshake...", new Object[]{peerAddress, ByteArrayUtils.toHexString(sessionId.getSessionId())});
                        DTLSSession newSession = new DTLSSession(peerAddress, false, record.getSequenceNumber());
                        ServerHandshaker handshaker = new ServerHandshaker(1, newSession, this.sessionListener, this.config);
                        nextFlight = handshaker.processMessage(record);
                    }
                }
            }
        } else {
            DTLSSession session = this.sessionStore.get(peerAddress);
            if (session != null) {
                ServerHandshaker handshaker = new ServerHandshaker(session, this.sessionListener, this.config);
                nextFlight = handshaker.processMessage(record);
            }
        }
        return nextFlight;
    }

    private SecretKey getMacKeyForCookies() {
        return this.cookieMacKey;
    }

    private Cookie generateCookie(InetSocketAddress peerAddress, ClientHello clientHello) throws HandshakeException {
        try {
            Mac hmac = Mac.getInstance("HmacSHA256");
            hmac.init(this.getMacKeyForCookies());
            hmac.update(peerAddress.toString().getBytes());
            hmac.update((byte)clientHello.getClientVersion().getMajor());
            hmac.update((byte)clientHello.getClientVersion().getMinor());
            hmac.update(clientHello.getRandom().getRandomBytes());
            hmac.update(clientHello.getSessionId().getSessionId());
            hmac.update(CipherSuite.listToByteArray(clientHello.getCipherSuites()));
            hmac.update(CompressionMethod.listToByteArray(clientHello.getCompressionMethods()));
            return new Cookie(hmac.doFinal());
        }
        catch (GeneralSecurityException e) {
            LOGGER.log(Level.SEVERE, "Could not instantiate MAC algorithm for cookie creation", e);
            throw new HandshakeException("Internal error", new AlertMessage(AlertMessage.AlertLevel.FATAL, AlertMessage.AlertDescription.INTERNAL_ERROR));
        }
    }

    public final void send(RawData msg) {
        if (msg == null) {
            LOGGER.finest("Ignoring NULL msg ...");
        } else {
            boolean queueFull;
            boolean bl = queueFull = !this.outboundMessages.offer(msg);
            if (queueFull) {
                LOGGER.log(Level.WARNING, "Outbound message queue is full! Dropping outbound message to peer [{0}]", msg.getInetSocketAddress());
            }
        }
    }

    private void sendNextMessageOverNetwork() throws HandshakeException {
        RawData message;
        try {
            message = this.outboundMessages.take();
        }
        catch (InterruptedException e) {
            return;
        }
        InetSocketAddress peerAddress = message.getInetSocketAddress();
        LOGGER.log(Level.FINER, "Sending application layer message to peer [{0}]", peerAddress);
        DTLSSession session = this.sessionStore.get(peerAddress);
        ClientHandshaker handshaker = null;
        DTLSFlight flight = null;
        try {
            if (session == null) {
                session = new DTLSSession(peerAddress, true);
                handshaker = new ClientHandshaker(message, session, this.sessionListener, this.config);
            } else if (session.isActive()) {
                ApplicationMessage fragment = new ApplicationMessage(message.getBytes());
                Record record = new Record(ContentType.APPLICATION_DATA, session.getWriteEpoch(), session.getSequenceNumber(), fragment, session);
                flight = new DTLSFlight(session);
                flight.addMessage(record);
            } else {
                handshaker = new ResumingClientHandshaker(message, session, this.sessionListener, this.config);
            }
            if (handshaker != null) {
                flight = ((Handshaker)handshaker).getStartHandshakeMessage();
                this.storeFlight(flight);
                this.scheduleRetransmission(flight);
            }
            this.sessionStore.update(session);
            this.sendFlight(flight);
        }
        catch (GeneralSecurityException e) {
            LOGGER.log(Level.FINE, "Cannot send record to peer [{0}] due to [{1}]", new Object[]{peerAddress, e.getMessage()});
        }
    }

    public final DTLSSession getSessionByAddress(InetSocketAddress address) {
        if (address == null) {
            return null;
        }
        return this.sessionStore.get(address);
    }

    private void sendFlight(DTLSFlight flight) {
        byte[] payload = new byte[]{};
        LOGGER.log(Level.FINER, "Sending flight of [{0}] messages to peer[{1}]", new Object[]{flight.getMessages().size(), flight.getPeerAddress()});
        ArrayList<DatagramPacket> datagrams = new ArrayList<DatagramPacket>();
        try {
            for (Record record : flight.getMessages()) {
                if (flight.getTries() > 0) {
                    int epoch = record.getEpoch();
                    record.setSequenceNumber(flight.getSession().getSequenceNumber(epoch));
                }
                LOGGER.log(Level.FINEST, "Sending record to peer [{0}]:\n{1}", new Object[]{flight.getPeerAddress(), record});
                byte[] recordBytes = record.toByteArray();
                if (payload.length + recordBytes.length > this.config.getMaxPayloadSize()) {
                    DatagramPacket datagram = new DatagramPacket(payload, payload.length, flight.getPeerAddress().getAddress(), flight.getPeerAddress().getPort());
                    datagrams.add(datagram);
                    payload = new byte[]{};
                }
                payload = ByteArrayUtils.concatenate(payload, recordBytes);
            }
            if (flight.getTries() > 0) {
                this.sessionStore.update(flight.getSession());
            }
            DatagramPacket datagram = new DatagramPacket(payload, payload.length, flight.getPeerAddress().getAddress(), flight.getPeerAddress().getPort());
            datagrams.add(datagram);
            for (DatagramPacket datagramPacket : datagrams) {
                if (!this.socket.isClosed()) {
                    this.socket.send(datagramPacket);
                    continue;
                }
                LOGGER.log(Level.FINE, "Socket [{0}] is closed, discarding packet ...", this.config.getAddress());
            }
        }
        catch (IOException e) {
            LOGGER.log(Level.WARNING, "Could not send datagram", e);
        }
        catch (GeneralSecurityException e) {
            LOGGER.log(Level.INFO, "Cannot send flight to peer [{0}] due to [{1}]", new Object[]{flight.getPeerAddress(), e.getMessage()});
        }
    }

    private void handleTimeout(DTLSFlight flight) {
        int max = this.config.getMaxRetransmissions();
        if (flight.getTries() < max) {
            LOGGER.log(Level.FINE, "Re-transmitting flight for [{0}], [{1}] retransmissions left", new Object[]{flight.getPeerAddress(), max - flight.getTries() - 1});
            flight.incrementTries();
            this.sendFlight(flight);
            this.scheduleRetransmission(flight);
        } else {
            LOGGER.log(Level.FINE, "Flight for [{0}] has reached maximum no. [{1}] of retransmissions", new Object[]{flight.getPeerAddress(), max});
        }
    }

    private void scheduleRetransmission(DTLSFlight flight) {
        if (flight.getRetransmitTask() != null) {
            flight.getRetransmitTask().cancel();
        }
        if (flight.isRetransmissionNeeded()) {
            flight.setRetransmitTask(new RetransmitTask(flight));
            if (flight.getTimeout() == 0) {
                flight.setTimeout(this.config.getRetransmissionTimeout());
            } else {
                flight.incrementTimeout();
            }
            this.timer.schedule(flight.getRetransmitTask(), flight.getTimeout());
        }
    }

    private void cancelPreviousFlight(InetSocketAddress peerAddress) {
        DTLSFlight previousFlight = this.getFlight(peerAddress);
        if (previousFlight != null) {
            previousFlight.getRetransmitTask().cancel();
            previousFlight.setRetransmitTask(null);
            this.removeFlight(peerAddress);
        }
    }

    public final InetSocketAddress getAddress() {
        if (this.socket == null) {
            return this.config.getAddress();
        }
        return new InetSocketAddress(this.socket.getLocalAddress(), this.socket.getLocalPort());
    }

    public final boolean isRunning() {
        return this.running;
    }

    public void setRawDataReceiver(RawDataChannel messageHandler) {
        this.messageHandler = messageHandler;
    }

    private void connectionClosed(InetSocketAddress peerAddress) {
        if (peerAddress != null) {
            this.sessionStore.remove(peerAddress);
            this.handshakers.remove(peerAddress);
        }
    }

    private abstract class Worker
    extends Thread {
        private Worker(String name) {
            super(name);
            this.setDaemon(true);
        }

        @Override
        public void run() {
            try {
                LOGGER.log(Level.CONFIG, "Starting worker thread [{0}]", this.getName());
                while (DTLSConnector.this.running) {
                    try {
                        this.doWork();
                    }
                    catch (ClosedByInterruptException e) {
                        LOGGER.log(Level.CONFIG, "Worker thread [{0}] has been interrupted", this.getName());
                    }
                    catch (Exception e) {
                        if (!DTLSConnector.this.running) continue;
                        LOGGER.log(Level.FINE, "Exception thrown by worker thread [" + this.getName() + "]", e);
                    }
                }
            }
            finally {
                LOGGER.log(Level.CONFIG, "Worker thread [{0}] has terminated", this.getName());
            }
        }

        protected abstract void doWork() throws Exception;
    }

    private class RetransmitTask
    extends TimerTask {
        private DTLSFlight flight;

        RetransmitTask(DTLSFlight flight) {
            this.flight = flight;
        }

        @Override
        public void run() {
            DTLSConnector.this.handleTimeout(this.flight);
        }
    }
}

