1:
37:
38:
39: package ;
40:
41: import ;
42:
43: import ;
44: import ;
45: import ;
46: import ;
47: import ;
48: import ;
49: import ;
50: import ;
51: import ;
52: import ;
53: import ;
54: import ;
55: import ;
56:
57:
63: public class EpollSelectorImpl extends AbstractSelector
64: {
65:
66: private static final int DEFAULT_EPOLL_SIZE = 128;
67: private static final int sizeof_struct_epoll_event;
68:
69: private static final int OP_ACCEPT = SelectionKey.OP_ACCEPT;
70: private static final int OP_CONNECT = SelectionKey.OP_CONNECT;
71: private static final int OP_READ = SelectionKey.OP_READ;
72: private static final int OP_WRITE = SelectionKey.OP_WRITE;
73:
74:
75: private int epoll_fd;
76:
77: private final HashMap keys;
78: private Set selectedKeys;
79: private Thread waitingThread;
80: private ByteBuffer events;
81:
82: private static final int INITIAL_CAPACITY;
83: private static final int MAX_DOUBLING_CAPACITY;
84: private static final int CAPACITY_INCREMENT;
85:
86: static
87: {
88: if (Configuration.INIT_LOAD_LIBRARY)
89: System.loadLibrary("javanio");
90:
91: if (epoll_supported())
92: sizeof_struct_epoll_event = sizeof_struct();
93: else
94: sizeof_struct_epoll_event = -1;
95:
96: INITIAL_CAPACITY = 64 * sizeof_struct_epoll_event;
97: MAX_DOUBLING_CAPACITY = 1024 * sizeof_struct_epoll_event;
98: CAPACITY_INCREMENT = 128 * sizeof_struct_epoll_event;
99: }
100:
101: public EpollSelectorImpl(SelectorProvider provider)
102: throws IOException
103: {
104: super(provider);
105: epoll_fd = epoll_create(DEFAULT_EPOLL_SIZE);
106: keys = new HashMap();
107: selectedKeys = null;
108: events = ByteBuffer.allocateDirect(INITIAL_CAPACITY);
109: }
110:
111:
114: public Set keys()
115: {
116: return new HashSet(keys.values());
117: }
118:
119:
122: public int select() throws IOException
123: {
124: return doSelect(-1);
125: }
126:
127:
130: public int select(long timeout) throws IOException
131: {
132: if (timeout > Integer.MAX_VALUE)
133: throw new IllegalArgumentException("timeout is too large");
134: if (timeout < 0)
135: throw new IllegalArgumentException("invalid timeout");
136: return doSelect((int) timeout);
137: }
138:
139: private int doSelect(int timeout) throws IOException
140: {
141: synchronized (keys)
142: {
143: Set cancelledKeys = cancelledKeys();
144: synchronized (cancelledKeys)
145: {
146: for (Iterator it = cancelledKeys.iterator(); it.hasNext(); )
147: {
148: EpollSelectionKeyImpl key = (EpollSelectionKeyImpl) it.next();
149: epoll_delete(epoll_fd, key.fd);
150: key.valid = false;
151: keys.remove(Integer.valueOf(key.fd));
152: it.remove();
153: deregister(key);
154: }
155:
156:
157:
158: for (Iterator it = keys.values().iterator(); it.hasNext(); )
159: {
160: EpollSelectionKeyImpl key = (EpollSelectionKeyImpl) it.next();
161: SelectableChannel ch = key.channel();
162: if (ch instanceof VMChannelOwner)
163: {
164: if (!((VMChannelOwner) ch).getVMChannel().getState().isValid())
165: it.remove();
166: }
167: }
168:
169:
170: if (keys.isEmpty())
171: return 0;
172:
173: int ret;
174: try
175: {
176: begin();
177: waitingThread = Thread.currentThread();
178: ret = epoll_wait(epoll_fd, events, keys.size(), timeout);
179: }
180: finally
181: {
182: Thread.interrupted();
183: waitingThread = null;
184: end();
185: }
186:
187: HashSet s = new HashSet(ret);
188: for (int i = 0; i < ret; i++)
189: {
190: events.position(i * sizeof_struct_epoll_event);
191: ByteBuffer b = events.slice();
192: int fd = selected_fd(b);
193: EpollSelectionKeyImpl key
194: = (EpollSelectionKeyImpl) keys.get(Integer.valueOf(fd));
195: if (key == null)
196: throw new IOException("fd was selected, but no key found");
197: key.selectedOps = selected_ops(b) & key.interestOps;
198: s.add(key);
199: }
200:
201: reallocateBuffer();
202:
203: selectedKeys = s;
204: return ret;
205: }
206: }
207: }
208:
209:
212: public Set selectedKeys()
213: {
214: if (selectedKeys == null)
215: return Collections.EMPTY_SET;
216: return selectedKeys;
217: }
218:
219:
222: public int selectNow() throws IOException
223: {
224: return doSelect(0);
225: }
226:
227:
230: public Selector wakeup()
231: {
232: try
233: {
234: waitingThread.interrupt();
235: }
236: catch (NullPointerException npe)
237: {
238:
239: }
240: return this;
241: }
242:
243:
246: protected void implCloseSelector() throws IOException
247: {
248: VMChannel.close(epoll_fd);
249: }
250:
251:
254: protected SelectionKey register(AbstractSelectableChannel ch, int ops, Object att)
255: {
256: if (!(ch instanceof VMChannelOwner))
257: throw new IllegalArgumentException("unsupported channel type");
258:
259: VMChannel channel = ((VMChannelOwner) ch).getVMChannel();
260: try
261: {
262: int native_fd = channel.getState().getNativeFD();
263: synchronized (keys)
264: {
265: if (keys.containsKey(Integer.valueOf(native_fd)))
266: throw new IllegalArgumentException("channel already registered");
267: EpollSelectionKeyImpl result =
268: new EpollSelectionKeyImpl(this, ch, native_fd);
269: if ((ops & ~(ch.validOps())) != 0)
270: throw new IllegalArgumentException("invalid ops for channel");
271: result.interestOps = ops;
272: result.selectedOps = 0;
273: result.valid = true;
274: result.attach(att);
275: result.key = System.identityHashCode(result);
276: epoll_add(epoll_fd, result.fd, ops);
277: keys.put(Integer.valueOf(native_fd), result);
278: reallocateBuffer();
279: return result;
280: }
281: }
282: catch (IOException ioe)
283: {
284: throw new IllegalArgumentException(ioe);
285: }
286: }
287:
288: private void reallocateBuffer()
289: {
290:
291:
292: if (events.capacity() < keys.size() * sizeof_struct_epoll_event)
293: {
294: int cap = events.capacity();
295: if (cap < MAX_DOUBLING_CAPACITY)
296: cap <<= 1;
297: else
298: cap += CAPACITY_INCREMENT;
299: events = ByteBuffer.allocateDirect(cap);
300: }
301:
302:
303: else if (events.capacity() > keys.size() * sizeof_struct_epoll_event * 2 + 1
304: && events.capacity() > INITIAL_CAPACITY)
305: {
306: int cap = events.capacity() >>> 1;
307: events = ByteBuffer.allocateDirect(cap);
308: }
309: }
310:
311: void epoll_modify(EpollSelectionKeyImpl key, int ops) throws IOException
312: {
313: epoll_modify(epoll_fd, key.fd, ops);
314: }
315:
316:
321: public static native boolean epoll_supported();
322:
323:
324:
329: private static native int sizeof_struct();
330:
331:
332:
339: private static native int epoll_create(int size) throws IOException;
340:
341:
348: private static native void epoll_add(int efd, int fd, int ops)
349: throws IOException;
350:
351:
359: private static native void epoll_modify(int efd, int fd, int ops)
360: throws IOException;
361:
362:
369: private static native void epoll_delete(int efd, int fd) throws IOException;
370:
371:
381: private static native int epoll_wait(int efd, ByteBuffer state, int n, int timeout)
382: throws IOException;
383:
384:
390: private static native int selected_fd(ByteBuffer struct);
391:
392:
398: private static native int selected_ops(ByteBuffer struct);
399: }