Source for gnu.java.nio.KqueueSelectorImpl

   1: /* KqueueSelectorImpl.java -- Selector for systems with kqueue event notification.
   2:    Copyright (C) 2006 Free Software Foundation, Inc.
   3: 
   4: This file is part of GNU Classpath.
   5: 
   6: GNU Classpath is free software; you can redistribute it and/or modify
   7: it under the terms of the GNU General Public License as published by
   8: the Free Software Foundation; either version 2, or (at your option)
   9: any later version.
  10: 
  11: GNU Classpath is distributed in the hope that it will be useful, but
  12: WITHOUT ANY WARRANTY; without even the implied warranty of
  13: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14: General Public License for more details.
  15: 
  16: You should have received a copy of the GNU General Public License
  17: along with GNU Classpath; see the file COPYING.  If not, write to the
  18: Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
  19: 02110-1301 USA.
  20: 
  21: Linking this library statically or dynamically with other modules is
  22: making a combined work based on this library.  Thus, the terms and
  23: conditions of the GNU General Public License cover the whole
  24: combination.
  25: 
  26: As a special exception, the copyright holders of this library give you
  27: permission to link this library with independent modules to produce an
  28: executable, regardless of the license terms of these independent
  29: modules, and to copy and distribute the resulting executable under
  30: terms of your choice, provided that you also meet, for each linked
  31: independent module, the terms and conditions of the license of that
  32: module.  An independent module is a module which is not derived from
  33: or based on this library.  If you modify this library, you may extend
  34: this exception to your version of the library, but you are not
  35: obligated to do so.  If you do not wish to do so, delete this
  36: exception statement from your version. */
  37: 
  38: 
  39: package gnu.java.nio;
  40: 
  41: 
  42: import java.io.IOException;
  43: import java.nio.ByteBuffer;
  44: import java.nio.ByteOrder;
  45: import java.nio.channels.ClosedSelectorException;
  46: import java.nio.channels.SelectableChannel;
  47: import java.nio.channels.SelectionKey;
  48: import java.nio.channels.Selector;
  49: import java.nio.channels.spi.AbstractSelectableChannel;
  50: import java.nio.channels.spi.AbstractSelector;
  51: import java.nio.channels.spi.SelectorProvider;
  52: import java.util.HashMap;
  53: import java.util.HashSet;
  54: import java.util.Iterator;
  55: import java.util.Map;
  56: import java.util.Set;
  57: 
  58: /**
  59:  * A {@link Selector} implementation that uses the <code>kqueue</code>
  60:  * event notification facility.
  61:  *
  62:  * @author Casey Marshall (csm@gnu.org)
  63:  */
  64: public class KqueueSelectorImpl extends AbstractSelector
  65: {
  66:   // Prepended underscore to field name to make it distinct
  67:   // from the method with the similar name.
  68:   private static final int _sizeof_struct_kevent;
  69: 
  70:   private static final int MAX_DOUBLING_CAPACITY = 16384;
  71:   private static final int CAP_INCREMENT = 1024;
  72:   private static final int INITIAL_CAPACITY;
  73: 
  74:   static
  75:   {
  76:     try
  77:       {
  78:         System.loadLibrary("javanio");
  79:       }
  80:     catch (Exception x)
  81:       {
  82:         x.printStackTrace();
  83:       }
  84: 
  85:     if (kqueue_supported ())
  86:       _sizeof_struct_kevent = sizeof_struct_kevent();
  87:     else
  88:       _sizeof_struct_kevent = -1;
  89:     INITIAL_CAPACITY = 16 * _sizeof_struct_kevent;
  90:   }
  91: 
  92:   /**
  93:    * Tell if kqueue-based selectors are supported on this system.
  94:    *
  95:    * @return True if this system has kqueue support, and support for it was
  96:    *  compiled in to Classpath.
  97:    */
  98:   public static native boolean kqueue_supported();
  99: 
 100:   /* Our native file descriptor. */
 101:   private int kq;
 102: 
 103:   private HashMap/*<Integer,KqueueSelectionKeyImpl>*/ keys;
 104:   private HashSet/*<KqueueSelectionKeyImpl>*/ selected;
 105:   private Thread blockedThread;
 106:   private ByteBuffer events;
 107: 
 108:   private static final int OP_ACCEPT  = SelectionKey.OP_ACCEPT;
 109:   private static final int OP_CONNECT = SelectionKey.OP_CONNECT;
 110:   private static final int OP_READ    = SelectionKey.OP_READ;
 111:   private static final int OP_WRITE   = SelectionKey.OP_WRITE;
 112: 
 113:   public KqueueSelectorImpl(SelectorProvider provider) throws IOException
 114:   {
 115:     super(provider);
 116:     kq = implOpen();
 117:     keys = new HashMap/*<KqueueSelectionKeyImpl>*/();
 118:     events = ByteBuffer.allocateDirect(INITIAL_CAPACITY);
 119:   }
 120: 
 121:   protected void implCloseSelector() throws IOException
 122:   {
 123:     implClose(kq);
 124:     kq = -1;
 125:   }
 126: 
 127:   /* (non-Javadoc)
 128:    * @see java.nio.channels.Selector#keys()
 129:    */
 130:   public Set keys()
 131:   {
 132:     if (!isOpen())
 133:       throw new ClosedSelectorException();
 134: 
 135:     return new HashSet(keys.values());
 136:   }
 137: 
 138:   /* (non-Javadoc)
 139:    * @see java.nio.channels.Selector#select()
 140:    */
 141:   public int select() throws IOException
 142:   {
 143:     return doSelect(-1);
 144:   }
 145: 
 146:   /* (non-Javadoc)
 147:    * @see java.nio.channels.Selector#select(long)
 148:    */
 149:   public int select(long timeout) throws IOException
 150:   {
 151:     if (timeout == 0)
 152:       timeout = -1;
 153:     return doSelect(timeout);
 154:   }
 155: 
 156:   /* (non-Javadoc)
 157:    * @see java.nio.channels.Selector#selectedKeys()
 158:    */
 159:   public Set selectedKeys()
 160:   {
 161:     if (!isOpen())
 162:       throw new ClosedSelectorException();
 163: 
 164:     return selected;
 165:   }
 166: 
 167:   /* (non-Javadoc)
 168:    * @see java.nio.channels.Selector#selectNow()
 169:    */
 170:   public int selectNow() throws IOException
 171:   {
 172:     return doSelect(0);
 173:   }
 174: 
 175:   /* (non-Javadoc)
 176:    * @see java.nio.channels.Selector#wakeup()
 177:    */
 178:   public Selector wakeup()
 179:   {
 180:     if (blockedThread != null)
 181:       blockedThread.interrupt();
 182:     return this;
 183:   }
 184: 
 185:   public String toString()
 186:   {
 187:     return super.toString() + " [ fd: " + kq + " ]";
 188:   }
 189: 
 190:   public boolean equals(Object o)
 191:   {
 192:     if (!(o instanceof KqueueSelectorImpl))
 193:       return false;
 194: 
 195:     return ((KqueueSelectorImpl) o).kq == kq;
 196:   }
 197: 
 198:   int doSelect(long timeout) throws IOException
 199:   {
 200:     Set cancelled = cancelledKeys();
 201:     synchronized (cancelled)
 202:     {
 203:       synchronized (keys)
 204:       {
 205:         for (Iterator it = cancelled.iterator(); it.hasNext(); )
 206:           {
 207:             KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next();
 208:             key.interestOps = 0;
 209:           }
 210: 
 211:         int events_size = (2 * _sizeof_struct_kevent) * keys.size();
 212:         int num_events = 0;
 213: 
 214:         for (Iterator it = keys.entrySet().iterator(); it.hasNext(); )
 215:           {
 216:             Map.Entry e = (Map.Entry) it.next();
 217:             KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) e.getValue();
 218: 
 219:             SelectableChannel ch = key.channel();
 220:             if (ch instanceof VMChannelOwner)
 221:               {
 222:                 if (!((VMChannelOwner) ch).getVMChannel().getState().isValid())
 223:                   {
 224:                     // closed channel; removed from kqueue automatically.
 225:                     it.remove();
 226:                     continue;
 227:                   }
 228:               }
 229: 
 230:             // If this key is registering a read filter, add it to the buffer.
 231:             if (key.needCommitRead())
 232:               {
 233:                 kevent_set(events, num_events, key.fd,
 234:                            key.interestOps & (OP_READ | OP_ACCEPT),
 235:                            key.activeOps & (OP_READ | OP_ACCEPT), key.key);
 236:                 num_events++;
 237:               }
 238: 
 239:             // If this key is registering a write filter, add it to the buffer.
 240:             if (key.needCommitWrite())
 241:               {
 242:                 kevent_set(events, num_events, key.fd,
 243:                            key.interestOps & (OP_WRITE | OP_CONNECT),
 244:                            key.activeOps & (OP_WRITE | OP_CONNECT), key.key);
 245:                 num_events++;
 246:               }
 247:           }
 248:         events.rewind().limit(events.capacity());
 249: 
 250:         //System.out.println("dump of keys to select:");
 251:         //dump_selection_keys(events.duplicate());
 252: 
 253:         int n = 0;
 254:         try
 255:           {
 256:             //System.out.println("[" + kq + "] kevent enter selecting from " + keys.size());
 257:             begin();
 258:             blockedThread = Thread.currentThread();
 259:             if (blockedThread.isInterrupted())
 260:               timeout = 0;
 261:             n = kevent(kq, events, num_events,
 262:                        events.capacity() / _sizeof_struct_kevent, timeout);
 263:           }
 264:         finally
 265:           {
 266:             end();
 267:             blockedThread = null;
 268:             Thread.interrupted();
 269:             //System.out.println("[" + kq + "kevent exit selected " + n);
 270:           }
 271: 
 272:         //System.out.println("dump of keys selected:");
 273:         //dump_selection_keys((ByteBuffer) events.duplicate().limit(n * _sizeof_struct_kevent));
 274: 
 275:         // Commit the operations we've just added in the call to kevent.
 276:         for (Iterator it = keys.values().iterator(); it.hasNext(); )
 277:           {
 278:             KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next();
 279:             key.activeOps = key.interestOps;
 280:           }
 281: 
 282:         selected = new HashSet/*<KqueueSelectionKeyImpl>*/(n);
 283:         int x = 0;
 284:         for (int i = 0; i < n; i++)
 285:           {
 286:             events.position(x).limit(x + _sizeof_struct_kevent);
 287:             x += _sizeof_struct_kevent;
 288:             int y = fetch_key(events.slice());
 289:             KqueueSelectionKeyImpl key =
 290:               (KqueueSelectionKeyImpl) keys.get(new Integer(y));
 291: 
 292:             if (key == null)
 293:               {
 294:                 System.out.println("WARNING! no key found for selected key " + y);
 295:                 continue;
 296:               }
 297:             // Keys that have been cancelled may be returned here; don't
 298:             // add them to the selected set.
 299:             if (!key.isValid())
 300:               continue;
 301:             key.readyOps = ready_ops(events.slice(), key.interestOps);
 302:             selected.add(key);
 303:           }
 304: 
 305:         // Finally, remove the cancelled keys.
 306:         for (Iterator it = cancelled.iterator(); it.hasNext(); )
 307:           {
 308:             KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next();
 309:             keys.remove(new Integer(key.key));
 310:             deregister(key);
 311:             it.remove();
 312:           }
 313: 
 314:         reallocateBuffer();
 315: 
 316:         return selected.size();
 317:       }
 318:     }
 319:   }
 320: 
 321:   protected SelectionKey register(AbstractSelectableChannel channel,
 322:                                   int interestOps,
 323:                                   Object attachment)
 324:   {
 325:     int native_fd = -1;
 326:     try
 327:       {
 328:         if (channel instanceof VMChannelOwner)
 329:           native_fd = ((VMChannelOwner) channel).getVMChannel()
 330:             .getState().getNativeFD();
 331:         else
 332:           throw new IllegalArgumentException("cannot handle channel type " +
 333:                                              channel.getClass().getName());
 334:       }
 335:     catch (IOException ioe)
 336:       {
 337:         throw new IllegalArgumentException("channel is closed or invalid");
 338:       }
 339: 
 340:     KqueueSelectionKeyImpl result = new KqueueSelectionKeyImpl(this, channel);
 341:     result.interestOps = interestOps;
 342:     result.attach(attachment);
 343:     result.fd = native_fd;
 344:     result.key = System.identityHashCode(result);
 345:     synchronized (keys)
 346:     {
 347:       while (keys.containsKey(new Integer(result.key)))
 348:         result.key++;
 349:       keys.put(new Integer(result.key), result);
 350:       reallocateBuffer();
 351:     }
 352:     return result;
 353:   }
 354: 
 355:   void setInterestOps(KqueueSelectionKeyImpl key, int ops)
 356:   {
 357:     synchronized (keys)
 358:     {
 359:       key.interestOps = ops;
 360:     }
 361:   }
 362: 
 363:   /**
 364:    * Reallocate the events buffer. This is the destination buffer for
 365:    * events returned by kevent. This method will:
 366:    *
 367:    *   * Grow the buffer if there is insufficent space for all registered
 368:    *     events.
 369:    *   * Shrink the buffer if it is more than twice the size needed.
 370:    *
 371:    */
 372:   private void reallocateBuffer()
 373:   {
 374:     synchronized (keys)
 375:     {
 376:       if (events.capacity() < (2 * _sizeof_struct_kevent) * keys.size())
 377:         {
 378:           int cap = events.capacity();
 379:           if (cap >= MAX_DOUBLING_CAPACITY)
 380:             cap += CAP_INCREMENT;
 381:           else
 382:             cap = cap << 1;
 383: 
 384:           events = ByteBuffer.allocateDirect(cap);
 385:         }
 386:       else if (events.capacity() > 4 * (_sizeof_struct_kevent) * keys.size() + 1
 387:                && events.capacity() > INITIAL_CAPACITY)
 388:         {
 389:           int cap = events.capacity();
 390:           cap = cap >>> 1;
 391:           events = ByteBuffer.allocateDirect(cap);
 392:         }
 393:     }
 394:   }
 395: 
 396:   //synchronized void updateOps(KqueueSelectionKeyImpl key, int interestOps)
 397:   //{
 398:   //  updateOps(key, interestOps, 0, false);
 399:   //}
 400: 
 401:   /*void updateOps(KqueueSelectionKeyImpl key, int interestOps,
 402:                  int activeOps, int fd)
 403:   {
 404:     //System.out.println(">> updating kqueue selection key:");
 405:     //dump_selection_keys(key.nstate.duplicate());
 406:     //System.out.println("<<");
 407:     synchronized (keys)
 408:     {
 409:       kevent_set(key.nstate, fd, interestOps, activeOps, key.key);
 410:     }
 411:     //System.out.println(">> updated kqueue selection key:");
 412:     //dump_selection_keys(key.nstate.duplicate());
 413:     //System.out.println("<<");
 414:   }*/
 415: 
 416:   private void dump_selection_keys(ByteBuffer keys)
 417:   {
 418:     // WARNING! This method is not guaranteed to be portable! This works
 419:     // on darwin/x86, but the sizeof and offsetof these fields may be
 420:     // different on other platforms!
 421:     int i = 0;
 422:     keys.order(ByteOrder.nativeOrder());
 423:     while (keys.hasRemaining())
 424:       {
 425:         System.out.println("struct kevent { ident: "
 426:                            + Integer.toString(keys.getInt())
 427:                            + " filter: "
 428:                            + Integer.toHexString(keys.getShort() & 0xFFFF)
 429:                            + " flags: "
 430:                            + Integer.toHexString(keys.getShort() & 0xFFFF)
 431:                            + " fflags: "
 432:                            + Integer.toHexString(keys.getInt())
 433:                            + " data: "
 434:                            + Integer.toHexString(keys.getInt())
 435:                            + " udata: "
 436:                            + Integer.toHexString(keys.getInt())
 437:                            + " }");
 438:       }
 439:   }
 440: 
 441:   /**
 442:    * Return the size of a <code>struct kevent</code> on this system.
 443:    *
 444:    * @return The size of <code>struct kevent</code>.
 445:    */
 446:   private static native int sizeof_struct_kevent();
 447: 
 448:   /**
 449:    * Opens a kqueue descriptor.
 450:    *
 451:    * @return The new kqueue descriptor.
 452:    * @throws IOException If opening fails.
 453:    */
 454:   private static native int implOpen() throws IOException;
 455: 
 456:   /**
 457:    * Closes the kqueue file descriptor.
 458:    *
 459:    * @param kq The kqueue file descriptor.
 460:    * @throws IOException
 461:    */
 462:   private static native void implClose(int kq) throws IOException;
 463: 
 464:   /**
 465:    * Initialize the specified native state for the given interest ops.
 466:    *
 467:    * @param nstate The native state structures; in this buffer should be
 468:    *  the <code>struct kevent</code>s created for a key.
 469:    * @param fd The file descriptor. If 0, the native FD is unmodified.
 470:    * @param interestOps The operations to enable.
 471:    * @param key A unique key that will reference the associated key later.
 472:    * @param delete Set to true if this event should be deleted from the
 473:    *  kqueue (if false, this event is added/updated).
 474:    */
 475:   private static native void kevent_set(ByteBuffer nstate, int i, int fd,
 476:                                         int interestOps, int activeOps, int key);
 477: 
 478:   /**
 479:    * Poll for events. The source events are stored in <code>events</code>,
 480:    * which is also where polled events will be placed.
 481:    *
 482:    * @param events The events to poll. This buffer is also the destination
 483:    *  for events read from the queue.
 484:    * @param nevents The number of events to poll (that is, the number of
 485:    *  events in the <code>events</code> buffer).
 486:    * @param nout The maximum number of events that may be returned.
 487:    * @param timeout The timeout. A timeout of -1 returns immediately; a timeout
 488:    *  of 0 waits indefinitely.
 489:    * @return The number of events read.
 490:    */
 491:   private static native int kevent(int kq, ByteBuffer events, int nevents,
 492:                                    int nout, long timeout);
 493: 
 494:   /**
 495:    * Fetch a polled key from a native state buffer. For each kevent key we
 496:    * create, we put the native state info (one or more <code>struct
 497:    *  kevent</code>s) in that key's {@link KqueueSelectionKeyImpl#nstate}
 498:    * buffer, and place the pointer of the key in the <code>udata</code> field
 499:    * of that structure. This method fetches that pointer from the given
 500:    * buffer (assumed to be a <code>struct kqueue</code>) and returns it.
 501:    *
 502:    * @param nstate The buffer containing the <code>struct kqueue</code> to read.
 503:    * @return The key object.
 504:    */
 505:   private static native int fetch_key(ByteBuffer nstate);
 506: 
 507:   /**
 508:    * Fetch the ready ops of the associated native state. That is, this
 509:    * inspects the first argument as a <code>struct kevent</code>, looking
 510:    * at its operation (the input is assumed to have been returned via a
 511:    * previous call to <code>kevent</code>), and translating that to the
 512:    * appropriate Java bit set, based on the second argument.
 513:    *
 514:    * @param nstate The native state.
 515:    * @param interestOps The enabled operations for the key.
 516:    * @return The bit set representing the ready operations.
 517:    */
 518:   private static native int ready_ops(ByteBuffer nstate, int interestOps);
 519: 
 520:   /**
 521:    * Check if kevent returned EV_EOF for a selection key.
 522:    *
 523:    * @param nstate The native state.
 524:    * @return True if the kevent call returned EOF.
 525:    */
 526:   private static native boolean check_eof(ByteBuffer nstate);
 527: }