1:
37:
38:
39: package ;
40:
41: import ;
42: import ;
43: import ;
44: import ;
45: import ;
46: import ;
47: import ;
48: import ;
49: import ;
50: import ;
51: import ;
52: import ;
53: import ;
54:
55: public class SelectorImpl extends AbstractSelector
56: {
57: private Set<SelectionKey> keys;
58: private Set<SelectionKey> selected;
59:
60:
64: private Object selectThreadMutex = new Object ();
65:
66:
69: private Thread selectThread;
70:
71:
80: private boolean unhandledWakeup;
81:
82: public SelectorImpl (SelectorProvider provider)
83: {
84: super (provider);
85:
86: keys = new HashSet<SelectionKey> ();
87: selected = new HashSet<SelectionKey> ();
88: }
89:
90: protected void finalize() throws Throwable
91: {
92: close();
93: }
94:
95: protected final void implCloseSelector()
96: throws IOException
97: {
98:
99: wakeup();
100:
101: synchronized (keys)
102: {
103: synchronized (selected)
104: {
105: synchronized (cancelledKeys ())
106: {
107:
108: }
109: }
110: }
111: }
112:
113: public final Set<SelectionKey> keys()
114: {
115: if (!isOpen())
116: throw new ClosedSelectorException();
117:
118: return Collections.unmodifiableSet (keys);
119: }
120:
121: public final int selectNow()
122: throws IOException
123: {
124:
125:
126: return select (1);
127: }
128:
129: public final int select()
130: throws IOException
131: {
132: return select (0);
133: }
134:
135: private final int[] getFDsAsArray (int ops)
136: {
137: int[] result;
138: int counter = 0;
139: Iterator<SelectionKey> it = keys.iterator ();
140:
141:
142: while (it.hasNext ())
143: {
144: SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
145:
146: if ((key.interestOps () & ops) != 0)
147: {
148: counter++;
149: }
150: }
151:
152: result = new int[counter];
153:
154: counter = 0;
155: it = keys.iterator ();
156:
157:
158: while (it.hasNext ())
159: {
160: SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
161:
162: if ((key.interestOps () & ops) != 0)
163: {
164: result[counter] = key.getNativeFD();
165: counter++;
166: }
167: }
168:
169: return result;
170: }
171:
172: public synchronized int select (long timeout)
173: throws IOException
174: {
175: if (!isOpen())
176: throw new ClosedSelectorException();
177:
178: synchronized (keys)
179: {
180: synchronized (selected)
181: {
182: deregisterCancelledKeys();
183:
184:
185: int[] read = getFDsAsArray (SelectionKey.OP_READ
186: | SelectionKey.OP_ACCEPT);
187: int[] write = getFDsAsArray (SelectionKey.OP_WRITE
188: | SelectionKey.OP_CONNECT);
189:
190:
191: int[] except = new int [0];
192:
193:
194:
195:
196:
197:
198:
199:
200:
201:
202:
203:
204:
205:
206:
207:
208:
209: synchronized (selectThreadMutex)
210: {
211: if (unhandledWakeup)
212: {
213: unhandledWakeup = false;
214: return 0;
215: }
216: else
217: {
218: selectThread = Thread.currentThread ();
219: }
220: }
221:
222:
223: int result = 0;
224: try
225: {
226: begin();
227: result = VMSelector.select (read, write, except, timeout);
228: }
229: finally
230: {
231: end();
232: }
233:
234:
235:
236:
237:
238:
239:
240:
241:
242:
243:
244:
245:
246: synchronized (selectThreadMutex)
247: {
248: if (unhandledWakeup)
249: {
250: unhandledWakeup = false;
251: Thread.interrupted ();
252: }
253: selectThread = null;
254: }
255:
256: Iterator<SelectionKey> it = keys.iterator ();
257:
258: while (it.hasNext ())
259: {
260: int ops = 0;
261: SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
262:
263:
264: if (selected.contains (key))
265: {
266: ops = key.readyOps ();
267: }
268:
269:
270: for (int i = 0; i < read.length; i++)
271: {
272: if (key.getNativeFD() == read[i])
273: {
274: if (key.channel () instanceof ServerSocketChannelImpl)
275: {
276: ops = ops | SelectionKey.OP_ACCEPT;
277: }
278: else
279: {
280: ops = ops | SelectionKey.OP_READ;
281: }
282: }
283: }
284:
285:
286: for (int i = 0; i < write.length; i++)
287: {
288: if (key.getNativeFD() == write[i])
289: {
290: if (key.channel() instanceof SocketChannel)
291: {
292: if (((SocketChannel) key.channel ()).isConnected ())
293: ops = ops | SelectionKey.OP_WRITE;
294: else
295: ops = ops | SelectionKey.OP_CONNECT;
296: }
297: else
298: ops = ops | SelectionKey.OP_WRITE;
299: }
300: }
301:
302:
303:
304:
305: if (!selected.contains (key))
306: {
307: selected.add (key);
308: }
309:
310:
311: key.readyOps (key.interestOps () & ops);
312: }
313: deregisterCancelledKeys();
314:
315: return result;
316: }
317: }
318: }
319:
320: public final Set<SelectionKey> selectedKeys()
321: {
322: if (!isOpen())
323: throw new ClosedSelectorException();
324:
325: return selected;
326: }
327:
328: public final Selector wakeup()
329: {
330:
331:
332:
333:
334:
335:
336:
337:
338: synchronized (selectThreadMutex)
339: {
340: unhandledWakeup = true;
341:
342:
343:
344: if (selectThread != null)
345: selectThread.interrupt ();
346: }
347:
348: return this;
349: }
350:
351: private final void deregisterCancelledKeys()
352: {
353: Set<SelectionKey> ckeys = cancelledKeys ();
354: synchronized (ckeys)
355: {
356: Iterator<SelectionKey> it = ckeys.iterator();
357:
358: while (it.hasNext ())
359: {
360: keys.remove ((SelectionKeyImpl) it.next ());
361: it.remove ();
362: }
363: }
364: }
365:
366: protected SelectionKey register (SelectableChannel ch, int ops, Object att)
367: {
368: return register ((AbstractSelectableChannel) ch, ops, att);
369: }
370:
371: protected final SelectionKey register (AbstractSelectableChannel ch, int ops,
372: Object att)
373: {
374: SelectionKeyImpl result;
375:
376: if (ch instanceof SocketChannelImpl)
377: result = new SocketChannelSelectionKey (ch, this);
378: else if (ch instanceof DatagramChannelImpl)
379: result = new DatagramChannelSelectionKey (ch, this);
380: else if (ch instanceof ServerSocketChannelImpl)
381: result = new ServerSocketChannelSelectionKey (ch, this);
382: else if (ch instanceof gnu.java.nio.SocketChannelImpl)
383: result = new gnu.java.nio.SocketChannelSelectionKeyImpl((gnu.java.nio.SocketChannelImpl)ch, this);
384: else
385: throw new InternalError ("No known channel type");
386:
387: synchronized (keys)
388: {
389: keys.add (result);
390:
391: result.interestOps (ops);
392: result.attach (att);
393: }
394:
395: return result;
396: }
397: }