1:
37:
38:
39: package ;
40:
41:
42: import ;
43: import ;
44: import ;
45: import ;
46: import ;
47: import ;
48: import ;
49: import ;
50: import ;
51: import ;
52: import ;
53: import ;
54: import ;
55: import ;
56: import ;
57:
58:
64: public class KqueueSelectorImpl extends AbstractSelector
65: {
66:
67:
68: private static final int _sizeof_struct_kevent;
69:
70: private static final int MAX_DOUBLING_CAPACITY = 16384;
71: private static final int CAP_INCREMENT = 1024;
72: private static final int INITIAL_CAPACITY;
73:
74: static
75: {
76: try
77: {
78: System.loadLibrary("javanio");
79: }
80: catch (Exception x)
81: {
82: x.printStackTrace();
83: }
84:
85: if (kqueue_supported ())
86: _sizeof_struct_kevent = sizeof_struct_kevent();
87: else
88: _sizeof_struct_kevent = -1;
89: INITIAL_CAPACITY = 16 * _sizeof_struct_kevent;
90: }
91:
92:
98: public static native boolean kqueue_supported();
99:
100:
101: private int kq;
102:
103: private HashMap keys;
104: private HashSet selected;
105: private Thread blockedThread;
106: private ByteBuffer events;
107:
108: private static final int OP_ACCEPT = SelectionKey.OP_ACCEPT;
109: private static final int OP_CONNECT = SelectionKey.OP_CONNECT;
110: private static final int OP_READ = SelectionKey.OP_READ;
111: private static final int OP_WRITE = SelectionKey.OP_WRITE;
112:
113: public KqueueSelectorImpl(SelectorProvider provider) throws IOException
114: {
115: super(provider);
116: kq = implOpen();
117: keys = new HashMap();
118: events = ByteBuffer.allocateDirect(INITIAL_CAPACITY);
119: }
120:
121: protected void implCloseSelector() throws IOException
122: {
123: implClose(kq);
124: kq = -1;
125: }
126:
127:
130: public Set keys()
131: {
132: if (!isOpen())
133: throw new ClosedSelectorException();
134:
135: return new HashSet(keys.values());
136: }
137:
138:
141: public int select() throws IOException
142: {
143: return doSelect(-1);
144: }
145:
146:
149: public int select(long timeout) throws IOException
150: {
151: if (timeout == 0)
152: timeout = -1;
153: return doSelect(timeout);
154: }
155:
156:
159: public Set selectedKeys()
160: {
161: if (!isOpen())
162: throw new ClosedSelectorException();
163:
164: return selected;
165: }
166:
167:
170: public int selectNow() throws IOException
171: {
172: return doSelect(0);
173: }
174:
175:
178: public Selector wakeup()
179: {
180: if (blockedThread != null)
181: blockedThread.interrupt();
182: return this;
183: }
184:
185: public String toString()
186: {
187: return super.toString() + " [ fd: " + kq + " ]";
188: }
189:
190: public boolean equals(Object o)
191: {
192: if (!(o instanceof KqueueSelectorImpl))
193: return false;
194:
195: return ((KqueueSelectorImpl) o).kq == kq;
196: }
197:
198: int doSelect(long timeout) throws IOException
199: {
200: Set cancelled = cancelledKeys();
201: synchronized (cancelled)
202: {
203: synchronized (keys)
204: {
205: for (Iterator it = cancelled.iterator(); it.hasNext(); )
206: {
207: KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next();
208: key.interestOps = 0;
209: }
210:
211: int events_size = (2 * _sizeof_struct_kevent) * keys.size();
212: int num_events = 0;
213:
214: for (Iterator it = keys.entrySet().iterator(); it.hasNext(); )
215: {
216: Map.Entry e = (Map.Entry) it.next();
217: KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) e.getValue();
218:
219: SelectableChannel ch = key.channel();
220: if (ch instanceof VMChannelOwner)
221: {
222: if (!((VMChannelOwner) ch).getVMChannel().getState().isValid())
223: {
224:
225: it.remove();
226: continue;
227: }
228: }
229:
230:
231: if (key.needCommitRead())
232: {
233: kevent_set(events, num_events, key.fd,
234: key.interestOps & (OP_READ | OP_ACCEPT),
235: key.activeOps & (OP_READ | OP_ACCEPT), key.key);
236: num_events++;
237: }
238:
239:
240: if (key.needCommitWrite())
241: {
242: kevent_set(events, num_events, key.fd,
243: key.interestOps & (OP_WRITE | OP_CONNECT),
244: key.activeOps & (OP_WRITE | OP_CONNECT), key.key);
245: num_events++;
246: }
247: }
248: events.rewind().limit(events.capacity());
249:
250:
251:
252:
253: int n = 0;
254: try
255: {
256:
257: begin();
258: blockedThread = Thread.currentThread();
259: if (blockedThread.isInterrupted())
260: timeout = 0;
261: n = kevent(kq, events, num_events,
262: events.capacity() / _sizeof_struct_kevent, timeout);
263: }
264: finally
265: {
266: end();
267: blockedThread = null;
268: Thread.interrupted();
269:
270: }
271:
272:
273:
274:
275:
276: for (Iterator it = keys.values().iterator(); it.hasNext(); )
277: {
278: KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next();
279: key.activeOps = key.interestOps;
280: }
281:
282: selected = new HashSet(n);
283: int x = 0;
284: for (int i = 0; i < n; i++)
285: {
286: events.position(x).limit(x + _sizeof_struct_kevent);
287: x += _sizeof_struct_kevent;
288: int y = fetch_key(events.slice());
289: KqueueSelectionKeyImpl key =
290: (KqueueSelectionKeyImpl) keys.get(new Integer(y));
291:
292: if (key == null)
293: {
294: System.out.println("WARNING! no key found for selected key " + y);
295: continue;
296: }
297:
298:
299: if (!key.isValid())
300: continue;
301: key.readyOps = ready_ops(events.slice(), key.interestOps);
302: selected.add(key);
303: }
304:
305:
306: for (Iterator it = cancelled.iterator(); it.hasNext(); )
307: {
308: KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next();
309: keys.remove(new Integer(key.key));
310: deregister(key);
311: it.remove();
312: }
313:
314: reallocateBuffer();
315:
316: return selected.size();
317: }
318: }
319: }
320:
321: protected SelectionKey register(AbstractSelectableChannel channel,
322: int interestOps,
323: Object attachment)
324: {
325: int native_fd = -1;
326: try
327: {
328: if (channel instanceof VMChannelOwner)
329: native_fd = ((VMChannelOwner) channel).getVMChannel()
330: .getState().getNativeFD();
331: else
332: throw new IllegalArgumentException("cannot handle channel type " +
333: channel.getClass().getName());
334: }
335: catch (IOException ioe)
336: {
337: throw new IllegalArgumentException("channel is closed or invalid");
338: }
339:
340: KqueueSelectionKeyImpl result = new KqueueSelectionKeyImpl(this, channel);
341: result.interestOps = interestOps;
342: result.attach(attachment);
343: result.fd = native_fd;
344: result.key = System.identityHashCode(result);
345: synchronized (keys)
346: {
347: while (keys.containsKey(new Integer(result.key)))
348: result.key++;
349: keys.put(new Integer(result.key), result);
350: reallocateBuffer();
351: }
352: return result;
353: }
354:
355: void setInterestOps(KqueueSelectionKeyImpl key, int ops)
356: {
357: synchronized (keys)
358: {
359: key.interestOps = ops;
360: }
361: }
362:
363:
372: private void reallocateBuffer()
373: {
374: synchronized (keys)
375: {
376: if (events.capacity() < (2 * _sizeof_struct_kevent) * keys.size())
377: {
378: int cap = events.capacity();
379: if (cap >= MAX_DOUBLING_CAPACITY)
380: cap += CAP_INCREMENT;
381: else
382: cap = cap << 1;
383:
384: events = ByteBuffer.allocateDirect(cap);
385: }
386: else if (events.capacity() > 4 * (_sizeof_struct_kevent) * keys.size() + 1
387: && events.capacity() > INITIAL_CAPACITY)
388: {
389: int cap = events.capacity();
390: cap = cap >>> 1;
391: events = ByteBuffer.allocateDirect(cap);
392: }
393: }
394: }
395:
396:
397:
398:
399:
400:
401:
415:
416: private void dump_selection_keys(ByteBuffer keys)
417: {
418:
419:
420:
421: int i = 0;
422: keys.order(ByteOrder.nativeOrder());
423: while (keys.hasRemaining())
424: {
425: System.out.println("struct kevent { ident: "
426: + Integer.toString(keys.getInt())
427: + " filter: "
428: + Integer.toHexString(keys.getShort() & 0xFFFF)
429: + " flags: "
430: + Integer.toHexString(keys.getShort() & 0xFFFF)
431: + " fflags: "
432: + Integer.toHexString(keys.getInt())
433: + " data: "
434: + Integer.toHexString(keys.getInt())
435: + " udata: "
436: + Integer.toHexString(keys.getInt())
437: + " }");
438: }
439: }
440:
441:
446: private static native int sizeof_struct_kevent();
447:
448:
454: private static native int implOpen() throws IOException;
455:
456:
462: private static native void implClose(int kq) throws IOException;
463:
464:
475: private static native void kevent_set(ByteBuffer nstate, int i, int fd,
476: int interestOps, int activeOps, int key);
477:
478:
491: private static native int kevent(int kq, ByteBuffer events, int nevents,
492: int nout, long timeout);
493:
494:
505: private static native int fetch_key(ByteBuffer nstate);
506:
507:
518: private static native int ready_ops(ByteBuffer nstate, int interestOps);
519:
520:
526: private static native boolean check_eof(ByteBuffer nstate);
527: }