001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt;
018
019import java.io.UnsupportedEncodingException;
020
021import org.fusesource.mqtt.codec.CONNECT;
022import org.fusesource.mqtt.codec.DISCONNECT;
023import org.fusesource.mqtt.codec.PINGREQ;
024import org.fusesource.mqtt.codec.PUBACK;
025import org.fusesource.mqtt.codec.PUBCOMP;
026import org.fusesource.mqtt.codec.PUBLISH;
027import org.fusesource.mqtt.codec.PUBREC;
028import org.fusesource.mqtt.codec.PUBREL;
029import org.fusesource.mqtt.codec.SUBSCRIBE;
030import org.fusesource.mqtt.codec.UNSUBSCRIBE;
031
032/**
033 * A set of static methods useful for handling MQTT based client connections.
034 */
035public class MQTTProtocolSupport {
036
037    private static final int TOPIC_NAME_MIN_LENGTH = 1;
038    private static final int TOPIC_NAME_MAX_LENGTH = 65535;
039
040    private static final String MULTI_LEVEL_WILDCARD = "#";
041    private static final String SINGLE_LEVEL_WILDCARD = "+";
042
043    private static final char MULTI_LEVEL_WILDCARD_CHAR = '#';
044    private static final char SINGLE_LEVEL_WILDCARD_CHAR = '+';
045    private static final char TOPIC_LEVEL_SEPERATOR_CHAR = '/';
046
047    /**
048     * Converts an MQTT formatted Topic name into a suitable ActiveMQ Destination
049     * name string.
050     *
051     * @param name
052     *        the MQTT formatted topic name.
053     *
054     * @return an destination name that fits the ActiveMQ conventions.
055     */
056    public static String convertMQTTToActiveMQ(String name) {
057        char[] chars = name.toCharArray();
058        for (int i = 0; i < chars.length; i++) {
059            switch(chars[i]) {
060                case '#':
061                    chars[i] = '>';
062                    break;
063                case '>':
064                    chars[i] = '#';
065                    break;
066                case '+':
067                    chars[i] = '*';
068                    break;
069                case '*':
070                    chars[i] = '+';
071                    break;
072                case '/':
073                    chars[i] = '.';
074                    break;
075                case '.':
076                    chars[i] = '/';
077                    break;
078            }
079        }
080        String rc = new String(chars);
081        return rc;
082    }
083
084    /**
085     * Converts an ActiveMQ destination name into a correctly formatted
086     * MQTT destination name.
087     *
088     * @param destinationName
089     *        the ActiveMQ destination name to process.
090     *
091     * @return a destination name formatted for MQTT.
092     */
093    public static String convertActiveMQToMQTT(String destinationName) {
094        char[] chars = destinationName.toCharArray();
095        for (int i = 0; i < chars.length; i++) {
096            switch(chars[i]) {
097                case '>':
098                    chars[i] = '#';
099                    break;
100                case '#':
101                    chars[i] = '>';
102                    break;
103                case '*':
104                    chars[i] = '+';
105                    break;
106                case '+':
107                    chars[i] = '*';
108                    break;
109                case '.':
110                    chars[i] = '/';
111                    break;
112                case '/':
113                    chars[i] = '.';
114                    break;
115            }
116        }
117        String rc = new String(chars);
118        return rc;
119    }
120
121    /**
122     * Given an MQTT header byte, determine the command type that the header
123     * represents.
124     *
125     * @param header
126     *        the byte value for the MQTT frame header.
127     *
128     * @return a string value for the given command type.
129     */
130    public static String commandType(byte header) {
131        byte messageType = (byte) ((header & 0xF0) >>> 4);
132        switch (messageType) {
133            case PINGREQ.TYPE:
134                return "PINGREQ";
135            case CONNECT.TYPE:
136                return "CONNECT";
137            case DISCONNECT.TYPE:
138                return "DISCONNECT";
139            case SUBSCRIBE.TYPE:
140                return "SUBSCRIBE";
141            case UNSUBSCRIBE.TYPE:
142                return "UNSUBSCRIBE";
143            case PUBLISH.TYPE:
144                return "PUBLISH";
145            case PUBACK.TYPE:
146                return "PUBACK";
147            case PUBREC.TYPE:
148                return "PUBREC";
149            case PUBREL.TYPE:
150                return "PUBREL";
151            case PUBCOMP.TYPE:
152                return "PUBCOMP";
153            default:
154                return "UNKNOWN";
155        }
156    }
157
158    /**
159     * Validate that the Topic names given by client commands are valid
160     * based on the MQTT protocol specification.
161     *
162     * @param topicName
163     *      the given Topic name provided by the client.
164     *
165     * @throws MQTTProtocolException if the value given is invalid.
166     */
167    public static void validate(String topicName) throws MQTTProtocolException {
168        int topicLen = 0;
169        try {
170            topicLen = topicName.getBytes("UTF-8").length;
171        } catch (UnsupportedEncodingException e) {
172            throw new MQTTProtocolException("Topic name contained invalid UTF-8 encoding.");
173        }
174
175        // Spec: Unless stated otherwise all UTF-8 encoded strings can have any length in
176        //       the range 0 to 65535 bytes.
177        if (topicLen < TOPIC_NAME_MIN_LENGTH || topicLen > TOPIC_NAME_MAX_LENGTH) {
178            throw new MQTTProtocolException("Topic name given had invliad length.");
179        }
180
181        // 4.7.1.2 and 4.7.1.3 these can stand alone
182        if (MULTI_LEVEL_WILDCARD.equals(topicName) || SINGLE_LEVEL_WILDCARD.equals(topicName)) {
183            return;
184        }
185
186        // Spec: 4.7.1.2
187        //  The multi-level wildcard character MUST be specified either on its own or following a
188        //  topic level separator. In either case it MUST be the last character specified in the
189        //  Topic Filter [MQTT-4.7.1-2].
190        int numWildCards = 0;
191        for (int i = 0; i < topicName.length(); ++i) {
192            if (topicName.charAt(i) == MULTI_LEVEL_WILDCARD_CHAR) {
193                numWildCards++;
194
195                // If prev exists it must be a separator
196                if (i > 0 && topicName.charAt(i - 1) != TOPIC_LEVEL_SEPERATOR_CHAR) {
197                    throw new MQTTProtocolException("The multi level wildcard must stand alone: " + topicName);
198                }
199            }
200
201            if (numWildCards > 1) {
202                throw new MQTTProtocolException("Topic Filter can only have one multi-level filter: " + topicName);
203            }
204        }
205
206        if (topicName.contains(MULTI_LEVEL_WILDCARD) && !topicName.endsWith(MULTI_LEVEL_WILDCARD)) {
207            throw new MQTTProtocolException("The multi-level filter must be at the end of the Topic name: " + topicName);
208        }
209
210        // Spec: 4.7.1.3
211        // The single-level wildcard can be used at any level in the Topic Filter, including
212        // first and last levels. Where it is used it MUST occupy an entire level of the filter
213        //
214        // [MQTT-4.7.1-3]. It can be used at more than one level in the Topic Filter and can be
215        // used in conjunction with the multilevel wildcard.
216        for (int i = 0; i < topicName.length(); ++i) {
217            if (topicName.charAt(i) != SINGLE_LEVEL_WILDCARD_CHAR) {
218                continue;
219            }
220
221            // If prev exists it must be a separator
222            if (i > 0 && topicName.charAt(i - 1) != TOPIC_LEVEL_SEPERATOR_CHAR) {
223                throw new MQTTProtocolException("The single level wildcard must stand alone: " + topicName);
224            }
225
226            // If next exists it must be a separator
227            if (i < topicName.length() - 1 && topicName.charAt(i + 1) != TOPIC_LEVEL_SEPERATOR_CHAR) {
228                throw new MQTTProtocolException("The single level wildcard must stand alone: " + topicName);
229            }
230        }
231    }
232}