From edfe58f8151beadb66dab273b08d8d4e705b7c39 Mon Sep 17 00:00:00 2001 From: Binaek Sarkar Date: Fri, 1 Sep 2023 19:03:05 +0530 Subject: [PATCH] Revert "Fixes issue where temporary tables are lost in the middle of a long running interactive session. Closes #3543" (#3789) This reverts commit 047d8b5556f31d273b0c95a4f19c2f7c22ca130e. --- pkg/db/db_client/db_client.go | 26 +----------------- pkg/db/db_client/db_client_session.go | 23 +++++----------- pkg/db/db_common/client.go | 6 ----- pkg/db/db_common/db_session.go | 23 +++------------- pkg/interactive/interactive_client_init.go | 31 ++++++++++------------ 5 files changed, 25 insertions(+), 84 deletions(-) diff --git a/pkg/db/db_client/db_client.go b/pkg/db/db_client/db_client.go index 40b08ec60..6f8b6bd4c 100644 --- a/pkg/db/db_client/db_client.go +++ b/pkg/db/db_client/db_client.go @@ -30,9 +30,6 @@ type DbClient struct { // connection used to run system/plumbing queries (connection state, server settings) managementPool *pgxpool.Pool - // a connection hijacked from the pool before the pool is disabled (in interactive prompt) - userConnection *pgx.Conn - // the settings of the server that this client is connected to serverSettings *db_common.ServerSettings @@ -125,24 +122,6 @@ func (c *DbClient) closePools() { c.managementPool.Close() } -// DisablePool will disable the user pool and use a single connection for all query executions -// This allows us to retain the state of the client when we are in the interactive prompt -// -// Note: this does not disable the management pool. -func (c *DbClient) DisablePool(ctx context.Context) error { - if c.userConnection == nil { - conn, err := c.userPool.Acquire(ctx) - if err != nil { - return err - } - // hijack this connection - so that it's not managed by the pool anymore - c.userConnection = conn.Hijack() - // close the user pool - we don't want any queries to go through the pool - c.userPool.Close() - } - return nil -} - func (c *DbClient) loadServerSettings(ctx context.Context) error { serverSettings, err := serversettings.Load(ctx, c.managementPool) if err != nil { @@ -186,10 +165,7 @@ func (c *DbClient) ServerSettings() *db_common.ServerSettings { // Close implements Client // closes the connection to the database and shuts down the backend -func (c *DbClient) Close(ctx context.Context) error { - if c.userConnection != nil { - c.userConnection.Close(ctx) - } +func (c *DbClient) Close(context.Context) error { log.Printf("[TRACE] DbClient.Close %v", c.userPool) c.closePools() // nullify active sessions, since with the closing of the pools diff --git a/pkg/db/db_client/db_client_session.go b/pkg/db/db_client/db_client_session.go index c0d005f95..6d66424b7 100644 --- a/pkg/db/db_client/db_client_session.go +++ b/pkg/db/db_client/db_client_session.go @@ -28,22 +28,13 @@ func (c *DbClient) AcquireSession(ctx context.Context) (sessionResult *db_common } }() - var databaseConnection db_common.Releasable - - // if we are using a single user connection, we will use this for the session - // NOTE: we must convert the connection into a `Releasable` interface so we can call `Release` on it when we are done - if c.userConnection != nil { - databaseConnection = db_common.ReleasableFromConn(c.userConnection) - } else { - // we are using a connection pool - retrieve a connection from the pool - var err error - databaseConnection, err = c.userPool.Acquire(ctx) - if err != nil { - sessionResult.Error = err - return sessionResult - } + // get a database connection and query its backend pid + // note - this will retry if the connection is bad + databaseConnection, err := c.userPool.Acquire(ctx) + if err != nil { + sessionResult.Error = err + return sessionResult } - backendPid := databaseConnection.Conn().PgConn().PID() c.sessionsMutex.Lock() @@ -93,7 +84,7 @@ func (c *DbClient) AcquireSession(ctx context.Context) (sessionResult *db_common } // update required session search path if needed - err := c.ensureSessionSearchPath(ctx, session) + err = c.ensureSessionSearchPath(ctx, session) if err != nil { sessionResult.Error = err return sessionResult diff --git a/pkg/db/db_common/client.go b/pkg/db/db_common/client.go index 67c6e0149..81423ba9b 100644 --- a/pkg/db/db_common/client.go +++ b/pkg/db/db_common/client.go @@ -30,10 +30,4 @@ type Client interface { GetSchemaFromDB(context.Context) (*SchemaMetadata, error) ServerSettings() *ServerSettings - - // DisablePool will disable the user pool and use a single connection for all query executions - // This allows us to retain the state of the client when we are in the interactive prompt - // - // Note: this does not disable the management pool. - DisablePool(context.Context) error } diff --git a/pkg/db/db_common/db_session.go b/pkg/db/db_common/db_session.go index c35cebdce..3875b811b 100644 --- a/pkg/db/db_common/db_session.go +++ b/pkg/db/db_common/db_session.go @@ -4,27 +4,9 @@ import ( "log" "time" - "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" ) -// Releasable is the bare minimum set of functions that DatabaseSession needs from it's underlying -// connection. -type Releasable interface { - Release() - Conn() *pgx.Conn -} - -type releasableConn struct { - conn *pgx.Conn -} - -func (r *releasableConn) Release() {} -func (r *releasableConn) Conn() *pgx.Conn { return r.conn } - -func ReleasableFromConn(conn *pgx.Conn) Releasable { - return &releasableConn{conn: conn} -} - // DatabaseSession wraps over the raw database connection // the purpose is to be able // - to store the current search path of the connection without having to make a database round-trip @@ -34,7 +16,7 @@ type DatabaseSession struct { SearchPath []string `json:"-"` // this gets rewritten, since the database/sql gives back a new instance everytime - Connection Releasable `json:"-"` + Connection *pgxpool.Conn `json:"-"` // the id of the last scan metadata retrieved ScanMetadataMaxId int64 `json:"-"` @@ -60,4 +42,5 @@ func (s *DatabaseSession) Close(waitForCleanup bool) { s.Connection.Release() } s.Connection = nil + } diff --git a/pkg/interactive/interactive_client_init.go b/pkg/interactive/interactive_client_init.go index 25eb23aec..194244f1a 100644 --- a/pkg/interactive/interactive_client_init.go +++ b/pkg/interactive/interactive_client_init.go @@ -6,10 +6,8 @@ import ( "log" "time" - "github.com/sethvargo/go-retry" "github.com/spf13/viper" "github.com/turbot/go-kit/helpers" - "github.com/turbot/steampipe-plugin-sdk/v5/sperr" "github.com/turbot/steampipe/pkg/constants" "github.com/turbot/steampipe/pkg/db/db_common" "github.com/turbot/steampipe/pkg/error_helpers" @@ -105,14 +103,6 @@ func (c *InteractiveClient) readInitDataStream(ctx context.Context) { if c.initData.Result.Error != nil { return } - - // disable the user connection pool - // this will ensure that we are always working off of only one connection - if err := c.initData.Client.DisablePool(ctx); err != nil { - c.initData.Result.Error = err - return - } - statushooks.SetStatus(ctx, "Completing initialization…") // fetch the schema // TODO make this async https://github.com/turbot/steampipe/issues/3400 @@ -157,14 +147,21 @@ func (c *InteractiveClient) isInitialised() bool { } func (c *InteractiveClient) waitForInitData(ctx context.Context) error { - backOff := retry.WithMaxDuration(40*time.Second, retry.NewConstant(20*time.Millisecond)) - return retry.Do(ctx, backOff, func(ctx context.Context) error { - if c.isInitialised() { - // if there was an error in initialisation, return it - return c.initData.Result.Error + var initTimeout = 40 * time.Second + ticker := time.NewTicker(20 * time.Millisecond) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + if c.isInitialised() { + // if there was an error in initialisation, return it + return c.initData.Result.Error + } + case <-time.After(initTimeout): + return fmt.Errorf("timed out waiting for initialisation to complete") } - return retry.RetryableError(sperr.New("initialization timeout")) - }) + } } // return the workspace, or nil if not yet initialised