OAuth backend Refactor (#7209)
This commit is contained in:
committed by
GitHub
parent
e87604eccb
commit
d5e80d0771
@@ -60,7 +60,7 @@ public abstract class BaseOAuthConfig implements OAuthFlowImplementation {
|
||||
* Throws an exception if the client ID cannot be extracted. Subclasses should override this to
|
||||
* parse the config differently.
|
||||
*
|
||||
* @return
|
||||
* @return The configured Client ID used for this oauth flow
|
||||
*/
|
||||
protected String getClientIdUnsafe(final JsonNode oauthConfig) {
|
||||
if (oauthConfig.get("client_id") != null) {
|
||||
@@ -74,7 +74,7 @@ public abstract class BaseOAuthConfig implements OAuthFlowImplementation {
|
||||
* Throws an exception if the client secret cannot be extracted. Subclasses should override this to
|
||||
* parse the config differently.
|
||||
*
|
||||
* @return
|
||||
* @return The configured client secret for this OAuthFlow
|
||||
*/
|
||||
protected String getClientSecretUnsafe(final JsonNode oauthConfig) {
|
||||
if (oauthConfig.get("client_secret") != null) {
|
||||
|
||||
@@ -6,38 +6,80 @@ package io.airbyte.oauth;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.config.persistence.ConfigNotFoundException;
|
||||
import io.airbyte.config.persistence.ConfigRepository;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Type;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URLEncoder;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpClient.Version;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
|
||||
/*
|
||||
* Class implementing generic oAuth 2.0 flow.
|
||||
*/
|
||||
public abstract class BaseOAuthFlow extends BaseOAuthConfig {
|
||||
|
||||
private final HttpClient httpClient;
|
||||
/**
|
||||
* Simple enum of content type strings and their respective encoding functions used for POSTing the
|
||||
* access token request
|
||||
*/
|
||||
public enum TOKEN_REQUEST_CONTENT_TYPE {
|
||||
|
||||
URL_ENCODED("application/x-www-form-urlencoded", BaseOAuthFlow::toUrlEncodedString),
|
||||
JSON("application/json", BaseOAuthFlow::toJson);
|
||||
|
||||
String contentType;
|
||||
Function<Map<String, String>, String> converter;
|
||||
|
||||
TOKEN_REQUEST_CONTENT_TYPE(String contentType, Function<Map<String, String>, String> converter) {
|
||||
this.contentType = contentType;
|
||||
this.converter = converter;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected final HttpClient httpClient;
|
||||
private final TOKEN_REQUEST_CONTENT_TYPE tokenReqContentType;
|
||||
private final Supplier<String> stateSupplier;
|
||||
|
||||
public BaseOAuthFlow(final ConfigRepository configRepository) {
|
||||
this(configRepository, HttpClient.newBuilder().version(Version.HTTP_1_1).build(), BaseOAuthFlow::generateRandomState);
|
||||
}
|
||||
|
||||
public BaseOAuthFlow(final ConfigRepository configRepository, final HttpClient httpClient, final Supplier<String> stateSupplier) {
|
||||
public BaseOAuthFlow(ConfigRepository configRepository, TOKEN_REQUEST_CONTENT_TYPE tokenReqContentType) {
|
||||
this(configRepository,
|
||||
HttpClient.newBuilder().version(Version.HTTP_1_1).build(),
|
||||
BaseOAuthFlow::generateRandomState,
|
||||
tokenReqContentType);
|
||||
}
|
||||
|
||||
public BaseOAuthFlow(ConfigRepository configRepository, HttpClient httpClient, Supplier<String> stateSupplier) {
|
||||
this(configRepository, httpClient, stateSupplier, TOKEN_REQUEST_CONTENT_TYPE.URL_ENCODED);
|
||||
}
|
||||
|
||||
public BaseOAuthFlow(ConfigRepository configRepository,
|
||||
HttpClient httpClient,
|
||||
Supplier<String> stateSupplier,
|
||||
TOKEN_REQUEST_CONTENT_TYPE tokenReqContentType) {
|
||||
super(configRepository);
|
||||
this.httpClient = httpClient;
|
||||
this.stateSupplier = stateSupplier;
|
||||
this.tokenReqContentType = tokenReqContentType;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -54,6 +96,31 @@ public abstract class BaseOAuthFlow extends BaseOAuthConfig {
|
||||
return formatConsentUrl(destinationDefinitionId, getClientIdUnsafe(oAuthParamConfig), redirectUrl);
|
||||
}
|
||||
|
||||
protected String formatConsentUrl(String clientId,
|
||||
String redirectUrl,
|
||||
String host,
|
||||
String path,
|
||||
String scope,
|
||||
String responseType)
|
||||
throws IOException {
|
||||
final URIBuilder builder = new URIBuilder()
|
||||
.setScheme("https")
|
||||
.setHost(host)
|
||||
.setPath(path)
|
||||
// required
|
||||
.addParameter("client_id", clientId)
|
||||
.addParameter("redirect_uri", redirectUrl)
|
||||
.addParameter("state", getState())
|
||||
// optional
|
||||
.addParameter("response_type", responseType)
|
||||
.addParameter("scope", scope);
|
||||
try {
|
||||
return builder.build().toString();
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IOException("Failed to format Consent URL for OAuth flow", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Depending on the OAuth flow implementation, the URL to grant user's consent may differ,
|
||||
* especially in the query parameters to be provided. This function should generate such consent URL
|
||||
@@ -84,7 +151,8 @@ public abstract class BaseOAuthFlow extends BaseOAuthConfig {
|
||||
getClientIdUnsafe(oAuthParamConfig),
|
||||
getClientSecretUnsafe(oAuthParamConfig),
|
||||
extractCodeParameter(queryParams),
|
||||
redirectUrl);
|
||||
redirectUrl,
|
||||
oAuthParamConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -98,20 +166,26 @@ public abstract class BaseOAuthFlow extends BaseOAuthConfig {
|
||||
getClientIdUnsafe(oAuthParamConfig),
|
||||
getClientSecretUnsafe(oAuthParamConfig),
|
||||
extractCodeParameter(queryParams),
|
||||
redirectUrl);
|
||||
redirectUrl, oAuthParamConfig);
|
||||
}
|
||||
|
||||
private Map<String, Object> completeOAuthFlow(final String clientId, final String clientSecret, final String authCode, final String redirectUrl)
|
||||
private Map<String, Object> completeOAuthFlow(final String clientId,
|
||||
final String clientSecret,
|
||||
final String authCode,
|
||||
final String redirectUrl,
|
||||
JsonNode oAuthParamConfig)
|
||||
throws IOException {
|
||||
var accessTokenUrl = getAccessTokenUrl(oAuthParamConfig);
|
||||
final HttpRequest request = HttpRequest.newBuilder()
|
||||
.POST(HttpRequest.BodyPublishers.ofString(toUrlEncodedString(getAccessTokenQueryParameters(clientId, clientSecret, authCode, redirectUrl))))
|
||||
.uri(URI.create(getAccessTokenUrl()))
|
||||
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||
.POST(HttpRequest.BodyPublishers
|
||||
.ofString(tokenReqContentType.converter.apply(getAccessTokenQueryParameters(clientId, clientSecret, authCode, redirectUrl))))
|
||||
.uri(URI.create(accessTokenUrl))
|
||||
.header("Content-Type", tokenReqContentType.contentType)
|
||||
.build();
|
||||
// TODO: Handle error response to report better messages
|
||||
try {
|
||||
final HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());;
|
||||
return extractRefreshToken(Jsons.deserialize(response.body()));
|
||||
final HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
|
||||
return extractRefreshToken(Jsons.deserialize(response.body()), accessTokenUrl);
|
||||
} catch (final InterruptedException e) {
|
||||
throw new IOException("Failed to complete OAuth flow", e);
|
||||
}
|
||||
@@ -146,13 +220,20 @@ public abstract class BaseOAuthFlow extends BaseOAuthConfig {
|
||||
/**
|
||||
* Returns the URL where to retrieve the access token from.
|
||||
*/
|
||||
protected abstract String getAccessTokenUrl();
|
||||
protected abstract String getAccessTokenUrl(JsonNode oAuthParamConfig);
|
||||
|
||||
/**
|
||||
* Once the auth code is exchange for a refresh token, the oauth flow implementation can extract and
|
||||
* returns the values of fields to be used in the connector's configurations.
|
||||
*/
|
||||
protected abstract Map<String, Object> extractRefreshToken(JsonNode data) throws IOException;
|
||||
protected Map<String, Object> extractRefreshToken(final JsonNode data, String accessTokenUrl) throws IOException {
|
||||
final Map<String, Object> result = new HashMap<>();
|
||||
if (data.has("refresh_token")) {
|
||||
result.put("refresh_token", data.get("refresh_token").asText());
|
||||
} else if (data.has("access_token")) {
|
||||
result.put("access_token", data.get("access_token").asText());
|
||||
} else {
|
||||
throw new IOException(String.format("Missing 'refresh_token' in query params from %s", accessTokenUrl));
|
||||
}
|
||||
return Map.of("credentials", result);
|
||||
|
||||
}
|
||||
|
||||
private static String urlEncode(final String s) {
|
||||
try {
|
||||
@@ -173,4 +254,10 @@ public abstract class BaseOAuthFlow extends BaseOAuthConfig {
|
||||
return result.toString();
|
||||
}
|
||||
|
||||
protected static String toJson(final Map<String, String> body) {
|
||||
final Gson gson = new Gson();
|
||||
Type gsonType = new TypeToken<Map<String, String>>() {}.getType();
|
||||
return gson.toJson(body, gsonType);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import io.airbyte.oauth.flows.google.GoogleAdsOAuthFlow;
|
||||
import io.airbyte.oauth.flows.google.GoogleAnalyticsOAuthFlow;
|
||||
import io.airbyte.oauth.flows.google.GoogleSearchConsoleOAuthFlow;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
public class OAuthImplementationFactory {
|
||||
|
||||
@@ -31,7 +32,7 @@ public class OAuthImplementationFactory {
|
||||
.build();
|
||||
}
|
||||
|
||||
public OAuthFlowImplementation create(final String imageName) {
|
||||
public OAuthFlowImplementation create(final String imageName, final UUID workspaceId) {
|
||||
if (OAUTH_FLOW_MAPPING.containsKey(imageName)) {
|
||||
return OAUTH_FLOW_MAPPING.get(imageName);
|
||||
} else {
|
||||
|
||||
@@ -7,7 +7,6 @@ package io.airbyte.oauth.flows;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.config.persistence.ConfigRepository;
|
||||
import io.airbyte.oauth.BaseOAuthFlow;
|
||||
import java.io.IOException;
|
||||
@@ -50,7 +49,7 @@ public class AsanaOAuthFlow extends BaseOAuthFlow {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getAccessTokenUrl() {
|
||||
protected String getAccessTokenUrl(JsonNode oAuthParamConfig) {
|
||||
return ACCESS_TOKEN_URL;
|
||||
}
|
||||
|
||||
@@ -62,15 +61,4 @@ public class AsanaOAuthFlow extends BaseOAuthFlow {
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> extractRefreshToken(JsonNode data) throws IOException {
|
||||
System.out.println(Jsons.serialize(data));
|
||||
if (data.has("refresh_token")) {
|
||||
final String refreshToken = data.get("refresh_token").asText();
|
||||
return Map.of("credentials", Map.of("refresh_token", refreshToken));
|
||||
} else {
|
||||
throw new IOException(String.format("Missing 'refresh_token' in query params from %s", ACCESS_TOKEN_URL));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -54,12 +54,12 @@ public class FacebookMarketingOAuthFlow extends BaseOAuthFlow {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getAccessTokenUrl() {
|
||||
protected String getAccessTokenUrl(JsonNode oAuthParamConfig) {
|
||||
return ACCESS_TOKEN_URL;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> extractRefreshToken(final JsonNode data) throws IOException {
|
||||
protected Map<String, Object> extractRefreshToken(final JsonNode data, String accessTokenUrl) throws IOException {
|
||||
// Facebook does not have refresh token but calls it "long lived access token" instead:
|
||||
// see https://developers.facebook.com/docs/facebook-login/access-tokens/refreshing
|
||||
if (data.has("access_token")) {
|
||||
|
||||
@@ -8,9 +8,7 @@ import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.airbyte.config.persistence.ConfigRepository;
|
||||
import java.io.IOException;
|
||||
import java.net.http.HttpClient;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class GoogleAdsOAuthFlow extends GoogleOAuthFlow {
|
||||
@@ -46,10 +44,4 @@ public class GoogleAdsOAuthFlow extends GoogleOAuthFlow {
|
||||
return super.getClientSecretUnsafe(config.get("credentials"));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> extractRefreshToken(final JsonNode data) throws IOException {
|
||||
// the config object containing refresh token is nested inside the "credentials" object
|
||||
return Map.of("credentials", super.extractRefreshToken(data));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -8,9 +8,7 @@ import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.airbyte.config.persistence.ConfigRepository;
|
||||
import java.io.IOException;
|
||||
import java.net.http.HttpClient;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class GoogleAnalyticsOAuthFlow extends GoogleOAuthFlow {
|
||||
@@ -45,10 +43,4 @@ public class GoogleAnalyticsOAuthFlow extends GoogleOAuthFlow {
|
||||
return super.getClientSecretUnsafe(config.get("credentials"));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> extractRefreshToken(final JsonNode data) throws IOException {
|
||||
// the config object containing refresh token is nested inside the "credentials" object
|
||||
return Map.of("credentials", super.extractRefreshToken(data));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ import io.airbyte.oauth.BaseOAuthFlow;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.http.HttpClient;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Supplier;
|
||||
@@ -64,7 +63,7 @@ public abstract class GoogleOAuthFlow extends BaseOAuthFlow {
|
||||
protected abstract String getScope();
|
||||
|
||||
@Override
|
||||
protected String getAccessTokenUrl() {
|
||||
protected String getAccessTokenUrl(JsonNode oAuthParamConfig) {
|
||||
return ACCESS_TOKEN_URL;
|
||||
}
|
||||
|
||||
@@ -82,18 +81,4 @@ public abstract class GoogleOAuthFlow extends BaseOAuthFlow {
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> extractRefreshToken(final JsonNode data) throws IOException {
|
||||
final Map<String, Object> result = new HashMap<>();
|
||||
if (data.has("access_token")) {
|
||||
result.put("access_token", data.get("access_token").asText());
|
||||
}
|
||||
if (data.has("refresh_token")) {
|
||||
result.put("refresh_token", data.get("refresh_token").asText());
|
||||
} else {
|
||||
throw new IOException(String.format("Missing 'refresh_token' in query params from %s", ACCESS_TOKEN_URL));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import com.google.common.base.Preconditions;
|
||||
import io.airbyte.config.persistence.ConfigRepository;
|
||||
import java.io.IOException;
|
||||
import java.net.http.HttpClient;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@@ -47,9 +48,13 @@ public class GoogleSearchConsoleOAuthFlow extends GoogleOAuthFlow {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> extractRefreshToken(final JsonNode data) throws IOException {
|
||||
protected Map<String, Object> extractRefreshToken(final JsonNode data, String accessTokenUrl) throws IOException {
|
||||
// the config object containing refresh token is nested inside the "authorization" object
|
||||
return Map.of("authorization", super.extractRefreshToken(data));
|
||||
final Map<String, Object> result = new HashMap<>();
|
||||
if (data.has("refresh_token")) {
|
||||
result.put("refresh_token", data.get("refresh_token").asText());
|
||||
}
|
||||
return Map.of("authorization", result);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -5,64 +5,42 @@
|
||||
package io.airbyte.oauth.flows;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import com.sun.net.httpserver.HttpHandler;
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.config.SourceOAuthParameter;
|
||||
import io.airbyte.config.persistence.ConfigNotFoundException;
|
||||
import io.airbyte.config.persistence.ConfigRepository;
|
||||
import io.airbyte.oauth.OAuthFlowImplementation;
|
||||
import io.airbyte.validation.json.JsonValidationException;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class FacebookMarketingOAuthFlowIntegrationTest {
|
||||
public class FacebookMarketingOAuthFlowIntegrationTest extends OAuthFlowIntegrationTest {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FacebookMarketingOAuthFlowIntegrationTest.class);
|
||||
private static final String REDIRECT_URL = "http://localhost/code";
|
||||
private static final Path CREDENTIALS_PATH = Path.of("secrets/facebook_marketing.json");
|
||||
protected static final Path CREDENTIALS_PATH = Path.of("secrets/facebook_marketing.json");
|
||||
|
||||
private ConfigRepository configRepository;
|
||||
private FacebookMarketingOAuthFlow facebookMarketingOAuthFlow;
|
||||
private HttpServer server;
|
||||
private ServerHandler serverHandler;
|
||||
@Override
|
||||
protected Path get_credentials_path() {
|
||||
return CREDENTIALS_PATH;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected OAuthFlowImplementation getFlowObject(ConfigRepository configRepository) {
|
||||
return new FacebookMarketingOAuthFlow(configRepository);
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void setup() throws IOException {
|
||||
if (!Files.exists(CREDENTIALS_PATH)) {
|
||||
throw new IllegalStateException(
|
||||
"Must provide path to a oauth credentials file.");
|
||||
}
|
||||
configRepository = mock(ConfigRepository.class);
|
||||
facebookMarketingOAuthFlow = new FacebookMarketingOAuthFlow(configRepository);
|
||||
|
||||
server = HttpServer.create(new InetSocketAddress(80), 0);
|
||||
server.setExecutor(null); // creates a default executor
|
||||
server.start();
|
||||
serverHandler = new ServerHandler("code");
|
||||
server.createContext("/code", serverHandler);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void tearDown() {
|
||||
server.stop(1);
|
||||
super.setup();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -80,7 +58,7 @@ public class FacebookMarketingOAuthFlowIntegrationTest {
|
||||
.put("client_id", credentialsJson.get("client_id").asText())
|
||||
.put("client_secret", credentialsJson.get("client_secret").asText())
|
||||
.build()))));
|
||||
final String url = facebookMarketingOAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL);
|
||||
final String url = flow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL);
|
||||
LOGGER.info("Waiting for user consent at: {}", url);
|
||||
// TODO: To automate, start a selenium job to navigate to the Consent URL and click on allowing
|
||||
// access...
|
||||
@@ -89,76 +67,11 @@ public class FacebookMarketingOAuthFlowIntegrationTest {
|
||||
limit -= 1;
|
||||
}
|
||||
assertTrue(serverHandler.isSucceeded(), "Failed to get User consent on time");
|
||||
final Map<String, Object> params = facebookMarketingOAuthFlow.completeSourceOAuth(workspaceId, definitionId,
|
||||
final Map<String, Object> params = flow.completeSourceOAuth(workspaceId, definitionId,
|
||||
Map.of("code", serverHandler.getParamValue()), REDIRECT_URL);
|
||||
LOGGER.info("Response from completing OAuth Flow is: {}", params.toString());
|
||||
assertTrue(params.containsKey("access_token"));
|
||||
assertTrue(params.get("access_token").toString().length() > 0);
|
||||
}
|
||||
|
||||
static class ServerHandler implements HttpHandler {
|
||||
|
||||
final private String expectedParam;
|
||||
private String paramValue;
|
||||
private boolean succeeded;
|
||||
|
||||
public ServerHandler(final String expectedParam) {
|
||||
this.expectedParam = expectedParam;
|
||||
this.paramValue = "";
|
||||
this.succeeded = false;
|
||||
}
|
||||
|
||||
public boolean isSucceeded() {
|
||||
return succeeded;
|
||||
}
|
||||
|
||||
public String getParamValue() {
|
||||
return paramValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(final HttpExchange t) {
|
||||
final String query = t.getRequestURI().getQuery();
|
||||
LOGGER.info("Received query: '{}'", query);
|
||||
final Map<String, String> data;
|
||||
try {
|
||||
data = deserialize(query);
|
||||
final String response;
|
||||
if (data != null && data.containsKey(expectedParam)) {
|
||||
paramValue = data.get(expectedParam);
|
||||
response = String.format("Successfully extracted %s:\n'%s'\nTest should be continuing the OAuth Flow to retrieve the refresh_token...",
|
||||
expectedParam, paramValue);
|
||||
LOGGER.info(response);
|
||||
t.sendResponseHeaders(200, response.length());
|
||||
succeeded = true;
|
||||
} else {
|
||||
response = String.format("Unable to parse query params from redirected url: %s", query);
|
||||
t.sendResponseHeaders(500, response.length());
|
||||
}
|
||||
final OutputStream os = t.getResponseBody();
|
||||
os.write(response.getBytes());
|
||||
os.close();
|
||||
} catch (final RuntimeException | IOException e) {
|
||||
LOGGER.error("Failed to parse from body {}", query, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static Map<String, String> deserialize(final String query) {
|
||||
if (query == null) {
|
||||
return null;
|
||||
}
|
||||
final Map<String, String> result = new HashMap<>();
|
||||
for (final String param : query.split("&")) {
|
||||
final String[] entry = param.split("=");
|
||||
if (entry.length > 1) {
|
||||
result.put(entry[0], entry[1]);
|
||||
} else {
|
||||
result.put(entry[0], "");
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -4,60 +4,59 @@
|
||||
|
||||
package io.airbyte.oauth.flows;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import com.sun.net.httpserver.HttpHandler;
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.config.SourceOAuthParameter;
|
||||
import io.airbyte.config.persistence.ConfigNotFoundException;
|
||||
import io.airbyte.config.persistence.ConfigRepository;
|
||||
import io.airbyte.validation.json.JsonValidationException;
|
||||
import io.airbyte.oauth.OAuthFlowImplementation;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class AsanaOAuthFlowIntegrationTest {
|
||||
public abstract class OAuthFlowIntegrationTest {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AsanaOAuthFlowIntegrationTest.class);
|
||||
private static final String REDIRECT_URL = "http://localhost:8000/code";
|
||||
private static final Path CREDENTIALS_PATH = Path.of("secrets/asana.json");
|
||||
/**
|
||||
* Convenience base class for OAuthFlow tests. Those tests right now are meant to be run manually,
|
||||
* due to the consent flow in the browser
|
||||
*/
|
||||
protected static final Logger LOGGER = LoggerFactory.getLogger(OAuthFlowIntegrationTest.class);
|
||||
protected static final String REDIRECT_URL = "http://localhost/code";
|
||||
|
||||
private ConfigRepository configRepository;
|
||||
private AsanaOAuthFlow asanaOAuthFlow;
|
||||
private HttpServer server;
|
||||
private ServerHandler serverHandler;
|
||||
protected ConfigRepository configRepository;
|
||||
protected OAuthFlowImplementation flow;
|
||||
protected HttpServer server;
|
||||
protected ServerHandler serverHandler;
|
||||
|
||||
protected abstract Path get_credentials_path();
|
||||
|
||||
protected abstract OAuthFlowImplementation getFlowObject(ConfigRepository configRepository);
|
||||
|
||||
@BeforeEach
|
||||
public void setup() throws IOException {
|
||||
if (!Files.exists(CREDENTIALS_PATH)) {
|
||||
if (!Files.exists(get_credentials_path())) {
|
||||
throw new IllegalStateException(
|
||||
"Must provide path to a oauth credentials file.");
|
||||
}
|
||||
configRepository = mock(ConfigRepository.class);
|
||||
asanaOAuthFlow = new AsanaOAuthFlow(configRepository);
|
||||
|
||||
server = HttpServer.create(new InetSocketAddress(8000), 0);
|
||||
flow = this.getFlowObject(configRepository);
|
||||
|
||||
server = HttpServer.create(new InetSocketAddress(80), 0);
|
||||
server.setExecutor(null); // creates a default executor
|
||||
server.start();
|
||||
serverHandler = new ServerHandler("code");
|
||||
server.createContext("/code", serverHandler);
|
||||
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
@@ -65,44 +64,9 @@ public class AsanaOAuthFlowIntegrationTest {
|
||||
server.stop(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFullAsanaOAuthFlow() throws InterruptedException, ConfigNotFoundException, IOException, JsonValidationException {
|
||||
int limit = 20;
|
||||
final UUID workspaceId = UUID.randomUUID();
|
||||
final UUID definitionId = UUID.randomUUID();
|
||||
final String fullConfigAsString = new String(Files.readAllBytes(CREDENTIALS_PATH));
|
||||
final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString);
|
||||
final String clientId = credentialsJson.get("client_id").asText();
|
||||
when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter()
|
||||
.withOauthParameterId(UUID.randomUUID())
|
||||
.withSourceDefinitionId(definitionId)
|
||||
.withWorkspaceId(workspaceId)
|
||||
.withConfiguration(Jsons.jsonNode(ImmutableMap.builder()
|
||||
.put("client_id", clientId)
|
||||
.put("client_secret", credentialsJson.get("client_secret").asText())
|
||||
.build()))));
|
||||
final String url = asanaOAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL);
|
||||
LOGGER.info("Waiting for user consent at: {}", url);
|
||||
// TODO: To automate, start a selenium job to navigate to the Consent URL and click on allowing
|
||||
// access...
|
||||
while (!serverHandler.isSucceeded() && limit > 0) {
|
||||
Thread.sleep(1000);
|
||||
limit -= 1;
|
||||
}
|
||||
assertTrue(serverHandler.isSucceeded(), "Failed to get User consent on time");
|
||||
final Map<String, Object> params = asanaOAuthFlow.completeSourceOAuth(workspaceId, definitionId,
|
||||
Map.of("code", serverHandler.getParamValue()), REDIRECT_URL);
|
||||
LOGGER.info("Response from completing OAuth Flow is: {}", params.toString());
|
||||
assertTrue(params.containsKey("credentials"));
|
||||
final Map creds = (Map) params.get("credentials");
|
||||
assertTrue(creds.containsKey("refresh_token"));
|
||||
assertTrue(creds.get("refresh_token").toString().length() > 0);
|
||||
}
|
||||
|
||||
static class ServerHandler implements HttpHandler {
|
||||
|
||||
final private String expectedParam;
|
||||
private Map responseQuery;
|
||||
private String paramValue;
|
||||
private boolean succeeded;
|
||||
|
||||
@@ -120,10 +84,6 @@ public class AsanaOAuthFlowIntegrationTest {
|
||||
return paramValue;
|
||||
}
|
||||
|
||||
public Map getResponseQuery() {
|
||||
return responseQuery;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(HttpExchange t) {
|
||||
final String query = t.getRequestURI().getQuery();
|
||||
@@ -136,7 +96,6 @@ public class AsanaOAuthFlowIntegrationTest {
|
||||
paramValue = data.get(expectedParam);
|
||||
response = String.format("Successfully extracted %s:\n'%s'\nTest should be continuing the OAuth Flow to retrieve the refresh_token...",
|
||||
expectedParam, paramValue);
|
||||
responseQuery = data;
|
||||
LOGGER.info(response);
|
||||
t.sendResponseHeaders(200, response.length());
|
||||
succeeded = true;
|
||||
@@ -46,7 +46,8 @@ public class OAuthHandler {
|
||||
|
||||
public OAuthConsentRead getSourceOAuthConsent(final SourceOauthConsentRequest sourceDefinitionIdRequestBody)
|
||||
throws JsonValidationException, ConfigNotFoundException, IOException {
|
||||
final OAuthFlowImplementation oAuthFlowImplementation = getSourceOAuthFlowImplementation(sourceDefinitionIdRequestBody.getSourceDefinitionId());
|
||||
final OAuthFlowImplementation oAuthFlowImplementation = getSourceOAuthFlowImplementation(sourceDefinitionIdRequestBody.getSourceDefinitionId(),
|
||||
sourceDefinitionIdRequestBody.getWorkspaceId());
|
||||
final ImmutableMap<String, Object> metadata = generateSourceMetadata(sourceDefinitionIdRequestBody.getSourceDefinitionId());
|
||||
final OAuthConsentRead result = new OAuthConsentRead().consentUrl(oAuthFlowImplementation.getSourceConsentUrl(
|
||||
sourceDefinitionIdRequestBody.getWorkspaceId(),
|
||||
@@ -63,7 +64,8 @@ public class OAuthHandler {
|
||||
public OAuthConsentRead getDestinationOAuthConsent(final DestinationOauthConsentRequest destinationDefinitionIdRequestBody)
|
||||
throws JsonValidationException, ConfigNotFoundException, IOException {
|
||||
final OAuthFlowImplementation oAuthFlowImplementation =
|
||||
getDestinationOAuthFlowImplementation(destinationDefinitionIdRequestBody.getDestinationDefinitionId());
|
||||
getDestinationOAuthFlowImplementation(destinationDefinitionIdRequestBody.getDestinationDefinitionId(),
|
||||
destinationDefinitionIdRequestBody.getWorkspaceId());
|
||||
final ImmutableMap<String, Object> metadata = generateDestinationMetadata(destinationDefinitionIdRequestBody.getDestinationDefinitionId());
|
||||
final OAuthConsentRead result = new OAuthConsentRead().consentUrl(oAuthFlowImplementation.getDestinationConsentUrl(
|
||||
destinationDefinitionIdRequestBody.getWorkspaceId(),
|
||||
@@ -79,7 +81,8 @@ public class OAuthHandler {
|
||||
|
||||
public Map<String, Object> completeSourceOAuth(final CompleteSourceOauthRequest oauthSourceRequestBody)
|
||||
throws JsonValidationException, ConfigNotFoundException, IOException {
|
||||
final OAuthFlowImplementation oAuthFlowImplementation = getSourceOAuthFlowImplementation(oauthSourceRequestBody.getSourceDefinitionId());
|
||||
final OAuthFlowImplementation oAuthFlowImplementation = getSourceOAuthFlowImplementation(oauthSourceRequestBody.getSourceDefinitionId(),
|
||||
oauthSourceRequestBody.getWorkspaceId());
|
||||
final ImmutableMap<String, Object> metadata = generateSourceMetadata(oauthSourceRequestBody.getSourceDefinitionId());
|
||||
final Map<String, Object> result = oAuthFlowImplementation.completeSourceOAuth(
|
||||
oauthSourceRequestBody.getWorkspaceId(),
|
||||
@@ -97,7 +100,7 @@ public class OAuthHandler {
|
||||
public Map<String, Object> completeDestinationOAuth(final CompleteDestinationOAuthRequest oauthDestinationRequestBody)
|
||||
throws JsonValidationException, ConfigNotFoundException, IOException {
|
||||
final OAuthFlowImplementation oAuthFlowImplementation =
|
||||
getDestinationOAuthFlowImplementation(oauthDestinationRequestBody.getDestinationDefinitionId());
|
||||
getDestinationOAuthFlowImplementation(oauthDestinationRequestBody.getDestinationDefinitionId(), oauthDestinationRequestBody.getWorkspaceId());
|
||||
final ImmutableMap<String, Object> metadata = generateDestinationMetadata(oauthDestinationRequestBody.getDestinationDefinitionId());
|
||||
final Map<String, Object> result = oAuthFlowImplementation.completeDestinationOAuth(
|
||||
oauthDestinationRequestBody.getWorkspaceId(),
|
||||
@@ -132,18 +135,18 @@ public class OAuthHandler {
|
||||
configRepository.writeDestinationOAuthParam(param);
|
||||
}
|
||||
|
||||
private OAuthFlowImplementation getSourceOAuthFlowImplementation(final UUID sourceDefinitionId)
|
||||
private OAuthFlowImplementation getSourceOAuthFlowImplementation(final UUID sourceDefinitionId, final UUID workspaceId)
|
||||
throws JsonValidationException, ConfigNotFoundException, IOException {
|
||||
final StandardSourceDefinition standardSourceDefinition = configRepository
|
||||
.getStandardSourceDefinition(sourceDefinitionId);
|
||||
return oAuthImplementationFactory.create(standardSourceDefinition.getDockerRepository());
|
||||
return oAuthImplementationFactory.create(standardSourceDefinition.getDockerRepository(), workspaceId);
|
||||
}
|
||||
|
||||
private OAuthFlowImplementation getDestinationOAuthFlowImplementation(final UUID destinationDefinitionId)
|
||||
private OAuthFlowImplementation getDestinationOAuthFlowImplementation(final UUID destinationDefinitionId, final UUID workspaceId)
|
||||
throws JsonValidationException, ConfigNotFoundException, IOException {
|
||||
final StandardDestinationDefinition standardDestinationDefinition = configRepository
|
||||
.getStandardDestinationDefinition(destinationDefinitionId);
|
||||
return oAuthImplementationFactory.create(standardDestinationDefinition.getDockerRepository());
|
||||
return oAuthImplementationFactory.create(standardDestinationDefinition.getDockerRepository(), workspaceId);
|
||||
}
|
||||
|
||||
private ImmutableMap<String, Object> generateSourceMetadata(final UUID sourceDefinitionId)
|
||||
|
||||
Reference in New Issue
Block a user