sql >> データベース >  >> RDS >> PostgreSQL

Google Cloud Dataflowで外部データベースサーバーへのSSHトンネルを設定するにはどうすればよいですか?

    問題が解決しました !私はこれに丸2日を費やしたとは信じられません...私は完全に間違った方向を見ていました。

    問題は、一部のDataflowまたはGCPネットワーク構成ではなく、私が知る限りでは...

    本当です。

    もちろん、問題は私のコードにありました。問題だけが分散環境でのみ明らかになりました。ワーカーではなく、メインのパイプラインプロセッサからトンネルを開くのを間違えました。そのため、SSHトンネルはアップしていましたが、ワーカーとターゲットサーバーの間ではなく、メインパイプラインとターゲットの間だけでした!

    これを修正するには、リクエストするDoFnを変更して、クエリの実行をトンネルでラップする必要がありました:

    class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
    """Wraps SQLSourceDoFn in a ssh tunnel"""
    
    def __init__(self, *args, **kwargs):
        self.dbport = kwargs["port"]
        self.dbhost = kwargs["host"]
        self.args = args
        self.kwargs = kwargs
        super().__init__(*args, **kwargs)
    
    def process(self, query, *args, **kwargs):
        # Remote side of the SSH Tunnel
        remote_address = (self.dbhost, self.dbport)
        ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
        with open_tunnel(
            ssh_tunnel,
            ssh_username=self.kwargs["ssh_user"],
            ssh_password=self.kwargs["ssh_password"],
            remote_bind_address=remote_address,
            set_keepalive=10.0
        ) as tunnel:
            forwarded_port = tunnel.local_bind_port
            self.kwargs["port"] = forwarded_port
            source = sql.SQLSource(*self.args, **self.kwargs)
            sql.SQLSouceInput._build_value(source, source.runtime_params)
            logging.info("Processing - {}".format(query))
            for records, schema in source.client.read(query):
                for row in records:
                    yield source.client.row_as_dict(row, schema)
    

    ご覧のとおり、pysql_beamライブラリの一部をオーバーライドする必要がありました。

    最後に、各ワーカーはリクエストごとに独自のトンネルを開きます。この動作を最適化することはおそらく可能ですが、私のニーズには十分です。




    1. SQL Server 2019でネイティブにコンパイルされたストアドプロシージャを実行するときの「内部接続の致命的なエラー」(既知のバグ)

    2. MS SQLServer2017標準でのフェイルオーバーの実装

    3. MySQLデータベースはWindowsサーバーで自動的にバックアップされます

    4. 接続できませんでした:「初期通信パケットの読み取り」でMySQLサーバーへの接続が失われました、システムエラー:0