sql >> データベース >  >> RDS >> PostgreSQL

SQLAlchemyとマルチプロセッシングを使用してPythonスクリプトでハングアップ

    TypeErrorだと思います multiprocessingから来ています のget

    スクリプトからすべてのDBコードを削除しました。これを見てください:

    import multiprocessing
    import sqlalchemy.exc
    
    def do(kwargs):
        i = kwargs['i']
        print i
        raise sqlalchemy.exc.ProgrammingError("", {}, None)
        return i
    
    
    pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
    results = []
    arglist = []
    for i in range(10):
        arglist.append({'i':i})
    r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
    
    # Use get or wait?
    # r.get()
    r.wait()
    
    pool.close()
    pool.join()
    print results
    

    r.waitの使用 期待される結果を返しますが、r.getを使用します TypeErrorを発生させます 。 pythonのドキュメント で説明されているように 、r.waitを使用します map_asyncの後 。

    編集 :以前の回答を修正する必要があります。私は今、TypeErrorを信じています SQLAlchemyから来ています。エラーを再現するようにスクリプトを修正しました。

    編集2 :問題はmultiprocessing.poolにあるようです コンストラクターがパラメーターを必要とする例外をワーカーが発生させた場合、うまく機能しません(ここ

    これを強調するためにスクリプトを修正しました。

    import multiprocessing
    
    class BadExc(Exception):
        def __init__(self, a):
            '''Non-optional param in the constructor.'''
            self.a = a
    
    class GoodExc(Exception):
        def __init__(self, a=None):
            '''Optional param in the constructor.'''
            self.a = a
    
    def do(kwargs):
        i = kwargs['i']
        print i
        raise BadExc('a')
        # raise GoodExc('a')
        return i
    
    pool = multiprocessing.Pool(processes=5)
    results = []
    arglist = []
    for i in range(10):
        arglist.append({'i':i})
    r = pool.map_async(do, arglist, callback=results.append)
    try:
        # set a timeout in order to be able to catch C-c
        r.get(1e100)
    except KeyboardInterrupt:
        pass
    print results
    

    あなたの場合、あなたのコードがSQLAlchemy例外を発生させるとすると、私が考えることができる唯一の解決策は、doですべての例外をキャッチすることです。 関数を実行し、通常のExceptionを再発生させます 代わりは。このようなもの:

    import multiprocessing
    
    class BadExc(Exception):
        def __init__(self, a):
            '''Non-optional param in the constructor.'''
            self.a = a
    
    def do(kwargs):
        try:
            i = kwargs['i']
            print i
            raise BadExc('a')
            return i
        except Exception as e:
            raise Exception(repr(e))
    
    pool = multiprocessing.Pool(processes=5)
    results = []
    arglist = []
    for i in range(10):
        arglist.append({'i':i})
    r = pool.map_async(do, arglist, callback=results.append)
    try:
        # set a timeout in order to be able to catch C-c
        r.get(1e100)
    except KeyboardInterrupt:
        pass
    print results
    

    編集3 :つまり、Pythonのバグ のようです。 、しかしSQLAlchemyの適切な例外はそれを回避します:したがって、私はSQLAlchemyの問題を提起しました 、も。

    問題の回避策として、編集2の最後の解決策だと思います (コールバックをtry-exceptとre-raiseでラップします)。



    1. 多数または未定義のカテゴリとのクロス集計

    2. MySQLを2列で並べ替え

    3. MYSQLの単一の等しい

    4. SQLで同時イベントの数を計算する