問題が解決しました !私はこれに丸2日を費やしたとは信じられません...私は完全に間違った方向を見ていました。
問題は、一部のDataflowまたはGCPネットワーク構成ではなく、私が知る限りでは...
本当です。
もちろん、問題は私のコードにありました。問題だけが分散環境でのみ明らかになりました。ワーカーではなく、メインのパイプラインプロセッサからトンネルを開くのを間違えました。そのため、SSHトンネルはアップしていましたが、ワーカーとターゲットサーバーの間ではなく、メインパイプラインとターゲットの間だけでした!
これを修正するには、リクエストするDoFnを変更して、クエリの実行をトンネルでラップする必要がありました:
class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""
def __init__(self, *args, **kwargs):
self.dbport = kwargs["port"]
self.dbhost = kwargs["host"]
self.args = args
self.kwargs = kwargs
super().__init__(*args, **kwargs)
def process(self, query, *args, **kwargs):
# Remote side of the SSH Tunnel
remote_address = (self.dbhost, self.dbport)
ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
with open_tunnel(
ssh_tunnel,
ssh_username=self.kwargs["ssh_user"],
ssh_password=self.kwargs["ssh_password"],
remote_bind_address=remote_address,
set_keepalive=10.0
) as tunnel:
forwarded_port = tunnel.local_bind_port
self.kwargs["port"] = forwarded_port
source = sql.SQLSource(*self.args, **self.kwargs)
sql.SQLSouceInput._build_value(source, source.runtime_params)
logging.info("Processing - {}".format(query))
for records, schema in source.client.read(query):
for row in records:
yield source.client.row_as_dict(row, schema)
ご覧のとおり、pysql_beamライブラリの一部をオーバーライドする必要がありました。
最後に、各ワーカーはリクエストごとに独自のトンネルを開きます。この動作を最適化することはおそらく可能ですが、私のニーズには十分です。