1)oracle dbにログインし、ユーザーを作成します。
CREATE USER jmsuser IDENTIFIED BY a;
GRANT DBA, AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE to jmsuser;
GRANT EXECUTE ON DBMS_AQADM TO jmsuser;
GRANT EXECUTE ON DBMS_AQ TO jmsuser;
GRANT EXECUTE ON DBMS_LOB TO jmsuser;
GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;
2)クラスはマルチコンシューマキューを作成し、キューに2つのサブスクライバを登録します。 (ConnectionDefinition.getOracleConnection()は、通常のjdbc接続をoracleに返します)
import java.sql.Connection;
import oracle.AQ.AQAgent;
import oracle.AQ.AQDriverManager;
import oracle.AQ.AQQueue;
import oracle.AQ.AQQueueProperty;
import oracle.AQ.AQQueueTable;
import oracle.AQ.AQQueueTableProperty;
import oracle.AQ.AQSession;
/**
*
* @author alukasiewicz
*/
public class NewClass {
public static void main(String[] args) throws Exception {
Class.forName("oracle.AQ.AQOracleDriver");
Connection con = ConnectionDefinition.getOracleConnection();
AQSession aq_sess = AQDriverManager.createAQSession(con);
AQQueueTableProperty qtable_prop;
AQQueueProperty queue_prop;
AQQueueTable q_table;
AQQueue queue;
AQAgent subs1, subs2;
qtable_prop = new AQQueueTableProperty("SYS.AQ$_JMS_BYTES_MESSAGE");
qtable_prop.setMultiConsumer(true);
q_table = aq_sess.createQueueTable("jmsuser", "aq_table5", qtable_prop);
queue_prop = new AQQueueProperty();
queue = aq_sess.createQueue(q_table, "aq_queue5", queue_prop);
System.out.println("Successful createQueue");
System.out.println("Successful start queue");
subs1 = new AQAgent("GREEN", "", 0);
subs2 = new AQAgent("BLUE", "", 0);
queue.addSubscriber(subs2, null);
queue.addSubscriber(subs1, null);
queue.start();
}
}
3)クラスはメッセージをキューに公開します。
public class Publisher {
public static void main(String[] args) throws Exception {
Class.forName("oracle.AQ.AQOracleDriver");
Connection con = ConnectionDefinition.getOracleConnection();
TopicConnection tc_conn =AQjmsTopicConnectionFactory.createTopicConnection(con);
tc_conn.start();
TopicSession jms_sess = tc_conn.createTopicSession(true, Session.SESSION_TRANSACTED);
Topic queueTopic= ((AQjmsSession )jms_sess).getTopic("JMSUSER","AQ_QUEUE5");
AQjmsTopicPublisher publisherAq = (AQjmsTopicPublisher)jms_sess.createPublisher(queueTopic);
BytesMessage messAll = jms_sess.createBytesMessage();
BytesMessage messOnlyForGreen = jms_sess.createBytesMessage();
messAll.writeUTF("Message for all subscribers");
messOnlyForGreen.writeUTF("Message only for green");
publisherAq.publish(messAll);
publisherAq.publish(messOnlyForGreen, new AQjmsAgent[]{new AQjmsAgent("GREEN", null)} );
con.commit();
tc_conn.close();
con.close();
}
}
Oracleでは、これらのメッセージをキューに表示できます。緑に2つ、赤に1つ。
SELECT a.queue, a.msg_state, a.consumer_name FROM jmsuser.aq$aq_table5 a
4)クラスはキューからメッセージを読み取ります。
public class Subscriber {
public static void main(String[] args) throws Exception {
Class.forName("oracle.AQ.AQOracleDriver");
Connection con = ConnectionDefinition.getOracleConnection();
TopicConnection tc_conn = AQjmsTopicConnectionFactory.createTopicConnection(con);
TopicSession jms_sess = tc_conn.createTopicSession(true, Session.SESSION_TRANSACTED);
tc_conn.start();
Topic queueTopic = ((AQjmsSession) jms_sess).getTopic("jmsuser", "AQ_QUEUE5");
TopicSubscriber subGreen = (TopicSubscriber)((AQjmsSession) jms_sess).createDurableSubscriber(queueTopic, "GREEN");
TopicSubscriber subRed = (TopicSubscriber)((AQjmsSession) jms_sess).createDurableSubscriber(queueTopic, "RED");
Message msg = subGreen.receive(10);
System.err.println("Start receiving message for green subscriber");
while(msg != null){
System.err.println(" GREEN recive message "+ ((BytesMessage)msg).readUTF());
msg = subGreen.receive(10); // receive with timeout;
}
System.err.println("End receiving message for green subscriber");
System.err.println(" ");
System.err.println("Start receiving message for red subscriber");
BytesMessage byteMsg = (BytesMessage)msg;
msg = subRed.receive(10);
while(msg != null){
System.err.println(" RED recive message "+ ((BytesMessage)msg).readUTF());
msg = subRed.receive(10); // receive with timeout;
}
System.err.println("End receiving message for red subscriber");
con.commit();
tc_conn.close();
con.close();
}
}
5)Pomの依存関係
<dependencies>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.4</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>aqapi</artifactId>
<version>13</version>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>orai18n</artifactId>
<version>11.2.0.4</version>
</dependency>
</dependencies>