sql >> データベース >  >> RDS >> PostgreSQL

SQLAlchemy、シリアル化可能なトランザクションの分離と慣用的なPythonの方法での再試行

    それで、約2週間突っ込んで、既成の解決策が得られなかった後、私は自分自身を思いついた。

    これがConflictResolverです managed_transactionを提供するクラス 関数デコレータ。デコレータを使用して、関数を再試行可能としてマークできます。つまり関数の実行中にデータベースの競合エラーが発生した場合、関数が再度実行され、競合エラーの原因となったdbトランザクションが終了することを期待しています。

    ソースコードは次のとおりです: https ://bitbucket.org/miohtama/cryptoassets/src/529c50d74972ff90fe5b61dfbfc1428189cc248f/cryptoassets/core/tests/test_conflictresolver.py?at=master

    それをカバーするユニットテストはここにあります: https://bitbucket.org/miohtama/cryptoassets/src/529c50d74972ff90fe5b61dfbfc1428189cc248f/cryptoassets/core/tests/test_conflictresolver.py?at=master

    Python3.4以降のみ。

    """Serialized SQL transaction conflict resolution as a function decorator."""
    
    import warnings
    import logging
    from collections import Counter
    
    from sqlalchemy.orm.exc import ConcurrentModificationError
    from sqlalchemy.exc import OperationalError
    
    
    UNSUPPORTED_DATABASE = "Seems like we might know how to support serializable transactions for this database. We don't know or it is untested. Thus, the reliability of the service may suffer. See transaction documentation for the details."
    
    #: Tuples of (Exception class, test function). Behavior copied from _retryable_errors definitions copied from zope.sqlalchemy
    DATABASE_COFLICT_ERRORS = []
    
    try:
        import psycopg2.extensions
    except ImportError:
        pass
    else:
        DATABASE_COFLICT_ERRORS.append((psycopg2.extensions.TransactionRollbackError, None))
    
    # ORA-08177: can't serialize access for this transaction
    try:
        import cx_Oracle
    except ImportError:
        pass
    else:
        DATABASE_COFLICT_ERRORS.append((cx_Oracle.DatabaseError, lambda e: e.args[0].code == 8177))
    
    if not DATABASE_COFLICT_ERRORS:
        # TODO: Do this when cryptoassets app engine is configured
        warnings.warn(UNSUPPORTED_DATABASE, UserWarning, stacklevel=2)
    
    #: XXX: We need to confirm is this the right way for MySQL, SQLIte?
    DATABASE_COFLICT_ERRORS.append((ConcurrentModificationError, None))
    
    
    logger = logging.getLogger(__name__)
    
    
    class CannotResolveDatabaseConflict(Exception):
        """The managed_transaction decorator has given up trying to resolve the conflict.
    
        We have exceeded the threshold for database conflicts. Probably long-running transactions or overload are blocking our rows in the database, so that this transaction would never succeed in error free manner. Thus, we need to tell our service user that unfortunately this time you cannot do your thing.
        """
    
    
    class ConflictResolver:
    
        def __init__(self, session_factory, retries):
            """
    
            :param session_factory: `callback()` which will give us a new SQLAlchemy session object for each transaction and retry
    
            :param retries: The number of attempst we try to re-run the transaction in the case of transaction conflict.
            """
            self.retries = retries
    
            self.session_factory = session_factory
    
            # Simple beancounting diagnostics how well we are doing
            self.stats = Counter(success=0, retries=0, errors=0, unresolved=0)
    
        @classmethod
        def is_retryable_exception(self, e):
            """Does the exception look like a database conflict error?
    
            Check for database driver specific cases.
    
            :param e: Python Exception instance
            """
    
            if not isinstance(e, OperationalError):
                # Not an SQLAlchemy exception
                return False
    
            # The exception SQLAlchemy wrapped
            orig = e.orig
    
            for err, func in DATABASE_COFLICT_ERRORS:
                # EXception type matches, now compare its values
                if isinstance(orig, err):
                    if func:
                        return func(e)
                    else:
                        return True
    
            return False
    
        def managed_transaction(self, func):
            """SQL Seralized transaction isolation-level conflict resolution.
    
            When SQL transaction isolation level is its highest level (Serializable), the SQL database itself cannot alone resolve conflicting concurrenct transactions. Thus, the SQL driver raises an exception to signal this condition.
    
            ``managed_transaction`` decorator will retry to run everyhing inside the function
    
            Usage::
    
                # Create new session for SQLAlchemy engine
                def create_session():
                    Session = sessionmaker()
                    Session.configure(bind=engine)
                    return Session()
    
                conflict_resolver = ConflictResolver(create_session, retries=3)
    
                # Create a decorated function which can try to re-run itself in the case of conflict
                @conflict_resolver.managed_transaction
                def myfunc(session):
    
                    # Both threads modify the same wallet simultaneously
                    w = session.query(BitcoinWallet).get(1)
                    w.balance += 1
    
                # Execute the conflict sensitive code inside a managed transaction
                myfunc()
    
            The rules:
    
            - You must not swallow all exceptions within ``managed_transactions``. Example how to handle exceptions::
    
                # Create a decorated function which can try to re-run itself in the case of conflict
                @conflict_resolver.managed_transaction
                def myfunc(session):
    
                    try:
                        my_code()
                    except Exception as e:
                        if ConflictResolver.is_retryable_exception(e):
                            # This must be passed to the function decorator, so it can attempt retry
                            raise
                        # Otherwise the exception is all yours
    
            - Use read-only database sessions if you know you do not need to modify the database and you need weaker transaction guarantees e.g. for displaying the total balance.
    
            - Never do external actions, like sending emails, inside ``managed_transaction``. If the database transaction is replayed, the code is run twice and you end up sending the same email twice.
    
            - Managed transaction section should be as small and fast as possible
    
            - Avoid long-running transactions by splitting up big transaction to smaller worker batches
    
            This implementation heavily draws inspiration from the following sources
    
            - http://stackoverflow.com/q/27351433/315168
    
            - https://gist.github.com/khayrov/6291557
            """
    
            def decorated_func():
    
                # Read attemps from app configuration
                attempts = self.retries
    
                while attempts >= 0:
    
                    session = self.session_factory()
                    try:
                        result = func(session)
                        session.commit()
                        self.stats["success"] += 1
                        return result
    
                    except Exception as e:
                        if self.is_retryable_exception(e):
                            session.close()
                            self.stats["retries"] += 1
                            attempts -= 1
                            if attempts < 0:
                                self.stats["unresolved"] += 1
                                raise CannotResolveDatabaseConflict("Could not replay the transaction {} even after {} attempts".format(func, self.retries)) from e
                            continue
                        else:
                            session.rollback()
                            self.stats["errors"] += 1
                            # All other exceptions should fall through
                            raise
    
            return decorated_func
    


    1. データベース行を作成するmysqldump

    2. javaString[]をpostgresプリペアドステートメントに設定中にエラーが発生しました

    3. mysqlとgolangで動的クエリを構築する

    4. java.sql.SQLException:接続はすでに閉じられています