JMS mit Oracle Advanced Queuing / Oracle Streams AQ Christian Hartmann Freiberufler Hamburg Schlüsselworte: JMS, Oracle Advanced Queuing, Java, PL/SQL, Oracle Streams AQ, Oracle Database 10g 1. Einleitung JMS ist der Standard zum Austausch von Nachrichten zwischen unterschiedlichen (entfernten) Systemen in Java-Umfeld. Es ermöglicht eine lose Kopplung und ist damit Grundlage für eine serviceorientierte Architektur. In jeder Installation einer Oracle-Datenbank ist seit der Version 9i, Release 2, ein so genanntes "Advanced Queueing" mit installiert. Mit "Advanced Queueing" (kurz AQ) wird das interne Messaging Verfahren von auf der Oracle-Datenbankebene bezeichnet. Oracle betreibt dafür intern einen sog. ESB (Enterprise Service Bus). Anhand eines Praxisbeispiels wird der Einsatz eines Oracle Datenbanksystems 10g als Oracle JMS Provider gezeigt, so dass ohne weitere Systeme ein tragfähiges, hoch performantes JMSSystem aufgebaut werden kann. 2. Einführung in JMS JMS (Java Message System) stellt eine komfortable Möglichkeit dar, ein sog. "Fire-AndForget-Konzept" umzusetzen, d.h. eine asynchrone Abarbeitung von Befehlen wird möglich. Dadurch kann beispielsweise erreicht werden, dass langlaufende Prozesse im Hintergrund ablaufen und die GUI für den Benutzer sofort wieder zur Verfügung steht. Als Beispiel sei hier eine Bestellung genannt. Der Kunde erhält sofort eine Bestätigung, während der komplexe Prozess der Bestellabwicklung parallel und asynchron angestoßen wird. Des Weiteren wird dadurch die lose Kopplung von Systemen möglich. Während das GUI des Bestellsystems beispielsweise in einer "Tomcat-Umgebung" läuft, kann das Bestellsystem auf einem anderen weit entfernten System laufen. Abb. 1: JMS Prinzip: Loose Kopplung / Fire + Forget Mittels JMS können nun Nachrichten über einen sog. "JMS-Provider" (im Prinzip eine Art Server) ausgetauscht werden. Dabei schickt das aufrufende System seine Nachricht an diesen Server. Das aufrufende System muss sich dabei nicht darum kümmern, wie und welches System die Nachricht wieder abholt. Für das sendende System ist die Prozessbearbeitung durch Absenden der Nachricht beendet. Das sendende System muss nur den "JMS-Provider" kennen und wissen, wie es die Nachricht an diesen versendet. Das empfangene System wiederum verbindet sich gegen den "JMS-Provider" und konsumiert die Nachricht. Auch hier muss das empfangende System nicht wissen, wer der Absender der Nachricht war und wie er diesen erreichen kann. Er muss lediglich die Adresse des "JMS Providers" kennen und wissen, wie er sich gegen diesen verbindet. Das empfangene oder das absendende System müssen dabei jeweils nicht gleichzeitig verfügbar sein. Der "JMS-Provider" sorgt für die Persistierung der entsprechenden Nachricht. Dabei ist einstellbar, wie lange dieser eine Nachricht verfügbar halten soll bevor diese als nicht mehr verfügbar angesehen werden soll. Grundsätzlich gibt es bei JMS zwei Prinzipen der Nachrichtenübermittlung. Zum einen das sog. "Punkt-zu-Punkt (Point-To-Point) Verfahren", bei dem es einen Sender und einen Empfänger gibt. Abb. 2: JMS Queue Bei diesem Verfahren erhält derjenige Empfänger die Nachricht, der dieser als erstes anfordert. Danach wird die Nachricht innerhalb der Queue (also auf dem JMS-Provider) gelöscht. Eine weitere Zustellung an einen weiteren Empfänger derselben Nachricht ist dann nicht mehr möglich. Bei dem "Publish-Subscribe-Modell" kann eine gesendete Nachricht mehrere Empfänger erreichen. Nachrichten werden dabei an ein sog. "Topic" versendet. Dabei können nur vorher an das Topic registrierte Empfänger die Nachricht auch erhalten. Sind diese nicht mit dem Topic verbunden, so erhalten diese die Nachricht nicht. Das interessante dabei ist, dass die Nachricht dabei solange im Topic (Queue / JMS-Provider) bestehen bleibt bis entweder das eingestellte Timeout erreicht wurde, oder aber die Nachricht an alle zuvor registrierten Empfänger verteilt wurde. Abb. 3: JMS Topic In dem hier vorliegenden Praxisbeispiel wird nur das „Point-To-Point“ Verfahren angewendet. 3. Die Oracle-Datenbank als JMS-Provider In jeder Installation einer Oracle-Datenbank ist seit der Version 9i, Release 2, ein so genanntes "Advanced Queuing" mit installiert. Abb. 4: Das Oracle-Datenbanksystem als JMS-Provider Das interessante dabei ist, dass Oracle dabei intern alle Nachrichten mit etablierten Standardverfahren über Datenbanktabellen und Datenbankusern abwickelt. Dies bedeutet, dass es sich hierbei um ein robustes, stabiles und seit Jahren etabliertes Verfahren handelt. Des Weiteren hat man, sofern man eine Oracle Datenbank betreibt, immer auch gleich einen JMS-Provider zur Hand, so dass man kein weiteres System aufsetzen muss um JMS nutzen zu können. Grundsätzlich gibt es zwei Verfahren um über das Advanced Queuing von Oracle Nachrichten zu versenden. Zum einen können die Nachrichten über sog. "PL/SQL-Packages" versendet werden, welches jedoch kein „echtes“ JMS darstellt. Eine andere Möglichkeit ist der Versand per Nachrichten über den sog. "JMS-Adapter". 4. Use-Case des Praxissystems Von einem Vorsystem werden Daten per FTP im XML-Format auf den Datenbankserver geliefert. Diese werden gemäß einer vorgegebenen Fachlogik verarbeitet. Sowohl die Verarbeitung im Ladeprozess als auch die Änderung von Daten in einer vom Benutzer zu bedienenden Anwendung führen zu Trigger-Auslösungen auf der Datenbank, welche dann eine Nachricht vom Typ XMLType (ein PL/SQL Type) in eine Queue stellen. Diese Information wird mit Hilfe eines externen Java-Programms entsprechend ausgelesen, bewertet und verschlüsselt als serialisiertes Java-Objekt über eine Queue einem externen Zielsystem zur Verfügung gestellt. Dies führt zu der folgenden Architektur: Abb. 5: Zielarchitektur 5. Einrichtung der Datenbank zur Nutzung als JMS-Provider 5.1 Datenbank-User-Anlage Da alle notwendigen Packages zum Betrieb der Oracle Datenbank als JMS-Provider bereits in der Grundinstallation einer Oracle Datenbank installiert sind, werden nur noch die notwendigen User und die entsprechenden Queues benötigt. Wie bereits beschrieben, so erfolgt die Verbindung zum Oracle-JMS-Providers mittels normaler Datenbankbenutzer. Oracle benötigt hierfür lediglich zwei dedizierte Datenbank-Benutzer. Einen für die Verwaltung der Queues und ihrer darunter liegenden Tabellen und einen für die eigentliche Nutzung des JMS-Providers, d.h. das Versenden und Empfangen der Nachrichten. Wir legen nun einen User "atc_aqadm" als Verwalter und einen User "atc_aquser" als Nutzer an und statten diese User mit den notwendigen Rechten aus: connect system/<YOUR_PASSWORD> -- Die Rolle für den AQ Admin User CREATE ROLE atc_aq_adm_role; GRANT CONNECT, RESOURCE, aq_administrator_role, create any procedure TO atc_aq_adm_role; -- Die Rolle für den normalen AQ User, der die Nachrichten in die Queue stellt CREATE ROLE atc_aq_user_role; GRANT CREATE SESSION, aq_user_role TO atc_aq_user_role; -- Berechtigung zum Stellen einer Nachricht in die Queue exec dbms_aqadm.grant_system_privilege(privilege => 'ENQUEUE_ANY', grantee => 'atc_aq_user_role', admin_option => TRUE); -- Berechtigung zum sog. "Dequeuen", d.h. nach Erhalt der Nachricht wird diese aus der Queue gelöscht exec dbms_aqadm.grant_system_privilege(privilege => 'DEQUEUE_ANY', grantee => 'atc_aq_user_role', admin_option => TRUE); -- Der Admin-User für die Queues create user atc_aqadm identified by atc_aqadm; grant atc_aq_adm_role to atc_aqadm; grant execute on dbms_aq to atc_aqadm; -- Der Nutzer der Queues create user atc_aquser identified by atc_aquser; grant connect, resource, create session to atc_aquser; alter user atc_aquser; grant atc_aq_user_role to atc_aquser; grant execute on dbms_aq to atc_aquser; 5.2 Anlegen der Queues Zum Versenden der Nachrichten werden nun die entsprechenden Queues definiert. Diese beiden Queues unterscheiden sich durch die Angabe ihrer Nutzdaten (Payload). Für die JavaObjekte wird als Nutzart ,AQ$_JMS_OBJECT_ESSAGE‘ und für die Nachricht mit dem PL/SQL Datentyp aus den PL/SQL-Routinen die Nutzart ,SYS.XMLTYPE‘ gewählt. Da wie zuvor beschrieben die gesamte Persistierung der Nachrichten über Tabellen geschieht, wird zunächst mittels ,DBMS_AQADM.CREATE_QUEUE_TABLE‘ eine entsprechende Tabelle erzeugt, die dann für das Halten der entsprechenden Nutzdaten vorbereitet wird. Danach wird die entsprechende Queue mit dem gewünschten Namen erzeugt (z. Bsp. ,DEV_ATCBPPMESSAGEQ‘) und diese dann auf die entsprechend zuvor angelegte Tabelle gemappt. Danach wird die eigentliche Queue noch gestartet. Wird die entsprechende Queue nicht gestartet, so kann an diese keine Nachricht gesendet werden bzw. keine Nachricht von dieser abgeholt werden. Der entsprechende Name der Queue ist wichtig, da hierüber aus den jeweiligen Applikationen der Zugriff auf diese erfolgt. Nachfolgend die Anlage der Queue zur Aufnahme von serialisierten Java-Objekten: EXEC DBMS_AQADM.CREATE_QUEUE_TABLE(QUEUE_TABLE => 'DEV_ATCBPPMESSAGET', QUEUE_PAYLOAD_TYPE => 'SYS.AQ$_JMS_OBJECT_MESSAGE', MULTIPLE_CONSUMERS => FALSE); EXEC DBMS_AQADM.CREATE_QUEUE(QUEUE_NAME => 'DEV_ATCBPPMESSAGEQ', QUEUE_TABLE => 'DEV_ATCBPPMESSAGET'); EXEC DBMS_AQADM.START_QUEUE(QUEUE_NAME => 'DEV_ATCBPPMESSAGEQ'); -- Grant the aq_user the needed rights to inserts and modify -- lobs (this is needed only for JMS-Queues) grant select, delete, insert, update on DEV_ATCBPPMESSAGET to atc_aq_user_role; Als Nachrichten-Typ gibt es im JMS-Umfeld noch unter anderem die folgenden wichtigen Typen: • AQ$_JMS_TEXT_MESSAGE - für den Versand von reinen Strings • AQ$_JMS_MAP_MESSAGE - für den Versand von Maps • AQ$_JMS_BYTES_MESSAGE - für den Versand von Bytecode Nachfogend nun die Anlage der Queue für den Versand einer PL/SQL-Nachricht aus einer PL/SQL-Routine heraus: EXEC DBMS_AQADM.CREATE_QUEUE_TABLE(QUEUE_TABLE => 'DEV_ATCBPPINT', QUEUE_PAYLOAD_TYPE => 'SYS.XMLType', MULTIPLE_CONSUMERS => FALSE); EXEC DBMS_AQADM.CREATE_QUEUE(QUEUE_NAME => 'DEV_ATCBPPINT', QUEUE_TABLE => 'DEV_ATCBPPINT'); EXEC DBMS_AQADM.START_QUEUE(QUEUE_NAME => 'DEV_ATCBPPINT'); Für eine sog. „PL/SQL-Queue“ kann jeder beliebige Datentyp angegeben werden, der auch im PL/SQL-Umfeld möglich ist (beispielsweise VARCHAR2, NUMBER, etc.) 6. Nutzung der Datenbank als JMS-Provider zum Senden und Empfangen von Nachrichten 6.1 Versand von Nachrichten in eine PL/SQL-Queue aus PL/SQL Gemäß des oben genannten User-Cases wird eine Nachricht nach der Modifikation eines Datensatzes in eine PL/SQL-Queue gesendet. Zunächst wird das als VARCHAR empfangene XML in ein XMLType gewandelt. Danach wird dieser XMLType in die zuvor definierte Queue abgelegt: [...] ENQUEUE_OPTIONS DBMS_AQ.ENQUEUE_OPTIONS_T; MESSAGE_PROPERTIES DBMS_AQ.MESSAGE_PROPERTIES_T; MESSAGE_HANDLE RAW(16); [...] v_XML_T := XMLType.createXML(v_XML); DBMS_AQ.ENQUEUE( queue_name=>'ATC_AQADM.DEV_ATCBPPINT', enqueue_options=>ENQUEUE_OPTIONS, message_properties=> MESSAGE_PROPERTIES, PAYLOAD=>v_XML_T, MSGID=>MESSAGE_HANDLE); 6.2 Abholen der Nachricht aus der PL/SQL-Queue mittels Java Die Nachricht wird nun wieder mittels eines Java-Programmes aus einer PL/SQL-Queue abgeholt. Da der Zugriff auf eine Queue wie ein normaler Datenbank-Connect über den JDBC-Thin-Driver erfolgt, reicht es aus, sich mit dem zuvor angelegten User atc_aquser gegen die entsprechenden Datenbank zu konnektieren und die Nachricht abzuholen. Hierfür werden nur die folgenden externen JARs benötigt: • Oracle AQ-Libaries: aq-x.x.x.jar • Normaler JDBC-Thin-Driver • JMS-Libaries von Sun (siehe http://java.sun.com/products/jms) Für die Verwendung des Oracle XMLTypes in einem Java-Programm wird zusätzlich noch die Java-Bibliothek xdb.jar benötigt. Als ,connectString‘ wird ein normaler JDBC-Thin-Connection-String verwendet (z. Bsp. jdbc:oracle:thin:@<SERVER_NAME>:1521:ATC) Nachfolgend der entsprechende Code - stark vereinfacht und ohne Fehlerhandling: Class.forName("oracle.jdbc.driver.OracleDriver"); conn = DriverManager.getConnection(connectString, userName, userPassword); Class.forName("oracle.AQ.AQOracleDriver"); AQSession aq_sess = AQDriverManager.createAQSession(conn); AQQueue aqQueue = aq_sess.getQueue(DB_AQ_ADMIN_NAME, queueName); AQDequeueOption dqOption = new AQDequeueOption(); dqOption.setNavigationMode (AQDequeueOption.NAVIGATION_NEXT_MESSAGE); AQMessage msg = ((AQOracleQueue)aqQueue).dequeue(dqOption,XMLType.getORADataFa ctory()); AQObjectPayload payload = msg.getObjectPayload(); XMLType xmlType = (XMLType) payload.getPayloadData(); 6.3 Versand von Nachrichten in eine JMS-Queue Nachdem die Nachricht durch das Java-Programm abgeholt wurde und fachliche Routinen ausgeführt wurden, erzeugt das Java-Programm ein verschlüsseltes Java-Objekt, welches dann auf eine JMS-Queue gelegt wird, die zuvor - wie oben dargestellt - entsprechend definiert wurde. Auch hier erfolgt der Zugriff auf die Queue mittels einer normalen JDBCConnection und dem zuvor definierten Usernamen ,atc_aquser‘. Diese wird im unten stehenden Codebeispiel über den ,connectString‘ bereitgesellt (z. Bsp. jdbc:oracle:thin:@<SERVER_NAME>:1521:ATC) // Übergabe: Ein „Serializable“ JMSQueue jmsQueue = new JMSQueue(); Properties info = new Properties(); info.put(userName, userPassword); // ConnectString : JDBC-ConnectString QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(connectString, info); jmsQueue.connection = queueConnectionFactory.createQueueConnection(userName, userPassword); // If a session is transacted, message acknowledgment is handled automatically // by commit and recovery is handled automatically by rollback jmsQueue.session = aq.connection.createQueueSession( true, // Session is transacted Session.CLIENT_ACKNOWLEDGE); // Acknowledges by commit and rollback jmsQueue.connection.start(); Queue queue = ((AQjmsSession)jmsQueue.session).getQueue(DB_AQ_ADMIN_NAME, “DEV_ATCBPPMESSAGEQ“); AQjmsObjectMessage objectData = new AQjmsObjectMessage(); objectData.setObject(serializable); QueueSender queueSender = jmsQueue.session.createSender(queue); queueSender.send(objectData); jmsQueue.session.commit(); jmsQueue.session.close(); jmsQueue.connection.close(); 6.4 Abholen einer Nachricht aus einer JMS-Queue Das externe Drittsystem holt die Daten aus der JMS-Queue dann entsprechend wie nachfolgende dargestellt ab. Auch hier wird wieder über einen normalen JDBC-Connect gearbeitet. JMSQueue jmsQueue = new JMSQueue(); Properties info = new Properties(); info.put(userName, userPassword); QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(connectString, info); jmsQueue.connection = queueConnectionFactory.createQueueConnection(userName, userPassword); jmsQueue.session = jmsQueue.connection.createQueueSession( true, Session.CLIENT_ACKNOWLEDGE); rollback // Session is transacted // Acknowledges by commit and jmsQueue.connection.start(); Queue queue = ((AQjmsSession) jmsQueue.session).getQueue(DB_AQ_ADMIN_NAME, "DEV_ATCBPPMESSAGEQ“); QueueReceiver queueReceiver = jmsQueue.session.createReceiver(queue); AQjmsObjectMessage objectMessage = (AQjmsObjectMessage) queueReceiver.receive(5000); Serializable serializable = objectMessage.getObject(); jmsQueue.session.commit(); jmsQueue.session.close(); jmsQueue.connection.close(); Das abholende System hat danach das empfangende Java-Objekt als ,Serializable‘ entsprechend zur Verfügung. 7. Fazit Mit Hilfe der Möglichkeiten von "Oracle Advanced Queueing" lässt sich auf einfache Art und Weise ein JMS-Service etablieren. In jedem Umfeld in dem sich eine Oracle Datenbank befindet, ist diese Funktionalität im Prinzip "Out-of-the-Box" vorhanden und einsatzbereit. Die hier vorliegende Abhandlung zeigt natürlich nur einen kleinen Ausschnitt aus den mächtigen Möglichkeiten, die Oracle in Zusammenhang mit dem Austausch von Nachrichten bietet. Entsprechend komplexer wird die Thematik beispielsweise bei dem sog. "Publish Subscribe Messaging Modell", bei dem es mehrere Sender und Empfänger geben kann. Auch die Thematik der Fehlerbehandlung ist selbstverständlich nicht zu vernachlässigen. Oracle bietet hier ein durchdachtes Konzept bei auftretenden Fehlern in der Nachrichtenübertragung. Kombiniert mit dem "Java Exception Handling" ergeben sich etablierte Möglichkeiten der Fehlerbehandlung, bis hin zum Zurückrollen einer Transaktion auf der sendenden Seite. Mit sog. "Message Driven Beans" im Rahmen der "Enterprise Java Beans" stehen dem Entwickler noch weitere komplexe Möglichkeiten zur Verfügung. Die Erwartungen an das Advanced Queuing wurden voll uns ganz erfüllt. Derzeit arbeitet das System mit ca. 10.000 Nachrichten täglich ohne Probleme. Für den Einsatz von Oracle als JMS-Provider sprechen folgende Gründe: vollwertiger JMS-Provider Out-Of-The-Box Einsatz Nutzung von etablierten Technologien zur Persistierung, Backup und Recovery einfache Administration mit bewährten Tools (TOAD, SQL*Plus, etc.) keine Administration eines weiteren Systems notwendig leichtes Ansprechen aus Drittprogrammen heraus über den Standard-Port 1521, der inhouse in den meisten Firewalls freigegeben ist • Zugriff mittels Standard-Treibern (keine weitere Connectivity-Software notwendig) • • • • • • Kontaktadresse: Christian Hartmann Bartholomäusstraße 14 D-22083 Hamburg Telefon: Fax: E-Mail Internet: +49(0) 160-94727557 +49(0) 40-64230071 [email protected] http://www.christian-hartmann.de