Differentiate between user and system queries using application name. Closes #3600

This commit is contained in:
Binaek Sarkar
2023-08-25 21:20:27 +05:30
committed by GitHub
parent 21d27bd1ea
commit 08b447a261
35 changed files with 462 additions and 307 deletions

View File

@@ -868,7 +868,7 @@ func getConnectionState(ctx context.Context) (steampipeconfig.ConnectionStateMap
}
defer client.Close(ctx)
conn, err := client.AcquireConnection(ctx)
conn, err := client.AcquireManagementConnection(ctx)
if err != nil {
res.Error = err
return nil, res

View File

@@ -17,7 +17,6 @@ import (
"github.com/turbot/steampipe/pkg/cmdconfig"
"github.com/turbot/steampipe/pkg/connection"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/constants/runtime"
"github.com/turbot/steampipe/pkg/filepaths"
"github.com/turbot/steampipe/pkg/pluginmanager_service"
"github.com/turbot/steampipe/pkg/steampipeconfig"
@@ -55,12 +54,6 @@ func runPluginManagerCmd(cmd *cobra.Command, _ []string) {
}
}()
// add a prefix to the PgClientAppName so that out DB connecitons are not treated as
// another Steampipe instance connected to the DB
// (as our lifecycle is managed by the db service,
// so we will be shut down when the service is stopped)
runtime.PgClientAppName = runtime.PgClientAppNamePluginManagerPrefix + runtime.PgClientAppName
configMap := connection.NewConnectionConfigMap(steampipeConfig.Connections)
log.Printf("[TRACE] loaded config map: %s", strings.Join(steampipeConfig.ConnectionNames(), ","))

View File

@@ -0,0 +1,32 @@
## Queries that need to be executed over a client connection:
- Get scan metadata
- read automatically if `--timing` is enabled
- Set search path
- Can be automatically during session startup
- Can be set by the user using meta commands
- Cache commands
- Can be automatically during session startup
- Can be set by the user using meta commands
- Introspection tables
- Written automatically for each database connection
- Read by system if `--tag` or `--where` are used for `check`
## Database Session
A thin wrapper around the raw database connection which caches the `search path` and the `scan metadata id`
### Acquire `session`
1. Get a database connection from the pool
1. if not found in `session cache map`
1. create a `DatabaseSession` for the connection
1. Persist `DatabaseSession` in `session cache map`
1. Set cache parameters (if required)
1. If client `cache` is enabled, enable client `cache` on the connection
1. If client `cache ttl` is set, set the `cache ttl` on the connection
1. Ensure `search path`
1. Load the `search path` of the `steampipe` user - db query
1. Get the resolved `search path` based on the `search_path` and `search_path_prefix` configs (`custom_search_path`)
1. if the `loaded search path` and `resolved search path` differ, set the `resolved search path` on the connection

28
go.sum
View File

@@ -118,6 +118,7 @@ cloud.google.com/go/language v1.6.0/go.mod h1:6dJ8t3B+lUYfStgls25GusK04NLh3eDLQn
cloud.google.com/go/lifesciences v0.5.0/go.mod h1:3oIKy8ycWGPUyZDR/8RNnTOYevhaMLqh5vLUXs9zvT8=
cloud.google.com/go/lifesciences v0.6.0/go.mod h1:ddj6tSX/7BOnhxCSd3ZcETvtNr8NZ6t/iPhY2Tyfu08=
cloud.google.com/go/longrunning v0.4.1 h1:v+yFJOfKC3yZdY6ZUI933pIYdhyhV8S3NpWrXWmg7jM=
cloud.google.com/go/longrunning v0.4.1/go.mod h1:4iWDqhBZ70CvZ6BfETbvam3T8FMvLK+eFj0E6AaRQTo=
cloud.google.com/go/mediatranslation v0.5.0/go.mod h1:jGPUhGTybqsPQn91pNXw0xVHfuJ3leR1wj37oU3y1f4=
cloud.google.com/go/mediatranslation v0.6.0/go.mod h1:hHdBCTYNigsBxshbznuIMFNe5QXEowAuNmmC7h8pu5w=
cloud.google.com/go/memcache v1.4.0/go.mod h1:rTOfiGZtJX1AaFUrOgsMHX5kAzaTQ8azHiuDoTPzNsE=
@@ -238,6 +239,7 @@ github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/Microsoft/hcsshim v0.10.0-rc.8 h1:YSZVvlIIDD1UxQpJp0h+dnpLUw+TrY0cx8obKsp3bek=
github.com/Microsoft/hcsshim v0.10.0-rc.8/go.mod h1:OEthFdQv/AD2RAdzR6Mm1N1KPCztGKDurW1Z8b8VGMM=
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/ProtonMail/go-crypto v0.0.0-20230717121422-5aa5874ade95 h1:KLq8BE0KwCL+mmXnjLWEAOYO+2l2AE4YMmqG1ZpZHBs=
@@ -268,6 +270,7 @@ github.com/aliyun/aliyun-tablestore-go-sdk v4.1.2+incompatible/go.mod h1:LDQHRZy
github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3bSzwk=
github.com/allegro/bigcache/v3 v3.1.0/go.mod h1:aPyh7jEvrog9zAwx5N7+JUQX5dZTSGpxF1LAR4dr35I=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/antchfx/xpath v0.0.0-20190129040759-c8489ed3251e/go.mod h1:Yee4kTMuNiPYJ7nSNorELQMr1J33uOpXDMByNYhvtNk=
github.com/antchfx/xquery v0.0.0-20180515051857-ad5b8c7a47b0/go.mod h1:LzD22aAzDP8/dyiCKFp31He4m2GPjl0AFyzDtZzUu9M=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
@@ -288,6 +291,7 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aws/aws-sdk-go v1.15.78/go.mod h1:E3/ieXAlvM0XWO57iftYVDLLvQ824smPP3ATZkfNZeM=
github.com/aws/aws-sdk-go v1.31.9/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.37.0/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
@@ -347,10 +351,13 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM=
github.com/containerd/cgroups v1.1.0/go.mod h1:6ppBcbh/NOOUU+dMKrykgaBnK9lCIBxHqJDGwsa1mIw=
github.com/containerd/containerd v1.7.4 h1:Q5lwCrO44ahHhO65rXthXkfJUG5W78LXwK9gTt8XFfU=
github.com/containerd/containerd v1.7.4/go.mod h1:gq7JDNtCrI1Zlcc572a9tvP1f1Ja8VBxiB9J00apAtU=
github.com/containerd/continuity v0.4.2-0.20230616210509-1e0d26eb2381 h1:a5jOuoZHKBi2oH9JsfNqrrPpHhmrYU0NAte3M/EPudw=
github.com/containerd/continuity v0.4.2-0.20230616210509-1e0d26eb2381/go.mod h1:F6PTNCKepoxEaXLQp3wDAjygEnImnZ/7o4JzpodfroQ=
github.com/coocood/freecache v1.2.3 h1:lcBwpZrwBZRZyLk/8EMyQVXRiFl663cCuMOrjCALeto=
github.com/coocood/freecache v1.2.3/go.mod h1:RBUWa/Cy+OHdfTGFEhEuE1pMCMX51Ncizj7rthiQ3vk=
github.com/coreos/bbolt v1.3.0/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
@@ -387,6 +394,7 @@ github.com/eko/gocache/v3 v3.1.2 h1:tBAn5kBScEmRXWHJl0iJgJU7TsMeOjySwHDZ/92riqg=
github.com/eko/gocache/v3 v3.1.2/go.mod h1:92prWCVTLxRkRlZuxDkLkwwUfitZ60zKNi6kn3qiDNU=
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/elazarl/goproxy v0.0.0-20221015165544-a0805db90819 h1:RIB4cRk+lBqKK3Oy0r2gRX4ui7tuhiZq2SuTtTCi0/0=
github.com/elazarl/goproxy v0.0.0-20221015165544-a0805db90819/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
@@ -410,6 +418,7 @@ github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoD
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
@@ -428,11 +437,13 @@ github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwv
github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg=
github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU=
github.com/gliderlabs/ssh v0.3.5 h1:OcaySEmAQJgyYcArR+gGGTHCyE7nvhEMTlYY+Dp8CpY=
github.com/gliderlabs/ssh v0.3.5/go.mod h1:8XB4KraRrX39qHhT6yxPsHedjA08I/uBVwj4xC+/+z4=
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 h1:+zs/tPmkDkHx3U66DAb0lQFJrpS6731Oaa12ikc+DiI=
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376/go.mod h1:an3vInlBmSxCcxctByoQdvwPiA7DTK7jaaFDBTtu0ic=
github.com/go-git/go-billy/v5 v5.4.1 h1:Uwp5tDRkPr+l/TnbHOQzp+tmJfLceOlbVucgpTz8ix4=
github.com/go-git/go-billy/v5 v5.4.1/go.mod h1:vjbugF6Fz7JIflbVpl1hJsGjSHNltrSw45YK/ukIvQg=
github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20230305113008-0c11038e723f h1:Pz0DHeFij3XFhoBRGUDPzSJ+w2UcK5/0JvF8DRI58r8=
github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20230305113008-0c11038e723f/go.mod h1:8LHG1a3SRW71ettAD/jW13h8c6AqjVSeL11RAdgaqpo=
github.com/go-git/go-git/v5 v5.8.1 h1:Zo79E4p7TRk0xoRgMq0RShiTHGKcKI4+DI6BfJc/Q+A=
github.com/go-git/go-git/v5 v5.8.1/go.mod h1:FHFuoD6yGz5OSKEBK+aWN9Oah0q54Jxl0abmj6GnqAo=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
@@ -460,6 +471,7 @@ github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nA
github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dpr1UfpPtxFw+EFuQ41HhCWZfha5jSVRG7C7I=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
@@ -484,6 +496,7 @@ github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
github.com/golang/glog v1.1.0 h1:/d3pCKDPWNnvIWe0vVUpNP32qc8U3PDVxySP/y360qE=
@@ -554,6 +567,7 @@ github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIG
github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/martian/v3 v3.2.1/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk=
github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw=
github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
@@ -603,6 +617,7 @@ github.com/gophercloud/gophercloud v0.10.1-0.20200424014253-c3bfe50899e5/go.mod
github.com/gophercloud/utils v0.0.0-20200423144003-7c72efc7435d/go.mod h1:ehWUbLQJPqS0Ep+CxeD559hsm9pthPXadJNKwZkp43w=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g=
github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
@@ -765,6 +780,7 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
@@ -788,6 +804,7 @@ github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFB
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
@@ -881,6 +898,7 @@ github.com/mitchellh/reflectwalk v1.0.1/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx
github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg=
github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc=
github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78=
github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@@ -897,6 +915,7 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d/go.mod h1:YUTz3bUH2ZwIWBy3CJBeOBEugqcmXREj14T+iG/4k4U=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
@@ -906,11 +925,13 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys=
github.com/onsi/gomega v1.23.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0-rc4 h1:oOxKUJWnFC4YGHCCMNql1x4YaDfYBTS5Y4x/Cgeo1E0=
@@ -920,9 +941,11 @@ github.com/oras-project/oras-credentials-go v0.3.0/go.mod h1:fFCebDQo0Do+gnM96uV
github.com/otiai10/copy v1.12.0 h1:cLMgSQnXBs1eehF0Wy/FAGsgDTDmAqFR7rQylBb1nDY=
github.com/otiai10/copy v1.12.0/go.mod h1:rSaLseMUsZFFbsFGc7wCJnnkTAvdc5L6VWxPE4308Ww=
github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks=
github.com/otiai10/mint v1.5.1/go.mod h1:MJm72SBthJjz8qhefc4z1PYEieWmy8Bku7CjcAqyUSM=
github.com/packer-community/winrmcp v0.0.0-20180921211025-c76d91c1e7db/go.mod h1:f6Izs6JvFTdnRbziASagjZ2vmf55NSIkC/weStxCHqk=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pegasus-kv/thrift v0.13.0 h1:4ESwaNoHImfbHa9RUGJiJZ4hrxorihZHk5aarYwY8d4=
github.com/pegasus-kv/thrift v0.13.0/go.mod h1:Gl9NT/WHG6ABm6NsrbfE8LiJN0sAyneCrvB4qN4NPqQ=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
@@ -979,6 +1002,7 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
@@ -1008,8 +1032,10 @@ github.com/skeema/knownhosts v1.2.0 h1:h9r9cf0+u7wSE+M183ZtMGgOJKiL96brpaz5ekfJC
github.com/skeema/knownhosts v1.2.0/go.mod h1:g4fPeYpque7P0xefxtGzV81ihjC8sX2IqpAoNkjxbMo=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.13.0 h1:Dx1kYM01xsSqKPno3aqLnrwac2LetPvN23diwyr69Qs=
github.com/smartystreets/assertions v1.13.0/go.mod h1:wDmR7qL282YbGsPy6H/yAsesrxfxaaSlJazyFLYVFx8=
github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s=
github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg=
github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
@@ -1052,6 +1078,7 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8=
github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/svanharmelen/jsonapi v0.0.0-20180618144545-0c0828c3f16d/go.mod h1:BSTlc8jOjh0niykqEGVXOLXdi9o0r0kR8tCYiMvjFgw=
@@ -1153,6 +1180,7 @@ go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=

View File

@@ -33,6 +33,7 @@ type connectionError struct {
}
type refreshConnectionState struct {
// a connection pool to the DB service which uses the server appname
pool *pgxpool.Pool
searchPath []string
connectionUpdates *steampipeconfig.ConnectionUpdates
@@ -638,11 +639,6 @@ func getCloneSchemaQuery(exemplarSchemaName string, connectionState *steampipeco
return fmt.Sprintf("select clone_foreign_schema('%s', '%s', '%s');", exemplarSchemaName, connectionState.ConnectionName, connectionState.Plugin)
}
func getCloneCommentsQuery(exemplarSchemaName string, connectionState *steampipeconfig.ConnectionState) string {
return fmt.Sprintf("select clone_table_comments('%s', '%s');", exemplarSchemaName, connectionState.ConnectionName)
}
func (s *refreshConnectionState) getInitialAndRemainingUpdates() (initialUpdates, remainingUpdates map[string]*steampipeconfig.ConnectionState, dynamicUpdates map[string][]*steampipeconfig.ConnectionState) {
updates := s.connectionUpdates.Update
searchPathConnections := s.connectionUpdates.FinalConnectionState.GetFirstSearchPathConnectionForPlugins(s.searchPath)
@@ -754,12 +750,11 @@ func (s *refreshConnectionState) setIncompleteConnectionStateToError(ctx context
// OnConnectionsChanged is the callback function invoked by the connection watcher when connections are added or removed
func (s *refreshConnectionState) sendPostgreSchemaNotification(ctx context.Context) error {
conn, err := db_local.CreateLocalDbConnection(ctx, &db_local.CreateDbOptions{Username: constants.DatabaseSuperUser})
conn, err := s.pool.Acquire(ctx)
if err != nil {
return err
}
defer conn.Close(ctx)
defer conn.Release()
notification := steampipeconfig.NewSchemaUpdateNotification(steampipeconfig.PgNotificationSchemaUpdate)
return db_local.SendPostgresNotification(ctx, conn, notification)
return db_local.SendPostgresNotification(ctx, conn.Conn(), notification)
}

View File

@@ -2,6 +2,7 @@ package connection_sync
import (
"context"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_client"
"github.com/turbot/steampipe/pkg/db/db_common"
@@ -13,7 +14,7 @@ import (
// if any of the connections are in error state, return an error
// this is used to ensure unqualified queries and tables are resolved to the correct connection
func WaitForSearchPathSchemas(ctx context.Context, client db_common.Client, searchPath []string) error {
conn, err := client.AcquireConnection(ctx)
conn, err := client.AcquireManagementConnection(ctx)
if err != nil {
return err
}

View File

@@ -6,3 +6,9 @@ const (
AppName = "steampipe"
FdwName = "steampipe-postgres-fdw"
)
const (
ClientConnectionAppNamePrefix = "steampipe_client"
ServiceConnectionAppNamePrefix = "steampipe_service"
ClientSystemConnectionAppNamePrefix = "steampipe_client_system"
)

View File

@@ -127,6 +127,10 @@ const (
IntrospectionTableReference = "steampipe_reference"
)
const (
RuntimeParamsKeyApplicationName = "application_name"
)
// Invoker is a pseudoEnum for the command/operation which starts the service
type Invoker string

View File

@@ -9,7 +9,16 @@ import (
)
var (
ExecutionID = helpers.GetMD5Hash(fmt.Sprintf("%d", time.Now().Nanosecond()))[:4]
PgClientAppName = fmt.Sprintf("%s_%s", constants.AppName, ExecutionID)
PgClientAppNamePluginManagerPrefix = "pm_"
ExecutionID = helpers.GetMD5Hash(fmt.Sprintf("%d", time.Now().Nanosecond()))[:4]
)
var (
// App name used by connections which issue user-initiated queries
ClientConnectionAppName = fmt.Sprintf("%s_%s", constants.ClientConnectionAppNamePrefix, ExecutionID)
// App name used for queries which support user-initiated queries (load schema, load connection state etc.)
ClientSystemConnectionAppName = fmt.Sprintf("%s_%s", constants.ClientSystemConnectionAppNamePrefix, ExecutionID)
// App name used for service related queries (plugin manager, refresh connection)
ServiceConnectionAppName = fmt.Sprintf("%s_%s", constants.ServiceConnectionAppNamePrefix, ExecutionID)
)

View File

@@ -23,7 +23,12 @@ import (
// DbClient wraps over `sql.DB` and gives an interface to the database
type DbClient struct {
connectionString string
pool *pgxpool.Pool
// connection userPool for user initiated queries
userPool *pgxpool.Pool
// connection used to run system/plumbing queries (connection state, server settings)
managementPool *pgxpool.Pool
// the settings of the server that this client is connected to
serverSettings *db_common.ServerSettings
@@ -37,7 +42,10 @@ type DbClient struct {
// map of database sessions, keyed to the backend_pid in postgres
// used to update session search path where necessary
// TODO: there's no code which cleans up this map when connections get dropped by pgx
// https://github.com/turbot/steampipe/issues/3737
sessions map[uint32]*db_common.DatabaseSession
// allows locked access to the 'sessions' map
sessionsMutex *sync.Mutex
@@ -109,13 +117,13 @@ func NewDbClient(ctx context.Context, connectionString string, onConnectionCallb
return client, nil
}
func (c *DbClient) closePools() {
c.userPool.Close()
c.managementPool.Close()
}
func (c *DbClient) loadServerSettings(ctx context.Context) error {
conn, _, err := c.GetDatabaseConnectionWithRetries(ctx)
if err != nil {
return err
}
defer conn.Release()
serverSettings, err := serversettings.Load(ctx, conn.Conn())
serverSettings, err := serversettings.Load(ctx, c.managementPool)
if err != nil {
if _, _, notFound := IsRelationNotFoundError(err); notFound {
// when connecting to pre-0.21.0 services, the server_settings table will not be available.
@@ -158,12 +166,11 @@ func (c *DbClient) ServerSettings() *db_common.ServerSettings {
// Close implements Client
// closes the connection to the database and shuts down the backend
func (c *DbClient) Close(context.Context) error {
log.Printf("[TRACE] DbClient.Close %v", c.pool)
if c.pool != nil {
// clear the sessions map - so that we can't reuse it
c.sessions = nil
c.pool.Close()
}
log.Printf("[TRACE] DbClient.Close %v", c.userPool)
c.closePools()
// nullify active sessions, since with the closing of the pools
// none of the sessions will be valid anymore
c.sessions = nil
return nil
}
@@ -188,20 +195,20 @@ func (c *DbClient) RefreshSessions(ctx context.Context) (res *db_common.AcquireS
// NOTE: it optimises the schema extraction by extracting schema information for
// connections backed by distinct plugins and then fanning back out.
func (c *DbClient) GetSchemaFromDB(ctx context.Context) (*db_common.SchemaMetadata, error) {
conn, _, err := c.GetDatabaseConnectionWithRetries(ctx)
mgmtConn, err := c.managementPool.Acquire(ctx)
if err != nil {
return nil, err
}
defer conn.Release()
defer mgmtConn.Release()
// for optimisation purposes, try to load connection state and build a map of schemas to load
// (if we are connected to a remote server running an older CLI,
// this load may fail, in which case bypass the optimisation)
connectionStateMap, err := steampipeconfig.LoadConnectionState(ctx, conn.Conn(), steampipeconfig.WithWaitUntilLoading())
connectionStateMap, err := steampipeconfig.LoadConnectionState(ctx, mgmtConn.Conn(), steampipeconfig.WithWaitUntilLoading())
// NOTE: if we failed to load conenction state, this may be because we are connected to an older version of the CLI
// use legacy (v0.19.x) schema loading code
if err != nil {
return c.GetSchemaFromDBLegacy(ctx, conn)
return c.GetSchemaFromDBLegacy(ctx, mgmtConn)
}
// build a ConnectionSchemaMap object to identify the schemas to load
@@ -216,14 +223,8 @@ func (c *DbClient) GetSchemaFromDB(ctx context.Context) (*db_common.SchemaMetada
// build a query to retrieve these schemas
query := c.buildSchemasQuery(schemas...)
//execute
tablesResult, err := conn.Query(ctx, query)
if err != nil {
return nil, err
}
// build schema metadata from query result
metadata, err := db_common.BuildSchemaMetadata(tablesResult)
metadata, err := db_common.LoadSchemaMetadata(ctx, mgmtConn.Conn(), query)
if err != nil {
return nil, err
}
@@ -249,14 +250,8 @@ func (c *DbClient) GetSchemaFromDBLegacy(ctx context.Context, conn *pgxpool.Conn
// build a query to retrieve these schemas
query := c.buildSchemasQueryLegacy()
//execute
tablesResult, err := conn.Query(ctx, query)
if err != nil {
return nil, err
}
// build schema metadata from query result
return db_common.BuildSchemaMetadata(tablesResult)
return db_common.LoadSchemaMetadata(ctx, conn.Conn(), query)
}
// refreshDbClient terminates the current connection and opens up a new connection to the service.
@@ -265,7 +260,7 @@ func (c *DbClient) refreshDbClient(ctx context.Context) error {
defer utils.LogTime("db_client.refreshDbClient end")
// close the connection pool and recreate
c.pool.Close()
c.closePools()
if err := c.establishConnectionPool(ctx); err != nil {
return err
}

View File

@@ -37,6 +37,15 @@ func (c *DbClient) establishConnectionPool(ctx context.Context) error {
"localhost",
}
// when connected to a service which is running a plugin compiled with SDK pre-v5, the plugin
// will not have the ability to turn off caching (feature introduced in SDKv5)
//
// the 'isLocalService' is used to set the client end cache to 'false' if caching is turned off in the local service
//
// this is a temporary workaround to make sure
// that we can turn off caching for plugins compiled with SDK pre-V5
// worst case scenario is that we don't switch off the cache for pre-V5 plugins
// refer to: https://github.com/turbot/steampipe/blob/f7f983a552a07e50e526fcadf2ccbfdb7b247cc0/pkg/db/db_client/db_client_session.go#L66
if helpers.StringSliceContains(locals, config.ConnConfig.Host) {
c.isLocalService = true
}
@@ -55,7 +64,7 @@ func (c *DbClient) establishConnectionPool(ctx context.Context) error {
// set an app name so that we can track database connections from this Steampipe execution
// this is used to determine whether the database can safely be closed
config.ConnConfig.Config.RuntimeParams = map[string]string{
"application_name": runtime.PgClientAppName,
constants.RuntimeParamsKeyApplicationName: runtime.ClientConnectionAppName,
}
// this returns connection pool
@@ -73,6 +82,34 @@ func (c *DbClient) establishConnectionPool(ctx context.Context) error {
if err != nil {
return err
}
c.pool = dbPool
c.userPool = dbPool
return c.establishManagementConnectionPool(ctx, config)
}
// establishSystemConnectionPool creates a connection pool to use to execute
// system-initiated queries (loading of connection state etc.)
// unlike establishConnectionPool, which is run first to create the user-query pool
// this doesn't wait for the pool to completely start, as establishConnectionPool will have established and verified a connection with the service
func (c *DbClient) establishManagementConnectionPool(ctx context.Context, config *pgxpool.Config) error {
utils.LogTime("db_client.establishSystemConnectionPool start")
defer utils.LogTime("db_client.establishSystemConnectionPool end")
// create a copy of the config which is used to setup the user connection pool
// we need to modify the config - don't want the original to get modified
copiedConfig := config.Copy()
copiedConfig.ConnConfig.Config.RuntimeParams = map[string]string{
constants.RuntimeParamsKeyApplicationName: runtime.ClientSystemConnectionAppName,
}
// remove the afterConnect hook - we don't need the session data in management connections
copiedConfig.AfterConnect = nil
// this returns connection pool
dbPool, err := pgxpool.NewWithConfig(context.Background(), copiedConfig)
if err != nil {
return err
}
c.managementPool = dbPool
return nil
}

View File

@@ -3,6 +3,7 @@ package db_client
import (
"context"
"database/sql"
"errors"
"fmt"
"net/netip"
"strings"
@@ -199,53 +200,48 @@ func (c *DbClient) getQueryTiming(ctx context.Context, startTime time.Time, sess
resultChannel <- timingResult
}()
res, err := c.ExecuteSyncInSession(ctx, session, fmt.Sprintf("select id, rows_fetched, cache_hit, hydrate_calls from %s.%s where id > %d", constants.InternalSchema, constants.ForeignTableScanMetadata, session.ScanMetadataMaxId))
// if we failed to read scan metadata (either because the query failed or the plugin does not support it)
// just return
if err != nil || len(res.Rows) == 0 {
var scanRows []ScanMetadataRow
err := db_common.ExecuteSystemClientCall(ctx, session.Connection.Conn(), func(ctx context.Context, tx pgx.Tx) error {
query := fmt.Sprintf("select id, rows_fetched, cache_hit, hydrate_calls from %s.%s where id > %d", constants.InternalSchema, constants.ForeignTableScanMetadata, session.ScanMetadataMaxId)
rows, err := tx.Query(ctx, query)
if err != nil {
return err
}
scanRows, err = pgx.CollectRows(rows, pgx.RowToStructByName[ScanMetadataRow])
return err
})
// if we failed to read scan metadata (either because the query failed or the plugin does not support it) just return
// we don't return the error, since we don't want to error out in this case
if err != nil || len(scanRows) == 0 {
return
}
// so we have scan metadata - create the metadata struct
timingResult.Metadata = &queryresult.TimingMetadata{}
var id int64
for _, r := range res.Rows {
rw := r.(*queryresult.RowResult)
id = rw.Data[0].(int64)
rowsFetched := rw.Data[1].(int64)
cacheHit := rw.Data[2].(bool)
hydrateCalls := rw.Data[3].(int64)
timingResult.Metadata.HydrateCalls += hydrateCalls
if cacheHit {
timingResult.Metadata.CachedRowsFetched += rowsFetched
timingResult.Metadata = &queryresult.TimingMetadata{}
for _, r := range scanRows {
timingResult.Metadata.HydrateCalls += r.hydrateCalls
if r.cacheHit {
timingResult.Metadata.CachedRowsFetched += r.rowsFetched
} else {
timingResult.Metadata.RowsFetched += rowsFetched
timingResult.Metadata.RowsFetched += r.rowsFetched
}
id = r.id
}
// update the max id for this session
session.ScanMetadataMaxId = id
}
func (c *DbClient) updateScanMetadataMaxId(ctx context.Context, session *db_common.DatabaseSession) error {
res, err := c.ExecuteSyncInSession(ctx, session, fmt.Sprintf("select max(id) from %s.%s", constants.InternalSchema, constants.ForeignTableScanMetadata))
if err != nil {
return err
}
for _, r := range res.Rows {
rw := r.(*queryresult.RowResult)
id, ok := rw.Data[0].(int64)
if ok {
// update the max id for this session
session.ScanMetadataMaxId = id
return db_common.ExecuteSystemClientCall(ctx, session.Connection.Conn(), func(ctx context.Context, tx pgx.Tx) error {
row := tx.QueryRow(ctx, fmt.Sprintf("select max(id) from %s.%s", constants.InternalSchema, constants.ForeignTableScanMetadata))
err := row.Scan(&session.ScanMetadataMaxId)
if errors.Is(err, pgx.ErrNoRows) {
return nil
}
//nolint:golint,staticcheck // we only need to only read the first row
break
}
return nil
return err
})
}
// run query in a goroutine, so we can check for cancellation

View File

@@ -52,12 +52,18 @@ func (c *DbClient) startQueryWithRetries(ctx context.Context, session *db_common
// just return it
return queryError
}
// get a connection from the system pool to query the connection state table
sysConn, err := c.managementPool.Acquire(ctx)
if err != nil {
return retry.RetryableError(err)
}
defer sysConn.Release()
// so this _was_ a "relation not found" error
// load the connection state and connection config to see if the missing schema is in there at all
// if there was a schema not found with an unqualified query, we keep trying until
// the first search path schema for each plugin has loaded
connectionStateMap, stateErr := steampipeconfig.LoadConnectionState(ctx, conn, steampipeconfig.WithWaitUntilLoading())
connectionStateMap, stateErr := steampipeconfig.LoadConnectionState(ctx, sysConn.Conn(), steampipeconfig.WithWaitUntilLoading())
if stateErr != nil {
log.Println("[TRACE] could not load connection state map:", stateErr)
// just return the query error
@@ -87,7 +93,7 @@ func (c *DbClient) startQueryWithRetries(ctx context.Context, session *db_common
}
// otherwise we need to wait for the first schema of everything plugin to load
if _, err := steampipeconfig.LoadConnectionState(ctx, conn, steampipeconfig.WithWaitForSearchPath(searchPath)); err != nil {
if _, err := steampipeconfig.LoadConnectionState(ctx, sysConn.Conn(), steampipeconfig.WithWaitForSearchPath(searchPath)); err != nil {
return err
}

View File

@@ -54,7 +54,7 @@ func (c *DbClient) SetRequiredSessionSearchPath(ctx context.Context) error {
}
func (c *DbClient) LoadUserSearchPath(ctx context.Context) error {
conn, _, err := c.GetDatabaseConnectionWithRetries(ctx)
conn, err := c.managementPool.Acquire(ctx)
if err != nil {
return err
}
@@ -91,10 +91,12 @@ func (c *DbClient) ensureSessionSearchPath(ctx context.Context, session *db_comm
log.Printf("[TRACE] ensureSessionSearchPath")
// update the stored value of user search path
// this might have changed if a connection has been added/removed
if err := c.loadUserSearchPath(ctx, session.Connection.Conn()); err != nil {
return err
}
// get the required search path which is either a custom search path (if present) or the user search path
requiredSearchPath := c.GetRequiredSessionSearchPath()
// now determine whether the session search path is the same as the required search path
@@ -107,7 +109,11 @@ func (c *DbClient) ensureSessionSearchPath(ctx context.Context, session *db_comm
// so we need to set the search path
log.Printf("[TRACE] session search path will be updated to %s", strings.Join(c.customSearchPath, ","))
_, err := session.Connection.Exec(ctx, fmt.Sprintf("set search_path to %s", strings.Join(db_common.PgEscapeSearchPath(requiredSearchPath), ",")))
err := db_common.ExecuteSystemClientCall(ctx, session.Connection.Conn(), func(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(ctx, fmt.Sprintf("set search_path to %s", strings.Join(db_common.PgEscapeSearchPath(requiredSearchPath), ",")))
return err
})
if err == nil {
// update the session search path property
session.SearchPath = requiredSearchPath

View File

@@ -3,7 +3,6 @@ package db_client
import (
"context"
"fmt"
"log"
"time"
"github.com/jackc/pgx/v5/pgxpool"
@@ -12,15 +11,8 @@ import (
"github.com/turbot/steampipe/pkg/db/db_common"
)
func (c *DbClient) AcquireConnection(ctx context.Context) (*pgxpool.Conn, error) {
// get a database connection and query its backend pid
// note - this will retry if the connection is bad
conn, _, err := c.GetDatabaseConnectionWithRetries(ctx)
if err != nil {
return nil, err
}
return conn, nil
func (c *DbClient) AcquireManagementConnection(ctx context.Context) (*pgxpool.Conn, error) {
return c.managementPool.Acquire(ctx)
}
func (c *DbClient) AcquireSession(ctx context.Context) (sessionResult *db_common.AcquireSessionResult) {
@@ -28,8 +20,6 @@ func (c *DbClient) AcquireSession(ctx context.Context) (sessionResult *db_common
defer func() {
if sessionResult != nil && sessionResult.Session != nil {
sessionResult.Session.UpdateUsage()
// fail safe - if there is no database connection, ensure we return an error
// NOTE: this should not be necessary but an occasional crash is occurring with a nil connection
if sessionResult.Session.Connection == nil && sessionResult.Error == nil {
@@ -40,11 +30,12 @@ func (c *DbClient) AcquireSession(ctx context.Context) (sessionResult *db_common
// get a database connection and query its backend pid
// note - this will retry if the connection is bad
databaseConnection, backendPid, err := c.GetDatabaseConnectionWithRetries(ctx)
databaseConnection, err := c.userPool.Acquire(ctx)
if err != nil {
sessionResult.Error = err
return sessionResult
}
backendPid := databaseConnection.Conn().PgConn().PID()
c.sessionsMutex.Lock()
session, found := c.sessions[backendPid]
@@ -102,19 +93,3 @@ func (c *DbClient) AcquireSession(ctx context.Context) (sessionResult *db_common
sessionResult.Error = ctx.Err()
return sessionResult
}
func (c *DbClient) GetDatabaseConnectionWithRetries(ctx context.Context) (*pgxpool.Conn, uint32, error) {
// get a database connection from the pool
databaseConnection, err := c.pool.Acquire(ctx)
if err != nil {
if databaseConnection != nil {
databaseConnection.Release()
}
log.Printf("[TRACE] GetDatabaseConnectionWithRetries failed: %s", err.Error())
return nil, 0, err
}
backendPid := databaseConnection.Conn().PgConn().PID()
return databaseConnection, backendPid, nil
}

View File

@@ -0,0 +1,8 @@
package db_client
type ScanMetadataRow struct {
id int64 `db:"id"`
rowsFetched int64 `db:"rows_fetched"`
cacheHit bool `db:"cache_hit"`
hydrateCalls int64 `db:"hydrate_calls"`
}

View File

@@ -0,0 +1,19 @@
package db_common
import (
"strings"
"github.com/turbot/steampipe/pkg/constants"
)
func IsClientAppName(appName string) bool {
return strings.HasPrefix(appName, constants.ClientConnectionAppNamePrefix) && !strings.HasPrefix(appName, constants.ClientSystemConnectionAppNamePrefix)
}
func IsClientSystemAppName(appName string) bool {
return strings.HasPrefix(appName, constants.ClientSystemConnectionAppNamePrefix)
}
func IsServiceAppName(appName string) bool {
return strings.HasPrefix(appName, constants.ServiceConnectionAppNamePrefix)
}

View File

@@ -32,21 +32,25 @@ func SetCacheEnabled(ctx context.Context, enabled bool, connection *pgx.Conn) er
}
func executeCacheSetFunction(ctx context.Context, settingValue string, connection *pgx.Conn) error {
_, err := connection.Exec(ctx, fmt.Sprintf(
"select %s.%s('%s')",
constants.InternalSchema,
constants.FunctionCacheSet,
settingValue,
))
return err
return ExecuteSystemClientCall(ctx, connection, func(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(ctx, fmt.Sprintf(
"select %s.%s('%s')",
constants.InternalSchema,
constants.FunctionCacheSet,
settingValue,
))
return err
})
}
func executeCacheTtlSetFunction(ctx context.Context, seconds string, connection *pgx.Conn) error {
_, err := connection.Exec(ctx, fmt.Sprintf(
"select %s.%s('%s')",
constants.InternalSchema,
constants.FunctionCacheSetTtl,
seconds,
))
return err
return ExecuteSystemClientCall(ctx, connection, func(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(ctx, fmt.Sprintf(
"select %s.%s('%s')",
constants.InternalSchema,
constants.FunctionCacheSetTtl,
seconds,
))
return err
})
}

View File

@@ -8,15 +8,15 @@ import (
)
type Client interface {
Close(ctx context.Context) error
LoadUserSearchPath(ctx context.Context) error
Close(context.Context) error
LoadUserSearchPath(context.Context) error
SetRequiredSessionSearchPath(context.Context) error
GetRequiredSessionSearchPath() []string
GetCustomSearchPath() []string
// acquire a database connection - must be closed
AcquireConnection(ctx context.Context) (*pgxpool.Conn, error)
// acquire a management database connection - must be closed
AcquireManagementConnection(context.Context) (*pgxpool.Conn, error)
// acquire a query execution session (which search pathand cache options set) - must be closed
AcquireSession(context.Context) *AcquireSessionResult

View File

@@ -7,13 +7,13 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
)
// DatabaseSession wraps over the raw database/sql.Conn and also allows for retaining useful instrumentation
// DatabaseSession wraps over the raw database connection
// the purpose is to be able
// - to store the current search path of the connection without having to make a database round-trip
// - To store the last scan_metadata id used on this connection
type DatabaseSession struct {
BackendPid uint32 `json:"backend_pid"`
UsedCount int `json:"used"`
LastUsed time.Time `json:"last_used"`
SearchPath []string `json:"-"`
Initialized bool `json:"-"`
BackendPid uint32 `json:"backend_pid"`
SearchPath []string `json:"-"`
// this gets rewritten, since the database/sql gives back a new instance everytime
Connection *pgxpool.Conn `json:"-"`
@@ -28,12 +28,6 @@ func NewDBSession(backendPid uint32) *DatabaseSession {
}
}
// UpdateUsage updates the UsedCount of the DatabaseSession and also the lastUsed time
func (s *DatabaseSession) UpdateUsage() {
s.UsedCount++
s.LastUsed = time.Now()
}
func (s *DatabaseSession) Close(waitForCleanup bool) {
if s.Connection != nil {
if waitForCleanup {

View File

@@ -45,28 +45,38 @@ func LoadForeignSchemaNames(ctx context.Context, conn *pgx.Conn) ([]string, erro
return foreignSchemaNames, nil
}
func BuildSchemaMetadata(rows pgx.Rows) (_ *SchemaMetadata, err error) {
utils.LogTime("db.buildSchemaMetadata start")
defer func() {
utils.LogTime("db.buildSchemaMetadata end")
// ensure rows are closed
rows.Close()
}()
records, err := getSchemaRecordsFromRows(rows)
func LoadSchemaMetadata(ctx context.Context, conn *pgx.Conn, query string) (*SchemaMetadata, error) {
var schemaRecords []schemaRecord
rows, err := conn.Query(ctx, query)
if err != nil {
return nil, err
}
SchemaMetadata := NewSchemaMetadata()
defer rows.Close()
schemaRecords, err = getSchemaRecordsFromRows(rows)
if err != nil {
return nil, err
}
// build schema metadata from query result
return buildSchemaMetadata(schemaRecords)
}
func buildSchemaMetadata(records []schemaRecord) (_ *SchemaMetadata, err error) {
utils.LogTime("db.buildSchemaMetadata start")
defer func() {
utils.LogTime("db.buildSchemaMetadata end")
}()
schemaMetadata := NewSchemaMetadata()
utils.LogTime("db.buildSchemaMetadata.iteration start")
for _, record := range records {
if _, schemaFound := SchemaMetadata.Schemas[record.TableSchema]; !schemaFound {
SchemaMetadata.Schemas[record.TableSchema] = map[string]TableSchema{}
if _, schemaFound := schemaMetadata.Schemas[record.TableSchema]; !schemaFound {
schemaMetadata.Schemas[record.TableSchema] = map[string]TableSchema{}
}
if _, tblFound := SchemaMetadata.Schemas[record.TableSchema][record.TableName]; !tblFound {
SchemaMetadata.Schemas[record.TableSchema][record.TableName] = TableSchema{
if _, tblFound := schemaMetadata.Schemas[record.TableSchema][record.TableName]; !tblFound {
schemaMetadata.Schemas[record.TableSchema][record.TableName] = TableSchema{
Schema: record.TableSchema,
Name: record.TableName,
FullName: fmt.Sprintf("%s.%s", record.TableSchema, record.TableName),
@@ -75,7 +85,7 @@ func BuildSchemaMetadata(rows pgx.Rows) (_ *SchemaMetadata, err error) {
}
}
SchemaMetadata.Schemas[record.TableSchema][record.TableName].Columns[record.ColumnName] = ColumnSchema{
schemaMetadata.Schemas[record.TableSchema][record.TableName].Columns[record.ColumnName] = ColumnSchema{
Name: record.ColumnName,
NotNull: typeHelpers.StringToBool(record.IsNullable),
Type: record.DataType,
@@ -84,12 +94,12 @@ func BuildSchemaMetadata(rows pgx.Rows) (_ *SchemaMetadata, err error) {
}
if strings.HasPrefix(record.TableSchema, "pg_temp") {
SchemaMetadata.TemporarySchemaName = record.TableSchema
schemaMetadata.TemporarySchemaName = record.TableSchema
}
}
utils.LogTime("db.buildSchemaMetadata.iteration end")
return SchemaMetadata, err
return schemaMetadata, err
}
func getSchemaRecordsFromRows(rows pgx.Rows) ([]schemaRecord, error) {

View File

@@ -2,6 +2,7 @@ package db_common
import (
"context"
"errors"
"strings"
"github.com/jackc/pgx/v5"
@@ -52,23 +53,17 @@ func GetUserSearchPath(ctx context.Context, conn *pgx.Conn) ([]string, error) {
LEFT JOIN pg_database d ON d.oid = rs.setdatabase
WHERE r.rolname = 'steampipe'`
rows, err := conn.Query(ctx, query)
if err != nil {
rows := conn.QueryRow(ctx, query)
var configStrings []string
if err := rows.Scan(&configStrings); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return []string{}, nil
}
return nil, err
}
defer rows.Close()
for rows.Next() {
var configStrings []string
if err := rows.Scan(&configStrings); err != nil {
return nil, err
}
if len(configStrings) > 0 {
return BuildSearchPathResult(configStrings[0])
}
if len(configStrings) > 0 {
return BuildSearchPathResult(configStrings[0])
}
// should not get here
return nil, nil
}

View File

@@ -0,0 +1,47 @@
package db_common
import (
"context"
"fmt"
"log"
"github.com/jackc/pgx/v5"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/constants/runtime"
)
// SystemClientExecutor is the executor function that is called within a transaction
// make sure that by the time the executor finishes execution, the connection is freed
// otherwise we will get a `conn is busy` error
type SystemClientExecutor func(context.Context, pgx.Tx) error
// ExecuteSystemClientCall creates a transaction and sets the application_name to the
// one used by the system client, executes the callback and sets the application name back to the client app name
func ExecuteSystemClientCall(ctx context.Context, conn *pgx.Conn, executor SystemClientExecutor) error {
if !IsClientAppName(conn.Config().RuntimeParams[constants.RuntimeParamsKeyApplicationName]) {
// this should NEVER happen
return sperr.New("ExecuteSystemClientCall called with appname other than client: %s", conn.Config().RuntimeParams[constants.RuntimeParamsKeyApplicationName])
}
return pgx.BeginFunc(ctx, conn, func(tx pgx.Tx) (e error) {
// if the appName is the ClientAppName, we need to set it to ClientSystemAppName
// and then revert when done
_, err := tx.Exec(ctx, fmt.Sprintf("SET application_name TO '%s'", runtime.ClientSystemConnectionAppName))
if err != nil {
return sperr.WrapWithRootMessage(err, "could not set application name on connection")
}
defer func() {
// set back the original application name
_, e = tx.Exec(ctx, fmt.Sprintf("SET application_name TO '%s'", conn.Config().RuntimeParams[constants.RuntimeParamsKeyApplicationName]))
if e != nil {
log.Println("[TRACE] could not reset application_name", e)
}
}()
if err := executor(ctx, tx); err != nil {
return sperr.WrapWithMessage(err, "scoped execution failed with management client")
}
return nil
})
}

View File

@@ -92,7 +92,7 @@ func CreateLocalDbConnection(ctx context.Context, opts *CreateDbOptions) (*pgx.C
// set an app name so that we can track database connections from this Steampipe execution
// this is used to determine whether the database can safely be closed
connConfig.Config.RuntimeParams = map[string]string{
"application_name": runtime.PgClientAppName,
constants.RuntimeParamsKeyApplicationName: runtime.ServiceConnectionAppName,
}
err = db_common.AddRootCertToConfig(&connConfig.Config, getRootCertLocation())
if err != nil {
@@ -136,10 +136,8 @@ func CreateConnectionPool(ctx context.Context, opts *CreateDbOptions, maxConnect
connConfig.MaxConnLifetime = connMaxLifetime
connConfig.MaxConnIdleTime = connMaxIdleTime
// set an app name so that we can track database connections from this Steampipe execution
// this is used to determine whether the database can safely be closed
connConfig.ConnConfig.Config.RuntimeParams = map[string]string{
"application_name": runtime.PgClientAppName,
constants.RuntimeParamsKeyApplicationName: runtime.ServiceConnectionAppName,
}
// this returns connection pool
@@ -172,7 +170,7 @@ func createMaintenanceClient(ctx context.Context, port int) (*pgx.Conn, error) {
utils.LogTime("db_local.createMaintenanceClient start")
defer utils.LogTime("db_local.createMaintenanceClient end")
connStr := fmt.Sprintf("host=127.0.0.1 port=%d user=%s dbname=postgres sslmode=disable", port, constants.DatabaseSuperUser)
connStr := fmt.Sprintf("host=127.0.0.1 port=%d user=%s dbname=postgres sslmode=disable application_name=%s", port, constants.DatabaseSuperUser, runtime.ServiceConnectionAppName)
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(viper.GetInt(constants.ArgDatabaseStartTimeout))*time.Second)
defer cancel()

View File

@@ -16,60 +16,33 @@ func executeSqlAsRoot(ctx context.Context, statements ...string) ([]pgconn.Comma
}
return ExecuteSqlInTransaction(ctx, rootClient, statements...)
}
func executeSqlWithArgsAsRoot(ctx context.Context, queries ...db_common.QueryWithArgs) ([]pgconn.CommandTag, error) {
rootClient, err := CreateLocalDbConnection(ctx, &CreateDbOptions{Username: constants.DatabaseSuperUser})
if err != nil {
return nil, err
}
return ExecuteSqlWithArgsInTransaction(ctx, rootClient, queries...)
}
func ExecuteSqlInTransaction(ctx context.Context, conn *pgx.Conn, statements ...string) (results []pgconn.CommandTag, err error) {
tx, err := conn.Begin(ctx)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
tx.Rollback(ctx)
err = pgx.BeginFunc(ctx, conn, func(tx pgx.Tx) error {
for _, statement := range statements {
result, err := tx.Exec(ctx, statement)
if err != nil {
return err
}
results = append(results, result)
}
}()
for _, statement := range statements {
result, err := tx.Exec(ctx, statement)
if err != nil {
return nil, err
}
results = append(results, result)
}
if err := tx.Commit(ctx); err != nil {
return nil, err
}
return results, nil
return nil
})
return results, err
}
func ExecuteSqlWithArgsInTransaction(ctx context.Context, conn *pgx.Conn, queries ...db_common.QueryWithArgs) (results []pgconn.CommandTag, err error) {
tx, err := conn.Begin(ctx)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
tx.Rollback(ctx)
err = pgx.BeginFunc(ctx, conn, func(tx pgx.Tx) error {
for _, q := range queries {
result, err := tx.Exec(ctx, q.Query, q.Args...)
if err != nil {
// set the results to nil - so that we don't return stuff in an error return
results = nil
return err
}
results = append(results, result)
}
}()
for _, q := range queries {
result, err := tx.Exec(ctx, q.Query, q.Args...)
if err != nil {
return nil, err
}
results = append(results, result)
}
if err := tx.Commit(ctx); err != nil {
return nil, err
}
return results, nil
return nil
})
return results, err
}

View File

@@ -318,15 +318,7 @@ func runInstall(ctx context.Context, oldDbName *string) error {
// resolve the name of the database that is to be installed
databaseName := resolveDatabaseName(oldDbName)
// validate db name
firstCharacter := databaseName[0:1]
var ascii int
for _, r := range databaseName {
ascii = int(r)
break
}
if firstCharacter == "_" || (ascii >= 'a' && ascii <= 'z') {
log.Printf("[TRACE] valid database name: %s", databaseName)
} else {
if !isValidDatabaseName(databaseName) {
return fmt.Errorf("Invalid database name '%s' - must start with either a lowercase character or an underscore", databaseName)
}
@@ -389,6 +381,10 @@ func startServiceForInstall(port int) (*psutils.Process, error) {
return psutils.NewProcess(int32(postgresCmd.Process.Pid))
}
func isValidDatabaseName(databaseName string) bool {
return databaseName[0] == '_' || (databaseName[0] >= 'a' && databaseName[0] <= 'z')
}
func getNextFreePort() (int, error) {
utils.LogTime("db_local.install.getNextFreePort start")
defer utils.LogTime("db_local.install.getNextFreePort end")

View File

@@ -0,0 +1,19 @@
package db_local
import "testing"
func TestIsValidDatabaseName(t *testing.T) {
tests := map[string]bool{
"valid_name": true,
"_valid_name": true,
"InvalidName": false,
"123Invalid": false,
}
for dbName, expectedResult := range tests {
if actualResult := isValidDatabaseName(dbName); actualResult != expectedResult {
t.Logf("Expected %t for %s, but for %t", expectedResult, dbName, actualResult)
t.Fail()
}
}
}

View File

@@ -22,22 +22,22 @@ import (
// TagColumn is the tag used to specify the column name and type in the introspection tables
const TagColumn = "column"
func CreateIntrospectionTables(ctx context.Context, workspaceResources *modconfig.ResourceMaps, conn *pgx.Conn) error {
func CreateIntrospectionTables(ctx context.Context, workspaceResources *modconfig.ResourceMaps, tx pgx.Tx) error {
// get the sql for columns which every table has
commonColumnSql := getColumnDefinitions(modconfig.ResourceMetadata{})
// convert to lowercase to avoid case sensitivity
switch strings.ToLower(viper.GetString(constants.ArgIntrospection)) {
case constants.IntrospectionInfo:
return populateAllIntrospectionTables(ctx, workspaceResources, conn, commonColumnSql)
return populateAllIntrospectionTables(ctx, workspaceResources, tx, commonColumnSql)
case constants.IntrospectionControl:
return populateControlIntrospectionTables(ctx, workspaceResources, conn, commonColumnSql)
return populateControlIntrospectionTables(ctx, workspaceResources, tx, commonColumnSql)
default:
return nil
}
}
func populateAllIntrospectionTables(ctx context.Context, workspaceResources *modconfig.ResourceMaps, conn *pgx.Conn, commonColumnSql []string) error {
func populateAllIntrospectionTables(ctx context.Context, workspaceResources *modconfig.ResourceMaps, tx pgx.Tx, commonColumnSql []string) error {
utils.LogTime("db.CreateIntrospectionTables start")
defer utils.LogTime("db.CreateIntrospectionTables end")
@@ -48,7 +48,7 @@ func populateAllIntrospectionTables(ctx context.Context, workspaceResources *mod
insertSql := getTableInsertSql(workspaceResources)
sql := []string{createSql, insertSql}
_, err := conn.Exec(ctx, strings.Join(sql, "\n"))
_, err := tx.Exec(ctx, strings.Join(sql, "\n"))
if err != nil {
return fmt.Errorf("failed to create introspection tables: %v", err)
}
@@ -56,6 +56,25 @@ func populateAllIntrospectionTables(ctx context.Context, workspaceResources *mod
return ctx.Err()
}
func populateControlIntrospectionTables(ctx context.Context, workspaceResources *modconfig.ResourceMaps, tx pgx.Tx, commonColumnSql []string) error {
utils.LogTime("db.CreateIntrospectionTables start")
defer utils.LogTime("db.CreateIntrospectionTables end")
// get the create sql for control and benchmark tables
createSql := getCreateControlTablesSql(commonColumnSql)
// now get sql to populate the control and benchmark tables
insertSql := getControlTableInsertSql(workspaceResources)
sql := []string{createSql, insertSql}
_, err := tx.Exec(ctx, strings.Join(sql, "\n"))
if err != nil {
return fmt.Errorf("failed to create introspection tables: %v", err)
}
// return context error - this enables calling code to respond to cancellation
return ctx.Err()
}
func getCreateTablesSql(commonColumnSql []string) string {
var createSql []string
createSql = append(createSql, getTableCreateSqlForResource(&modconfig.Control{}, constants.IntrospectionTableControl, commonColumnSql))
@@ -179,25 +198,6 @@ func getTableCreateSqlForResource(s interface{}, tableName string, commonColumnS
return tableSql
}
func populateControlIntrospectionTables(ctx context.Context, workspaceResources *modconfig.ResourceMaps, conn *pgx.Conn, commonColumnSql []string) error {
utils.LogTime("db.CreateIntrospectionTables start")
defer utils.LogTime("db.CreateIntrospectionTables end")
// get the create sql for control and benchmark tables
createSql := getCreateControlTablesSql(commonColumnSql)
// now get sql to populate the control and benchmark tables
insertSql := getControlTableInsertSql(workspaceResources)
sql := []string{createSql, insertSql}
_, err := conn.Exec(ctx, strings.Join(sql, "\n"))
if err != nil {
return fmt.Errorf("failed to create introspection tables: %v", err)
}
// return context error - this enables calling code to respond to cancellation
return ctx.Err()
}
func getCreateControlTablesSql(commonColumnSql []string) string {
var createSql []string
createSql = append(createSql, getTableCreateSqlForResource(&modconfig.Control{}, constants.IntrospectionTableControl, commonColumnSql))

View File

@@ -5,11 +5,10 @@ import (
"fmt"
"log"
"os"
"strings"
psutils "github.com/shirou/gopsutil/process"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/constants/runtime"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/error_helpers"
"github.com/turbot/steampipe/pkg/filepaths"
"github.com/turbot/steampipe/pkg/pluginmanager"
@@ -124,7 +123,7 @@ GROUP BY application_name
counts := &ClientCount{}
rows, err := rootClient.Query(ctx, query, "client backend", runtime.PgClientAppName)
rows, err := rootClient.Query(ctx, query, "client backend", constants.ServiceConnectionAppNamePrefix)
if err != nil {
return nil, err
}
@@ -139,10 +138,13 @@ GROUP BY application_name
}
counts.TotalClients += count
if strings.HasPrefix(appName, constants.AppName) {
if db_common.IsClientAppName(appName) {
counts.SteampipeClients += count
}
if strings.HasPrefix(appName, runtime.PgClientAppNamePluginManagerPrefix) {
// plugin manager uses the service prefix
if db_common.IsServiceAppName(appName) {
counts.PluginManagerClients += count
}
}

View File

@@ -16,15 +16,14 @@ import (
"github.com/alecthomas/chroma/lexers"
"github.com/alecthomas/chroma/styles"
"github.com/c-bata/go-prompt"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/spf13/viper"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe/pkg/cmdconfig"
"github.com/turbot/steampipe/pkg/connection_sync"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/db/db_local"
"github.com/turbot/steampipe/pkg/display"
"github.com/turbot/steampipe/pkg/error_helpers"
"github.com/turbot/steampipe/pkg/interactive/metaquery"
@@ -563,7 +562,7 @@ func (c *InteractiveClient) getConnectionState(ctx context.Context) (steampipeco
statushooks.SetStatus(ctx, "Loading connection state…")
conn, err := c.client().AcquireConnection(ctx)
conn, err := c.client().AcquireManagementConnection(ctx)
if err != nil {
return nil, err
}
@@ -723,7 +722,7 @@ func (c *InteractiveClient) listenToPgNotifications(ctx context.Context) error {
}
log.Printf("[TRACE] Wait for notification")
notification, err := conn.WaitForNotification(ctx)
notification, err := conn.Conn().WaitForNotification(ctx)
if err != nil && !error_helpers.IsContextCancelledError(err) {
log.Printf("[INFO] Error waiting for notification: %s", err)
// TODO what to do about connection closed error
@@ -735,14 +734,14 @@ func (c *InteractiveClient) listenToPgNotifications(ctx context.Context) error {
}
log.Printf("[TRACE] Handled notification")
}
conn.Close(ctx)
conn.Release()
log.Printf("[TRACE] InteractiveClient listenToPgNotifications DONE")
return nil
}
func (c *InteractiveClient) getNotificationConnection(ctx context.Context) (*pgx.Conn, error) {
conn, err := db_local.CreateLocalDbConnection(ctx, &db_local.CreateDbOptions{Username: constants.DatabaseUser})
func (c *InteractiveClient) getNotificationConnection(ctx context.Context) (*pgxpool.Conn, error) {
conn, err := c.client().AcquireManagementConnection(ctx)
if err != nil {
return nil, err
}
@@ -751,7 +750,7 @@ func (c *InteractiveClient) getNotificationConnection(ctx context.Context) (*pgx
_, err = conn.Exec(ctx, listenSql)
if err != nil {
log.Printf("[INFO] Error listening to schema channel: %s", err)
conn.Close(ctx)
conn.Release()
return nil, err
}
return conn, nil

View File

@@ -18,7 +18,7 @@ import (
func (c *InteractiveClient) initialiseSuggestions(ctx context.Context) error {
log.Printf("[TRACE] initialiseSuggestions")
conn, err := c.client().AcquireConnection(ctx)
conn, err := c.client().AcquireManagementConnection(ctx)
if err != nil {
return err
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"crypto/md5"
"fmt"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr"
"log"
"os"
"os/exec"
@@ -22,6 +21,7 @@ import (
sdkproto "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
sdkshared "github.com/turbot/steampipe-plugin-sdk/v5/grpc/shared"
sdkplugin "github.com/turbot/steampipe-plugin-sdk/v5/plugin"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr"
"github.com/turbot/steampipe/pkg/connection"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_local"

View File

@@ -5,12 +5,18 @@ import (
"fmt"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
)
func Load(ctx context.Context, conn *pgx.Conn) (_ *db_common.ServerSettings, e error) {
func Load(ctx context.Context, pool *pgxpool.Pool) (serverSettings *db_common.ServerSettings, e error) {
conn, err := pool.Acquire(ctx)
if err != nil {
return nil, err
}
defer conn.Release()
defer func() {
// this function uses reflection to extract and convert values
// we need to be able to recover from panics while using reflection
@@ -18,12 +24,12 @@ func Load(ctx context.Context, conn *pgx.Conn) (_ *db_common.ServerSettings, e e
e = sperr.ToError(r, sperr.WithMessage("error loading server settings"))
}
}()
rows, err := conn.Query(ctx, fmt.Sprintf("SELECT * FROM %s.%s", constants.InternalSchema, constants.ServerSettingsTable))
if err != nil {
return nil, err
}
defer rows.Close()
return pgx.CollectOneRow(rows, pgx.RowToAddrOfStructByName[db_common.ServerSettings])
serverSettings, e = pgx.CollectOneRow(rows, pgx.RowToAddrOfStructByName[db_common.ServerSettings])
return
}

View File

@@ -107,9 +107,14 @@ func loadConnectionState(ctx context.Context, conn *pgx.Conn, opts ...loadConnec
log.Println("[TRACE] with config", config)
query := buildLoadConnectionStateQuery(config)
log.Println("[TRACE] running query", query)
rows, err := conn.Query(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
connectionStateList, err := pgx.CollectRows(rows, pgx.RowToStructByName[ConnectionState])
if err != nil {
// columns were added after the 0.20.0 release (connections for now)
// we need to handle the case where we are connected to an old version of
@@ -123,15 +128,9 @@ func loadConnectionState(ctx context.Context, conn *pgx.Conn, opts ...loadConnec
}
return nil, err
}
defer rows.Close()
var res = make(ConnectionStateMap)
connectionStateList, err := pgx.CollectRows(rows, pgx.RowToStructByName[ConnectionState])
if err != nil {
return nil, err
}
for _, c := range connectionStateList {
// copy into loop var
connectionState := c

View File

@@ -3,7 +3,9 @@ package workspace
import (
"context"
"errors"
"github.com/jackc/pgx/v5"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/db/db_local"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/utils"
@@ -19,18 +21,19 @@ func EnsureSessionData(ctx context.Context, source *modconfig.ResourceMaps, conn
return errors.New("nil conn passed to EnsureSessionData")
}
// check for introspection tables
// if the steampipe_mod table is missing, assume we have no session data - go ahead and create it
row := conn.QueryRow(ctx, "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema LIKE 'pg_temp%' AND table_name='steampipe_mod' ")
var count int
if err := row.Scan(&count); err != nil {
return err
}
if count == 0 {
if err := db_local.CreateIntrospectionTables(ctx, source, conn); err != nil {
return db_common.ExecuteSystemClientCall(ctx, conn, func(ctx context.Context, tx pgx.Tx) error {
// check for introspection tables
// if the steampipe_mod table is missing, assume we have no session data - go ahead and create it
row := tx.QueryRow(ctx, "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema LIKE 'pg_temp%' AND table_name='steampipe_mod' ")
var count int
if err := row.Scan(&count); err != nil {
return err
}
}
return nil
if count == 0 {
if err := db_local.CreateIntrospectionTables(ctx, source, tx); err != nil {
return err
}
}
return nil
})
}