PostgreSQL Store
import "github.com/zestor-dev/zestor/store/postgres"
The postgres package provides a PostgreSQL-backed implementation of store.Store using jackc/pgx/v5 with connection pooling. Writes go to a KV table and an outbox; a trigger sends NOTIFY so a background listener can drain the outbox and fan out Watch events—so watchers on different application instances can observe the same kinds.
Features
- Persistent storage with ACID transactions
- Cross-process watch via
LISTEN/NOTIFYand an outbox (ordered by outbox id) - Namespace / tenant id (
Namespaceoption maps tons_id) - Version and
updated_atmaintained on the KV row - No-op
Set: skipped when serialized bytes are unchanged (no outbox row)
Quick start
import (
"log"
"time"
"github.com/zestor-dev/zestor/codec"
"github.com/zestor-dev/zestor/store"
"github.com/zestor-dev/zestor/store/postgres"
)
type User struct {
Name string `json:"name"`
Email string `json:"email"`
}
func main() {
s, err := postgres.New[User](postgres.Options{
ConnString: "postgresql://user:pass@localhost:5432/mydb?sslmode=disable",
Codec: &codec.JSON{},
Timeout: 10 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer s.Close()
ch, cancel, err := s.Watch("users", store.WithInitialReplay[User]())
if err != nil {
log.Fatal(err)
}
defer cancel()
go func() {
for ev := range ch {
log.Printf("%s %s\n", ev.EventType, ev.Name)
}
}()
_, _ = s.Set("users", "alice", User{Name: "Alice", Email: "alice@example.com"})
}
Configuration
Options
type Options struct {
ConnString string // PostgreSQL URL (required), e.g. postgres://...
Codec codec.Codec // Required
Namespace int64 // Optional; default 0 — isolates rows per tenant
Timeout time.Duration // Per-operation timeout; default 10s
}
Connection strings
Use any DSN accepted by pgx, for example:
postgresql://user:password@host:5432/dbname?sslmode=requirepostgresql:///dbname?host=/var/run/postgresql(Unix socket)
Database schema
The store creates (if missing):
zestor_kv:(ns_id, kind, key)primary key,value BYTEA,version,updated_atzestor_outbox: append-only rows withetypeincreate/update/delete- Trigger on
zestor_outbox→pg_notify('zestor_events', payload)
Exact DDL is applied inside New; you normally do not manage these tables by hand except for outbox retention (delete aged rows in production).
Watch
Watch supports the same options as other implementations:
store.WithInitialReplay[T]()store.WithEventTypes[T](...)store.WithBufferSize[T](n)
Events are delivered after the transaction commits and the listener drains the outbox. Latency is usually low but not synchronous with the Set return in the same goroutine.
Codecs
Same as SQLite: pass any codec.Codec (Codecs — JSON, Protobuf, YAML, etc.).
Limitations
| Topic | Notes |
|---|---|
| Outbox growth | Plan retention / compaction |
SetFn | Requires existing key (ErrKeyNotFound otherwise), like sqlite / gomap |
| Validation / compare hooks | Not supported (same as sqlite) |
Testing
export POSTGRES_CONN='postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable'
go test ./...
If PostgreSQL is not reachable, tests skip.
See also
- SQLite store — embedded file, in-process watch only
- Implementations overview