001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.nio; 018 019import java.io.IOException; 020import java.nio.channels.SelectionKey; 021import java.nio.channels.Selector; 022import java.util.Iterator; 023import java.util.Set; 024import java.util.concurrent.ConcurrentLinkedQueue; 025import java.util.concurrent.atomic.AtomicInteger; 026 027public class SelectorWorker implements Runnable { 028 029 private static final AtomicInteger NEXT_ID = new AtomicInteger(); 030 031 final SelectorManager manager; 032 final Selector selector; 033 final int id = NEXT_ID.getAndIncrement(); 034 private final int maxChannelsPerWorker; 035 036 final AtomicInteger retainCounter = new AtomicInteger(1); 037 private final ConcurrentLinkedQueue<Runnable> ioTasks = new ConcurrentLinkedQueue<Runnable>(); 038 039 public SelectorWorker(SelectorManager manager) throws IOException { 040 this.manager = manager; 041 selector = Selector.open(); 042 maxChannelsPerWorker = manager.getMaxChannelsPerWorker(); 043 manager.getSelectorExecutor().execute(this); 044 } 045 046 void retain() { 047 if (retainCounter.incrementAndGet() == maxChannelsPerWorker) { 048 manager.onWorkerFullEvent(this); 049 } 050 } 051 052 void release() { 053 int use = retainCounter.decrementAndGet(); 054 if (use == 0) { 055 manager.onWorkerEmptyEvent(this); 056 } else if (use == maxChannelsPerWorker - 1) { 057 manager.onWorkerNotFullEvent(this); 058 } 059 } 060 061 boolean isReleased() { 062 return retainCounter.get() == 0; 063 } 064 065 public void addIoTask(Runnable work) { 066 ioTasks.add(work); 067 selector.wakeup(); 068 } 069 070 private void processIoTasks() { 071 Runnable task; 072 while ((task = ioTasks.poll()) != null) { 073 try { 074 task.run(); 075 } catch (Throwable e) { 076 e.printStackTrace(); 077 } 078 } 079 } 080 081 @Override 082 public void run() { 083 084 String origName = Thread.currentThread().getName(); 085 try { 086 Thread.currentThread().setName("Selector Worker: " + id); 087 while (!isReleased()) { 088 089 processIoTasks(); 090 091 int count = selector.select(10); 092 093 if (count == 0) { 094 continue; 095 } 096 097 // Get a java.util.Set containing the SelectionKey objects 098 // for all channels that are ready for I/O. 099 Set<SelectionKey> keys = selector.selectedKeys(); 100 101 for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext();) { 102 final SelectionKey key = i.next(); 103 i.remove(); 104 105 final SelectorSelection s = (SelectorSelection) key.attachment(); 106 try { 107 if (key.isValid()) { 108 key.interestOps(0); 109 } 110 111 // Kick off another thread to find newly selected keys 112 // while we process the 113 // currently selected keys 114 manager.getChannelExecutor().execute(new Runnable() { 115 @Override 116 public void run() { 117 try { 118 s.onSelect(); 119 s.enable(); 120 } catch (Throwable e) { 121 s.onError(e); 122 } 123 } 124 }); 125 126 } catch (Throwable e) { 127 s.onError(e); 128 } 129 } 130 } 131 } catch (Throwable e) { 132 e.printStackTrace(); 133 // Notify all the selections that the error occurred. 134 Set<SelectionKey> keys = selector.keys(); 135 for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext();) { 136 SelectionKey key = i.next(); 137 SelectorSelection s = (SelectorSelection) key.attachment(); 138 s.onError(e); 139 } 140 } finally { 141 try { 142 manager.onWorkerEmptyEvent(this); 143 selector.close(); 144 } catch (IOException ignore) { 145 ignore.printStackTrace(); 146 } 147 Thread.currentThread().setName(origName); 148 } 149 } 150}