diff --git a/.github/workflows/release_cli_and_assets.yml b/.github/workflows/release_cli_and_assets.yml index 2af560e94..195ec49f1 100644 --- a/.github/workflows/release_cli_and_assets.yml +++ b/.github/workflows/release_cli_and_assets.yml @@ -321,6 +321,7 @@ jobs: - "migration" - "brew" - "service_and_plugin" + - "blank_aggregators" - "search_path" - "chaos_and_query" - "dynamic_schema" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3ea55d2bc..f4b2c3555 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -93,6 +93,7 @@ jobs: - "migration" - "brew" - "service_and_plugin" + - "blank_aggregators" - "service" - "search_path" - "chaos_and_query" diff --git a/pkg/steampipeconfig/connection_state.go b/pkg/steampipeconfig/connection_state.go index 9b63ddbb8..954baad7a 100644 --- a/pkg/steampipeconfig/connection_state.go +++ b/pkg/steampipeconfig/connection_state.go @@ -1,6 +1,8 @@ package steampipeconfig import ( + "sort" + "strings" "time" typehelpers "github.com/turbot/go-kit/types" @@ -35,16 +37,19 @@ type ConnectionState struct { PluginModTime time.Time `json:"plugin_mod_time" db:"plugin_mod_time"` // the update time of the connection ConnectionModTime time.Time `json:"connection_mod_time" db:"connection_mod_time"` + // the names of child connections (for aggregators) + ConnectionNames []string `json:"-" db:"-"` } func NewConnectionState(remoteSchema string, connection *modconfig.Connection, creationTime time.Time) *ConnectionState { return &ConnectionState{ - Plugin: remoteSchema, - ConnectionName: connection.Name, - PluginModTime: creationTime, - State: constants.ConnectionStateReady, - Type: &connection.Type, - ImportSchema: connection.ImportSchema, + Plugin: remoteSchema, + ConnectionName: connection.Name, + PluginModTime: creationTime, + State: constants.ConnectionStateReady, + Type: &connection.Type, + ImportSchema: connection.ImportSchema, + ConnectionNames: connection.ConnectionNames, } } @@ -63,6 +68,14 @@ func (d *ConnectionState) Equals(other *ConnectionState) bool { return false } + names := d.ConnectionNames + sort.Strings(names) + otherNames := other.ConnectionNames + sort.Strings(otherNames) + if strings.Join(names, ",") != strings.Join(otherNames, "'") { + return false + } + // allow for sub ms rounding errors when converting from PG if d.PluginModTime.Sub(other.PluginModTime).Abs() > 1*time.Millisecond { return false diff --git a/pkg/steampipeconfig/connection_updates.go b/pkg/steampipeconfig/connection_updates.go index ef31e66ba..a2a360b7a 100644 --- a/pkg/steampipeconfig/connection_updates.go +++ b/pkg/steampipeconfig/connection_updates.go @@ -42,7 +42,7 @@ func NewConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, forceUpdateCo return updates, res } // validate the updates - // this will validate all plugins and connection names and remove any updates whichi use invalid connections + // this will validate all plugins and connection names and remove any updates which use invalid connections updates.validate() return updates, res } diff --git a/pkg/steampipeconfig/connection_updates_validate.go b/pkg/steampipeconfig/connection_updates_validate.go index f2b3bb54d..d863e790c 100644 --- a/pkg/steampipeconfig/connection_updates_validate.go +++ b/pkg/steampipeconfig/connection_updates_validate.go @@ -64,7 +64,7 @@ func (u *ConnectionUpdates) validateUpdates() { func (u *ConnectionUpdates) validateAggregator(connectionState *ConnectionState) bool { connectionName := connectionState.ConnectionName if connectionState.GetType() == modconfig.ConnectionTypeAggregator { - // get the conneciton object + // get the connection object connection := GlobalConfig.Connections[connectionName] // get the first child connection for _, childConnection := range connection.Connections { @@ -74,7 +74,8 @@ func (u *ConnectionUpdates) validateAggregator(connectionState *ConnectionState) } } } - return false + // treat empty aggregator as validated - we will create a schema for it but not allow querying + return true } func validateConnectionName(connectionName string, p *ConnectionPlugin) *ValidationFailure { diff --git a/tests/acceptance/test_data/source_files/blank_aggregator.spc b/tests/acceptance/test_data/source_files/blank_aggregator.spc new file mode 100644 index 000000000..ac37bac13 --- /dev/null +++ b/tests/acceptance/test_data/source_files/blank_aggregator.spc @@ -0,0 +1,5 @@ +connection "all_chaos" { + type = "aggregator" + plugin = "chaos" + connections = ["*"] +} diff --git a/tests/acceptance/test_files/blank_aggregators.bats b/tests/acceptance/test_files/blank_aggregators.bats new file mode 100644 index 000000000..8fd8b2fb5 --- /dev/null +++ b/tests/acceptance/test_files/blank_aggregators.bats @@ -0,0 +1,35 @@ +load "$LIB_BATS_ASSERT/load.bash" +load "$LIB_BATS_SUPPORT/load.bash" + +# function setup() { +# rm -f $STEAMPIPE_INSTALL_DIR/config/chaos.spc +# steampipe service "select 1" +# } + +@test "blank aggregator connection should throw a warning but not fail to run steampipe" { + skip + cp $SRC_DATA_DIR/blank_aggregator.spc $STEAMPIPE_INSTALL_DIR/config/chaos.spc + run steampipe query "select * from all_chaos.chaos_all_numeric_column" + echo $output + assert_output --partial "aggregator 'all_chaos' with pattern '*' matches no connections" +} + +@test "blank aggregator connection should return empty results and not error" { + skip + cp $SRC_DATA_DIR/blank_aggregator.spc $STEAMPIPE_INSTALL_DIR/config/chaos.spc + run steampipe query "select * from all_chaos.chaos_all_numeric_column" + echo $output + assert_equal "$output" "null" +} + +@test "blank aggregator connection schema not created issue" { + skip + # for blank aggregator connections, schema was not getting created while service was running + # https://github.com/turbot/steampipe/issues/3488 + run steampipe service start + cp $SRC_DATA_DIR/blank_aggregator.spc $STEAMPIPE_INSTALL_DIR/config/chaos.spc + run steampipe query "select * from all_chaos.chaos_all_numeric_column" + echo $output + steampipe service stop + assert_equal "$output" "null" +}