🎉 Source redshift: implement privileges check (#9744)
This commit is contained in:
@@ -641,7 +641,7 @@
|
||||
- name: Redshift
|
||||
sourceDefinitionId: e87ffa8e-a3b5-f69c-9076-6011339de1f6
|
||||
dockerRepository: airbyte/source-redshift
|
||||
dockerImageTag: 0.3.8
|
||||
dockerImageTag: 0.3.9
|
||||
documentationUrl: https://docs.airbyte.io/integrations/sources/redshift
|
||||
icon: redshift.svg
|
||||
sourceType: database
|
||||
|
||||
@@ -6631,9 +6631,9 @@
|
||||
supportsNormalization: false
|
||||
supportsDBT: false
|
||||
supported_destination_sync_modes: []
|
||||
- dockerImage: "airbyte/source-redshift:0.3.8"
|
||||
- dockerImage: "airbyte/source-redshift:0.3.9"
|
||||
spec:
|
||||
documentationUrl: "https://docs.airbyte.com/integrations/destinations/redshift"
|
||||
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
|
||||
connectionSpecification:
|
||||
$schema: "http://json-schema.org/draft-07/schema#"
|
||||
title: "Redshift Source Spec"
|
||||
|
||||
@@ -16,5 +16,5 @@ ENV APPLICATION source-redshift
|
||||
|
||||
COPY --from=build /airbyte /airbyte
|
||||
|
||||
LABEL io.airbyte.version=0.3.8
|
||||
LABEL io.airbyte.version=0.3.9
|
||||
LABEL io.airbyte.name=airbyte/source-redshift
|
||||
|
||||
@@ -12,10 +12,14 @@ import io.airbyte.db.jdbc.JdbcUtils;
|
||||
import io.airbyte.integrations.base.IntegrationRunner;
|
||||
import io.airbyte.integrations.base.Source;
|
||||
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
|
||||
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
|
||||
import io.airbyte.integrations.source.relationaldb.TableInfo;
|
||||
import io.airbyte.protocol.models.CommonField;
|
||||
import java.sql.JDBCType;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import org.slf4j.Logger;
|
||||
@@ -93,6 +97,27 @@ public class RedshiftSource extends AbstractJdbcSource<JDBCType> implements Sour
|
||||
return Set.of("information_schema", "pg_catalog", "pg_internal", "catalog_history");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException {
|
||||
return new HashSet<>(database.bufferedResultSetQuery(
|
||||
connection -> {
|
||||
connection.setAutoCommit(true);
|
||||
final PreparedStatement ps = connection.prepareStatement(
|
||||
"SELECT schemaname, tablename "
|
||||
+ "FROM pg_tables "
|
||||
+ "WHERE has_table_privilege(schemaname||'.'||tablename, 'select') = true AND schemaname = ?;");
|
||||
ps.setString(1, schema);
|
||||
return ps.executeQuery();
|
||||
},
|
||||
resultSet -> {
|
||||
final JsonNode json = sourceOperations.rowToJson(resultSet);
|
||||
return JdbcPrivilegeDto.builder()
|
||||
.schemaName(json.get("schemaname").asText())
|
||||
.tableName(json.get("tablename").asText())
|
||||
.build();
|
||||
}));
|
||||
}
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final Source source = new RedshiftSource();
|
||||
LOGGER.info("starting source: {}", RedshiftSource.class);
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/redshift",
|
||||
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift",
|
||||
"connectionSpecification": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "Redshift Source Spec",
|
||||
|
||||
@@ -29,7 +29,6 @@ class RedshiftJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
|
||||
@BeforeEach
|
||||
public void setup() throws Exception {
|
||||
config = getStaticConfig();
|
||||
|
||||
super.setup();
|
||||
}
|
||||
|
||||
|
||||
@@ -40,6 +40,8 @@ public class RedshiftSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
// This test case expects an active redshift cluster that is useable from outside of vpc
|
||||
protected ObjectNode config;
|
||||
protected JdbcDatabase database;
|
||||
protected String testUserName;
|
||||
protected String testUserPassword;
|
||||
protected String schemaName;
|
||||
protected String schemaToIgnore;
|
||||
protected String streamName;
|
||||
@@ -53,22 +55,30 @@ public class RedshiftSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
config = getStaticConfig();
|
||||
|
||||
database = createDatabase(config);
|
||||
|
||||
testUserName = "foo";
|
||||
testUserPassword = "BarBarBar1&";
|
||||
createTestUser(database, config, testUserName, testUserPassword);
|
||||
schemaName = Strings.addRandomSuffix("integration_test", "_", 5).toLowerCase();
|
||||
schemaToIgnore = schemaName + "shouldIgnore";
|
||||
streamName = "customer";
|
||||
|
||||
// limit the connection to one schema only
|
||||
config = config.set("schemas", Jsons.jsonNode(List.of(schemaName)));
|
||||
|
||||
// use test user user
|
||||
config = config.set("username", Jsons.jsonNode(testUserName));
|
||||
config = config.set("password", Jsons.jsonNode(testUserPassword));
|
||||
|
||||
// create a test data
|
||||
createTestData(database, schemaName);
|
||||
createTestData(database, schemaName, streamName, testUserName, true);
|
||||
createTestData(database, schemaName, "not_readable", testUserName, false);
|
||||
|
||||
// create a schema with data that will not be used for testing, but would be used to check schema
|
||||
// filtering. This one should not be visible in results
|
||||
createTestData(database, schemaToIgnore);
|
||||
createTestData(database, schemaToIgnore, streamName, testUserName, true);
|
||||
}
|
||||
|
||||
protected static JdbcDatabase createDatabase(final JsonNode config) {
|
||||
protected JdbcDatabase createDatabase(final JsonNode config) {
|
||||
return Databases.createJdbcDatabase(
|
||||
config.get("username").asText(),
|
||||
config.get("password").asText(),
|
||||
@@ -79,15 +89,30 @@ public class RedshiftSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
RedshiftSource.DRIVER_CLASS);
|
||||
}
|
||||
|
||||
protected void createTestData(final JdbcDatabase database, final String schemaName)
|
||||
protected void createTestUser(final JdbcDatabase database, final JsonNode config, final String testUserName, final String testUserPassword)
|
||||
throws SQLException {
|
||||
final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName);
|
||||
final String createTestUserQuery = String.format("CREATE USER %s PASSWORD '%s'", testUserName, testUserPassword);
|
||||
database.execute(connection -> {
|
||||
connection.createStatement().execute(createTestUserQuery);
|
||||
});
|
||||
final String grantSelectOnPgTablesQuery = String.format("GRANT SELECT ON TABLE pg_tables TO %s ", testUserName);
|
||||
database.execute(connection -> {
|
||||
connection.createStatement().execute(grantSelectOnPgTablesQuery);
|
||||
});
|
||||
}
|
||||
|
||||
protected void createTestData(final JdbcDatabase database,
|
||||
final String schemaName,
|
||||
final String tableName,
|
||||
final String testUserName,
|
||||
final Boolean isReadableByTestUser)
|
||||
throws SQLException {
|
||||
final String createSchemaQuery = String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName);
|
||||
database.execute(connection -> {
|
||||
connection.createStatement().execute(createSchemaQuery);
|
||||
});
|
||||
|
||||
streamName = "customer";
|
||||
final String fqTableName = JdbcUtils.getFullyQualifiedTableName(schemaName, streamName);
|
||||
final String fqTableName = JdbcUtils.getFullyQualifiedTableName(schemaName, tableName);
|
||||
final String createTestTable =
|
||||
String.format(
|
||||
"CREATE TABLE IF NOT EXISTS %s (c_custkey INTEGER, c_name VARCHAR(16), c_nation VARCHAR(16));\n",
|
||||
@@ -101,6 +126,22 @@ public class RedshiftSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
database.execute(connection -> {
|
||||
connection.createStatement().execute(insertTestData);
|
||||
});
|
||||
|
||||
if (!isReadableByTestUser) {
|
||||
final String revokeSelect = String.format("REVOKE SELECT ON TABLE %s FROM %s;\n", fqTableName, testUserName);
|
||||
database.execute(connection -> {
|
||||
connection.createStatement().execute(revokeSelect);
|
||||
});
|
||||
} else {
|
||||
final String grantUsageQuery = String.format("GRANT USAGE ON SCHEMA %s TO %s;\n", schemaName, testUserName);
|
||||
database.execute(connection -> {
|
||||
connection.createStatement().execute(grantUsageQuery);
|
||||
});
|
||||
final String grantSelectQuery = String.format("GRANT SELECT ON TABLE %s TO %s;\n", fqTableName, testUserName);
|
||||
database.execute(connection -> {
|
||||
connection.createStatement().execute(grantSelectQuery);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -109,6 +150,10 @@ public class RedshiftSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schemaName)));
|
||||
database.execute(connection -> connection.createStatement()
|
||||
.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schemaToIgnore)));
|
||||
database.execute(connection -> connection.createStatement()
|
||||
.execute(String.format("REVOKE SELECT ON table pg_tables FROM %s", testUserName)));
|
||||
database.execute(connection -> connection.createStatement()
|
||||
.execute(String.format("DROP USER IF EXISTS %s", testUserName)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -11,7 +11,8 @@ import io.airbyte.integrations.source.redshift.RedshiftSource;
|
||||
|
||||
public class RedshiftSslSourceAcceptanceTest extends RedshiftSourceAcceptanceTest {
|
||||
|
||||
protected static JdbcDatabase createDatabase(final JsonNode config) {
|
||||
@Override
|
||||
protected JdbcDatabase createDatabase(final JsonNode config) {
|
||||
return Databases.createJdbcDatabase(
|
||||
config.get("username").asText(),
|
||||
config.get("password").asText(),
|
||||
|
||||
@@ -54,7 +54,8 @@ All Redshift connections are encrypted using SSL
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
| :------ | :-------- | :----- | :------ |
|
||||
| 0.3 .8 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
|
||||
| 0.3.9 | 2022-02-21 | [9744](https://github.com/airbytehq/airbyte/pull/9744) | List only the tables on which the user has SELECT permissions.
|
||||
| 0.3.8 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
|
||||
| 0.3.7 | 2022-01-26 | [9721](https://github.com/airbytehq/airbyte/pull/9721) | Added schema selection |
|
||||
| 0.3.6 | 2022-01-20 | [8617](https://github.com/airbytehq/airbyte/pull/8617) | Update connector fields title/description |
|
||||
| 0.3.5 | 2021-12-24 | [8958](https://github.com/airbytehq/airbyte/pull/8958) | Add support for JdbcType.ARRAY |
|
||||
|
||||
Reference in New Issue
Block a user