From d5e80d0771f212e4c2182748d2bedfbfdfbe5fae Mon Sep 17 00:00:00 2001 From: Marcos Eliziario Santos Date: Thu, 21 Oct 2021 04:08:32 -0300 Subject: [PATCH] OAuth backend Refactor (#7209) --- .../io/airbyte/oauth/BaseOAuthConfig.java | 4 +- .../java/io/airbyte/oauth/BaseOAuthFlow.java | 119 +++++++++++++++--- .../oauth/OAuthImplementationFactory.java | 3 +- .../airbyte/oauth/flows/AsanaOAuthFlow.java | 14 +-- .../flows/FacebookMarketingOAuthFlow.java | 4 +- .../flows/google/GoogleAdsOAuthFlow.java | 8 -- .../google/GoogleAnalyticsOAuthFlow.java | 8 -- .../oauth/flows/google/GoogleOAuthFlow.java | 17 +-- .../google/GoogleSearchConsoleOAuthFlow.java | 9 +- ...bookMarketingOAuthFlowIntegrationTest.java | 117 +++-------------- .../flows/OAuthFlowIntegrationTest.java} | 83 ++++-------- .../airbyte/server/handlers/OAuthHandler.java | 19 +-- 12 files changed, 165 insertions(+), 240 deletions(-) rename airbyte-oauth/src/test-integration/java/{io.airbyte.oauth.flows/AsanaOAuthFlowIntegrationTest.java => io/airbyte/oauth/flows/OAuthFlowIntegrationTest.java} (50%) diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuthConfig.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuthConfig.java index 1c51d18fbc8..1d75829d49b 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuthConfig.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuthConfig.java @@ -60,7 +60,7 @@ public abstract class BaseOAuthConfig implements OAuthFlowImplementation { * Throws an exception if the client ID cannot be extracted. Subclasses should override this to * parse the config differently. * - * @return + * @return The configured Client ID used for this oauth flow */ protected String getClientIdUnsafe(final JsonNode oauthConfig) { if (oauthConfig.get("client_id") != null) { @@ -74,7 +74,7 @@ public abstract class BaseOAuthConfig implements OAuthFlowImplementation { * Throws an exception if the client secret cannot be extracted. Subclasses should override this to * parse the config differently. * - * @return + * @return The configured client secret for this OAuthFlow */ protected String getClientSecretUnsafe(final JsonNode oauthConfig) { if (oauthConfig.get("client_secret") != null) { diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuthFlow.java index 39386c83d43..ddf19647d96 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuthFlow.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuthFlow.java @@ -6,38 +6,80 @@ package io.airbyte.oauth; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import io.airbyte.commons.json.Jsons; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import java.io.IOException; +import java.lang.reflect.Type; import java.net.URI; +import java.net.URISyntaxException; import java.net.URLEncoder; import java.net.http.HttpClient; import java.net.http.HttpClient.Version; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.function.Function; import java.util.function.Supplier; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.http.client.utils.URIBuilder; /* * Class implementing generic oAuth 2.0 flow. */ public abstract class BaseOAuthFlow extends BaseOAuthConfig { - private final HttpClient httpClient; + /** + * Simple enum of content type strings and their respective encoding functions used for POSTing the + * access token request + */ + public enum TOKEN_REQUEST_CONTENT_TYPE { + + URL_ENCODED("application/x-www-form-urlencoded", BaseOAuthFlow::toUrlEncodedString), + JSON("application/json", BaseOAuthFlow::toJson); + + String contentType; + Function, String> converter; + + TOKEN_REQUEST_CONTENT_TYPE(String contentType, Function, String> converter) { + this.contentType = contentType; + this.converter = converter; + } + + } + + protected final HttpClient httpClient; + private final TOKEN_REQUEST_CONTENT_TYPE tokenReqContentType; private final Supplier stateSupplier; public BaseOAuthFlow(final ConfigRepository configRepository) { this(configRepository, HttpClient.newBuilder().version(Version.HTTP_1_1).build(), BaseOAuthFlow::generateRandomState); } - public BaseOAuthFlow(final ConfigRepository configRepository, final HttpClient httpClient, final Supplier stateSupplier) { + public BaseOAuthFlow(ConfigRepository configRepository, TOKEN_REQUEST_CONTENT_TYPE tokenReqContentType) { + this(configRepository, + HttpClient.newBuilder().version(Version.HTTP_1_1).build(), + BaseOAuthFlow::generateRandomState, + tokenReqContentType); + } + + public BaseOAuthFlow(ConfigRepository configRepository, HttpClient httpClient, Supplier stateSupplier) { + this(configRepository, httpClient, stateSupplier, TOKEN_REQUEST_CONTENT_TYPE.URL_ENCODED); + } + + public BaseOAuthFlow(ConfigRepository configRepository, + HttpClient httpClient, + Supplier stateSupplier, + TOKEN_REQUEST_CONTENT_TYPE tokenReqContentType) { super(configRepository); this.httpClient = httpClient; this.stateSupplier = stateSupplier; + this.tokenReqContentType = tokenReqContentType; } @Override @@ -54,6 +96,31 @@ public abstract class BaseOAuthFlow extends BaseOAuthConfig { return formatConsentUrl(destinationDefinitionId, getClientIdUnsafe(oAuthParamConfig), redirectUrl); } + protected String formatConsentUrl(String clientId, + String redirectUrl, + String host, + String path, + String scope, + String responseType) + throws IOException { + final URIBuilder builder = new URIBuilder() + .setScheme("https") + .setHost(host) + .setPath(path) + // required + .addParameter("client_id", clientId) + .addParameter("redirect_uri", redirectUrl) + .addParameter("state", getState()) + // optional + .addParameter("response_type", responseType) + .addParameter("scope", scope); + try { + return builder.build().toString(); + } catch (URISyntaxException e) { + throw new IOException("Failed to format Consent URL for OAuth flow", e); + } + } + /** * Depending on the OAuth flow implementation, the URL to grant user's consent may differ, * especially in the query parameters to be provided. This function should generate such consent URL @@ -84,7 +151,8 @@ public abstract class BaseOAuthFlow extends BaseOAuthConfig { getClientIdUnsafe(oAuthParamConfig), getClientSecretUnsafe(oAuthParamConfig), extractCodeParameter(queryParams), - redirectUrl); + redirectUrl, + oAuthParamConfig); } @Override @@ -98,20 +166,26 @@ public abstract class BaseOAuthFlow extends BaseOAuthConfig { getClientIdUnsafe(oAuthParamConfig), getClientSecretUnsafe(oAuthParamConfig), extractCodeParameter(queryParams), - redirectUrl); + redirectUrl, oAuthParamConfig); } - private Map completeOAuthFlow(final String clientId, final String clientSecret, final String authCode, final String redirectUrl) + private Map completeOAuthFlow(final String clientId, + final String clientSecret, + final String authCode, + final String redirectUrl, + JsonNode oAuthParamConfig) throws IOException { + var accessTokenUrl = getAccessTokenUrl(oAuthParamConfig); final HttpRequest request = HttpRequest.newBuilder() - .POST(HttpRequest.BodyPublishers.ofString(toUrlEncodedString(getAccessTokenQueryParameters(clientId, clientSecret, authCode, redirectUrl)))) - .uri(URI.create(getAccessTokenUrl())) - .header("Content-Type", "application/x-www-form-urlencoded") + .POST(HttpRequest.BodyPublishers + .ofString(tokenReqContentType.converter.apply(getAccessTokenQueryParameters(clientId, clientSecret, authCode, redirectUrl)))) + .uri(URI.create(accessTokenUrl)) + .header("Content-Type", tokenReqContentType.contentType) .build(); // TODO: Handle error response to report better messages try { - final HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());; - return extractRefreshToken(Jsons.deserialize(response.body())); + final HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + return extractRefreshToken(Jsons.deserialize(response.body()), accessTokenUrl); } catch (final InterruptedException e) { throw new IOException("Failed to complete OAuth flow", e); } @@ -146,13 +220,20 @@ public abstract class BaseOAuthFlow extends BaseOAuthConfig { /** * Returns the URL where to retrieve the access token from. */ - protected abstract String getAccessTokenUrl(); + protected abstract String getAccessTokenUrl(JsonNode oAuthParamConfig); - /** - * Once the auth code is exchange for a refresh token, the oauth flow implementation can extract and - * returns the values of fields to be used in the connector's configurations. - */ - protected abstract Map extractRefreshToken(JsonNode data) throws IOException; + protected Map extractRefreshToken(final JsonNode data, String accessTokenUrl) throws IOException { + final Map result = new HashMap<>(); + if (data.has("refresh_token")) { + result.put("refresh_token", data.get("refresh_token").asText()); + } else if (data.has("access_token")) { + result.put("access_token", data.get("access_token").asText()); + } else { + throw new IOException(String.format("Missing 'refresh_token' in query params from %s", accessTokenUrl)); + } + return Map.of("credentials", result); + + } private static String urlEncode(final String s) { try { @@ -173,4 +254,10 @@ public abstract class BaseOAuthFlow extends BaseOAuthConfig { return result.toString(); } + protected static String toJson(final Map body) { + final Gson gson = new Gson(); + Type gsonType = new TypeToken>() {}.getType(); + return gson.toJson(body, gsonType); + } + } diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java index 45648c0a73d..6f75b7d6be8 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java @@ -14,6 +14,7 @@ import io.airbyte.oauth.flows.google.GoogleAdsOAuthFlow; import io.airbyte.oauth.flows.google.GoogleAnalyticsOAuthFlow; import io.airbyte.oauth.flows.google.GoogleSearchConsoleOAuthFlow; import java.util.Map; +import java.util.UUID; public class OAuthImplementationFactory { @@ -31,7 +32,7 @@ public class OAuthImplementationFactory { .build(); } - public OAuthFlowImplementation create(final String imageName) { + public OAuthFlowImplementation create(final String imageName, final UUID workspaceId) { if (OAUTH_FLOW_MAPPING.containsKey(imageName)) { return OAUTH_FLOW_MAPPING.get(imageName); } else { diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/AsanaOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/AsanaOAuthFlow.java index e3d53435aa4..3a967c694cc 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/AsanaOAuthFlow.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/AsanaOAuthFlow.java @@ -7,7 +7,6 @@ package io.airbyte.oauth.flows; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; -import io.airbyte.commons.json.Jsons; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.oauth.BaseOAuthFlow; import java.io.IOException; @@ -50,7 +49,7 @@ public class AsanaOAuthFlow extends BaseOAuthFlow { } @Override - protected String getAccessTokenUrl() { + protected String getAccessTokenUrl(JsonNode oAuthParamConfig) { return ACCESS_TOKEN_URL; } @@ -62,15 +61,4 @@ public class AsanaOAuthFlow extends BaseOAuthFlow { .build(); } - @Override - protected Map extractRefreshToken(JsonNode data) throws IOException { - System.out.println(Jsons.serialize(data)); - if (data.has("refresh_token")) { - final String refreshToken = data.get("refresh_token").asText(); - return Map.of("credentials", Map.of("refresh_token", refreshToken)); - } else { - throw new IOException(String.format("Missing 'refresh_token' in query params from %s", ACCESS_TOKEN_URL)); - } - } - } diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/FacebookMarketingOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/FacebookMarketingOAuthFlow.java index 9c459206a7c..a710f01d1b1 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/FacebookMarketingOAuthFlow.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/FacebookMarketingOAuthFlow.java @@ -54,12 +54,12 @@ public class FacebookMarketingOAuthFlow extends BaseOAuthFlow { } @Override - protected String getAccessTokenUrl() { + protected String getAccessTokenUrl(JsonNode oAuthParamConfig) { return ACCESS_TOKEN_URL; } @Override - protected Map extractRefreshToken(final JsonNode data) throws IOException { + protected Map extractRefreshToken(final JsonNode data, String accessTokenUrl) throws IOException { // Facebook does not have refresh token but calls it "long lived access token" instead: // see https://developers.facebook.com/docs/facebook-login/access-tokens/refreshing if (data.has("access_token")) { diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAdsOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAdsOAuthFlow.java index 1ff20896367..eedbaf5036a 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAdsOAuthFlow.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAdsOAuthFlow.java @@ -8,9 +8,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.airbyte.config.persistence.ConfigRepository; -import java.io.IOException; import java.net.http.HttpClient; -import java.util.Map; import java.util.function.Supplier; public class GoogleAdsOAuthFlow extends GoogleOAuthFlow { @@ -46,10 +44,4 @@ public class GoogleAdsOAuthFlow extends GoogleOAuthFlow { return super.getClientSecretUnsafe(config.get("credentials")); } - @Override - protected Map extractRefreshToken(final JsonNode data) throws IOException { - // the config object containing refresh token is nested inside the "credentials" object - return Map.of("credentials", super.extractRefreshToken(data)); - } - } diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAnalyticsOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAnalyticsOAuthFlow.java index 0a9851fb542..8e26336783a 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAnalyticsOAuthFlow.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleAnalyticsOAuthFlow.java @@ -8,9 +8,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.airbyte.config.persistence.ConfigRepository; -import java.io.IOException; import java.net.http.HttpClient; -import java.util.Map; import java.util.function.Supplier; public class GoogleAnalyticsOAuthFlow extends GoogleOAuthFlow { @@ -45,10 +43,4 @@ public class GoogleAnalyticsOAuthFlow extends GoogleOAuthFlow { return super.getClientSecretUnsafe(config.get("credentials")); } - @Override - protected Map extractRefreshToken(final JsonNode data) throws IOException { - // the config object containing refresh token is nested inside the "credentials" object - return Map.of("credentials", super.extractRefreshToken(data)); - } - } diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleOAuthFlow.java index 1fc029332fe..497a633b8a7 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleOAuthFlow.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleOAuthFlow.java @@ -12,7 +12,6 @@ import io.airbyte.oauth.BaseOAuthFlow; import java.io.IOException; import java.net.URISyntaxException; import java.net.http.HttpClient; -import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.function.Supplier; @@ -64,7 +63,7 @@ public abstract class GoogleOAuthFlow extends BaseOAuthFlow { protected abstract String getScope(); @Override - protected String getAccessTokenUrl() { + protected String getAccessTokenUrl(JsonNode oAuthParamConfig) { return ACCESS_TOKEN_URL; } @@ -82,18 +81,4 @@ public abstract class GoogleOAuthFlow extends BaseOAuthFlow { .build(); } - @Override - protected Map extractRefreshToken(final JsonNode data) throws IOException { - final Map result = new HashMap<>(); - if (data.has("access_token")) { - result.put("access_token", data.get("access_token").asText()); - } - if (data.has("refresh_token")) { - result.put("refresh_token", data.get("refresh_token").asText()); - } else { - throw new IOException(String.format("Missing 'refresh_token' in query params from %s", ACCESS_TOKEN_URL)); - } - return result; - } - } diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleSearchConsoleOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleSearchConsoleOAuthFlow.java index 1aaad03c01b..77973683446 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleSearchConsoleOAuthFlow.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleSearchConsoleOAuthFlow.java @@ -10,6 +10,7 @@ import com.google.common.base.Preconditions; import io.airbyte.config.persistence.ConfigRepository; import java.io.IOException; import java.net.http.HttpClient; +import java.util.HashMap; import java.util.Map; import java.util.function.Supplier; @@ -47,9 +48,13 @@ public class GoogleSearchConsoleOAuthFlow extends GoogleOAuthFlow { } @Override - protected Map extractRefreshToken(final JsonNode data) throws IOException { + protected Map extractRefreshToken(final JsonNode data, String accessTokenUrl) throws IOException { // the config object containing refresh token is nested inside the "authorization" object - return Map.of("authorization", super.extractRefreshToken(data)); + final Map result = new HashMap<>(); + if (data.has("refresh_token")) { + result.put("refresh_token", data.get("refresh_token").asText()); + } + return Map.of("authorization", result); } } diff --git a/airbyte-oauth/src/test-integration/java/io.airbyte.oauth.flows/FacebookMarketingOAuthFlowIntegrationTest.java b/airbyte-oauth/src/test-integration/java/io.airbyte.oauth.flows/FacebookMarketingOAuthFlowIntegrationTest.java index 3cda4ec1358..ef87291fb24 100644 --- a/airbyte-oauth/src/test-integration/java/io.airbyte.oauth.flows/FacebookMarketingOAuthFlowIntegrationTest.java +++ b/airbyte-oauth/src/test-integration/java/io.airbyte.oauth.flows/FacebookMarketingOAuthFlowIntegrationTest.java @@ -5,64 +5,42 @@ package io.airbyte.oauth.flows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; -import com.sun.net.httpserver.HttpExchange; -import com.sun.net.httpserver.HttpHandler; -import com.sun.net.httpserver.HttpServer; import io.airbyte.commons.json.Jsons; import io.airbyte.config.SourceOAuthParameter; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.oauth.OAuthFlowImplementation; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; -import java.io.OutputStream; -import java.net.InetSocketAddress; import java.nio.file.Files; import java.nio.file.Path; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class FacebookMarketingOAuthFlowIntegrationTest { +public class FacebookMarketingOAuthFlowIntegrationTest extends OAuthFlowIntegrationTest { - private static final Logger LOGGER = LoggerFactory.getLogger(FacebookMarketingOAuthFlowIntegrationTest.class); - private static final String REDIRECT_URL = "http://localhost/code"; - private static final Path CREDENTIALS_PATH = Path.of("secrets/facebook_marketing.json"); + protected static final Path CREDENTIALS_PATH = Path.of("secrets/facebook_marketing.json"); - private ConfigRepository configRepository; - private FacebookMarketingOAuthFlow facebookMarketingOAuthFlow; - private HttpServer server; - private ServerHandler serverHandler; + @Override + protected Path get_credentials_path() { + return CREDENTIALS_PATH; + } + + @Override + protected OAuthFlowImplementation getFlowObject(ConfigRepository configRepository) { + return new FacebookMarketingOAuthFlow(configRepository); + } @BeforeEach public void setup() throws IOException { - if (!Files.exists(CREDENTIALS_PATH)) { - throw new IllegalStateException( - "Must provide path to a oauth credentials file."); - } - configRepository = mock(ConfigRepository.class); - facebookMarketingOAuthFlow = new FacebookMarketingOAuthFlow(configRepository); - - server = HttpServer.create(new InetSocketAddress(80), 0); - server.setExecutor(null); // creates a default executor - server.start(); - serverHandler = new ServerHandler("code"); - server.createContext("/code", serverHandler); - } - - @AfterEach - void tearDown() { - server.stop(1); + super.setup(); } @Test @@ -80,7 +58,7 @@ public class FacebookMarketingOAuthFlowIntegrationTest { .put("client_id", credentialsJson.get("client_id").asText()) .put("client_secret", credentialsJson.get("client_secret").asText()) .build())))); - final String url = facebookMarketingOAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL); + final String url = flow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL); LOGGER.info("Waiting for user consent at: {}", url); // TODO: To automate, start a selenium job to navigate to the Consent URL and click on allowing // access... @@ -89,76 +67,11 @@ public class FacebookMarketingOAuthFlowIntegrationTest { limit -= 1; } assertTrue(serverHandler.isSucceeded(), "Failed to get User consent on time"); - final Map params = facebookMarketingOAuthFlow.completeSourceOAuth(workspaceId, definitionId, + final Map params = flow.completeSourceOAuth(workspaceId, definitionId, Map.of("code", serverHandler.getParamValue()), REDIRECT_URL); LOGGER.info("Response from completing OAuth Flow is: {}", params.toString()); assertTrue(params.containsKey("access_token")); assertTrue(params.get("access_token").toString().length() > 0); } - static class ServerHandler implements HttpHandler { - - final private String expectedParam; - private String paramValue; - private boolean succeeded; - - public ServerHandler(final String expectedParam) { - this.expectedParam = expectedParam; - this.paramValue = ""; - this.succeeded = false; - } - - public boolean isSucceeded() { - return succeeded; - } - - public String getParamValue() { - return paramValue; - } - - @Override - public void handle(final HttpExchange t) { - final String query = t.getRequestURI().getQuery(); - LOGGER.info("Received query: '{}'", query); - final Map data; - try { - data = deserialize(query); - final String response; - if (data != null && data.containsKey(expectedParam)) { - paramValue = data.get(expectedParam); - response = String.format("Successfully extracted %s:\n'%s'\nTest should be continuing the OAuth Flow to retrieve the refresh_token...", - expectedParam, paramValue); - LOGGER.info(response); - t.sendResponseHeaders(200, response.length()); - succeeded = true; - } else { - response = String.format("Unable to parse query params from redirected url: %s", query); - t.sendResponseHeaders(500, response.length()); - } - final OutputStream os = t.getResponseBody(); - os.write(response.getBytes()); - os.close(); - } catch (final RuntimeException | IOException e) { - LOGGER.error("Failed to parse from body {}", query, e); - } - } - - private static Map deserialize(final String query) { - if (query == null) { - return null; - } - final Map result = new HashMap<>(); - for (final String param : query.split("&")) { - final String[] entry = param.split("="); - if (entry.length > 1) { - result.put(entry[0], entry[1]); - } else { - result.put(entry[0], ""); - } - } - return result; - } - - } - } diff --git a/airbyte-oauth/src/test-integration/java/io.airbyte.oauth.flows/AsanaOAuthFlowIntegrationTest.java b/airbyte-oauth/src/test-integration/java/io/airbyte/oauth/flows/OAuthFlowIntegrationTest.java similarity index 50% rename from airbyte-oauth/src/test-integration/java/io.airbyte.oauth.flows/AsanaOAuthFlowIntegrationTest.java rename to airbyte-oauth/src/test-integration/java/io/airbyte/oauth/flows/OAuthFlowIntegrationTest.java index af76c876299..c9ac5fb199f 100644 --- a/airbyte-oauth/src/test-integration/java/io.airbyte.oauth.flows/AsanaOAuthFlowIntegrationTest.java +++ b/airbyte-oauth/src/test-integration/java/io/airbyte/oauth/flows/OAuthFlowIntegrationTest.java @@ -4,60 +4,59 @@ package io.airbyte.oauth.flows; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; -import io.airbyte.commons.json.Jsons; -import io.airbyte.config.SourceOAuthParameter; -import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.validation.json.JsonValidationException; +import io.airbyte.oauth.OAuthFlowImplementation; import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.UUID; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AsanaOAuthFlowIntegrationTest { +public abstract class OAuthFlowIntegrationTest { - private static final Logger LOGGER = LoggerFactory.getLogger(AsanaOAuthFlowIntegrationTest.class); - private static final String REDIRECT_URL = "http://localhost:8000/code"; - private static final Path CREDENTIALS_PATH = Path.of("secrets/asana.json"); + /** + * Convenience base class for OAuthFlow tests. Those tests right now are meant to be run manually, + * due to the consent flow in the browser + */ + protected static final Logger LOGGER = LoggerFactory.getLogger(OAuthFlowIntegrationTest.class); + protected static final String REDIRECT_URL = "http://localhost/code"; - private ConfigRepository configRepository; - private AsanaOAuthFlow asanaOAuthFlow; - private HttpServer server; - private ServerHandler serverHandler; + protected ConfigRepository configRepository; + protected OAuthFlowImplementation flow; + protected HttpServer server; + protected ServerHandler serverHandler; + + protected abstract Path get_credentials_path(); + + protected abstract OAuthFlowImplementation getFlowObject(ConfigRepository configRepository); @BeforeEach public void setup() throws IOException { - if (!Files.exists(CREDENTIALS_PATH)) { + if (!Files.exists(get_credentials_path())) { throw new IllegalStateException( "Must provide path to a oauth credentials file."); } configRepository = mock(ConfigRepository.class); - asanaOAuthFlow = new AsanaOAuthFlow(configRepository); - server = HttpServer.create(new InetSocketAddress(8000), 0); + flow = this.getFlowObject(configRepository); + + server = HttpServer.create(new InetSocketAddress(80), 0); server.setExecutor(null); // creates a default executor server.start(); serverHandler = new ServerHandler("code"); server.createContext("/code", serverHandler); + } @AfterEach @@ -65,44 +64,9 @@ public class AsanaOAuthFlowIntegrationTest { server.stop(1); } - @Test - public void testFullAsanaOAuthFlow() throws InterruptedException, ConfigNotFoundException, IOException, JsonValidationException { - int limit = 20; - final UUID workspaceId = UUID.randomUUID(); - final UUID definitionId = UUID.randomUUID(); - final String fullConfigAsString = new String(Files.readAllBytes(CREDENTIALS_PATH)); - final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString); - final String clientId = credentialsJson.get("client_id").asText(); - when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter() - .withOauthParameterId(UUID.randomUUID()) - .withSourceDefinitionId(definitionId) - .withWorkspaceId(workspaceId) - .withConfiguration(Jsons.jsonNode(ImmutableMap.builder() - .put("client_id", clientId) - .put("client_secret", credentialsJson.get("client_secret").asText()) - .build())))); - final String url = asanaOAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL); - LOGGER.info("Waiting for user consent at: {}", url); - // TODO: To automate, start a selenium job to navigate to the Consent URL and click on allowing - // access... - while (!serverHandler.isSucceeded() && limit > 0) { - Thread.sleep(1000); - limit -= 1; - } - assertTrue(serverHandler.isSucceeded(), "Failed to get User consent on time"); - final Map params = asanaOAuthFlow.completeSourceOAuth(workspaceId, definitionId, - Map.of("code", serverHandler.getParamValue()), REDIRECT_URL); - LOGGER.info("Response from completing OAuth Flow is: {}", params.toString()); - assertTrue(params.containsKey("credentials")); - final Map creds = (Map) params.get("credentials"); - assertTrue(creds.containsKey("refresh_token")); - assertTrue(creds.get("refresh_token").toString().length() > 0); - } - static class ServerHandler implements HttpHandler { final private String expectedParam; - private Map responseQuery; private String paramValue; private boolean succeeded; @@ -120,10 +84,6 @@ public class AsanaOAuthFlowIntegrationTest { return paramValue; } - public Map getResponseQuery() { - return responseQuery; - } - @Override public void handle(HttpExchange t) { final String query = t.getRequestURI().getQuery(); @@ -136,7 +96,6 @@ public class AsanaOAuthFlowIntegrationTest { paramValue = data.get(expectedParam); response = String.format("Successfully extracted %s:\n'%s'\nTest should be continuing the OAuth Flow to retrieve the refresh_token...", expectedParam, paramValue); - responseQuery = data; LOGGER.info(response); t.sendResponseHeaders(200, response.length()); succeeded = true; diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/OAuthHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/OAuthHandler.java index 5c5222e65ab..7fa5565bf77 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/OAuthHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/OAuthHandler.java @@ -46,7 +46,8 @@ public class OAuthHandler { public OAuthConsentRead getSourceOAuthConsent(final SourceOauthConsentRequest sourceDefinitionIdRequestBody) throws JsonValidationException, ConfigNotFoundException, IOException { - final OAuthFlowImplementation oAuthFlowImplementation = getSourceOAuthFlowImplementation(sourceDefinitionIdRequestBody.getSourceDefinitionId()); + final OAuthFlowImplementation oAuthFlowImplementation = getSourceOAuthFlowImplementation(sourceDefinitionIdRequestBody.getSourceDefinitionId(), + sourceDefinitionIdRequestBody.getWorkspaceId()); final ImmutableMap metadata = generateSourceMetadata(sourceDefinitionIdRequestBody.getSourceDefinitionId()); final OAuthConsentRead result = new OAuthConsentRead().consentUrl(oAuthFlowImplementation.getSourceConsentUrl( sourceDefinitionIdRequestBody.getWorkspaceId(), @@ -63,7 +64,8 @@ public class OAuthHandler { public OAuthConsentRead getDestinationOAuthConsent(final DestinationOauthConsentRequest destinationDefinitionIdRequestBody) throws JsonValidationException, ConfigNotFoundException, IOException { final OAuthFlowImplementation oAuthFlowImplementation = - getDestinationOAuthFlowImplementation(destinationDefinitionIdRequestBody.getDestinationDefinitionId()); + getDestinationOAuthFlowImplementation(destinationDefinitionIdRequestBody.getDestinationDefinitionId(), + destinationDefinitionIdRequestBody.getWorkspaceId()); final ImmutableMap metadata = generateDestinationMetadata(destinationDefinitionIdRequestBody.getDestinationDefinitionId()); final OAuthConsentRead result = new OAuthConsentRead().consentUrl(oAuthFlowImplementation.getDestinationConsentUrl( destinationDefinitionIdRequestBody.getWorkspaceId(), @@ -79,7 +81,8 @@ public class OAuthHandler { public Map completeSourceOAuth(final CompleteSourceOauthRequest oauthSourceRequestBody) throws JsonValidationException, ConfigNotFoundException, IOException { - final OAuthFlowImplementation oAuthFlowImplementation = getSourceOAuthFlowImplementation(oauthSourceRequestBody.getSourceDefinitionId()); + final OAuthFlowImplementation oAuthFlowImplementation = getSourceOAuthFlowImplementation(oauthSourceRequestBody.getSourceDefinitionId(), + oauthSourceRequestBody.getWorkspaceId()); final ImmutableMap metadata = generateSourceMetadata(oauthSourceRequestBody.getSourceDefinitionId()); final Map result = oAuthFlowImplementation.completeSourceOAuth( oauthSourceRequestBody.getWorkspaceId(), @@ -97,7 +100,7 @@ public class OAuthHandler { public Map completeDestinationOAuth(final CompleteDestinationOAuthRequest oauthDestinationRequestBody) throws JsonValidationException, ConfigNotFoundException, IOException { final OAuthFlowImplementation oAuthFlowImplementation = - getDestinationOAuthFlowImplementation(oauthDestinationRequestBody.getDestinationDefinitionId()); + getDestinationOAuthFlowImplementation(oauthDestinationRequestBody.getDestinationDefinitionId(), oauthDestinationRequestBody.getWorkspaceId()); final ImmutableMap metadata = generateDestinationMetadata(oauthDestinationRequestBody.getDestinationDefinitionId()); final Map result = oAuthFlowImplementation.completeDestinationOAuth( oauthDestinationRequestBody.getWorkspaceId(), @@ -132,18 +135,18 @@ public class OAuthHandler { configRepository.writeDestinationOAuthParam(param); } - private OAuthFlowImplementation getSourceOAuthFlowImplementation(final UUID sourceDefinitionId) + private OAuthFlowImplementation getSourceOAuthFlowImplementation(final UUID sourceDefinitionId, final UUID workspaceId) throws JsonValidationException, ConfigNotFoundException, IOException { final StandardSourceDefinition standardSourceDefinition = configRepository .getStandardSourceDefinition(sourceDefinitionId); - return oAuthImplementationFactory.create(standardSourceDefinition.getDockerRepository()); + return oAuthImplementationFactory.create(standardSourceDefinition.getDockerRepository(), workspaceId); } - private OAuthFlowImplementation getDestinationOAuthFlowImplementation(final UUID destinationDefinitionId) + private OAuthFlowImplementation getDestinationOAuthFlowImplementation(final UUID destinationDefinitionId, final UUID workspaceId) throws JsonValidationException, ConfigNotFoundException, IOException { final StandardDestinationDefinition standardDestinationDefinition = configRepository .getStandardDestinationDefinition(destinationDefinitionId); - return oAuthImplementationFactory.create(standardDestinationDefinition.getDockerRepository()); + return oAuthImplementationFactory.create(standardDestinationDefinition.getDockerRepository(), workspaceId); } private ImmutableMap generateSourceMetadata(final UUID sourceDefinitionId)