c77_secure_db/c77_secure_db--1.0.sql

2100 lines
81 KiB
PL/PgSQL

-- c77_secure_db--1.0.sql: Complete rebuild with security-first design
-- Requires PostgreSQL 14 or later and pgcrypto extension
\echo 'Loading c77_secure_db extension v1.0...'
-- Validate dependencies
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pgcrypto') THEN
RAISE EXCEPTION 'The c77_secure_db extension requires the pgcrypto extension to be installed first.';
END IF;
END $$;
-- =============================================================================
-- SECURITY INFRASTRUCTURE
-- =============================================================================
-- Authorization tokens table - secure, short-lived tokens
CREATE TABLE c77_secure_db_auth_tokens (
token UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id TEXT NOT NULL DEFAULT pg_backend_pid()::text,
created_at TIMESTAMPTZ DEFAULT now(),
expires_at TIMESTAMPTZ DEFAULT (now() + interval '5 seconds'),
operation_type TEXT,
used BOOLEAN DEFAULT false
);
-- Performance index for token lookups
CREATE INDEX idx_c77_secure_db_auth_tokens_session_expires
ON c77_secure_db_auth_tokens(session_id, expires_at) WHERE NOT used;
-- Secure schemas registry
CREATE TABLE c77_secure_db_secure_schemas (
schema_name TEXT PRIMARY KEY,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
settings JSONB DEFAULT '{}'::jsonb
);
-- Operation audit log for compliance and monitoring
CREATE TABLE c77_secure_db_operation_audit (
id BIGSERIAL PRIMARY KEY,
operation_id UUID DEFAULT gen_random_uuid(),
schema_name TEXT NOT NULL,
table_name TEXT NOT NULL,
operation_type TEXT NOT NULL,
user_name TEXT DEFAULT current_user,
session_id TEXT DEFAULT pg_backend_pid()::text,
client_addr INET DEFAULT inet_client_addr(),
application_name TEXT DEFAULT current_setting('application_name', true),
data_hash TEXT,
rbac_user_id TEXT,
rbac_feature TEXT,
success BOOLEAN NOT NULL,
error_message TEXT,
execution_time_ms INTEGER,
created_at TIMESTAMPTZ DEFAULT now()
);
-- Audit table indexes
CREATE INDEX idx_c77_secure_db_audit_created_at ON c77_secure_db_operation_audit(created_at);
CREATE INDEX idx_c77_secure_db_audit_schema_table ON c77_secure_db_operation_audit(schema_name, table_name);
CREATE INDEX idx_c77_secure_db_audit_user ON c77_secure_db_operation_audit(user_name);
CREATE INDEX idx_c77_secure_db_audit_rbac_user ON c77_secure_db_operation_audit(rbac_user_id) WHERE rbac_user_id IS NOT NULL;
COMMENT ON TABLE c77_secure_db_auth_tokens IS 'Short-lived authorization tokens for secure operations';
COMMENT ON TABLE c77_secure_db_secure_schemas IS 'Registry of schemas under c77_secure_db protection';
COMMENT ON TABLE c77_secure_db_operation_audit IS 'Audit log of all secure database operations';
-- =============================================================================
-- SECURITY FUNCTIONS
-- =============================================================================
-- Create authorization token (SECURITY DEFINER - only authorized functions can call)
CREATE OR REPLACE FUNCTION c77_secure_db_create_auth_token(
p_operation_type TEXT DEFAULT 'generic'
) RETURNS UUID
SECURITY DEFINER
LANGUAGE plpgsql AS $$
DECLARE
v_token UUID;
BEGIN
-- Clean expired tokens first (maintenance)
DELETE FROM c77_secure_db_auth_tokens
WHERE expires_at < now() OR used = true;
-- Generate new token
INSERT INTO c77_secure_db_auth_tokens (session_id, operation_type)
VALUES (pg_backend_pid()::text, p_operation_type)
RETURNING token INTO v_token;
RETURN v_token;
END;
$$;
-- Validate authorization token (SECURITY DEFINER - only triggers can call)
CREATE OR REPLACE FUNCTION c77_secure_db_validate_auth_token(
p_token UUID
) RETURNS BOOLEAN
SECURITY DEFINER
LANGUAGE plpgsql AS $$
DECLARE
v_valid BOOLEAN := false;
BEGIN
-- Mark token as used and check validity atomically
UPDATE c77_secure_db_auth_tokens
SET used = true
WHERE token = p_token
AND session_id = pg_backend_pid()::text
AND expires_at > now()
AND used = false;
GET DIAGNOSTICS v_valid = FOUND;
-- Clean up used token immediately
DELETE FROM c77_secure_db_auth_tokens WHERE token = p_token;
RETURN v_valid;
END;
$$;
-- Secure trigger function - prevents all unauthorized modifications
CREATE OR REPLACE FUNCTION c77_secure_db_prevent_direct_modification()
RETURNS TRIGGER LANGUAGE plpgsql AS $$
DECLARE
v_token UUID;
v_valid BOOLEAN := false;
BEGIN
-- Try to get and validate authorization token
BEGIN
v_token := current_setting('c77_secure_db.auth_token')::UUID;
v_valid := c77_secure_db_validate_auth_token(v_token);
EXCEPTION WHEN OTHERS THEN
v_valid := false;
END;
-- Only allow operation if token is valid
IF v_valid THEN
CASE TG_OP
WHEN 'DELETE' THEN RETURN OLD;
WHEN 'INSERT' THEN RETURN NEW;
WHEN 'UPDATE' THEN RETURN NEW;
END CASE;
END IF;
-- Block unauthorized access with helpful error
RAISE EXCEPTION 'Direct modifications not allowed on secure table %.%. Use c77_secure_db_operation() function.',
TG_TABLE_SCHEMA, TG_TABLE_NAME
USING HINT = 'All table modifications must go through the secure operation API',
ERRCODE = 'insufficient_privilege';
END;
$$;
-- =============================================================================
-- CONTENT HASH AND INTEGRITY FUNCTIONS
-- =============================================================================
-- Calculate content hash for tamper detection
CREATE OR REPLACE FUNCTION c77_secure_db_calculate_content_hash(
p_schema_name TEXT,
p_table_name TEXT,
p_data JSONB
) RETURNS TEXT
LANGUAGE plpgsql STABLE AS $$
DECLARE
v_exclude_columns TEXT[] := ARRAY['id', 'content_hash', 'created_at', 'updated_at', 'deleted_at', 'hash_version'];
v_column_comment TEXT;
v_temp_exclude_columns TEXT[];
v_sorted_keys TEXT[];
v_hash_input TEXT := '';
v_key TEXT;
BEGIN
-- Get custom exclude columns from content_hash column comment if exists
BEGIN
SELECT col_description(
format('%I.%I', p_schema_name, p_table_name)::regclass::oid,
(SELECT attnum FROM pg_attribute
WHERE attrelid = format('%I.%I', p_schema_name, p_table_name)::regclass
AND attname = 'content_hash')
) INTO v_column_comment;
IF v_column_comment IS NOT NULL AND v_column_comment::jsonb ? 'exclude_hash_columns' THEN
v_temp_exclude_columns := ARRAY(
SELECT jsonb_array_elements_text(v_column_comment::jsonb->'exclude_hash_columns')
);
v_exclude_columns := v_exclude_columns || v_temp_exclude_columns;
END IF;
EXCEPTION WHEN OTHERS THEN
-- Ignore invalid JSON in comments
NULL;
END;
-- Get sorted keys for consistent hashing
SELECT array_agg(key ORDER BY key) INTO v_sorted_keys
FROM jsonb_object_keys(p_data) AS key
WHERE key != ALL(v_exclude_columns);
-- Build hash input string efficiently
FOREACH v_key IN ARRAY v_sorted_keys LOOP
v_hash_input := v_hash_input || v_key || ':' || COALESCE(p_data->>v_key, '') || '|';
END LOOP;
-- Return SHA-256 hash
RETURN encode(sha256(convert_to(v_hash_input, 'UTF8')), 'hex');
END;
$$;
-- Check if record data is fresh (not tampered with)
CREATE OR REPLACE FUNCTION c77_secure_db_check_freshness(
p_schema_name TEXT,
p_table_name TEXT,
p_data JSONB
) RETURNS JSONB
LANGUAGE plpgsql STABLE AS $$
DECLARE
v_stored_hash TEXT;
v_calculated_hash TEXT;
v_id TEXT;
v_hash_version INTEGER;
v_query TEXT;
v_data_cleaned JSONB;
v_special_columns TEXT[] := ARRAY['content_hash', 'created_at', 'updated_at', 'deleted_at', 'hash_version'];
v_column TEXT;
BEGIN
-- Validate inputs
IF p_schema_name IS NULL OR p_table_name IS NULL OR p_data IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Missing required parameters',
'timestamp', now()
);
END IF;
-- Extract primary key
v_id := p_data->>'id';
IF v_id IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Primary key "id" missing in input data',
'timestamp', now()
);
END IF;
-- Clean input data by removing special columns
v_data_cleaned := p_data;
FOREACH v_column IN ARRAY v_special_columns LOOP
v_data_cleaned := v_data_cleaned - v_column;
END LOOP;
-- Calculate hash of input data
v_calculated_hash := c77_secure_db_calculate_content_hash(p_schema_name, p_table_name, v_data_cleaned);
-- Build query to get stored hash
v_query := format(
'SELECT content_hash, COALESCE(hash_version, 1) FROM %I.%I WHERE id = $1',
p_schema_name, p_table_name
);
-- Add deleted_at filter if column exists
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = p_schema_name AND table_name = p_table_name AND column_name = 'deleted_at'
) THEN
v_query := v_query || ' AND deleted_at IS NULL';
END IF;
-- Execute query
EXECUTE v_query INTO v_stored_hash, v_hash_version USING v_id::BIGINT;
-- Check if record exists
IF v_stored_hash IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Record with id ' || v_id || ' not found or has been deleted',
'timestamp', now()
);
END IF;
-- Return comparison result
RETURN jsonb_build_object(
'success', true,
'id', v_id,
'fresh', (v_stored_hash = v_calculated_hash),
'stored_hash', v_stored_hash,
'calculated_hash', v_calculated_hash,
'hash_version', v_hash_version,
'timestamp', now()
);
EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object(
'success', false,
'error', SQLERRM,
'error_code', SQLSTATE,
'timestamp', now()
);
END;
$$;
-- =============================================================================
-- SCHEMA MANAGEMENT
-- =============================================================================
-- Manage secure schemas registry
CREATE OR REPLACE FUNCTION c77_secure_db_manage_secure_schemas(
p_operation TEXT,
p_schema_name TEXT DEFAULT NULL
) RETURNS JSONB
LANGUAGE plpgsql AS $$
DECLARE
v_operation TEXT := lower(p_operation);
v_schema_exists BOOLEAN;
v_row_count INTEGER;
BEGIN
-- Validate operation
IF v_operation NOT IN ('list', 'add', 'remove') THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Invalid operation. Must be one of: list, add, remove',
'timestamp', now()
);
END IF;
CASE v_operation
WHEN 'list' THEN
RETURN jsonb_build_object(
'success', true,
'schemas', (
SELECT COALESCE(jsonb_agg(
jsonb_build_object(
'schema_name', schema_name,
'created_at', created_at,
'updated_at', updated_at,
'settings', settings
)
), '[]'::jsonb)
FROM c77_secure_db_secure_schemas
),
'timestamp', now()
);
WHEN 'add' THEN
-- Validate schema name
IF p_schema_name IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Schema name is required for add operation',
'timestamp', now()
);
END IF;
-- Check if schema exists in database
SELECT EXISTS (
SELECT 1 FROM information_schema.schemata
WHERE schema_name = p_schema_name
) INTO v_schema_exists;
IF NOT v_schema_exists THEN
RETURN jsonb_build_object(
'success', false,
'error', format('Schema %I does not exist in database', p_schema_name),
'timestamp', now()
);
END IF;
-- Add to secure schemas
INSERT INTO c77_secure_db_secure_schemas (schema_name)
VALUES (p_schema_name)
ON CONFLICT (schema_name) DO UPDATE SET updated_at = now();
-- Apply triggers to existing tables
PERFORM c77_secure_db_apply_triggers(p_schema_name);
RETURN jsonb_build_object(
'success', true,
'message', format('Schema %I added to secure registry and triggers applied', p_schema_name),
'timestamp', now()
);
WHEN 'remove' THEN
IF p_schema_name IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Schema name is required for remove operation',
'timestamp', now()
);
END IF;
DELETE FROM c77_secure_db_secure_schemas WHERE schema_name = p_schema_name;
GET DIAGNOSTICS v_row_count = ROW_COUNT;
IF v_row_count > 0 THEN
RETURN jsonb_build_object(
'success', true,
'message', format('Schema %I removed from secure registry', p_schema_name),
'timestamp', now()
);
ELSE
RETURN jsonb_build_object(
'success', false,
'error', format('Schema %I not found in secure registry', p_schema_name),
'timestamp', now()
);
END IF;
END CASE;
EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object(
'success', false,
'error', SQLERRM,
'error_code', SQLSTATE,
'timestamp', now()
);
END;
$$;
-- Apply security triggers to all tables in a schema
CREATE OR REPLACE FUNCTION c77_secure_db_apply_triggers(p_schema_name TEXT)
RETURNS VOID
LANGUAGE plpgsql AS $$
DECLARE
v_table_name TEXT;
v_trigger_count INTEGER := 0;
BEGIN
FOR v_table_name IN
SELECT table_name
FROM information_schema.tables
WHERE table_schema = p_schema_name
AND table_type = 'BASE TABLE'
LOOP
-- Drop existing triggers first
EXECUTE format('DROP TRIGGER IF EXISTS c77_secure_db_prevent_insert ON %I.%I', p_schema_name, v_table_name);
EXECUTE format('DROP TRIGGER IF EXISTS c77_secure_db_prevent_update ON %I.%I', p_schema_name, v_table_name);
EXECUTE format('DROP TRIGGER IF EXISTS c77_secure_db_prevent_delete ON %I.%I', p_schema_name, v_table_name);
-- Create new triggers
EXECUTE format(
'CREATE TRIGGER c77_secure_db_prevent_insert BEFORE INSERT ON %I.%I ' ||
'FOR EACH ROW EXECUTE FUNCTION c77_secure_db_prevent_direct_modification()',
p_schema_name, v_table_name
);
EXECUTE format(
'CREATE TRIGGER c77_secure_db_prevent_update BEFORE UPDATE ON %I.%I ' ||
'FOR EACH ROW EXECUTE FUNCTION c77_secure_db_prevent_direct_modification()',
p_schema_name, v_table_name
);
EXECUTE format(
'CREATE TRIGGER c77_secure_db_prevent_delete BEFORE DELETE ON %I.%I ' ||
'FOR EACH ROW EXECUTE FUNCTION c77_secure_db_prevent_direct_modification()',
p_schema_name, v_table_name
);
v_trigger_count := v_trigger_count + 1;
END LOOP;
RAISE NOTICE 'Applied c77_secure_db triggers to % tables in schema %', v_trigger_count, p_schema_name;
END;
$$;
-- Auto-apply triggers when tables are created (event trigger)
CREATE OR REPLACE FUNCTION c77_secure_db_auto_apply_triggers()
RETURNS EVENT_TRIGGER
LANGUAGE plpgsql AS $$
DECLARE
v_obj RECORD;
v_schema_name TEXT;
BEGIN
FOR v_obj IN SELECT * FROM pg_event_trigger_ddl_commands() WHERE object_type = 'table' LOOP
v_schema_name := v_obj.schema_name;
-- Check if schema is in secure registry
IF EXISTS (SELECT 1 FROM c77_secure_db_secure_schemas WHERE schema_name = v_schema_name) THEN
PERFORM c77_secure_db_apply_triggers(v_schema_name);
RAISE NOTICE 'Auto-applied c77_secure_db triggers to new table %.%', v_schema_name, v_obj.object_identity;
END IF;
END LOOP;
END;
$$;
-- Create the event trigger
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_event_trigger WHERE evtname = 'c77_secure_db_event_auto_apply_triggers') THEN
CREATE EVENT TRIGGER c77_secure_db_event_auto_apply_triggers
ON DDL_COMMAND_END
WHEN TAG IN ('CREATE TABLE', 'ALTER TABLE')
EXECUTE FUNCTION c77_secure_db_auto_apply_triggers();
END IF;
END $$;
-- =============================================================================
-- ACCESS CONTROL AND ROLES
-- =============================================================================
-- Create security roles
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'c77_secure_db_admin') THEN
CREATE ROLE c77_secure_db_admin;
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'c77_secure_db_user') THEN
CREATE ROLE c77_secure_db_user;
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'c77_secure_db_readonly') THEN
CREATE ROLE c77_secure_db_readonly;
END IF;
END $$;
-- Set up proper permissions
-- Revoke dangerous permissions from PUBLIC
REVOKE ALL ON FUNCTION c77_secure_db_create_auth_token(TEXT) FROM PUBLIC;
REVOKE ALL ON FUNCTION c77_secure_db_validate_auth_token(UUID) FROM PUBLIC;
REVOKE ALL ON TABLE c77_secure_db_auth_tokens FROM PUBLIC;
REVOKE ALL ON TABLE c77_secure_db_secure_schemas FROM PUBLIC;
REVOKE ALL ON TABLE c77_secure_db_operation_audit FROM PUBLIC;
-- Grant appropriate permissions
-- Readonly role
GRANT EXECUTE ON FUNCTION c77_secure_db_check_freshness(TEXT, TEXT, JSONB) TO c77_secure_db_readonly;
GRANT EXECUTE ON FUNCTION c77_secure_db_calculate_content_hash(TEXT, TEXT, JSONB) TO c77_secure_db_readonly;
GRANT SELECT ON c77_secure_db_operation_audit TO c77_secure_db_readonly;
-- User role
GRANT c77_secure_db_readonly TO c77_secure_db_user;
GRANT EXECUTE ON FUNCTION c77_secure_db_create_auth_token(TEXT) TO c77_secure_db_user;
-- Admin role
GRANT c77_secure_db_user TO c77_secure_db_admin;
GRANT EXECUTE ON FUNCTION c77_secure_db_manage_secure_schemas(TEXT, TEXT) TO c77_secure_db_admin;
GRANT EXECUTE ON FUNCTION c77_secure_db_apply_triggers(TEXT) TO c77_secure_db_admin;
GRANT ALL ON c77_secure_db_secure_schemas TO c77_secure_db_admin;
GRANT SELECT, INSERT ON c77_secure_db_operation_audit TO c77_secure_db_admin;
-- Comments for documentation
COMMENT ON FUNCTION c77_secure_db_create_auth_token(TEXT) IS 'Creates short-lived authorization token for secure operations';
COMMENT ON FUNCTION c77_secure_db_validate_auth_token(UUID) IS 'Validates and consumes authorization token';
COMMENT ON FUNCTION c77_secure_db_prevent_direct_modification() IS 'Trigger function preventing unauthorized table modifications';
COMMENT ON FUNCTION c77_secure_db_calculate_content_hash(TEXT, TEXT, JSONB) IS 'Calculates SHA-256 hash for tamper detection';
COMMENT ON FUNCTION c77_secure_db_check_freshness(TEXT, TEXT, JSONB) IS 'Verifies record integrity by comparing content hashes';
COMMENT ON FUNCTION c77_secure_db_manage_secure_schemas(TEXT, TEXT) IS 'Manages registry of schemas under secure protection';
COMMENT ON FUNCTION c77_secure_db_apply_triggers(TEXT) IS 'Applies security triggers to all tables in a schema';
-- Add this to the main extension SQL file
-- =============================================================================
-- MAIN SECURE OPERATION FUNCTION
-- =============================================================================
-- Main secure operation function with optional RBAC integration
CREATE OR REPLACE FUNCTION c77_secure_db_operation(
p_json_data JSONB,
p_check_rbac BOOLEAN DEFAULT false,
p_required_feature TEXT DEFAULT NULL,
p_scope_type TEXT DEFAULT NULL,
p_scope_id TEXT DEFAULT NULL
) RETURNS JSONB
LANGUAGE plpgsql SECURITY DEFINER AS $$
DECLARE
v_start_time TIMESTAMPTZ := clock_timestamp();
v_auth_token UUID;
v_rbac_available BOOLEAN := false;
v_external_id TEXT;
v_operation_id UUID := gen_random_uuid();
-- Operation parameters
v_schema_name TEXT;
v_table_name TEXT;
v_operation TEXT;
v_primary_key TEXT := 'id';
v_data JSONB;
-- Processing variables
v_data_cleaned JSONB;
v_content_hash TEXT;
v_hash_version INTEGER := 1;
v_columns TEXT[];
v_values TEXT[];
v_update_pairs TEXT[];
v_row_count INTEGER;
v_result JSONB;
-- Schema introspection
v_has_content_hash BOOLEAN;
v_has_created_at BOOLEAN;
v_has_updated_at BOOLEAN;
v_has_deleted_at BOOLEAN;
v_has_hash_version BOOLEAN;
v_primary_key_type TEXT;
-- Error handling
v_execution_time_ms INTEGER;
v_error_message TEXT;
BEGIN
-- STEP 1: Validate input parameters
IF p_json_data IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Input JSON data is required',
'timestamp', now()
);
END IF;
-- Extract parameters
v_schema_name := p_json_data->>'schema_name';
v_table_name := p_json_data->>'table_name';
v_operation := lower(p_json_data->>'operation');
v_primary_key := COALESCE(p_json_data->>'primary_key', 'id');
v_data := p_json_data->'data';
-- Validate required parameters
IF v_schema_name IS NULL OR v_table_name IS NULL OR v_operation IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Missing required fields: schema_name, table_name, or operation',
'timestamp', now()
);
END IF;
IF v_operation NOT IN ('insert', 'update', 'upsert', 'delete', 'soft_delete') THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Invalid operation. Must be one of: insert, update, upsert, delete, soft_delete',
'timestamp', now()
);
END IF;
-- STEP 2: Check RBAC if requested and available
IF p_check_rbac AND p_required_feature IS NOT NULL THEN
SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'c77_rbac') INTO v_rbac_available;
IF v_rbac_available THEN
v_external_id := current_setting('c77_rbac.external_id', true);
IF v_external_id IS NULL OR v_external_id = '' THEN
RETURN jsonb_build_object(
'success', false,
'error', 'RBAC enabled but no user context set',
'hint', 'Set c77_rbac.external_id session variable',
'timestamp', now()
);
END IF;
-- Perform RBAC check using c77_rbac extension
IF NOT c77_rbac_can_access(
p_required_feature,
v_external_id,
COALESCE(p_scope_type, 'global'),
COALESCE(p_scope_id, 'all')
) THEN
-- Log unauthorized attempt
INSERT INTO c77_secure_db_operation_audit (
operation_id, schema_name, table_name, operation_type,
rbac_user_id, rbac_feature, success, error_message
) VALUES (
v_operation_id, v_schema_name, v_table_name, v_operation,
v_external_id, p_required_feature, false, 'Insufficient permissions'
);
RETURN jsonb_build_object(
'success', false,
'error', 'Insufficient permissions',
'required_feature', p_required_feature,
'user_id', v_external_id,
'timestamp', now()
);
END IF;
END IF;
END IF;
-- STEP 3: Table introspection
SELECT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = v_schema_name AND table_name = v_table_name AND column_name = 'content_hash'
) INTO v_has_content_hash;
SELECT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = v_schema_name AND table_name = v_table_name AND column_name = 'created_at'
) INTO v_has_created_at;
SELECT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = v_schema_name AND table_name = v_table_name AND column_name = 'updated_at'
) INTO v_has_updated_at;
SELECT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = v_schema_name AND table_name = v_table_name AND column_name = 'deleted_at'
) INTO v_has_deleted_at;
SELECT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = v_schema_name AND table_name = v_table_name AND column_name = 'hash_version'
) INTO v_has_hash_version;
-- Get primary key data type
SELECT data_type INTO v_primary_key_type
FROM information_schema.columns
WHERE table_schema = v_schema_name AND table_name = v_table_name AND column_name = v_primary_key;
-- STEP 4: Create authorization token and set context
v_auth_token := c77_secure_db_create_auth_token(v_operation);
PERFORM set_config('c77_secure_db.auth_token', v_auth_token::text, true);
-- STEP 5: Prepare data for operation
-- Clean data by removing system columns
v_data_cleaned := v_data;
IF v_data_cleaned ? 'content_hash' THEN v_data_cleaned := v_data_cleaned - 'content_hash'; END IF;
IF v_data_cleaned ? 'created_at' THEN v_data_cleaned := v_data_cleaned - 'created_at'; END IF;
IF v_data_cleaned ? 'updated_at' THEN v_data_cleaned := v_data_cleaned - 'updated_at'; END IF;
IF v_data_cleaned ? 'deleted_at' THEN v_data_cleaned := v_data_cleaned - 'deleted_at'; END IF;
IF v_data_cleaned ? 'hash_version' THEN v_data_cleaned := v_data_cleaned - 'hash_version'; END IF;
-- Calculate content hash if table supports it
IF v_has_content_hash AND v_operation IN ('insert', 'update', 'upsert') THEN
v_content_hash := c77_secure_db_calculate_content_hash(v_schema_name, v_table_name, v_data_cleaned);
END IF;
-- Prepare columns and values for SQL generation
SELECT array_agg(key), array_agg(quote_literal(v_data->>key))
INTO v_columns, v_values
FROM jsonb_object_keys(v_data) AS key
WHERE key NOT IN ('content_hash', 'created_at', 'updated_at', 'deleted_at', 'hash_version');
-- Add system columns to insert/update
IF v_operation IN ('insert', 'update', 'upsert') THEN
IF v_has_content_hash AND v_content_hash IS NOT NULL THEN
v_columns := v_columns || ARRAY['content_hash'];
v_values := v_values || ARRAY[quote_literal(v_content_hash)];
END IF;
IF v_has_hash_version THEN
v_columns := v_columns || ARRAY['hash_version'];
v_values := v_values || ARRAY[quote_literal(v_hash_version)];
END IF;
IF v_has_created_at AND v_operation IN ('insert', 'upsert') THEN
v_columns := v_columns || ARRAY['created_at'];
v_values := v_values || ARRAY[quote_literal(now())];
END IF;
IF v_has_updated_at THEN
v_columns := v_columns || ARRAY['updated_at'];
v_values := v_values || ARRAY[quote_literal(now())];
END IF;
END IF;
-- STEP 6: Execute the operation
CASE v_operation
WHEN 'insert' THEN
EXECUTE format(
'INSERT INTO %I.%I (%s) VALUES (%s) RETURNING *',
v_schema_name, v_table_name,
array_to_string(v_columns, ','),
array_to_string(v_values, ',')
);
GET DIAGNOSTICS v_row_count = ROW_COUNT;
WHEN 'update' THEN
IF NOT (v_data ? v_primary_key) THEN
RAISE EXCEPTION 'Primary key "%" required for update operation', v_primary_key;
END IF;
-- Build update SET clause
v_update_pairs := ARRAY(
SELECT format('%I = %s', key, quote_literal(v_data->>key))
FROM jsonb_object_keys(v_data) AS key
WHERE key NOT IN ('content_hash', 'created_at', 'updated_at', 'deleted_at', 'hash_version', v_primary_key)
);
-- Add system columns to update
IF v_has_updated_at THEN
v_update_pairs := v_update_pairs || ARRAY[format('updated_at = %L', now())];
END IF;
IF v_has_content_hash AND v_content_hash IS NOT NULL THEN
v_update_pairs := v_update_pairs || ARRAY[format('content_hash = %L', v_content_hash)];
END IF;
IF v_has_hash_version THEN
v_update_pairs := v_update_pairs || ARRAY[format('hash_version = %L', v_hash_version)];
END IF;
EXECUTE format(
'UPDATE %I.%I SET %s WHERE %I = ($1)::%s RETURNING *',
v_schema_name, v_table_name,
array_to_string(v_update_pairs, ','),
v_primary_key, COALESCE(v_primary_key_type, 'bigint')
) USING (v_data->>v_primary_key);
GET DIAGNOSTICS v_row_count = ROW_COUNT;
WHEN 'upsert' THEN
-- Simple upsert using primary key conflict
EXECUTE format(
'INSERT INTO %I.%I (%s) VALUES (%s) ON CONFLICT (%I) DO UPDATE SET %s RETURNING *',
v_schema_name, v_table_name,
array_to_string(v_columns, ','),
array_to_string(v_values, ','),
v_primary_key,
array_to_string(ARRAY(
SELECT format('%I = EXCLUDED.%I', col, col)
FROM unnest(v_columns) AS col
WHERE col != v_primary_key
), ',')
);
GET DIAGNOSTICS v_row_count = ROW_COUNT;
WHEN 'delete' THEN
IF NOT (v_data ? v_primary_key) THEN
RAISE EXCEPTION 'Primary key "%" required for delete operation', v_primary_key;
END IF;
EXECUTE format(
'DELETE FROM %I.%I WHERE %I = ($1)::%s',
v_schema_name, v_table_name,
v_primary_key, COALESCE(v_primary_key_type, 'bigint')
) USING (v_data->>v_primary_key);
GET DIAGNOSTICS v_row_count = ROW_COUNT;
WHEN 'soft_delete' THEN
IF NOT (v_data ? v_primary_key) THEN
RAISE EXCEPTION 'Primary key "%" required for soft_delete operation', v_primary_key;
END IF;
IF NOT v_has_deleted_at THEN
RAISE EXCEPTION 'Table %.% does not have deleted_at column for soft delete', v_schema_name, v_table_name;
END IF;
v_update_pairs := ARRAY[format('deleted_at = %L', now())];
IF v_has_updated_at THEN
v_update_pairs := v_update_pairs || ARRAY[format('updated_at = %L', now())];
END IF;
EXECUTE format(
'UPDATE %I.%I SET %s WHERE %I = ($1)::%s AND deleted_at IS NULL RETURNING *',
v_schema_name, v_table_name,
array_to_string(v_update_pairs, ','),
v_primary_key, COALESCE(v_primary_key_type, 'bigint')
) USING (v_data->>v_primary_key);
GET DIAGNOSTICS v_row_count = ROW_COUNT;
END CASE;
-- STEP 7: Calculate execution time and prepare result
v_execution_time_ms := EXTRACT(epoch FROM (clock_timestamp() - v_start_time)) * 1000;
-- Build success result
v_result := jsonb_build_object(
'success', true,
'operation', v_operation,
'schema_name', v_schema_name,
'table_name', v_table_name,
'rows_affected', v_row_count,
'execution_time_ms', v_execution_time_ms,
'operation_id', v_operation_id,
'timestamp', now()
);
-- Add optional fields
IF v_content_hash IS NOT NULL THEN
v_result := v_result || jsonb_build_object('content_hash', v_content_hash);
END IF;
IF p_check_rbac AND v_rbac_available THEN
v_result := v_result || jsonb_build_object(
'rbac_check_performed', true,
'rbac_user_id', v_external_id,
'required_feature', p_required_feature
);
END IF;
-- STEP 8: Log successful operation
INSERT INTO c77_secure_db_operation_audit (
operation_id, schema_name, table_name, operation_type,
data_hash, rbac_user_id, rbac_feature, success, execution_time_ms
) VALUES (
v_operation_id, v_schema_name, v_table_name, v_operation,
v_content_hash, v_external_id, p_required_feature, true, v_execution_time_ms
);
-- STEP 9: Clean up and return
PERFORM set_config('c77_secure_db.auth_token', '', true);
RETURN v_result;
EXCEPTION WHEN OTHERS THEN
-- Error handling and cleanup
v_error_message := SQLERRM;
v_execution_time_ms := EXTRACT(epoch FROM (clock_timestamp() - v_start_time)) * 1000;
-- Always clean up authorization token
PERFORM set_config('c77_secure_db.auth_token', '', true);
-- Log failed operation
BEGIN
INSERT INTO c77_secure_db_operation_audit (
operation_id, schema_name, table_name, operation_type,
rbac_user_id, rbac_feature, success, error_message, execution_time_ms
) VALUES (
v_operation_id, COALESCE(v_schema_name, 'unknown'), COALESCE(v_table_name, 'unknown'),
COALESCE(v_operation, 'unknown'), v_external_id, p_required_feature,
false, v_error_message, v_execution_time_ms
);
EXCEPTION WHEN OTHERS THEN
-- Ignore audit logging errors to avoid masking the original error
NULL;
END;
RETURN jsonb_build_object(
'success', false,
'error', v_error_message,
'error_code', SQLSTATE,
'operation_id', v_operation_id,
'execution_time_ms', v_execution_time_ms,
'timestamp', now()
);
END;
$$;
-- Backward compatibility function (original signature)
CREATE OR REPLACE FUNCTION c77_secure_db_operation(p_json_data JSONB)
RETURNS JSONB LANGUAGE plpgsql AS $$
BEGIN
RETURN c77_secure_db_operation(p_json_data, false, NULL, NULL, NULL);
END;
$$;
-- Grant permissions for the main operation function
GRANT EXECUTE ON FUNCTION c77_secure_db_operation(JSONB, BOOLEAN, TEXT, TEXT, TEXT) TO c77_secure_db_user;
GRANT EXECUTE ON FUNCTION c77_secure_db_operation(JSONB) TO c77_secure_db_user;
COMMENT ON FUNCTION c77_secure_db_operation(JSONB, BOOLEAN, TEXT, TEXT, TEXT) IS 'Main secure operation function with optional RBAC integration';
COMMENT ON FUNCTION c77_secure_db_operation(JSONB) IS 'Backward compatible secure operation function';
-- Add these to the main extension SQL file
-- =============================================================================
-- VERIFICATION AND UTILITY FUNCTIONS
-- =============================================================================
-- Verify content hashes for all records in a table
CREATE OR REPLACE FUNCTION c77_secure_db_verify_content_hashes(
p_schema_name TEXT,
p_table_name TEXT,
p_fix_mismatches BOOLEAN DEFAULT false,
p_batch_size INTEGER DEFAULT 1000
) RETURNS JSONB
LANGUAGE plpgsql AS $$
DECLARE
v_cursor REFCURSOR;
v_record RECORD;
v_data JSONB;
v_data_cleaned JSONB;
v_calculated_hash TEXT;
v_stored_hash TEXT;
v_mismatches JSONB[] := '{}';
v_total_records INTEGER := 0;
v_mismatch_count INTEGER := 0;
v_fixed_count INTEGER := 0;
v_batch_count INTEGER := 0;
v_auth_token UUID;
v_special_columns TEXT[] := ARRAY['content_hash', 'created_at', 'updated_at', 'deleted_at', 'hash_version'];
v_column TEXT;
v_query TEXT;
v_primary_key TEXT;
BEGIN
-- Validate table exists and has content_hash column
IF NOT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = p_schema_name AND table_name = p_table_name
) THEN
RETURN jsonb_build_object(
'success', false,
'error', format('Table %I.%I does not exist', p_schema_name, p_table_name),
'timestamp', now()
);
END IF;
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = p_schema_name AND table_name = p_table_name AND column_name = 'content_hash'
) THEN
RETURN jsonb_build_object(
'success', false,
'error', format('Table %I.%I does not have a content_hash column', p_schema_name, p_table_name),
'timestamp', now()
);
END IF;
-- Get primary key
SELECT a.attname INTO v_primary_key
FROM pg_constraint c
JOIN pg_attribute a ON a.attrelid = c.conrelid AND a.attnum = ANY(c.conkey)
WHERE c.conrelid = format('%I.%I', p_schema_name, p_table_name)::regclass
AND c.contype = 'p'
LIMIT 1;
IF v_primary_key IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', format('Table %I.%I does not have a primary key', p_schema_name, p_table_name),
'timestamp', now()
);
END IF;
-- Create authorization token if we need to fix mismatches
IF p_fix_mismatches THEN
v_auth_token := c77_secure_db_create_auth_token('hash_verification');
PERFORM set_config('c77_secure_db.auth_token', v_auth_token::text, true);
END IF;
-- Build query to fetch all records
v_query := format(
'SELECT * FROM %I.%I ORDER BY %I',
p_schema_name, p_table_name, v_primary_key
);
-- Open cursor
OPEN v_cursor FOR EXECUTE v_query;
LOOP
FETCH v_cursor INTO v_record;
EXIT WHEN NOT FOUND;
v_total_records := v_total_records + 1;
v_batch_count := v_batch_count + 1;
-- Convert record to JSONB and clean it
v_data := row_to_json(v_record)::jsonb;
v_data_cleaned := v_data;
-- Remove special columns
FOREACH v_column IN ARRAY v_special_columns LOOP
v_data_cleaned := v_data_cleaned - v_column;
END LOOP;
-- Remove primary key from hash calculation
v_data_cleaned := v_data_cleaned - v_primary_key;
-- Calculate hash
v_calculated_hash := c77_secure_db_calculate_content_hash(p_schema_name, p_table_name, v_data_cleaned);
v_stored_hash := v_data->>'content_hash';
-- Check for mismatch
IF v_calculated_hash != v_stored_hash THEN
v_mismatch_count := v_mismatch_count + 1;
-- Record mismatch details
v_mismatches := v_mismatches || jsonb_build_object(
'primary_key_value', v_data->>v_primary_key,
'stored_hash', v_stored_hash,
'calculated_hash', v_calculated_hash,
'hash_version', COALESCE(v_data->>'hash_version', '1')
);
-- Fix if requested
IF p_fix_mismatches THEN
EXECUTE format(
'UPDATE %I.%I SET content_hash = $1, hash_version = COALESCE(hash_version, 1) + 1, updated_at = now() WHERE %I = $2',
p_schema_name, p_table_name, v_primary_key
) USING v_calculated_hash, (v_data->>v_primary_key)::BIGINT;
v_fixed_count := v_fixed_count + 1;
END IF;
END IF;
-- Progress notification
IF v_batch_count >= p_batch_size THEN
v_batch_count := 0;
RAISE NOTICE 'Processed % records, found % mismatches so far', v_total_records, v_mismatch_count;
END IF;
END LOOP;
CLOSE v_cursor;
-- Clean up authorization token
IF p_fix_mismatches THEN
PERFORM set_config('c77_secure_db.auth_token', '', true);
END IF;
RETURN jsonb_build_object(
'success', true,
'total_records', v_total_records,
'mismatch_count', v_mismatch_count,
'fixed_count', v_fixed_count,
'mismatches', v_mismatches,
'batch_size', p_batch_size,
'timestamp', now()
);
EXCEPTION WHEN OTHERS THEN
-- Clean up on error
IF p_fix_mismatches THEN
PERFORM set_config('c77_secure_db.auth_token', '', true);
END IF;
BEGIN CLOSE v_cursor; EXCEPTION WHEN OTHERS THEN NULL; END;
RETURN jsonb_build_object(
'success', false,
'error', SQLERRM,
'error_code', SQLSTATE,
'records_processed', v_total_records,
'timestamp', now()
);
END;
$$;
-- Bulk freshness checking
CREATE OR REPLACE FUNCTION c77_secure_db_check_freshness_bulk(
p_schema_name TEXT,
p_table_name TEXT,
p_data JSONB
) RETURNS JSONB
LANGUAGE plpgsql STABLE AS $$
DECLARE
v_record JSONB;
v_results JSONB[] := '{}';
v_result JSONB;
v_total_count INTEGER := 0;
v_fresh_count INTEGER := 0;
v_stale_count INTEGER := 0;
v_error_count INTEGER := 0;
BEGIN
-- Validate input is array
IF jsonb_typeof(p_data) != 'array' THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Input data must be a JSONB array',
'timestamp', now()
);
END IF;
-- Process each record
FOR v_record IN SELECT jsonb_array_elements(p_data) LOOP
v_total_count := v_total_count + 1;
-- Check freshness of this record
v_result := c77_secure_db_check_freshness(p_schema_name, p_table_name, v_record);
v_results := v_results || v_result;
-- Count results
IF (v_result->>'success')::BOOLEAN THEN
IF (v_result->>'fresh')::BOOLEAN THEN
v_fresh_count := v_fresh_count + 1;
ELSE
v_stale_count := v_stale_count + 1;
END IF;
ELSE
v_error_count := v_error_count + 1;
END IF;
END LOOP;
RETURN jsonb_build_object(
'success', true,
'total_records', v_total_count,
'fresh_records', v_fresh_count,
'stale_records', v_stale_count,
'error_records', v_error_count,
'results', v_results,
'timestamp', now()
);
EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object(
'success', false,
'error', SQLERRM,
'error_code', SQLSTATE,
'records_processed', v_total_count,
'timestamp', now()
);
END;
$$;
-- Generate operation templates for easier usage
CREATE OR REPLACE FUNCTION c77_secure_db_get_operation_template(
p_schema_name TEXT,
p_table_name TEXT,
p_operation TEXT
) RETURNS TEXT
LANGUAGE plpgsql AS $$
DECLARE
v_operation TEXT := lower(p_operation);
v_exclude_columns TEXT[] := ARRAY['id', 'content_hash', 'created_at', 'updated_at', 'deleted_at', 'hash_version'];
v_columns TEXT[];
v_primary_key TEXT;
v_data_template JSONB;
v_template JSONB;
BEGIN
-- Validate inputs
IF NOT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = p_schema_name AND table_name = p_table_name
) THEN
RETURN format('-- Error: Table %I.%I does not exist', p_schema_name, p_table_name);
END IF;
IF v_operation NOT IN ('insert', 'update', 'upsert', 'delete', 'soft_delete') THEN
RETURN '-- Error: Invalid operation. Must be one of: insert, update, upsert, delete, soft_delete';
END IF;
-- Get primary key
SELECT a.attname INTO v_primary_key
FROM pg_constraint c
JOIN pg_attribute a ON a.attrelid = c.conrelid AND a.attnum = ANY(c.conkey)
WHERE c.conrelid = format('%I.%I', p_schema_name, p_table_name)::regclass
AND c.contype = 'p'
LIMIT 1;
-- Get columns for data template
SELECT array_agg(column_name) INTO v_columns
FROM information_schema.columns
WHERE table_schema = p_schema_name AND table_name = p_table_name
AND column_name != ALL(v_exclude_columns);
-- Build data template
v_data_template := jsonb_object_agg(
column_name,
CASE
WHEN data_type IN ('character varying', 'text') THEN '""'
WHEN data_type IN ('integer', 'bigint', 'smallint') THEN '0'
WHEN data_type = 'boolean' THEN 'false'
WHEN data_type IN ('timestamp with time zone', 'timestamp without time zone') THEN '"2025-01-01T00:00:00Z"'
WHEN data_type = 'jsonb' THEN '{}'
ELSE 'null'
END
)
FROM information_schema.columns
WHERE table_schema = p_schema_name AND table_name = p_table_name
AND column_name != ALL(v_exclude_columns);
-- Add primary key for operations that need it
IF v_operation IN ('update', 'upsert', 'delete', 'soft_delete') AND v_primary_key IS NOT NULL THEN
v_data_template := v_data_template || jsonb_build_object(v_primary_key, 0);
END IF;
-- Build template
v_template := jsonb_build_object(
'schema_name', p_schema_name,
'table_name', p_table_name,
'operation', v_operation,
'data', v_data_template
);
-- Add primary key field for operations that need it
IF v_operation IN ('update', 'delete', 'soft_delete') THEN
v_template := v_template || jsonb_build_object('primary_key', COALESCE(v_primary_key, 'id'));
END IF;
-- Return formatted SQL
RETURN format(
E'-- %s operation template for %I.%I\nSELECT c77_secure_db_operation(\n''%s''::jsonb\n);',
upper(v_operation), p_schema_name, p_table_name, jsonb_pretty(v_template)
);
EXCEPTION WHEN OTHERS THEN
RETURN format('-- Error generating template: %s', SQLERRM);
END;
$$;
-- Maintenance function to clean up expired tokens
CREATE OR REPLACE FUNCTION c77_secure_db_cleanup_expired_tokens()
RETURNS INTEGER
LANGUAGE plpgsql AS $$
DECLARE
v_deleted_count INTEGER;
BEGIN
DELETE FROM c77_secure_db_auth_tokens
WHERE expires_at < (now() - interval '1 minute') OR used = true;
GET DIAGNOSTICS v_deleted_count = ROW_COUNT;
RETURN v_deleted_count;
END;
$$;
-- System health check function
CREATE OR REPLACE FUNCTION c77_secure_db_health_check()
RETURNS JSONB
LANGUAGE plpgsql AS $$
DECLARE
v_rbac_available BOOLEAN;
v_secure_schemas_count INTEGER;
v_active_tokens INTEGER;
v_recent_operations INTEGER;
v_recent_errors INTEGER;
BEGIN
-- Check c77_rbac availability
SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'c77_rbac') INTO v_rbac_available;
-- Count secure schemas
SELECT count(*) INTO v_secure_schemas_count FROM c77_secure_db_secure_schemas;
-- Count active tokens
SELECT count(*) INTO v_active_tokens FROM c77_secure_db_auth_tokens WHERE expires_at > now();
-- Count recent operations (last hour)
SELECT count(*) INTO v_recent_operations
FROM c77_secure_db_operation_audit
WHERE created_at > now() - interval '1 hour';
-- Count recent errors (last hour)
SELECT count(*) INTO v_recent_errors
FROM c77_secure_db_operation_audit
WHERE created_at > now() - interval '1 hour' AND success = false;
RETURN jsonb_build_object(
'success', true,
'extension_version', '2.0',
'rbac_available', v_rbac_available,
'secure_schemas_count', v_secure_schemas_count,
'active_tokens', v_active_tokens,
'recent_operations_1h', v_recent_operations,
'recent_errors_1h', v_recent_errors,
'error_rate_1h', CASE
WHEN v_recent_operations > 0 THEN round((v_recent_errors::numeric / v_recent_operations * 100), 2)
ELSE 0
END,
'timestamp', now()
);
END;
$$;
-- Grant permissions for utility functions
GRANT EXECUTE ON FUNCTION c77_secure_db_verify_content_hashes(TEXT, TEXT, BOOLEAN, INTEGER) TO c77_secure_db_admin;
GRANT EXECUTE ON FUNCTION c77_secure_db_check_freshness_bulk(TEXT, TEXT, JSONB) TO c77_secure_db_readonly;
GRANT EXECUTE ON FUNCTION c77_secure_db_get_operation_template(TEXT, TEXT, TEXT) TO c77_secure_db_user;
GRANT EXECUTE ON FUNCTION c77_secure_db_cleanup_expired_tokens() TO c77_secure_db_admin;
GRANT EXECUTE ON FUNCTION c77_secure_db_health_check() TO c77_secure_db_readonly;
-- Add comments
COMMENT ON FUNCTION c77_secure_db_verify_content_hashes(TEXT, TEXT, BOOLEAN, INTEGER) IS 'Verifies content hashes for all records in a table with optional fixing';
COMMENT ON FUNCTION c77_secure_db_check_freshness_bulk(TEXT, TEXT, JSONB) IS 'Bulk freshness checking for multiple records';
COMMENT ON FUNCTION c77_secure_db_get_operation_template(TEXT, TEXT, TEXT) IS 'Generates SQL templates for secure operations';
COMMENT ON FUNCTION c77_secure_db_cleanup_expired_tokens() IS 'Maintenance function to clean up expired authorization tokens';
COMMENT ON FUNCTION c77_secure_db_health_check() IS 'System health check and status report';
-- =============================================================================
-- TESTING AND VALIDATION FRAMEWORK
-- =============================================================================
-- Comprehensive security test suite
CREATE OR REPLACE FUNCTION c77_secure_db_test_security()
RETURNS JSONB
LANGUAGE plpgsql AS $$
DECLARE
v_test_results JSONB := '{}';
v_tests_passed INTEGER := 0;
v_tests_failed INTEGER := 0;
v_test_schema TEXT := 'c77_test_' || extract(epoch from now())::bigint;
v_operation_result JSONB;
BEGIN
-- Create test environment
EXECUTE format('CREATE SCHEMA %I', v_test_schema);
-- Create test table
EXECUTE format('
CREATE TABLE %I.test_secure_table (
id BIGSERIAL PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
content_hash TEXT,
hash_version INTEGER DEFAULT 1,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
deleted_at TIMESTAMPTZ
)', v_test_schema);
-- Register schema as secure
PERFORM c77_secure_db_manage_secure_schemas('add', v_test_schema);
-- TEST 1: Verify direct INSERT is blocked
BEGIN
EXECUTE format('INSERT INTO %I.test_secure_table (name) VALUES (''bypass_test'')', v_test_schema);
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'direct_insert_blocked', jsonb_build_object(
'status', 'FAILED',
'message', 'Direct INSERT was allowed - CRITICAL SECURITY FLAW!'
)
);
EXCEPTION WHEN insufficient_privilege THEN
v_tests_passed := v_tests_passed + 1;
v_test_results := v_test_results || jsonb_build_object(
'direct_insert_blocked', jsonb_build_object(
'status', 'PASSED',
'message', 'Direct INSERT correctly blocked'
)
);
END;
-- TEST 2: Verify direct UPDATE is blocked
BEGIN
EXECUTE format('UPDATE %I.test_secure_table SET name = ''hacked'' WHERE id = 1', v_test_schema);
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'direct_update_blocked', jsonb_build_object(
'status', 'FAILED',
'message', 'Direct UPDATE was allowed - CRITICAL SECURITY FLAW!'
)
);
EXCEPTION WHEN insufficient_privilege THEN
v_tests_passed := v_tests_passed + 1;
v_test_results := v_test_results || jsonb_build_object(
'direct_update_blocked', jsonb_build_object(
'status', 'PASSED',
'message', 'Direct UPDATE correctly blocked'
)
);
END;
-- TEST 3: Verify direct DELETE is blocked
BEGIN
EXECUTE format('DELETE FROM %I.test_secure_table WHERE id = 1', v_test_schema);
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'direct_delete_blocked', jsonb_build_object(
'status', 'FAILED',
'message', 'Direct DELETE was allowed - CRITICAL SECURITY FLAW!'
)
);
EXCEPTION WHEN insufficient_privilege THEN
v_tests_passed := v_tests_passed + 1;
v_test_results := v_test_results || jsonb_build_object(
'direct_delete_blocked', jsonb_build_object(
'status', 'PASSED',
'message', 'Direct DELETE correctly blocked'
)
);
END;
-- TEST 4: Verify legitimate secure operation works
BEGIN
SELECT c77_secure_db_operation(jsonb_build_object(
'schema_name', v_test_schema,
'table_name', 'test_secure_table',
'operation', 'insert',
'data', jsonb_build_object('name', 'legitimate_test', 'description', 'This should work')
)) INTO v_operation_result;
IF (v_operation_result->>'success')::boolean THEN
v_tests_passed := v_tests_passed + 1;
v_test_results := v_test_results || jsonb_build_object(
'legitimate_operation', jsonb_build_object(
'status', 'PASSED',
'message', 'Secure operation succeeded',
'operation_id', v_operation_result->>'operation_id'
)
);
ELSE
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'legitimate_operation', jsonb_build_object(
'status', 'FAILED',
'message', 'Secure operation failed: ' || (v_operation_result->>'error')
)
);
END IF;
EXCEPTION WHEN OTHERS THEN
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'legitimate_operation', jsonb_build_object(
'status', 'ERROR',
'message', SQLERRM
)
);
END;
-- TEST 5: Test token expiration (simulate expired token)
-- Comprehensive security test suite
CREATE OR REPLACE FUNCTION c77_secure_db_test_security()
RETURNS JSONB
LANGUAGE plpgsql AS $$
DECLARE
v_test_results JSONB := '{}';
v_tests_passed INTEGER := 0;
v_tests_failed INTEGER := 0;
v_test_schema TEXT := 'c77_test_' || extract(epoch from now())::bigint;
v_operation_result JSONB;
BEGIN
-- Create test environment
EXECUTE format('CREATE SCHEMA %I', v_test_schema);
-- Create test table
EXECUTE format('
CREATE TABLE %I.test_secure_table (
id BIGSERIAL PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
content_hash TEXT,
hash_version INTEGER DEFAULT 1,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
deleted_at TIMESTAMPTZ
)', v_test_schema);
-- Register schema as secure
PERFORM c77_secure_db_manage_secure_schemas('add', v_test_schema);
-- TEST 1: Verify direct INSERT is blocked
BEGIN
EXECUTE format('INSERT INTO %I.test_secure_table (name) VALUES (''bypass_test'')', v_test_schema);
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'direct_insert_blocked', jsonb_build_object(
'status', 'FAILED',
'message', 'Direct INSERT was allowed - CRITICAL SECURITY FLAW!'
)
);
EXCEPTION WHEN insufficient_privilege THEN
v_tests_passed := v_tests_passed + 1;
v_test_results := v_test_results || jsonb_build_object(
'direct_insert_blocked', jsonb_build_object(
'status', 'PASSED',
'message', 'Direct INSERT correctly blocked'
)
);
END;
-- TEST 2: Verify direct UPDATE is blocked
BEGIN
EXECUTE format('UPDATE %I.test_secure_table SET name = ''hacked'' WHERE id = 1', v_test_schema);
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'direct_update_blocked', jsonb_build_object(
'status', 'FAILED',
'message', 'Direct UPDATE was allowed - CRITICAL SECURITY FLAW!'
)
);
EXCEPTION WHEN insufficient_privilege THEN
v_tests_passed := v_tests_passed + 1;
v_test_results := v_test_results || jsonb_build_object(
'direct_update_blocked', jsonb_build_object(
'status', 'PASSED',
'message', 'Direct UPDATE correctly blocked'
)
);
END;
-- TEST 3: Verify direct DELETE is blocked
BEGIN
EXECUTE format('DELETE FROM %I.test_secure_table WHERE id = 1', v_test_schema);
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'direct_delete_blocked', jsonb_build_object(
'status', 'FAILED',
'message', 'Direct DELETE was allowed - CRITICAL SECURITY FLAW!'
)
);
EXCEPTION WHEN insufficient_privilege THEN
v_tests_passed := v_tests_passed + 1;
v_test_results := v_test_results || jsonb_build_object(
'direct_delete_blocked', jsonb_build_object(
'status', 'PASSED',
'message', 'Direct DELETE correctly blocked'
)
);
END;
-- TEST 4: Verify legitimate secure operation works
BEGIN
SELECT c77_secure_db_operation(jsonb_build_object(
'schema_name', v_test_schema,
'table_name', 'test_secure_table',
'operation', 'insert',
'data', jsonb_build_object('name', 'legitimate_test', 'description', 'This should work')
)) INTO v_operation_result;
IF (v_operation_result->>'success')::boolean THEN
v_tests_passed := v_tests_passed + 1;
v_test_results := v_test_results || jsonb_build_object(
'legitimate_operation', jsonb_build_object(
'status', 'PASSED',
'message', 'Secure operation succeeded',
'operation_id', v_operation_result->>'operation_id'
)
);
ELSE
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'legitimate_operation', jsonb_build_object(
'status', 'FAILED',
'message', 'Secure operation failed: ' || (v_operation_result->>'error')
)
);
END IF;
EXCEPTION WHEN OTHERS THEN
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'legitimate_operation', jsonb_build_object(
'status', 'ERROR',
'message', SQLERRM
)
);
END;
-- TEST 5: Test token expiration (simulate expired token)
BEGIN
-- This test verifies that expired tokens don't work
PERFORM set_config('c77_secure_db.auth_token', gen_random_uuid()::text, true);
EXECUTE format('INSERT INTO %I.test_secure_table (name) VALUES (''token_test'')', v_test_schema);
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'token_expiration', jsonb_build_object(
'status', 'FAILED',
'message', 'Invalid token was accepted - SECURITY FLAW!'
)
);
EXCEPTION WHEN insufficient_privilege THEN
v_tests_passed := v_tests_passed + 1;
v_test_results := v_test_results || jsonb_build_object(
'token_expiration', jsonb_build_object(
'status', 'PASSED',
'message', 'Invalid token correctly rejected'
)
);
WHEN OTHERS THEN
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'token_expiration', jsonb_build_object(
'status', 'ERROR',
'message', SQLERRM
)
);
END;
-- Clean up token after TEST 5
PERFORM set_config('c77_secure_db.auth_token', '', true);
-- TEST 6: Test hash calculation and verification
BEGIN
-- Insert a record and verify its hash
SELECT c77_secure_db_operation(jsonb_build_object(
'schema_name', v_test_schema,
'table_name', 'test_secure_table',
'operation', 'insert',
'data', jsonb_build_object('name', 'hash_test', 'description', 'Test hash calculation')
)) INTO v_operation_result;
IF (v_operation_result->>'success')::boolean AND (v_operation_result->>'content_hash') IS NOT NULL THEN
-- Now verify the hash
DECLARE
v_freshness_result JSONB;
v_record_data JSONB;
BEGIN
-- Get the inserted record data (simulated)
v_record_data := jsonb_build_object(
'id', 1, -- Assuming first record
'name', 'hash_test',
'description', 'Test hash calculation'
);
v_freshness_result := c77_secure_db_check_freshness(v_test_schema, 'test_secure_table', v_record_data);
IF (v_freshness_result->>'success')::boolean AND (v_freshness_result->>'fresh')::boolean THEN
v_tests_passed := v_tests_passed + 1;
v_test_results := v_test_results || jsonb_build_object(
'hash_verification', jsonb_build_object(
'status', 'PASSED',
'message', 'Hash calculation and verification working correctly'
)
);
ELSE
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'hash_verification', jsonb_build_object(
'status', 'FAILED',
'message', 'Hash verification failed: ' || (v_freshness_result->>'error')
)
);
END IF;
END;
ELSE
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'hash_verification', jsonb_build_object(
'status', 'FAILED',
'message', 'Hash was not calculated during insert'
)
);
END IF;
EXCEPTION WHEN OTHERS THEN
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'hash_verification', jsonb_build_object(
'status', 'ERROR',
'message', SQLERRM
)
);
END;
-- Cleanup test environment
BEGIN
PERFORM c77_secure_db_manage_secure_schemas('remove', v_test_schema);
EXECUTE format('DROP SCHEMA %I CASCADE', v_test_schema);
EXCEPTION WHEN OTHERS THEN
-- Log cleanup failure but don't fail the test
v_test_results := v_test_results || jsonb_build_object(
'cleanup_warning', 'Failed to cleanup test schema: ' || SQLERRM
);
END;
RETURN jsonb_build_object(
'test_suite', 'c77_secure_db_security',
'version', '2.0',
'summary', jsonb_build_object(
'tests_passed', v_tests_passed,
'tests_failed', v_tests_failed,
'total_tests', v_tests_passed + v_tests_failed,
'success_rate', CASE
WHEN (v_tests_passed + v_tests_failed) > 0 THEN
round((v_tests_passed::numeric / (v_tests_passed + v_tests_failed) * 100), 2)
ELSE 0
END
),
'overall_status', CASE
WHEN v_tests_failed = 0 THEN 'ALL_TESTS_PASSED'
ELSE 'SECURITY_ISSUES_DETECTED'
END,
'test_results', v_test_results,
'test_schema_used', v_test_schema,
'timestamp', now()
);
END;
$$;
-- TEST 6: Test hash calculation and verification
BEGIN
-- Insert a record and verify its hash
SELECT c77_secure_db_operation(jsonb_build_object(
'schema_name', v_test_schema,
'table_name', 'test_secure_table',
'operation', 'insert',
'data', jsonb_build_object('name', 'hash_test', 'description', 'Test hash calculation')
)) INTO v_operation_result;
IF (v_operation_result->>'success')::boolean AND (v_operation_result->>'content_hash') IS NOT NULL THEN
-- Now verify the hash
DECLARE
v_freshness_result JSONB;
v_record_data JSONB;
BEGIN
-- Get the inserted record data (simulated)
v_record_data := jsonb_build_object(
'id', 1, -- Assuming first record
'name', 'hash_test',
'description', 'Test hash calculation'
);
v_freshness_result := c77_secure_db_check_freshness(v_test_schema, 'test_secure_table', v_record_data);
IF (v_freshness_result->>'success')::boolean AND (v_freshness_result->>'fresh')::boolean THEN
v_tests_passed := v_tests_passed + 1;
v_test_results := v_test_results || jsonb_build_object(
'hash_verification', jsonb_build_object(
'status', 'PASSED',
'message', 'Hash calculation and verification working correctly'
)
);
ELSE
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'hash_verification', jsonb_build_object(
'status', 'FAILED',
'message', 'Hash verification failed: ' || (v_freshness_result->>'error')
)
);
END IF;
END;
ELSE
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'hash_verification', jsonb_build_object(
'status', 'FAILED',
'message', 'Hash was not calculated during insert'
)
);
END IF;
EXCEPTION WHEN OTHERS THEN
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'hash_verification', jsonb_build_object(
'status', 'ERROR',
'message', SQLERRM
)
);
END;
-- Cleanup test environment
BEGIN
PERFORM c77_secure_db_manage_secure_schemas('remove', v_test_schema);
EXECUTE format('DROP SCHEMA %I CASCADE', v_test_schema);
EXCEPTION WHEN OTHERS THEN
-- Log cleanup failure but don't fail the test
v_test_results := v_test_results || jsonb_build_object(
'cleanup_warning', 'Failed to cleanup test schema: ' || SQLERRM
);
END;
RETURN jsonb_build_object(
'test_suite', 'c77_secure_db_security',
'version', '2.0',
'summary', jsonb_build_object(
'tests_passed', v_tests_passed,
'tests_failed', v_tests_failed,
'total_tests', v_tests_passed + v_tests_failed,
'success_rate', CASE
WHEN (v_tests_passed + v_tests_failed) > 0 THEN
round((v_tests_passed::numeric / (v_tests_passed + v_tests_failed) * 100), 2)
ELSE 0
END
),
'overall_status', CASE
WHEN v_tests_failed = 0 THEN 'ALL_TESTS_PASSED'
ELSE 'SECURITY_ISSUES_DETECTED'
END,
'test_results', v_test_results,
'test_schema_used', v_test_schema,
'timestamp', now()
);
END;
$$;
-- Test RBAC integration
CREATE OR REPLACE FUNCTION c77_secure_db_test_rbac_integration()
RETURNS JSONB
LANGUAGE plpgsql AS $$
DECLARE
v_rbac_available BOOLEAN;
v_test_results JSONB := '{}';
v_tests_passed INTEGER := 0;
v_tests_failed INTEGER := 0;
v_test_schema TEXT := 'c77_rbac_test_' || extract(epoch from now())::bigint;
v_operation_result JSONB;
BEGIN
-- Check if c77_rbac is available
SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'c77_rbac') INTO v_rbac_available;
IF NOT v_rbac_available THEN
RETURN jsonb_build_object(
'test_suite', 'c77_secure_db_rbac_integration',
'rbac_available', false,
'message', 'c77_rbac extension not available - skipping RBAC tests',
'overall_status', 'SKIPPED',
'timestamp', now()
);
END IF;
-- Create test environment
EXECUTE format('CREATE SCHEMA %I', v_test_schema);
EXECUTE format('
CREATE TABLE %I.rbac_test_table (
id BIGSERIAL PRIMARY KEY,
name TEXT NOT NULL,
content_hash TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
)', v_test_schema);
PERFORM c77_secure_db_manage_secure_schemas('add', v_test_schema);
-- Set up RBAC test data
PERFORM c77_rbac_assign_subject('test_user', 'secure_operator', 'global', 'all');
PERFORM c77_rbac_grant_feature('secure_operator', 'secure_db_insert');
PERFORM c77_rbac_grant_feature('secure_operator', 'secure_db_update');
-- TEST 1: Operation with valid RBAC permissions should succeed
BEGIN
PERFORM set_config('c77_rbac.external_id', 'test_user', true);
SELECT c77_secure_db_operation(
jsonb_build_object(
'schema_name', v_test_schema,
'table_name', 'rbac_test_table',
'operation', 'insert',
'data', jsonb_build_object('name', 'rbac_test_record')
),
true, -- check_rbac
'secure_db_insert'
) INTO v_operation_result;
IF (v_operation_result->>'success')::boolean THEN
v_tests_passed := v_tests_passed + 1;
v_test_results := v_test_results || jsonb_build_object(
'valid_rbac_permission', jsonb_build_object(
'status', 'PASSED',
'message', 'Operation with valid RBAC permission succeeded'
)
);
ELSE
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'valid_rbac_permission', jsonb_build_object(
'status', 'FAILED',
'message', 'Operation failed despite valid RBAC permission: ' || (v_operation_result->>'error')
)
);
END IF;
EXCEPTION WHEN OTHERS THEN
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'valid_rbac_permission', jsonb_build_object(
'status', 'ERROR',
'message', SQLERRM
)
);
END;
-- TEST 2: Operation without required RBAC permission should fail
BEGIN
PERFORM set_config('c77_rbac.external_id', 'test_user', true);
SELECT c77_secure_db_operation(
jsonb_build_object(
'schema_name', v_test_schema,
'table_name', 'rbac_test_table',
'operation', 'insert',
'data', jsonb_build_object('name', 'unauthorized_record')
),
true, -- check_rbac
'secure_db_delete' -- User doesn't have this permission
) INTO v_operation_result;
IF NOT (v_operation_result->>'success')::boolean AND (v_operation_result->>'error') LIKE '%Insufficient permissions%' THEN
v_tests_passed := v_tests_passed + 1;
v_test_results := v_test_results || jsonb_build_object(
'invalid_rbac_permission', jsonb_build_object(
'status', 'PASSED',
'message', 'Operation correctly blocked due to insufficient RBAC permissions'
)
);
ELSE
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'invalid_rbac_permission', jsonb_build_object(
'status', 'FAILED',
'message', 'Operation should have been blocked by RBAC but was not'
)
);
END IF;
EXCEPTION WHEN OTHERS THEN
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'invalid_rbac_permission', jsonb_build_object(
'status', 'ERROR',
'message', SQLERRM
)
);
END;
-- TEST 3: Operation without user context should fail
BEGIN
PERFORM set_config('c77_rbac.external_id', '', true);
SELECT c77_secure_db_operation(
jsonb_build_object(
'schema_name', v_test_schema,
'table_name', 'rbac_test_table',
'operation', 'insert',
'data', jsonb_build_object('name', 'no_context_record')
),
true, -- check_rbac
'secure_db_insert'
) INTO v_operation_result;
IF NOT (v_operation_result->>'success')::boolean AND (v_operation_result->>'error') LIKE '%no user context%' THEN
v_tests_passed := v_tests_passed + 1;
v_test_results := v_test_results || jsonb_build_object(
'no_user_context', jsonb_build_object(
'status', 'PASSED',
'message', 'Operation correctly blocked due to missing user context'
)
);
ELSE
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'no_user_context', jsonb_build_object(
'status', 'FAILED',
'message', 'Operation should have been blocked due to missing user context'
)
);
END IF;
EXCEPTION WHEN OTHERS THEN
v_tests_failed := v_tests_failed + 1;
v_test_results := v_test_results || jsonb_build_object(
'no_user_context', jsonb_build_object(
'status', 'ERROR',
'message', SQLERRM
)
);
END;
-- Cleanup RBAC test data
BEGIN
PERFORM c77_rbac_revoke_subject_role('test_user', 'secure_operator', 'global', 'all');
PERFORM c77_rbac_revoke_feature('secure_operator', 'secure_db_insert');
PERFORM c77_rbac_revoke_feature('secure_operator', 'secure_db_update');
EXCEPTION WHEN OTHERS THEN
-- Ignore cleanup errors
NULL;
END;
-- Cleanup test environment
BEGIN
PERFORM c77_secure_db_manage_secure_schemas('remove', v_test_schema);
EXECUTE format('DROP SCHEMA %I CASCADE', v_test_schema);
EXCEPTION WHEN OTHERS THEN
v_test_results := v_test_results || jsonb_build_object(
'cleanup_warning', 'Failed to cleanup test schema: ' || SQLERRM
);
END;
RETURN jsonb_build_object(
'test_suite', 'c77_secure_db_rbac_integration',
'rbac_available', true,
'summary', jsonb_build_object(
'tests_passed', v_tests_passed,
'tests_failed', v_tests_failed,
'total_tests', v_tests_passed + v_tests_failed,
'success_rate', CASE
WHEN (v_tests_passed + v_tests_failed) > 0 THEN
round((v_tests_passed::numeric / (v_tests_passed + v_tests_failed) * 100), 2)
ELSE 0
END
),
'overall_status', CASE
WHEN v_tests_failed = 0 THEN 'ALL_TESTS_PASSED'
ELSE 'RBAC_INTEGRATION_ISSUES'
END,
'test_results', v_test_results,
'timestamp', now()
);
END;
$$;
-- Master test runner - runs all tests
CREATE OR REPLACE FUNCTION c77_secure_db_run_all_tests()
RETURNS JSONB
LANGUAGE plpgsql AS $$
DECLARE
v_security_results JSONB;
v_rbac_results JSONB;
v_health_results JSONB;
v_overall_status TEXT := 'ALL_TESTS_PASSED';
BEGIN
-- Run security tests
v_security_results := c77_secure_db_test_security();
-- Run RBAC integration tests
v_rbac_results := c77_secure_db_test_rbac_integration();
-- Run health check
v_health_results := c77_secure_db_health_check();
-- Determine overall status
IF (v_security_results->>'overall_status') != 'ALL_TESTS_PASSED' THEN
v_overall_status := 'SECURITY_ISSUES_DETECTED';
ELSIF (v_rbac_results->>'overall_status') NOT IN ('ALL_TESTS_PASSED', 'SKIPPED') THEN
v_overall_status := 'RBAC_INTEGRATION_ISSUES';
END IF;
RETURN jsonb_build_object(
'test_suite', 'c77_secure_db_complete',
'version', '2.0',
'overall_status', v_overall_status,
'security_tests', v_security_results,
'rbac_integration_tests', v_rbac_results,
'health_check', v_health_results,
'recommendation', CASE v_overall_status
WHEN 'ALL_TESTS_PASSED' THEN 'Extension is ready for production use'
WHEN 'SECURITY_ISSUES_DETECTED' THEN 'CRITICAL: Fix security issues before any production use'
WHEN 'RBAC_INTEGRATION_ISSUES' THEN 'RBAC integration needs attention - can use without RBAC'
ELSE 'Review test results and address issues'
END,
'timestamp', now()
);
END;
$$;
-- Grant permissions for test functions
GRANT EXECUTE ON FUNCTION c77_secure_db_test_security() TO c77_secure_db_admin;
GRANT EXECUTE ON FUNCTION c77_secure_db_test_rbac_integration() TO c77_secure_db_admin;
GRANT EXECUTE ON FUNCTION c77_secure_db_run_all_tests() TO c77_secure_db_admin;
COMMENT ON FUNCTION c77_secure_db_test_security() IS 'Comprehensive security test suite for the extension';
COMMENT ON FUNCTION c77_secure_db_test_rbac_integration() IS 'Tests integration with c77_rbac extension';
COMMENT ON FUNCTION c77_secure_db_run_all_tests() IS 'Master test runner that executes all test suites';
\echo 'c77_secure_db extension v2.0 loaded successfully!'