sql >> データベース >  >> RDS >> PostgreSQL

ThreadPoolExecutorを使用すると、DjangoORMが接続をリークします

    私の推測では、 ThreadPoolExecutor DB接続を作成しているのはではありませんが、スレッド化されたジョブは接続を保持しているジョブです。私はすでにこれに対処しなければなりませんでした。

    ThreadPoolExecutorでジョブが実行されるたびにスレッドが手動で閉じられるようにするために、このラッパーを作成することになりました。これは、接続がリークされないようにするのに役立つはずです。これまでのところ、このコードの使用中にリークは発生していません。

    from functools import wraps
    from concurrent.futures import ThreadPoolExecutor
    from django.db import connection
    
    class DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor):
        """
        When a function is passed into the ThreadPoolExecutor via either submit() or map(), 
        this will wrap the function, and make sure that close_django_db_connection() is called 
        inside the thread when it's finished so Django doesn't leak DB connections.
    
        Since map() calls submit(), only submit() needs to be overwritten.
        """
        def close_django_db_connection(self):
            connection.close()
    
        def generate_thread_closing_wrapper(self, fn):
            @wraps(fn)
            def new_func(*args, **kwargs):
                try:
                    return fn(*args, **kwargs)
                finally:
                    self.close_django_db_connection()
            return new_func
    
        def submit(*args, **kwargs):
            """
            I took the args filtering/unpacking logic from 
       
            https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/thread.py 
            
            so I can properly get the function object the same way it was done there.
            """
            if len(args) >= 2:
                self, fn, *args = args
                fn = self.generate_thread_closing_wrapper(fn=fn)
            elif not args:
                raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
                            "needs an argument")
            elif 'fn' in kwargs:
                fn = self.generate_thread_closing_wrapper(fn=kwargs.pop('fn'))
                self, *args = args
        
            return super(self.__class__, self).submit(fn, *args, **kwargs)
    

    次に、これを使用できます:

        with DjangoConnectionThreadPoolExecutor(max_workers=15) as executor:
            results = list(executor.map(func, args_list))
    

    ...そして接続が閉じることを確信してください。




    1. MySQLを使用してJOINでGROUPBYのSUMを取得する

    2. 1つのエンティティクラスの1つの列に2つの異なるテーブルmanytooneマッピングを作成しようとしています

    3. Entity FrameworkがネストされたSQLクエリを生成するのはなぜですか?

    4. SQL ServerでSAアカウントを無効にする(T-SQLの例)