Revert "Fixes issue where temporary tables are lost in the middle of a long running interactive session. Closes #3543" (#3789)

This reverts commit 047d8b5556.
This commit is contained in:
Binaek Sarkar
2023-09-01 19:03:05 +05:30
committed by GitHub
parent 047d8b5556
commit edfe58f815
5 changed files with 25 additions and 84 deletions

View File

@@ -30,9 +30,6 @@ type DbClient struct {
// connection used to run system/plumbing queries (connection state, server settings)
managementPool *pgxpool.Pool
// a connection hijacked from the pool before the pool is disabled (in interactive prompt)
userConnection *pgx.Conn
// the settings of the server that this client is connected to
serverSettings *db_common.ServerSettings
@@ -125,24 +122,6 @@ func (c *DbClient) closePools() {
c.managementPool.Close()
}
// DisablePool will disable the user pool and use a single connection for all query executions
// This allows us to retain the state of the client when we are in the interactive prompt
//
// Note: this does not disable the management pool.
func (c *DbClient) DisablePool(ctx context.Context) error {
if c.userConnection == nil {
conn, err := c.userPool.Acquire(ctx)
if err != nil {
return err
}
// hijack this connection - so that it's not managed by the pool anymore
c.userConnection = conn.Hijack()
// close the user pool - we don't want any queries to go through the pool
c.userPool.Close()
}
return nil
}
func (c *DbClient) loadServerSettings(ctx context.Context) error {
serverSettings, err := serversettings.Load(ctx, c.managementPool)
if err != nil {
@@ -186,10 +165,7 @@ func (c *DbClient) ServerSettings() *db_common.ServerSettings {
// Close implements Client
// closes the connection to the database and shuts down the backend
func (c *DbClient) Close(ctx context.Context) error {
if c.userConnection != nil {
c.userConnection.Close(ctx)
}
func (c *DbClient) Close(context.Context) error {
log.Printf("[TRACE] DbClient.Close %v", c.userPool)
c.closePools()
// nullify active sessions, since with the closing of the pools

View File

@@ -28,22 +28,13 @@ func (c *DbClient) AcquireSession(ctx context.Context) (sessionResult *db_common
}
}()
var databaseConnection db_common.Releasable
// if we are using a single user connection, we will use this for the session
// NOTE: we must convert the connection into a `Releasable` interface so we can call `Release` on it when we are done
if c.userConnection != nil {
databaseConnection = db_common.ReleasableFromConn(c.userConnection)
} else {
// we are using a connection pool - retrieve a connection from the pool
var err error
databaseConnection, err = c.userPool.Acquire(ctx)
if err != nil {
sessionResult.Error = err
return sessionResult
}
// get a database connection and query its backend pid
// note - this will retry if the connection is bad
databaseConnection, err := c.userPool.Acquire(ctx)
if err != nil {
sessionResult.Error = err
return sessionResult
}
backendPid := databaseConnection.Conn().PgConn().PID()
c.sessionsMutex.Lock()
@@ -93,7 +84,7 @@ func (c *DbClient) AcquireSession(ctx context.Context) (sessionResult *db_common
}
// update required session search path if needed
err := c.ensureSessionSearchPath(ctx, session)
err = c.ensureSessionSearchPath(ctx, session)
if err != nil {
sessionResult.Error = err
return sessionResult

View File

@@ -30,10 +30,4 @@ type Client interface {
GetSchemaFromDB(context.Context) (*SchemaMetadata, error)
ServerSettings() *ServerSettings
// DisablePool will disable the user pool and use a single connection for all query executions
// This allows us to retain the state of the client when we are in the interactive prompt
//
// Note: this does not disable the management pool.
DisablePool(context.Context) error
}

View File

@@ -4,27 +4,9 @@ import (
"log"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
// Releasable is the bare minimum set of functions that DatabaseSession needs from it's underlying
// connection.
type Releasable interface {
Release()
Conn() *pgx.Conn
}
type releasableConn struct {
conn *pgx.Conn
}
func (r *releasableConn) Release() {}
func (r *releasableConn) Conn() *pgx.Conn { return r.conn }
func ReleasableFromConn(conn *pgx.Conn) Releasable {
return &releasableConn{conn: conn}
}
// DatabaseSession wraps over the raw database connection
// the purpose is to be able
// - to store the current search path of the connection without having to make a database round-trip
@@ -34,7 +16,7 @@ type DatabaseSession struct {
SearchPath []string `json:"-"`
// this gets rewritten, since the database/sql gives back a new instance everytime
Connection Releasable `json:"-"`
Connection *pgxpool.Conn `json:"-"`
// the id of the last scan metadata retrieved
ScanMetadataMaxId int64 `json:"-"`
@@ -60,4 +42,5 @@ func (s *DatabaseSession) Close(waitForCleanup bool) {
s.Connection.Release()
}
s.Connection = nil
}

View File

@@ -6,10 +6,8 @@ import (
"log"
"time"
"github.com/sethvargo/go-retry"
"github.com/spf13/viper"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/error_helpers"
@@ -105,14 +103,6 @@ func (c *InteractiveClient) readInitDataStream(ctx context.Context) {
if c.initData.Result.Error != nil {
return
}
// disable the user connection pool
// this will ensure that we are always working off of only one connection
if err := c.initData.Client.DisablePool(ctx); err != nil {
c.initData.Result.Error = err
return
}
statushooks.SetStatus(ctx, "Completing initialization…")
// fetch the schema
// TODO make this async https://github.com/turbot/steampipe/issues/3400
@@ -157,14 +147,21 @@ func (c *InteractiveClient) isInitialised() bool {
}
func (c *InteractiveClient) waitForInitData(ctx context.Context) error {
backOff := retry.WithMaxDuration(40*time.Second, retry.NewConstant(20*time.Millisecond))
return retry.Do(ctx, backOff, func(ctx context.Context) error {
if c.isInitialised() {
// if there was an error in initialisation, return it
return c.initData.Result.Error
var initTimeout = 40 * time.Second
ticker := time.NewTicker(20 * time.Millisecond)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if c.isInitialised() {
// if there was an error in initialisation, return it
return c.initData.Result.Error
}
case <-time.After(initTimeout):
return fmt.Errorf("timed out waiting for initialisation to complete")
}
return retry.RetryableError(sperr.New("initialization timeout"))
})
}
}
// return the workspace, or nil if not yet initialised