Fix ssh tunneling for normalization (#6396)
* switch to custom file for ssh config in normalization * bump version * get local port properly * added unit test for write_ssh_config * format
This commit is contained in:
@@ -45,5 +45,5 @@ WORKDIR /airbyte
|
||||
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
|
||||
ENTRYPOINT ["/airbyte/entrypoint.sh"]
|
||||
|
||||
LABEL io.airbyte.version=0.1.45
|
||||
LABEL io.airbyte.version=0.1.46
|
||||
LABEL io.airbyte.name=airbyte/normalization
|
||||
|
||||
@@ -101,7 +101,7 @@ function main() {
|
||||
run)
|
||||
configuredbt
|
||||
. /airbyte/sshtunneling.sh
|
||||
openssh $CONFIG_FILE "${PROJECT_DIR}/localsshport.json"
|
||||
openssh "${PROJECT_DIR}/ssh.json"
|
||||
trap 'closessh' EXIT
|
||||
# Run dbt to compile and execute the generated normalization models
|
||||
dbt run --profiles-dir "${PROJECT_DIR}" --project-dir "${PROJECT_DIR}"
|
||||
|
||||
@@ -55,7 +55,7 @@ class TransformConfig:
|
||||
transformed_config = self.transform(integration_type, original_config)
|
||||
self.write_yaml_config(inputs["output_path"], transformed_config, "profiles.yml")
|
||||
if self.is_ssh_tunnelling(original_config):
|
||||
self.write_ssh_port(inputs["output_path"], self.pick_a_port())
|
||||
self.write_ssh_config(inputs["output_path"], original_config, transformed_config)
|
||||
|
||||
@staticmethod
|
||||
def parse(args):
|
||||
@@ -282,17 +282,21 @@ class TransformConfig:
|
||||
fh.write(yaml.dump(config))
|
||||
|
||||
@staticmethod
|
||||
def write_ssh_port(output_path: str, port: int):
|
||||
def write_ssh_config(output_path: str, original_config: Dict[str, Any], transformed_config: Dict[str, Any]):
|
||||
"""
|
||||
This function writes a small json file with content like {"port":xyz}
|
||||
This is being used only when ssh tunneling.
|
||||
We do this because we need to decide on and save this port number into our dbt config
|
||||
and then use that same port in sshtunneling.sh when opening the tunnel.
|
||||
This function writes a json file with config specific to ssh.
|
||||
We do this because we need these details to open the ssh tunnel for dbt.
|
||||
"""
|
||||
ssh_dict = {
|
||||
"db_host": original_config["host"],
|
||||
"db_port": original_config["port"],
|
||||
"tunnel_map": original_config["tunnel_method"],
|
||||
"local_port": transformed_config["normalize"]["outputs"]["prod"]["port"],
|
||||
}
|
||||
if not os.path.exists(output_path):
|
||||
os.makedirs(output_path)
|
||||
with open(os.path.join(output_path, "localsshport.json"), "w") as fh:
|
||||
json.dump({"port": port}, fh)
|
||||
with open(os.path.join(output_path, "ssh.json"), "w") as fh:
|
||||
json.dump(ssh_dict, fh)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
|
||||
@@ -23,8 +23,10 @@
|
||||
#
|
||||
|
||||
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
import pytest
|
||||
@@ -339,3 +341,42 @@ class TestTransformConfig:
|
||||
assert {"integration_type": DestinationType.postgres, "config": "config.json", "output_path": "out.yml"} == t.parse(
|
||||
["--integration-type", "postgres", "--config", "config.json", "--out", "out.yml"]
|
||||
)
|
||||
|
||||
def test_write_ssh_config(self):
|
||||
original_config_input = {
|
||||
"type": "postgres",
|
||||
"dbname": "my_db",
|
||||
"host": "airbyte.io",
|
||||
"pass": "password123",
|
||||
"port": 5432,
|
||||
"schema": "public",
|
||||
"threads": 32,
|
||||
"user": "a user",
|
||||
"tunnel_method": {
|
||||
"tunnel_host": "1.2.3.4",
|
||||
"tunnel_method": "SSH_PASSWORD_AUTH",
|
||||
"tunnel_port": 22,
|
||||
"tunnel_user": "user",
|
||||
"tunnel_user_password": "pass",
|
||||
},
|
||||
}
|
||||
transformed_config_input = self.get_base_config()
|
||||
transformed_config_input["normalize"]["outputs"]["prod"] = {
|
||||
"port": 7890,
|
||||
}
|
||||
expected = {
|
||||
"db_host": "airbyte.io",
|
||||
"db_port": 5432,
|
||||
"tunnel_map": {
|
||||
"tunnel_host": "1.2.3.4",
|
||||
"tunnel_method": "SSH_PASSWORD_AUTH",
|
||||
"tunnel_port": 22,
|
||||
"tunnel_user": "user",
|
||||
"tunnel_user_password": "pass",
|
||||
},
|
||||
"local_port": 7890,
|
||||
}
|
||||
tmp_path = tempfile.TemporaryDirectory().name
|
||||
TransformConfig.write_ssh_config(tmp_path, original_config_input, transformed_config_input)
|
||||
with open(os.path.join(tmp_path, "ssh.json"), "r") as f:
|
||||
assert json.load(f) == expected
|
||||
|
||||
@@ -47,7 +47,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class);
|
||||
|
||||
public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.45";
|
||||
public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.46";
|
||||
|
||||
private final DestinationType destinationType;
|
||||
private final ProcessFactory processFactory;
|
||||
|
||||
@@ -35,7 +35,7 @@ if [[ -f "${CWD}/bq_keyfile.json" ]]; then
|
||||
fi
|
||||
|
||||
. $CWD/sshtunneling.sh
|
||||
openssh $CWD/destination_config.json $CWD/localsshport.json
|
||||
openssh $CWD/ssh.json
|
||||
trap 'closessh' EXIT
|
||||
|
||||
# Add mandatory flags profiles-dir and project-dir when calling dbt when necessary
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
# This function opens an ssh tunnel if required using values provided in config.
|
||||
# Requires two arguments,
|
||||
# path to config file ($1)
|
||||
# path to file containing local port to use ($2)
|
||||
# Requires one argument,
|
||||
# path to ssh config file ($1)
|
||||
function openssh() {
|
||||
# check if jq is missing, and if so try to install it..
|
||||
# this is janky but for custom dbt transform we can't be sure jq is installed as using user docker image
|
||||
@@ -14,12 +13,12 @@ function openssh() {
|
||||
fi
|
||||
# tunnel_db_host and tunnel_db_port currently rely on the destination's spec using "host" and "port" as keys for these values
|
||||
# if adding ssh support for a new destination where this is not the case, extra logic will be needed to capture these dynamically
|
||||
tunnel_db_host=$(cat $1 | jq -r '.host')
|
||||
tunnel_db_port=$(cat $1 | jq -r '.port')
|
||||
tunnel_method=$(cat $1 | jq -r '.tunnel_method.tunnel_method' | tr '[:lower:]' '[:upper:]')
|
||||
tunnel_username=$(cat $1 | jq -r '.tunnel_method.tunnel_user')
|
||||
tunnel_host=$(cat $1 | jq -r '.tunnel_method.tunnel_host')
|
||||
tunnel_local_port=$(cat $2 | jq -r '.port')
|
||||
tunnel_db_host=$(cat $1 | jq -r '.db_host')
|
||||
tunnel_db_port=$(cat $1 | jq -r '.db_port')
|
||||
tunnel_method=$(cat $1 | jq -r '.tunnel_map.tunnel_method' | tr '[:lower:]' '[:upper:]')
|
||||
tunnel_username=$(cat $1 | jq -r '.tunnel_map.tunnel_user')
|
||||
tunnel_host=$(cat $1 | jq -r '.tunnel_map.tunnel_host')
|
||||
tunnel_local_port=$(cat $1 | jq -r '.local_port')
|
||||
# set a path for a control socket, allowing us to close this specific ssh connection when desired
|
||||
tmpcontrolsocket="/tmp/sshsocket${tunnel_db_remote_port}-${RANDOM}"
|
||||
if [[ ${tunnel_method} = "SSH_KEY_AUTH" ]] ; then
|
||||
@@ -27,7 +26,7 @@ function openssh() {
|
||||
# create a temporary file to hold ssh key and trap to delete on EXIT
|
||||
trap 'rm -f "$tmpkeyfile"' EXIT
|
||||
tmpkeyfile=$(mktemp /tmp/xyzfile.XXXXXXXXXXX) || exit 1
|
||||
echo "$(cat $1 | jq -r '.tunnel_method.ssh_key')" > $tmpkeyfile
|
||||
echo "$(cat $1 | jq -r '.tunnel_map.ssh_key')" > $tmpkeyfile
|
||||
# -f=background -N=no remote command -M=master mode StrictHostKeyChecking=no auto-adds host
|
||||
echo "Running: ssh -f -N -M -o StrictHostKeyChecking=no -S {control socket} -i {key file} -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host}"
|
||||
ssh -f -N -M -o StrictHostKeyChecking=no -S $tmpcontrolsocket -i $tmpkeyfile -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host} &&
|
||||
@@ -44,7 +43,7 @@ function openssh() {
|
||||
{ dnf install epel-release -y && dnf install sshpass -y; } || exit 1
|
||||
fi
|
||||
# put ssh password in env var for use in sshpass. Better than directly passing with -p
|
||||
export SSHPASS=$(cat $1 | jq -r '.tunnel_method.tunnel_user_password')
|
||||
export SSHPASS=$(cat $1 | jq -r '.tunnel_map.tunnel_user_password')
|
||||
echo "Running: sshpass -e ssh -f -N -M -o StrictHostKeyChecking=no -S {control socket} -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host}"
|
||||
sshpass -e ssh -f -N -M -o StrictHostKeyChecking=no -S $tmpcontrolsocket -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host} &&
|
||||
sshopen="yes" &&
|
||||
|
||||
Reference in New Issue
Block a user