1:
37:
38:
39: package ;
40:
41: import ;
42: import ;
43: import ;
44: import ;
45: import ;
46: import ;
47: import ;
48: import ;
49: import ;
50: import ;
51: import ;
52: import ;
53:
54: import ;
55: import ;
56: import ;
57: import ;
58: import ;
59: import ;
60: import ;
61: import ;
62: import ;
63: import ;
64: import ;
65: import ;
66: import ;
67: import ;
68: import ;
69: import ;
70: import ;
71:
72: import ;
73: import ;
74: import ;
75: import ;
76: import ;
77: import ;
78: import ;
79: import ;
80: import ;
81: import ;
82: import ;
83: import ;
84: import ;
85: import ;
86: import ;
87: import ;
88: import ;
89: import ;
90: import ;
91:
92:
100: public class OrbFunctional extends OrbRestricted
101: {
102:
107: protected class portServer
108: extends Thread
109: {
110:
113: int running_threads;
114:
115:
118: int s_port;
119:
120:
123: ServerSocket service;
124:
125:
128: boolean terminated;
129:
130:
133: portServer(int _port)
134: {
135: s_port = _port;
136: setDaemon(true);
137: try
138: {
139: service = socketFactory.createServerSocket(s_port);
140: }
141: catch (IOException ex)
142: {
143: BAD_OPERATION bad = new BAD_OPERATION(
144: "Unable to open the server socket at " + s_port);
145: bad.minor = Minor.Socket;
146: bad.initCause(ex);
147: throw bad;
148: }
149: }
150:
151:
155: public void run()
156: {
157: while (running)
158: {
159: try
160: {
161: tick();
162: }
163: catch (SocketException ex)
164: {
165:
166:
167: if (terminated)
168: return;
169: }
170: catch (Exception iex)
171: {
172:
173:
174: try
175: {
176: Thread.sleep(TWAIT_SERVER_ERROR_PAUSE);
177: }
178: catch (InterruptedException ex)
179: {
180: }
181: }
182: }
183: }
184:
185:
190: void tick()
191: throws Exception
192: {
193: serve(this, service);
194: }
195:
196:
199: public void close_now()
200: {
201: try
202: {
203: terminated = true;
204: service.close();
205: }
206: catch (Exception ex)
207: {
208:
209:
210: }
211: }
212:
213:
216: protected void finalize()
217: {
218: close_now();
219: }
220: }
221:
222:
227: protected class sharedPortServer extends portServer
228: {
229:
232: sharedPortServer(int _port)
233: {
234: super(_port);
235: }
236:
237:
242: void tick() throws Exception
243: {
244: Socket request = service.accept();
245: serveStep(request, false);
246: }
247: }
248:
249:
253: public static int DEFAULT_INITIAL_PORT = 1126;
254:
255:
259: public static int RANDOM_PORT_FROM = 1024;
260:
261:
265: public static int RANDOM_PORT_TO = 4024;
266:
267:
270: public static int RANDOM_PORT_ATTEMPTS = 64;
271:
272:
276: public static final String LISTEN_ON = "gnu.classpath.CORBA.ListenOn";
277:
278:
281: public static final String REFERENCE = "org.omg.CORBA.ORBInitRef";
282:
283:
287: public static final String NS_PORT = "org.omg.CORBA.ORBInitialPort";
288:
289:
293: public static final String NS_HOST = "org.omg.CORBA.ORBInitialHost";
294:
295:
298: public static final String NAME_SERVICE = "NameService";
299:
300:
303: public static final String ORB_ID = "org.omg.CORBA.ORBid";
304:
305:
306:
309: public static final String SERVER_ID = "org.omg.CORBA.ServerId";
310:
311:
317: public static String START_READING_MESSAGE =
318: "gnu.classpath.CORBA.TOUT_START_READING_MESSAGE";
319:
320:
324: public static String WHILE_READING =
325: "gnu.classpath.CORBA.TOUT_WHILE_READING";
326:
327:
332: public static String AFTER_RECEIVING =
333: "gnu.classpath.CORBA.TOUT_AFTER_RECEIVING";
334:
335:
339: public static String SERVER_ERROR_PAUSE =
340: "gnu.classpath.CORBA.SERVER_ERROR_PAUSE";
341:
342:
345: public final String LOCAL_HOST;
346:
347:
353: public int TOUT_START_READING_MESSAGE = 20 * 1000;
354:
355:
356:
357:
361: public int TOUT_WHILE_READING = 2 * 60 * 1000;
362:
363:
368: public int TOUT_AFTER_RECEIVING = 40 * 60 * 1000;
369:
370:
374: public int TWAIT_SERVER_ERROR_PAUSE = 5000;
375:
376:
383: public static int TANDEM_REQUESTS = 7000;
384:
385:
388: public String orb_id = "orb_"+hashCode();
389:
390:
394: public static String server_id = "server_"+OrbFunctional.class.hashCode();
395:
396:
399: protected final Connected_objects connected_objects =
400: new Connected_objects();
401:
402:
406: protected Version max_version;
407:
408:
412: protected boolean running;
413:
414:
417: protected Map initial_references = new TreeMap();
418:
419:
422: protected ArrayList portServers = new ArrayList();
423:
424:
427: private String ns_host;
428:
429:
437: private static int Port = DEFAULT_INITIAL_PORT;
438:
439:
442: private int ns_port = 900;
443:
444:
447: NameParser nameParser = new NameParser();
448:
449:
453: protected Asynchron asynchron = new Asynchron();
454:
455:
458: protected LinkedList freed_ports = new LinkedList();
459:
460:
463: protected Hashtable identities = new Hashtable();
464:
465:
470: private int MAX_RUNNING_THREADS = 256;
471:
472:
475: public SocketFactory socketFactory = DefaultSocketFactory.Singleton;
476:
477:
480: public OrbFunctional()
481: {
482: try
483: {
484: LOCAL_HOST = ns_host = InetAddress.getLocalHost().getHostAddress();
485: initial_references.put("CodecFactory", new gnuCodecFactory(this));
486: }
487: catch (UnknownHostException ex)
488: {
489: BAD_OPERATION bad =
490: new BAD_OPERATION("Unable to open the server socket.");
491: bad.initCause(ex);
492: throw bad;
493: }
494: }
495:
496:
502: public void setMaxVersion(Version max_supported)
503: {
504: max_version = max_supported;
505: }
506:
507:
511: public Version getMaxVersion()
512: {
513: return max_version;
514: }
515:
516:
525: public int getFreePort()
526: throws BAD_OPERATION
527: {
528: ServerSocket s;
529: int a_port;
530:
531: try
532: {
533:
534: if (!freed_ports.isEmpty())
535: {
536: Integer free = (Integer) freed_ports.getLast();
537: freed_ports.removeLast();
538: s = socketFactory.createServerSocket(free.intValue());
539: s.close();
540: return free.intValue();
541: }
542: }
543: catch (Exception ex)
544: {
545:
546:
547:
548: }
549:
550: for (a_port = Port; a_port < Port + 20; a_port++)
551: {
552: try
553: {
554: s = socketFactory.createServerSocket(a_port);
555: s.close();
556: Port = a_port + 1;
557: return a_port;
558: }
559: catch (IOException ex)
560: {
561:
562: }
563: }
564:
565: Random rand = new Random();
566:
567: int range = RANDOM_PORT_TO - RANDOM_PORT_FROM;
568: IOException ioex = null;
569: for (int i = 0; i < RANDOM_PORT_ATTEMPTS; i++)
570: {
571: try
572: {
573: a_port = RANDOM_PORT_FROM + rand.nextInt(range);
574: s = socketFactory.createServerSocket(a_port);
575: s.close();
576: return a_port;
577: }
578: catch (IOException ex)
579: {
580:
581: ioex = ex;
582: }
583: }
584:
585: NO_RESOURCES bad = new NO_RESOURCES("Unable to open the server socket.");
586: bad.minor = Minor.Ports;
587: if (ioex != null)
588: bad.initCause(ioex);
589: throw bad;
590: }
591:
592:
601: public static void setPort(int a_Port)
602: {
603: Port = a_Port;
604: }
605:
606:
621: public void connect(org.omg.CORBA.Object object)
622: {
623: int a_port = getFreePort();
624:
625: Connected_objects.cObject ref = connected_objects.add(object, a_port);
626: IOR ior = createIOR(ref);
627: prepareObject(object, ior);
628: if (running)
629: startService(ior);
630: }
631:
632:
649: public void connect(org.omg.CORBA.Object object, byte[] key)
650: {
651: int a_port = getFreePort();
652:
653: Connected_objects.cObject ref =
654: connected_objects.add(key, object, a_port, null);
655: IOR ior = createIOR(ref);
656: prepareObject(object, ior);
657: if (running)
658: startService(ior);
659: }
660:
661:
683: public void connect_1_thread(org.omg.CORBA.Object object, byte[] key,
684: java.lang.Object identity
685: )
686: {
687: sharedPortServer shared = (sharedPortServer) identities.get(identity);
688: if (shared == null)
689: {
690: int a_port = getFreePort();
691: shared = new sharedPortServer(a_port);
692: identities.put(identity, shared);
693: if (running)
694: {
695: portServers.add(shared);
696: shared.start();
697: }
698: }
699:
700: Connected_objects.cObject ref =
701: connected_objects.add(key, object, shared.s_port, identity);
702: IOR ior = createIOR(ref);
703: prepareObject(object, ior);
704: }
705:
706:
711: public void startService(IOR ior)
712: {
713: portServer p = new portServer(ior.Internet.port);
714: portServers.add(p);
715: p.start();
716: }
717:
718:
721: public void destroy()
722: {
723: portServer p;
724: for (int i = 0; i < portServers.size(); i++)
725: {
726: p = (portServer) portServers.get(i);
727: p.close_now();
728: }
729: super.destroy();
730: }
731:
732:
741: public void disconnect(org.omg.CORBA.Object object)
742: {
743: Connected_objects.cObject rmKey = null;
744:
745:
746:
747: if (object instanceof ObjectImpl)
748: {
749: Delegate delegate = ((ObjectImpl) object)._get_delegate();
750: if (delegate instanceof SimpleDelegate)
751: {
752: byte[] key = ((SimpleDelegate) delegate).getIor().key;
753: rmKey = connected_objects.get(key);
754: }
755: }
756:
757:
758:
759: if (rmKey == null)
760: rmKey = connected_objects.getKey(object);
761: if (rmKey != null)
762: {
763:
764: portServer p;
765: StopService:
766: for (int i = 0; i < portServers.size(); i++)
767: {
768: p = (portServer) portServers.get(i);
769: if (p.s_port == rmKey.port && !(p instanceof sharedPortServer))
770: {
771: p.close_now();
772: freed_ports.addFirst(new Integer(rmKey.port));
773: break StopService;
774: }
775: connected_objects.remove(rmKey.key);
776: }
777: }
778: }
779:
780:
788: public void identityDestroyed(java.lang.Object identity)
789: {
790: if (identity == null)
791: return;
792:
793: sharedPortServer ise = (sharedPortServer) identities.get(identity);
794: if (ise != null)
795: {
796: synchronized (connected_objects)
797: {
798: ise.close_now();
799: identities.remove(identity);
800:
801: Connected_objects.cObject obj;
802: Map.Entry m;
803: Iterator iter = connected_objects.entrySet().iterator();
804: while (iter.hasNext())
805: {
806: m = (Map.Entry) iter.next();
807: obj = (Connected_objects.cObject) m.getValue();
808: if (obj.identity == identity)
809: iter.remove();
810: }
811: }
812: }
813: }
814:
815:
823: public org.omg.CORBA.Object find_local_object(IOR ior)
824: {
825:
826: if (!ior.Internet.host.equals(LOCAL_HOST))
827: return null;
828:
829: return find_connected_object(ior.key, ior.Internet.port);
830: }
831:
832:
839: public String[] list_initial_services()
840: {
841: String[] refs = new String[ initial_references.size() ];
842: int p = 0;
843:
844: Iterator iter = initial_references.keySet().iterator();
845: while (iter.hasNext())
846: {
847: refs [ p++ ] = (String) iter.next();
848: }
849: return refs;
850: }
851:
852:
869: public String object_to_string(org.omg.CORBA.Object forObject)
870: {
871:
872: if (forObject instanceof ObjectImpl)
873: {
874: Delegate delegate = ((ObjectImpl) forObject)._get_delegate();
875: if (delegate instanceof SimpleDelegate)
876: return ((SimpleDelegate) delegate).getIor().toStringifiedReference();
877: }
878:
879:
880: Connected_objects.cObject rec = connected_objects.getKey(forObject);
881:
882: if (rec == null)
883: throw new BAD_PARAM("The object " + forObject +
884: " has not been previously connected to this ORB"
885: );
886:
887: IOR ior = createIOR(rec);
888:
889: return ior.toStringifiedReference();
890: }
891:
892:
895: public IOR getLocalIor(org.omg.CORBA.Object forObject)
896: {
897: Connected_objects.cObject rec = connected_objects.getKey(forObject);
898: if (rec == null)
899: return null;
900: else
901: return createIOR(rec);
902: }
903:
904:
913: public org.omg.CORBA.Object resolve_initial_references(String name)
914: throws InvalidName
915: {
916: org.omg.CORBA.Object object = null;
917: try
918: {
919: object = (org.omg.CORBA.Object) initial_references.get(name);
920: if (object == null && name.equals(NAME_SERVICE))
921: {
922: object = getDefaultNameService();
923: if (object != null)
924: initial_references.put(NAME_SERVICE, object);
925: }
926: }
927: catch (Exception ex)
928: {
929: InvalidName err = new InvalidName(name);
930: err.initCause(ex);
931: throw err;
932: }
933: if (object != null)
934: return object;
935: else
936: throw new InvalidName("Not found: '" + name + "'");
937: }
938:
939:
944: public void run()
945: {
946: CollocatedOrbs.registerOrb(this);
947: try
948: {
949: running = true;
950:
951:
952: Iterator iter = connected_objects.entrySet().iterator();
953: Map.Entry m;
954: Connected_objects.cObject obj;
955:
956: while (iter.hasNext())
957: {
958: m = (Map.Entry) iter.next();
959: obj = (Connected_objects.cObject) m.getValue();
960:
961: portServer subserver;
962:
963: if (obj.identity == null)
964: {
965: subserver = new portServer(obj.port);
966: portServers.add(subserver);
967: }
968: else
969: subserver = (portServer) identities.get(obj.identity);
970:
971: if (! subserver.isAlive())
972: {
973:
974: if (! iter.hasNext())
975: {
976:
977: iter = null;
978: subserver.run();
979: return;
980: }
981: else
982: subserver.start();
983: }
984: }
985: }
986: finally
987: {
988: CollocatedOrbs.unregisterOrb(this);
989: }
990: }
991:
992:
998: public void ensureRunning()
999: {
1000: final OrbFunctional THIS = this;
1001:
1002: if (!running)
1003: {
1004: Thread t = new Thread()
1005: {
1006: public void run()
1007: {
1008: THIS.run();
1009: }
1010: };
1011: t.setDaemon(true);
1012: t.start();
1013: }
1014: }
1015:
1016:
1022: public void shutdown(boolean wait_for_completion)
1023: {
1024: super.shutdown(wait_for_completion);
1025: running = false;
1026:
1027: if (!wait_for_completion)
1028: {
1029: for (int i = 0; i < portServers.size(); i++)
1030: {
1031: portServer p = (portServer) portServers.get(i);
1032: p.close_now();
1033: }
1034: }
1035: }
1036:
1037:
1048: public org.omg.CORBA.Object string_to_object(String an_ior)
1049: {
1050: return nameParser.corbaloc(an_ior, this);
1051: }
1052:
1053:
1056: public org.omg.CORBA.Object ior_to_object(IOR ior)
1057: {
1058: org.omg.CORBA.Object object = find_local_object(ior);
1059: if (object == null)
1060: {
1061:
1062: object = CollocatedOrbs.searchLocalObject(ior);
1063: if (object == null)
1064: {
1065:
1066: ObjectImpl impl = StubLocator.search(this, ior);
1067: try
1068: {
1069: if (impl._get_delegate() == null)
1070: impl._set_delegate(new IorDelegate(this, ior));
1071: }
1072: catch (BAD_OPERATION ex)
1073: {
1074:
1075:
1076: impl._set_delegate(new IorDelegate(this, ior));
1077: }
1078:
1079: object = impl;
1080: }
1081: }
1082: return object;
1083: }
1084:
1085:
1089: protected org.omg.CORBA.Object getDefaultNameService()
1090: {
1091: if (initial_references.containsKey(NAME_SERVICE))
1092: return (org.omg.CORBA.Object) initial_references.get(NAME_SERVICE);
1093:
1094: IOR ior = new IOR();
1095: ior.Id = NamingContextExtHelper.id();
1096: ior.Internet.host = ns_host;
1097: ior.Internet.port = ns_port;
1098: ior.key = NamingServiceTransient.getDefaultKey();
1099:
1100: IorObject iorc = new IorObject(this, ior);
1101: NamingContextExt namer = NamingContextExtHelper.narrow(iorc);
1102: initial_references.put(NAME_SERVICE, namer);
1103: return namer;
1104: }
1105:
1106:
1115: protected org.omg.CORBA.Object find_connected_object(byte[] key, int port)
1116: {
1117: Connected_objects.cObject ref = connected_objects.get(key);
1118: if (ref == null)
1119: return null;
1120: if (port >= 0 && ref.port != port)
1121: return null;
1122: else
1123: return ref.object;
1124: }
1125:
1126:
1135: protected void set_parameters(Applet app, Properties props)
1136: {
1137: useProperties(props);
1138:
1139: String[][] para = app.getParameterInfo();
1140: if (para != null)
1141: {
1142: for (int i = 0; i < para.length; i++)
1143: {
1144: if (para[i][0].equals(LISTEN_ON))
1145: Port = Integer.parseInt(para[i][1]);
1146: if (para[i][0].equals(REFERENCE))
1147: {
1148: StringTokenizer st = new StringTokenizer(para[i][1], "=");
1149: initial_references.put(st.nextToken(),
1150: string_to_object(st.nextToken()));
1151: }
1152:
1153: if (para[i][0].equals(ORB_ID))
1154: orb_id = para[i][1];
1155:
1156: if (para[i][0].equals(SERVER_ID))
1157: server_id = para[i][1];
1158:
1159: if (para[i][0].equals(NS_HOST))
1160: ns_host = para[i][1];
1161: if (para[i][0].equals(START_READING_MESSAGE))
1162: TOUT_START_READING_MESSAGE = Integer.parseInt(para[i][1]);
1163: if (para[i][0].equals(WHILE_READING))
1164: TOUT_WHILE_READING = Integer.parseInt(para[i][1]);
1165: if (para[i][0].equals(AFTER_RECEIVING))
1166: TOUT_AFTER_RECEIVING = Integer.parseInt(para[i][1]);
1167: try
1168: {
1169: if (para[i][0].equals(NS_PORT))
1170: ns_port = Integer.parseInt(para[i][1]);
1171: }
1172: catch (NumberFormatException ex)
1173: {
1174: BAD_PARAM bad = new BAD_PARAM("Invalid " + NS_PORT
1175: + "property, unable to parse '" + props.getProperty(NS_PORT)
1176: + "'");
1177: bad.initCause(ex);
1178: throw bad;
1179: }
1180: }
1181: }
1182: }
1183:
1184:
1195: protected void set_parameters(String[] para, Properties props)
1196: {
1197: if ((para != null) && para.length > 1)
1198: {
1199: for (int i = 0; i < para.length - 1; i++)
1200: {
1201: if (para[i].endsWith("ListenOn"))
1202: Port = Integer.parseInt(para[i + 1]);
1203: if (para[i].endsWith("ORBInitRef"))
1204: {
1205: StringTokenizer st = new StringTokenizer(para[i + 1], "=");
1206: initial_references.put(st.nextToken(),
1207: string_to_object(st.nextToken()));
1208: }
1209:
1210: if (para[i].endsWith("ORBInitialHost"))
1211: ns_host = para[i + 1];
1212:
1213: if (para[i].endsWith("ServerId"))
1214: server_id = para[i++];
1215: else if (para[i].endsWith("ORBid"))
1216: orb_id = para[i++];
1217:
1218: try
1219: {
1220: if (para[i].endsWith("ORBInitialPort"))
1221: ns_port = Integer.parseInt(para[i + 1]);
1222: }
1223: catch (NumberFormatException ex)
1224: {
1225: throw new BAD_PARAM("Invalid " + para[i]
1226: + "parameter, unable to parse '"
1227: + props.getProperty(para[i + 1]) + "'");
1228: }
1229: }
1230: }
1231:
1232: useProperties(props);
1233: }
1234:
1235:
1238: protected IOR createIOR(Connected_objects.cObject ref)
1239: throws BAD_OPERATION
1240: {
1241: IOR ior = new IOR();
1242: ior.key = ref.key;
1243: ior.Internet.port = ref.port;
1244:
1245: if (ref.object instanceof ObjectImpl)
1246: {
1247: ObjectImpl imp = (ObjectImpl) ref.object;
1248: if (imp._ids().length > 0)
1249: ior.Id = imp._ids() [ 0 ];
1250: }
1251: if (ior.Id == null)
1252: ior.Id = ref.object.getClass().getName();
1253:
1254: ior.Internet.host = CollocatedOrbs.localHost;
1255: ior.Internet.port = ref.port;
1256:
1257: return ior;
1258: }
1259:
1260:
1268: protected void prepareObject(org.omg.CORBA.Object object, IOR ior)
1269: throws BAD_PARAM
1270: {
1271:
1276:
1277:
1278: if (object instanceof ObjectImpl)
1279: {
1280: ObjectImpl impl = (ObjectImpl) object;
1281: try
1282: {
1283: if (impl._get_delegate() == null)
1284: impl._set_delegate(new SimpleDelegate(this, ior));
1285: }
1286: catch (BAD_OPERATION ex)
1287: {
1288:
1289: impl._set_delegate(new SimpleDelegate(this, ior));
1290: }
1291: }
1292: }
1293:
1294:
1307: private void respond_to_client(OutputStream net_out,
1308: MessageHeader msh_request, RequestHeader rh_request,
1309: ResponseHandlerImpl handler, SystemException sysEx
1310: ) throws IOException
1311: {
1312:
1313: ReplyHeader reply = handler.reply_header;
1314:
1315: if (sysEx != null)
1316: reply.reply_status = ReplyHeader.SYSTEM_EXCEPTION;
1317: else if (handler.isExceptionReply())
1318: reply.reply_status = ReplyHeader.USER_EXCEPTION;
1319: else
1320: reply.reply_status = ReplyHeader.NO_EXCEPTION;
1321: reply.request_id = rh_request.request_id;
1322:
1323: BufferedCdrOutput out =
1324: new BufferedCdrOutput(50 + handler.getBuffer().buffer.size());
1325: out.setOrb(this);
1326:
1327: out.setOffset(msh_request.getHeaderSize());
1328:
1329: reply.write(out);
1330:
1331: if (msh_request.version.since_inclusive(1, 2))
1332: {
1333: out.align(8);
1334:
1335:
1336:
1337: }
1338: handler.getBuffer().buffer.writeTo(out);
1339:
1340: MessageHeader msh_reply = new MessageHeader();
1341:
1342: msh_reply.version = msh_request.version;
1343: msh_reply.message_type = MessageHeader.REPLY;
1344: msh_reply.message_size = out.buffer.size();
1345:
1346:
1347: msh_reply.write(net_out);
1348: out.buffer.writeTo(net_out);
1349: net_out.flush();
1350: }
1351:
1352:
1355: private void forward_request(OutputStream net_out,
1356: MessageHeader msh_request, RequestHeader rh_request, gnuForwardRequest info
1357: ) throws IOException
1358: {
1359: MessageHeader msh_forward = new MessageHeader();
1360: msh_forward.version = msh_request.version;
1361:
1362: ReplyHeader rh_forward = msh_forward.create_reply_header();
1363: msh_forward.message_type = MessageHeader.REPLY;
1364: rh_forward.reply_status = info.forwarding_code;
1365: rh_forward.request_id = rh_request.request_id;
1366:
1367:
1368: BufferedCdrOutput out = new BufferedCdrOutput();
1369: out.setOrb(this);
1370: out.setOffset(msh_forward.getHeaderSize());
1371:
1372: rh_forward.write(out);
1373:
1374: if (msh_forward.version.since_inclusive(1, 2))
1375: out.align(8);
1376: out.write_Object(info.forward_reference);
1377:
1378: msh_forward.message_size = out.buffer.size();
1379:
1380:
1381: msh_forward.write(net_out);
1382: out.buffer.writeTo(net_out);
1383: net_out.flush();
1384: }
1385:
1386:
1398: void serve(final portServer p, ServerSocket serverSocket)
1399: throws MARSHAL, IOException
1400: {
1401: final Socket service;
1402: service = serverSocket.accept();
1403:
1404:
1405: if (p.running_threads >= MAX_RUNNING_THREADS)
1406: {
1407: serveStep(service, true);
1408: return;
1409: }
1410:
1411: new Thread()
1412: {
1413: public void run()
1414: {
1415: try
1416: {
1417: synchronized (p)
1418: {
1419: p.running_threads++;
1420: }
1421: serveStep(service, false);
1422: }
1423: finally
1424: {
1425: synchronized (p)
1426: {
1427: p.running_threads--;
1428: }
1429: }
1430: }
1431: }.start();
1432: }
1433:
1434:
1445: void serveStep(Socket service, boolean no_resources)
1446: {
1447: try
1448: {
1449: Serving: while (true)
1450: {
1451: InputStream in = service.getInputStream();
1452: service.setSoTimeout(TOUT_START_READING_MESSAGE);
1453:
1454: MessageHeader msh_request = new MessageHeader();
1455:
1456: try
1457: {
1458: msh_request.read(in);
1459: }
1460: catch (MARSHAL ex)
1461: {
1462:
1463: return;
1464: }
1465:
1466: if (max_version != null)
1467: {
1468: if (!msh_request.version.until_inclusive(max_version.major,
1469: max_version.minor))
1470: {
1471: OutputStream out = service.getOutputStream();
1472: new ErrorMessage(max_version).write(out);
1473: return;
1474: }
1475: }
1476:
1477: byte[] r = msh_request.readMessage(in, service, TOUT_WHILE_READING,
1478: TOUT_AFTER_RECEIVING);
1479:
1480: if (msh_request.message_type == MessageHeader.REQUEST)
1481: {
1482: RequestHeader rh_request;
1483:
1484: BufferredCdrInput cin = new BufferredCdrInput(r);
1485: cin.setOrb(this);
1486: cin.setVersion(msh_request.version);
1487: cin.setOffset(msh_request.getHeaderSize());
1488: cin.setBigEndian(msh_request.isBigEndian());
1489:
1490: rh_request = msh_request.create_request_header();
1491:
1492:
1493: rh_request.read(cin);
1494:
1495:
1496:
1497: if (msh_request.version.since_inclusive(1, 2))
1498: {
1499: cin.align(8);
1500:
1501:
1502: }
1503:
1504: InvokeHandler target = (InvokeHandler) find_connected_object(
1505: rh_request.object_key, -1);
1506:
1507:
1508:
1509:
1510: ReplyHeader rh_reply = msh_request.create_reply_header();
1511:
1512:
1513: ResponseHandlerImpl handler = new ResponseHandlerImpl(
1514: this, msh_request, rh_reply, rh_request);
1515:
1516: SystemException sysEx = null;
1517:
1518: try
1519: {
1520: if (no_resources)
1521: {
1522: NO_RESOURCES no = new NO_RESOURCES("Too many parallel calls");
1523: no.minor = Minor.Threads;
1524: throw no;
1525: }
1526: if (target == null)
1527: throw new OBJECT_NOT_EXIST();
1528: target._invoke(rh_request.operation, cin, handler);
1529: }
1530: catch (gnuForwardRequest forwarded)
1531: {
1532: OutputStream sou = service.getOutputStream();
1533: forward_request(sou, msh_request, rh_request, forwarded);
1534: if (service != null && !service.isClosed())
1535: {
1536:
1537:
1538: service.setSoTimeout(TANDEM_REQUESTS);
1539: continue Serving;
1540: }
1541: }
1542: catch (UnknownException uex)
1543: {
1544: sysEx = new UNKNOWN("Unknown", 2,
1545: CompletionStatus.COMPLETED_MAYBE);
1546: sysEx.initCause(uex.originalEx);
1547:
1548: org.omg.CORBA.portable.OutputStream ech = handler.createExceptionReply();
1549:
1550: rh_reply.service_context = UnknownExceptionCtxHandler.addExceptionContext(
1551: rh_reply.service_context, uex.originalEx, ech);
1552:
1553: ObjectCreator.writeSystemException(ech, sysEx);
1554: }
1555: catch (SystemException ex)
1556: {
1557: sysEx = ex;
1558:
1559: org.omg.CORBA.portable.OutputStream ech = handler.createExceptionReply();
1560:
1561: rh_reply.service_context = UnknownExceptionCtxHandler.addExceptionContext(
1562: rh_reply.service_context, ex, ech);
1563:
1564: ObjectCreator.writeSystemException(ech, ex);
1565: }
1566: catch (Exception except)
1567: {
1568:
1569:
1570:
1571: except.printStackTrace();
1572:
1573: sysEx = new UNKNOWN("Unknown", 2,
1574: CompletionStatus.COMPLETED_MAYBE);
1575: sysEx.initCause(except);
1576:
1577: org.omg.CORBA.portable.OutputStream ech = handler.createExceptionReply();
1578:
1579: rh_reply.service_context = UnknownExceptionCtxHandler.addExceptionContext(
1580: rh_reply.service_context, except, ech);
1581:
1582: ObjectCreator.writeSystemException(ech, sysEx);
1583: }
1584:
1585:
1586: if (rh_request.isResponseExpected())
1587: {
1588: OutputStream sou = service.getOutputStream();
1589: respond_to_client(sou, msh_request, rh_request, handler,
1590: sysEx);
1591: }
1592: }
1593: else if (msh_request.message_type == MessageHeader.CLOSE_CONNECTION
1594: || msh_request.message_type == MessageHeader.MESSAGE_ERROR)
1595: {
1596: CloseMessage.close(service.getOutputStream());
1597: service.close();
1598: return;
1599: }
1600:
1601: if (service != null && !service.isClosed())
1602:
1603:
1604:
1605: service.setSoTimeout(TANDEM_REQUESTS);
1606: else
1607: return;
1608: }
1609: }
1610: catch (SocketException ex)
1611: {
1612:
1613: return;
1614: }
1615: catch (IOException ioex)
1616: {
1617:
1618:
1619: return;
1620: }
1621: finally
1622: {
1623: try
1624: {
1625: if (service!=null && !service.isClosed())
1626: service.close();
1627: }
1628: catch (IOException ioex)
1629: {
1630:
1631: }
1632: }
1633: }
1634:
1635:
1639: protected void useProperties(Properties props)
1640: {
1641: if (props != null)
1642: {
1643: if (props.containsKey(LISTEN_ON))
1644: Port = Integer.parseInt(props.getProperty(LISTEN_ON));
1645: if (props.containsKey(NS_HOST))
1646: ns_host = props.getProperty(NS_HOST);
1647: try
1648: {
1649: if (props.containsKey(NS_PORT))
1650: ns_port = Integer.parseInt(props.getProperty(NS_PORT));
1651: if (props.containsKey(START_READING_MESSAGE))
1652: TOUT_START_READING_MESSAGE =
1653: Integer.parseInt(props.getProperty(START_READING_MESSAGE));
1654: if (props.containsKey(WHILE_READING))
1655: TOUT_WHILE_READING =
1656: Integer.parseInt(props.getProperty(WHILE_READING));
1657: if (props.containsKey(AFTER_RECEIVING))
1658: TOUT_AFTER_RECEIVING =
1659: Integer.parseInt(props.getProperty(AFTER_RECEIVING));
1660: if (props.containsKey(SERVER_ERROR_PAUSE))
1661: TWAIT_SERVER_ERROR_PAUSE =
1662: Integer.parseInt(props.getProperty(SERVER_ERROR_PAUSE));
1663: }
1664: catch (NumberFormatException ex)
1665: {
1666: throw new BAD_PARAM("Invalid " + NS_PORT +
1667: "property, unable to parse '" + props.getProperty(NS_PORT) +
1668: "'"
1669: );
1670: }
1671:
1672: if (props.containsKey(SocketFactory.PROPERTY))
1673: {
1674: String factory = null;
1675: try
1676: {
1677: factory = props.getProperty(SocketFactory.PROPERTY);
1678: if (factory!=null)
1679: socketFactory = (SocketFactory)
1680: ObjectCreator.forName(factory).newInstance();
1681: }
1682: catch (Exception ex)
1683: {
1684: BAD_PARAM p = new BAD_PARAM("Bad socket factory "+factory);
1685: p.initCause(ex);
1686: throw p;
1687: }
1688: }
1689:
1690: if (props.containsKey(ORB_ID))
1691: orb_id = props.getProperty(ORB_ID);
1692:
1693: if (props.containsKey(SERVER_ID))
1694: server_id = props.getProperty(SERVER_ID);
1695:
1696: Enumeration en = props.elements();
1697: while (en.hasMoreElements())
1698: {
1699: String item = (String) en.nextElement();
1700: if (item.equals(REFERENCE))
1701: initial_references.put(item,
1702: string_to_object(props.getProperty(item))
1703: );
1704: }
1705: }
1706: }
1707:
1708:
1726: public Request get_next_response() throws org.omg.CORBA.WrongTransaction
1727: {
1728: return asynchron.get_next_response();
1729: }
1730:
1731:
1738: public boolean poll_next_response()
1739: {
1740: return asynchron.poll_next_response();
1741: }
1742:
1743:
1757: public void send_multiple_requests_deferred(Request[] requests)
1758: {
1759: asynchron.send_multiple_requests_deferred(requests);
1760: }
1761:
1762:
1771: public void send_multiple_requests_oneway(Request[] requests)
1772: {
1773: asynchron.send_multiple_requests_oneway(requests);
1774: }
1775:
1776:
1779: protected void finalize() throws java.lang.Throwable
1780: {
1781: running = false;
1782: super.finalize();
1783: }
1784:
1785:
1790: public int countConnectedObjects()
1791: {
1792: return connected_objects.size();
1793: }
1794: }