sql >> データベース >  >> RDS >> Oracle

Javaでのコミット時にOracleAQテーブルにエンキューし、JMSクライアントで使用する方法

    私はこれを達成することができました-OracleAPIの多くの部分を推測し、さまざまなブログからヒントを収集する必要がありました。ここに興味がある人にとっては、私がそれを機能させる方法です-1。 OracleDb2でOracleオブジェクトを作成しました。このOracleオブジェクトを使用して、payload3としてオブジェクトタイプのキューテーブルを作成しました。これで、オブジェクトdata4を含むSTRUCTペイロードを使用してAQMessageタイプをキューに入れることができます。また、ADTペイロードタイプを理解しているJMSコンシューマーでデキューすることができます( http://blog.javaforge.net/post/30858904340/oracle-advanced-queuing-spring-custom-types

    コードを使用した手順は次のとおりです-Oracle

    オブジェクトの構造体インスタンスを使用して、JavaでAQMessage型をキューに入れることができるようになりました

    public void enqueueMessage(OracleConnection conn, String correlationId, byte[] payloadData) throws Exception {
        // First create the message properties:
        AQMessageProperties aqMessageProperties = AQFactory.createAQMessageProperties();
        aqMessageProperties.setCorrelation(correlationId);
        aqMessageProperties.setExceptionQueue(EXCEPTION_QUEUE_NAME);
    
        // Specify an agent as the sender:
        AQAgent aqAgent = AQFactory.createAQAgent();
        aqAgent.setName(SENDER_NAME);
        aqAgent.setAddress(QUEUE_NAME);
        aqMessageProperties.setSender(aqAgent);
    
        // Create the payload
        StructDescriptor structDescriptor = StructDescriptor.createDescriptor(EVENT_OBJECT, conn);
        Map<String, Object> payloadMap = new HashMap<String, Object>();
        payloadMap.put("ID", correlationId);
        payloadMap.put("PAYLOAD", new OracleAQBLOBUtil().createBlob(conn, payloadData));
        STRUCT struct = new STRUCT(structDescriptor, conn, payloadMap);
    
        // Create the actual AQMessage instance:
        AQMessage aqMessage = AQFactory.createAQMessage(aqMessageProperties);
        aqMessage.setPayload(struct);
    
        AQEnqueueOptions opt = new AQEnqueueOptions();
        opt.setDeliveryMode(AQEnqueueOptions.DeliveryMode.PERSISTENT);
        opt.setVisibility(AQEnqueueOptions.VisibilityOption.ON_COMMIT);
    
        // execute the actual enqueue operation:
        conn.enqueue(QUEUE_NAME, opt, aqMessage);
    }
    

    ブロブフィールドには特別な処理が必要でした

    public class OracleAQBLOBUtil {
    
        public BLOB createBlob(OracleConnection conn, byte[] payload) throws Exception {
            BLOB blob = BLOB.createTemporary(conn, false, BLOB.DURATION_SESSION);
            OutputStream outputStream = blob.setBinaryStream(1L);
            InputStream inputStream = new ByteArrayInputStream(payload);
            try {
                byte[] buffer = new byte[blob.getBufferSize()];
                int bytesRead = 0;
                while ((bytesRead = inputStream.read(buffer)) != -1) {
                    outputStream.write(buffer, 0, bytesRead);
                }
                return blob;
            }
            finally {
                outputStream.close();
                inputStream.close();
            }
        }
    
        public byte[] saveOutputStream(BLOB blob) throws Exception {
            InputStream inputStream = blob.getBinaryStream();
            int counter;
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            while ((counter = inputStream.read()) > -1) {
                byteArrayOutputStream.write(counter);
            }
            byteArrayOutputStream.close();
            return byteArrayOutputStream.toByteArray();
        }
    
    }
    

    コンシューマーの場合、コンシューマーがペイロードタイプ(カスタムオブジェクト)を理解できるようにするORADataFactoryのインスタンスを提供する必要があります。

    AQjmsSession queueSession = (AQjmsSession) session;
    Queue queue = (Queue) ctx.lookup(queueName);
    MessageConsumer receiver = queueSession.createReceiver(queue, new OracleAQObjORADataFactory());
    

    OracleAQObjORADataFactoryのコードはどこにありますか

    import java.io.ByteArrayOutputStream;
    import java.io.InputStream;
    import java.sql.Connection;
    import java.sql.SQLException;
    
    import oracle.jdbc.OracleTypes;
    import oracle.jpub.runtime.MutableStruct;
    import oracle.sql.BLOB;
    import oracle.sql.Datum;
    import oracle.sql.ORAData;
    import oracle.sql.ORADataFactory;
    import oracle.sql.STRUCT;
    
    public class OracleAQObjORADataFactory  implements ORAData, ORADataFactory {
    
        public static final String EVENT_OBJECT = "SYSTEM.AQ_EVENT_OBJ";
        public static final int _SQL_TYPECODE = OracleTypes.STRUCT;
    
        protected MutableStruct _struct;
    
        protected static int[] _sqlType = { java.sql.Types.VARCHAR, java.sql.Types.VARBINARY };
        protected static ORADataFactory[] _factory = new ORADataFactory[2];
        protected static final OracleAQObjORADataFactory  _AqEventObjFactory = new OracleAQObjORADataFactory ();
    
        public static ORADataFactory getORADataFactory() {
            return _AqEventObjFactory;
        }
    
        /* constructors */
        protected void _init_struct(boolean init) {
            if (init)
                _struct = new MutableStruct(new Object[2], _sqlType, _factory);
        }
    
        public OracleAQObjORADataFactory () {
            _init_struct(true);
        }
    
        public OracleAQObjORADataFactory (String id, byte[] payload) throws SQLException {
            _init_struct(true);
            setId(id);
            setPayload(payload);
        }
    
        /* ORAData interface */
        public Datum toDatum(Connection c) throws SQLException {
            return _struct.toDatum(c, EVENT_OBJECT);
        }
    
        /* ORADataFactory interface */
        public ORAData create(Datum d, int sqlType) throws SQLException {
            return create(null, d, sqlType);
        }
    
        protected ORAData create(OracleAQObjORADataFactory  o, Datum d, int sqlType) throws SQLException {
            if (d == null)
                return null;
            if (o == null)
                o = new OracleAQObjORADataFactory ();
            o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
            return o;
        }
    
        public String getId() throws SQLException {
            return (String) _struct.getAttribute(0);
        }
    
        public void setId(String id) throws SQLException {
            _struct.setAttribute(0, id);
        }
    
        public byte[] getPayload() throws SQLException {
            BLOB blob = (BLOB) _struct.getAttribute(1);
            InputStream inputStream = blob.getBinaryStream();
            return getBytes(inputStream);
        }
    
        public byte[] getBytes(InputStream body) {
            int c;
            try {
                ByteArrayOutputStream f = new ByteArrayOutputStream();
                while ((c = body.read()) > -1) {
                    f.write(c);
                }
                f.close();
                byte[] result = f.toByteArray();
                return result;
            }
            catch (Exception e) {
                System.err.println("Exception: " + e.getMessage());
                e.printStackTrace();
                return null;
            }
        }
    
        public void setPayload(byte[] payload) throws SQLException {
            _struct.setAttribute(1, payload);
        }
    
    }
    

    プロジェクトでCamelまたはSpringを使用している可能性があります。その場合は-1です。 Camel 2.10.2以降を使用している場合は、カスタムメッセージリスターコンテナ(CAMEL-5676)2を使用してJMSコンシューマーを作成できます。以前のバージョンを使用している場合は、エンドポイントの方法を使用できない可能性があります(私はそれを理解できませんでした)が、JMSリクエストリスナーを使用することはできます

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
        xmlns:p="http://www.springframework.org/schema/p"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                            http://www.springframework.org/schema/jms
                            http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
    
        <!-- this is just an example, you can also use a datasource as the ctor arg -->
        <bean id="connectionFactoryOracleAQQueue" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory">
            <constructor-arg index="0">
                <value>jdbc:oracle:thin:@blrub442:1522:UB23</value>
            </constructor-arg>
            <constructor-arg index="1" type="java.util.Properties">
                <value></value>
            </constructor-arg>
        </bean>
    
        <bean id="oracleQueueCredentials" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
            <property name="targetConnectionFactory">
                <ref bean="connectionFactoryOracleAQQueue" />
            </property>
            <property name="username">
                <value>system</value>
            </property>
            <property name="password">
                <value>oracle</value>
            </property>
        </bean>
    
        <!-- Definitions for JMS Listener classes that we have created -->
        <bean id="aqMessageListener" class="com.misys.test.JmsRequestListener" />
    
        <bean id="aqEventQueue" class="com.misys.test.OracleAqQueueFactoryBean">
            <property name="connectionFactory" ref="oracleQueueCredentials" />
            <property name="oracleQueueName" value="BOZ_SINGLE_QUEUE" />
        </bean>
    
        <!-- The Spring DefaultMessageListenerContainer configuration. This bean is automatically loaded when the JMS application context is started -->
        <bean id="jmsContainer" class="com.misys.test.AQMessageListenerContainer" scope="singleton">
            <property name="connectionFactory" ref="oracleQueueCredentials" />
            <property name="destination" ref="aqEventQueue" />
            <property name="messageListener" ref="aqMessageListener" />
            <property name="sessionTransacted" value="false" />
        </bean>
    
    </beans>
    

    カスタムメッセージリスナーコンテナ

    public class AQMessageListenerContainer extends DefaultMessageListenerContainer {
    
        @Override
        protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
            return ((AQjmsSession) session).createConsumer(destination, getMessageSelector(),
                    OracleAQObjORADataFactory.getORADataFactory(), null, isPubSubNoLocal());
        }
    }
    

    およびリクエストリスナーのonMessageメソッド

    public void onMessage(Message msg) {
        try {
            AQjmsAdtMessage aQjmsAdtMessage = (AQjmsAdtMessage) msg;
            OracleAQObjORADataFactory obj = (OracleAQObjORADataFactory) aQjmsAdtMessage.getAdtPayload();
    
            System.out.println("Datetime: " + obj.getId());
            System.out.println("Payload: " + new String(obj.getPayload(), Charset.forName("UTF-8")));
        }
        catch (Exception jmsException) {
            if (logger.isErrorEnabled()) {
                logger.error(jmsException.getLocalizedMessage());
            }
        }
    }
    



    1. MAMPの一般ログを有効にして、すべてのMySQLクエリをファイルに記録します

    2. 相互依存関係のあるSQLスクリプトの実行

    3. cx_Oracle:ストアドプロシージャへの引数としてPL /SQLRECORDタイプを使用する

    4. Mysqlトランザクション:コミットとロールバック