001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.IOException;
020import java.net.InetAddress;
021import java.net.InetSocketAddress;
022import java.net.ServerSocket;
023import java.net.Socket;
024import java.net.SocketException;
025import java.net.SocketTimeoutException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.UnknownHostException;
029import java.nio.channels.ClosedChannelException;
030import java.nio.channels.SelectionKey;
031import java.nio.channels.Selector;
032import java.nio.channels.ServerSocketChannel;
033import java.nio.channels.SocketChannel;
034import java.util.HashMap;
035import java.util.Iterator;
036import java.util.Set;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.LinkedBlockingQueue;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicInteger;
041
042import javax.net.ServerSocketFactory;
043import javax.net.ssl.SSLServerSocket;
044
045import org.apache.activemq.Service;
046import org.apache.activemq.ThreadPriorities;
047import org.apache.activemq.TransportLoggerSupport;
048import org.apache.activemq.command.BrokerInfo;
049import org.apache.activemq.openwire.OpenWireFormatFactory;
050import org.apache.activemq.transport.Transport;
051import org.apache.activemq.transport.TransportFactory;
052import org.apache.activemq.transport.TransportServer;
053import org.apache.activemq.transport.TransportServerThreadSupport;
054import org.apache.activemq.util.IOExceptionSupport;
055import org.apache.activemq.util.InetAddressUtil;
056import org.apache.activemq.util.IntrospectionSupport;
057import org.apache.activemq.util.ServiceListener;
058import org.apache.activemq.util.ServiceStopper;
059import org.apache.activemq.util.ServiceSupport;
060import org.apache.activemq.wireformat.WireFormat;
061import org.apache.activemq.wireformat.WireFormatFactory;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * A TCP based implementation of {@link TransportServer}
067 */
068public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
069
070    private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
071
072    protected volatile ServerSocket serverSocket;
073    protected volatile Selector selector;
074    protected int backlog = 5000;
075    protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
076    protected final TcpTransportFactory transportFactory;
077    protected long maxInactivityDuration = 30000;
078    protected long maxInactivityDurationInitalDelay = 10000;
079    protected int minmumWireFormatVersion;
080    protected boolean useQueueForAccept = true;
081    protected boolean allowLinkStealing;
082
083    /**
084     * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer
085     * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer,
086     * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or
087     * TransportConnector URIs.
088     */
089    protected boolean trace = false;
090
091    protected int soTimeout = 0;
092    protected int socketBufferSize = 64 * 1024;
093    protected int connectionTimeout = 30000;
094
095    /**
096     * Name of the LogWriter implementation to use. Names are mapped to classes in the
097     * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably
098     * set in Connection or TransportConnector URIs.
099     */
100    protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
101
102    /**
103     * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1
104     * TransportLogger which is manageable, a TransportLoggerControl MBean will me created.
105     */
106    protected boolean dynamicManagement = false;
107
108    /**
109     * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log.
110     * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the
111     * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or
112     * TransportConnector URIs.
113     */
114    protected boolean startLogging = true;
115    protected final ServerSocketFactory serverSocketFactory;
116    protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
117    protected Thread socketHandlerThread;
118
119    /**
120     * The maximum number of sockets allowed for this server
121     */
122    protected int maximumConnections = Integer.MAX_VALUE;
123    protected final AtomicInteger currentTransportCount = new AtomicInteger();
124
125    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
126        URISyntaxException {
127        super(location);
128        this.transportFactory = transportFactory;
129        this.serverSocketFactory = serverSocketFactory;
130    }
131
132    public void bind() throws IOException {
133        URI bind = getBindLocation();
134
135        String host = bind.getHost();
136        host = (host == null || host.length() == 0) ? "localhost" : host;
137        InetAddress addr = InetAddress.getByName(host);
138
139        try {
140            serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
141            configureServerSocket(serverSocket);
142        } catch (IOException e) {
143            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
144        }
145        try {
146            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
147                bind.getQuery(), bind.getFragment()));
148        } catch (URISyntaxException e) {
149            // it could be that the host name contains invalid characters such
150            // as _ on unix platforms so lets try use the IP address instead
151            try {
152                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
153                    bind.getQuery(), bind.getFragment()));
154            } catch (URISyntaxException e2) {
155                throw IOExceptionSupport.create(e2);
156            }
157        }
158    }
159
160    private void configureServerSocket(ServerSocket socket) throws SocketException {
161        socket.setSoTimeout(2000);
162        if (transportOptions != null) {
163
164            // If the enabledCipherSuites option is invalid we don't want to ignore it as the call
165            // to SSLServerSocket to configure it has a side effect on the socket rendering it
166            // useless as all suites are enabled many of which are considered as insecure.  We
167            // instead trap that option here and throw an exception.  We should really consider
168            // all invalid options as breaking and not start the transport but the current design
169            // doesn't really allow for this.
170            //
171            //  see: https://issues.apache.org/jira/browse/AMQ-4582
172            //
173            if (socket instanceof SSLServerSocket) {
174                if (transportOptions.containsKey("enabledCipherSuites")) {
175                    Object cipherSuites = transportOptions.remove("enabledCipherSuites");
176
177                    if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) {
178                        throw new SocketException(String.format(
179                            "Invalid transport options {enabledCipherSuites=%s}", cipherSuites));
180                    }
181                }
182            }
183
184            IntrospectionSupport.setProperties(socket, transportOptions);
185        }
186    }
187
188    /**
189     * @return Returns the wireFormatFactory.
190     */
191    public WireFormatFactory getWireFormatFactory() {
192        return wireFormatFactory;
193    }
194
195    /**
196     * @param wireFormatFactory
197     *            The wireFormatFactory to set.
198     */
199    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
200        this.wireFormatFactory = wireFormatFactory;
201    }
202
203    /**
204     * Associates a broker info with the transport server so that the transport can do discovery advertisements of the
205     * broker.
206     *
207     * @param brokerInfo
208     */
209    @Override
210    public void setBrokerInfo(BrokerInfo brokerInfo) {
211    }
212
213    public long getMaxInactivityDuration() {
214        return maxInactivityDuration;
215    }
216
217    public void setMaxInactivityDuration(long maxInactivityDuration) {
218        this.maxInactivityDuration = maxInactivityDuration;
219    }
220
221    public long getMaxInactivityDurationInitalDelay() {
222        return this.maxInactivityDurationInitalDelay;
223    }
224
225    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
226        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
227    }
228
229    public int getMinmumWireFormatVersion() {
230        return minmumWireFormatVersion;
231    }
232
233    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
234        this.minmumWireFormatVersion = minmumWireFormatVersion;
235    }
236
237    public boolean isTrace() {
238        return trace;
239    }
240
241    public void setTrace(boolean trace) {
242        this.trace = trace;
243    }
244
245    public String getLogWriterName() {
246        return logWriterName;
247    }
248
249    public void setLogWriterName(String logFormat) {
250        this.logWriterName = logFormat;
251    }
252
253    public boolean isDynamicManagement() {
254        return dynamicManagement;
255    }
256
257    public void setDynamicManagement(boolean useJmx) {
258        this.dynamicManagement = useJmx;
259    }
260
261    public boolean isStartLogging() {
262        return startLogging;
263    }
264
265    public void setStartLogging(boolean startLogging) {
266        this.startLogging = startLogging;
267    }
268
269    /**
270     * @return the backlog
271     */
272    public int getBacklog() {
273        return backlog;
274    }
275
276    /**
277     * @param backlog
278     *            the backlog to set
279     */
280    public void setBacklog(int backlog) {
281        this.backlog = backlog;
282    }
283
284    /**
285     * @return the useQueueForAccept
286     */
287    public boolean isUseQueueForAccept() {
288        return useQueueForAccept;
289    }
290
291    /**
292     * @param useQueueForAccept
293     *            the useQueueForAccept to set
294     */
295    public void setUseQueueForAccept(boolean useQueueForAccept) {
296        this.useQueueForAccept = useQueueForAccept;
297    }
298
299    /**
300     * pull Sockets from the ServerSocket
301     */
302    @Override
303    public void run() {
304        if (!isStopped() && !isStopping()) {
305            final ServerSocket serverSocket = this.serverSocket;
306            if (serverSocket == null) {
307                onAcceptError(new IOException("Server started without a valid ServerSocket"));
308            }
309
310            final ServerSocketChannel channel = serverSocket.getChannel();
311            if (channel != null) {
312                doRunWithServerSocketChannel(channel);
313            } else {
314                doRunWithServerSocket(serverSocket);
315            }
316        }
317    }
318
319    private void doRunWithServerSocketChannel(final ServerSocketChannel channel) {
320        try {
321            channel.configureBlocking(false);
322            final Selector selector = Selector.open();
323
324            try {
325                channel.register(selector, SelectionKey.OP_ACCEPT);
326            } catch (ClosedChannelException ex) {
327                try {
328                    selector.close();
329                } catch (IOException ignore) {}
330
331                throw ex;
332            }
333
334            // Update object instance for later cleanup.
335            this.selector = selector;
336
337            while (!isStopped()) {
338                int count = selector.select(10);
339
340                if (count == 0) {
341                    continue;
342                }
343
344                Set<SelectionKey> keys = selector.selectedKeys();
345
346                for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
347                    final SelectionKey key = i.next();
348                    if (key.isAcceptable()) {
349                        try {
350                            SocketChannel sc = channel.accept();
351                            if (sc != null) {
352                                if (isStopped() || getAcceptListener() == null) {
353                                    sc.close();
354                                } else {
355                                    if (useQueueForAccept) {
356                                        socketQueue.put(sc.socket());
357                                    } else {
358                                        handleSocket(sc.socket());
359                                    }
360                                }
361                            }
362
363                        } catch (SocketTimeoutException ste) {
364                            // expect this to happen
365                        } catch (Exception e) {
366                            e.printStackTrace();
367                            if (!isStopping()) {
368                                onAcceptError(e);
369                            } else if (!isStopped()) {
370                                LOG.warn("run()", e);
371                                onAcceptError(e);
372                            }
373                        }
374                    }
375                    i.remove();
376                }
377            }
378        } catch (IOException ex) {
379            if (!isStopping()) {
380                onAcceptError(ex);
381            } else if (!isStopped()) {
382                LOG.warn("run()", ex);
383                onAcceptError(ex);
384            }
385        }
386    }
387
388    private void doRunWithServerSocket(final ServerSocket serverSocket) {
389        while (!isStopped()) {
390            Socket socket = null;
391            try {
392                socket = serverSocket.accept();
393                if (socket != null) {
394                    if (isStopped() || getAcceptListener() == null) {
395                        socket.close();
396                    } else {
397                        if (useQueueForAccept) {
398                            socketQueue.put(socket);
399                        } else {
400                            handleSocket(socket);
401                        }
402                    }
403                }
404            } catch (SocketTimeoutException ste) {
405                // expect this to happen
406            } catch (Exception e) {
407                if (!isStopping()) {
408                    onAcceptError(e);
409                } else if (!isStopped()) {
410                    LOG.warn("run()", e);
411                    onAcceptError(e);
412                }
413            }
414        }
415    }
416
417    /**
418     * Allow derived classes to override the Transport implementation that this transport server creates.
419     *
420     * @param socket
421     * @param format
422     *
423     * @return a new Transport instance.
424     *
425     * @throws IOException
426     */
427    protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
428        return new TcpTransport(format, socket);
429    }
430
431    /**
432     * @return pretty print of this
433     */
434    @Override
435    public String toString() {
436        return "" + getBindLocation();
437    }
438
439    /**
440     * @param socket
441     * @param bindAddress
442     * @return real hostName
443     * @throws UnknownHostException
444     */
445    protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
446        String result = null;
447        if (socket.isBound()) {
448            if (socket.getInetAddress().isAnyLocalAddress()) {
449                // make it more human readable and useful, an alternative to 0.0.0.0
450                result = InetAddressUtil.getLocalHostName();
451            } else {
452                result = socket.getInetAddress().getCanonicalHostName();
453            }
454        } else {
455            result = bindAddress.getCanonicalHostName();
456        }
457        return result;
458    }
459
460    @Override
461    protected void doStart() throws Exception {
462        if (useQueueForAccept) {
463            Runnable run = new Runnable() {
464                @Override
465                public void run() {
466                    try {
467                        while (!isStopped() && !isStopping()) {
468                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
469                            if (sock != null) {
470                                try {
471                                    handleSocket(sock);
472                                } catch (Throwable thrown) {
473                                    if (!isStopping()) {
474                                        onAcceptError(new Exception(thrown));
475                                    } else if (!isStopped()) {
476                                        LOG.warn("Unexpected error thrown during accept handling: ", thrown);
477                                        onAcceptError(new Exception(thrown));
478                                    }
479                                }
480                            }
481                        }
482
483                    } catch (InterruptedException e) {
484                        if (!isStopped() || !isStopping()) {
485                            LOG.info("socketQueue interrupted - stopping");
486                            onAcceptError(e);
487                        }
488                    }
489                }
490            };
491            socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
492            socketHandlerThread.setDaemon(true);
493            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
494            socketHandlerThread.start();
495        }
496        super.doStart();
497    }
498
499    @Override
500    protected void doStop(ServiceStopper stopper) throws Exception {
501        Exception firstFailure = null;
502
503        try {
504            if (selector != null) {
505                selector.close();
506                selector = null;
507            }
508        } catch (Exception error) {
509        }
510
511        try {
512            final ServerSocket serverSocket = this.serverSocket;
513            if (serverSocket != null) {
514                this.serverSocket = null;
515                serverSocket.close();
516            }
517        } catch (Exception error) {
518            firstFailure = error;
519        }
520
521        if (socketHandlerThread != null) {
522            socketHandlerThread.interrupt();
523            socketHandlerThread = null;
524        }
525
526        try {
527            super.doStop(stopper);
528        } catch (Exception error) {
529            if (firstFailure != null) {
530                firstFailure = error;
531            }
532        }
533
534        if (firstFailure != null) {
535            throw firstFailure;
536        }
537    }
538
539    @Override
540    public InetSocketAddress getSocketAddress() {
541        return (InetSocketAddress) serverSocket.getLocalSocketAddress();
542    }
543
544    protected void handleSocket(Socket socket) {
545        doHandleSocket(socket);
546    }
547
548    final protected void doHandleSocket(Socket socket) {
549        boolean closeSocket = true;
550        boolean countIncremented = false;
551        try {
552            int currentCount;
553            do {
554                currentCount = currentTransportCount.get();
555                if (currentCount >= this.maximumConnections) {
556                     throw new ExceededMaximumConnectionsException(
557                         "Exceeded the maximum number of allowed client connections. See the '" +
558                         "maximumConnections' property on the TCP transport configuration URI " +
559                         "in the ActiveMQ configuration file (e.g., activemq.xml)");
560                 }
561
562            //Increment this value before configuring the transport
563            //This is necessary because some of the transport servers must read from the
564            //socket during configureTransport() so we want to make sure this value is
565            //accurate as the transport server could pause here waiting for data to be sent from a client
566            } while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1));
567            countIncremented = true;
568
569            HashMap<String, Object> options = new HashMap<String, Object>();
570            options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
571            options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
572            options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
573            options.put("trace", Boolean.valueOf(trace));
574            options.put("soTimeout", Integer.valueOf(soTimeout));
575            options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
576            options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
577            options.put("logWriterName", logWriterName);
578            options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
579            options.put("startLogging", Boolean.valueOf(startLogging));
580            options.putAll(transportOptions);
581
582            TransportInfo transportInfo = configureTransport(this, socket);
583            closeSocket = false;
584
585            if (transportInfo.transport instanceof ServiceSupport) {
586                ((ServiceSupport) transportInfo.transport).addServiceListener(this);
587            }
588
589            Transport configuredTransport = transportInfo.transportFactory.serverConfigure(
590                    transportInfo.transport, transportInfo.format, options);
591
592            getAcceptListener().onAccept(configuredTransport);
593
594        } catch (SocketTimeoutException ste) {
595            // expect this to happen
596        } catch (Exception e) {
597            if (closeSocket) {
598                try {
599                    //if closing the socket, only decrement the count it was actually incremented
600                    //where it was incremented
601                    if (countIncremented) {
602                        currentTransportCount.decrementAndGet();
603                    }
604                    socket.close();
605                } catch (Exception ignore) {
606                }
607            }
608
609            if (!isStopping()) {
610                onAcceptError(e);
611            } else if (!isStopped()) {
612                LOG.warn("run()", e);
613                onAcceptError(e);
614            }
615        }
616    }
617
618    protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
619        WireFormat format = wireFormatFactory.createWireFormat();
620        Transport transport = createTransport(socket, format);
621        return new TransportInfo(format, transport, transportFactory);
622    }
623
624    protected class TransportInfo {
625        final WireFormat format;
626        final Transport transport;
627        final TransportFactory transportFactory;
628
629        public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) {
630            this.format = format;
631            this.transport = transport;
632            this.transportFactory = transportFactory;
633        }
634    }
635
636    public int getSoTimeout() {
637        return soTimeout;
638    }
639
640    public void setSoTimeout(int soTimeout) {
641        this.soTimeout = soTimeout;
642    }
643
644    public int getSocketBufferSize() {
645        return socketBufferSize;
646    }
647
648    public void setSocketBufferSize(int socketBufferSize) {
649        this.socketBufferSize = socketBufferSize;
650    }
651
652    public int getConnectionTimeout() {
653        return connectionTimeout;
654    }
655
656    public void setConnectionTimeout(int connectionTimeout) {
657        this.connectionTimeout = connectionTimeout;
658    }
659
660    /**
661     * @return the maximumConnections
662     */
663    public int getMaximumConnections() {
664        return maximumConnections;
665    }
666
667    /**
668     * @param maximumConnections
669     *            the maximumConnections to set
670     */
671    public void setMaximumConnections(int maximumConnections) {
672        this.maximumConnections = maximumConnections;
673    }
674
675    public AtomicInteger getCurrentTransportCount() {
676        return currentTransportCount;
677    }
678
679    @Override
680    public void started(Service service) {
681    }
682
683    @Override
684    public void stopped(Service service) {
685        this.currentTransportCount.decrementAndGet();
686    }
687
688    @Override
689    public boolean isSslServer() {
690        return false;
691    }
692
693    @Override
694    public boolean isAllowLinkStealing() {
695        return allowLinkStealing;
696    }
697
698    @Override
699    public void setAllowLinkStealing(boolean allowLinkStealing) {
700        this.allowLinkStealing = allowLinkStealing;
701    }
702}