c77_secure_db/c77_secure_db--1.0.0.sql

1422 lines
49 KiB
PL/PgSQL

-- Extension: c77_secure_db
-- Description: Secure database operations with tamper detection and transaction control
-- Version: 1.0.0
-- Requires pgcrypto extension
-- Check if pgcrypto is available
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_extension WHERE extname = 'pgcrypto'
) THEN
RAISE EXCEPTION 'The c77_secure_db extension requires the pgcrypto extension to be installed first.';
END IF;
END
$$;
-- Create secure_schemas table if it doesn't exist
CREATE TABLE IF NOT EXISTS public.secure_schemas (
schema_name text PRIMARY KEY,
created_at timestamptz DEFAULT now(),
updated_at timestamptz DEFAULT now()
);
-- Add a comment to document the table's purpose
COMMENT ON TABLE public.secure_schemas IS 'Stores schemas where c77_apply_prevent_triggers should be automatically applied via the c77_auto_apply_prevent_triggers event trigger.';
-- Prevent direct modification trigger function
CREATE OR REPLACE FUNCTION c77_prevent_direct_modification()
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS $BODY$
BEGIN
IF current_setting('myapp.allow_direct_modification', true) = 'true' THEN
IF TG_OP = 'DELETE' THEN
RETURN OLD; -- Allow DELETE to proceed
END IF;
RETURN NEW; -- Allow INSERT or UPDATE to proceed
END IF;
RAISE EXCEPTION 'Direct modifications are not allowed. Use the c77_secure_db_operation function instead.';
END;
$BODY$;
COMMENT ON FUNCTION c77_prevent_direct_modification() IS 'Trigger function to prevent direct table modifications unless authorized via the myapp.allow_direct_modification setting.';
-- Apply prevent triggers function
CREATE OR REPLACE FUNCTION c77_apply_prevent_triggers(
p_schema_name text)
RETURNS void
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
DECLARE
v_table_name text;
BEGIN
FOR v_table_name IN
SELECT table_name
FROM information_schema.tables
WHERE table_schema = p_schema_name
AND table_type = 'BASE TABLE'
LOOP
EXECUTE format('DROP TRIGGER IF EXISTS c77_prevent_direct_insert ON %I.%I', p_schema_name, v_table_name);
EXECUTE format('DROP TRIGGER IF EXISTS c77_prevent_direct_update ON %I.%I', p_schema_name, v_table_name);
EXECUTE format('DROP TRIGGER IF EXISTS c77_prevent_direct_delete ON %I.%I', p_schema_name, v_table_name);
EXECUTE format(
'CREATE TRIGGER c77_prevent_direct_insert BEFORE INSERT ON %I.%I ' ||
'FOR EACH ROW EXECUTE FUNCTION c77_prevent_direct_modification()',
p_schema_name, v_table_name
);
EXECUTE format(
'CREATE TRIGGER c77_prevent_direct_update BEFORE UPDATE ON %I.%I ' ||
'FOR EACH ROW EXECUTE FUNCTION c77_prevent_direct_modification()',
p_schema_name, v_table_name
);
EXECUTE format(
'CREATE TRIGGER c77_prevent_direct_delete BEFORE DELETE ON %I.%I ' ||
'FOR EACH ROW EXECUTE FUNCTION c77_prevent_direct_modification()',
p_schema_name, v_table_name
);
END LOOP;
END;
$BODY$;
COMMENT ON FUNCTION c77_apply_prevent_triggers(text) IS 'Applies prevention triggers to all tables in the specified schema.';
-- Auto apply triggers function
CREATE OR REPLACE FUNCTION c77_auto_apply_prevent_triggers()
RETURNS event_trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS $BODY$
DECLARE
v_obj record;
v_schema_name text;
v_designated_schema text;
BEGIN
-- Get the schema of the table being modified
FOR v_obj IN
SELECT * FROM pg_event_trigger_ddl_commands()
WHERE object_type = 'table'
LOOP
v_schema_name := v_obj.schema_name;
-- Check if the schema is in the secure_schemas table
FOR v_designated_schema IN
SELECT schema_name
FROM public.secure_schemas
LOOP
IF v_schema_name = v_designated_schema THEN
PERFORM c77_apply_prevent_triggers(v_schema_name);
RAISE NOTICE 'Applied c77_apply_prevent_triggers to schema % due to DDL change on table %.%.',
v_schema_name, v_schema_name, v_obj.object_identity;
END IF;
END LOOP;
END LOOP;
END;
$BODY$;
COMMENT ON FUNCTION c77_auto_apply_prevent_triggers() IS 'Event trigger function to automatically apply prevention triggers when tables are created or altered in secure schemas.';
-- Calculate content hash function
CREATE OR REPLACE FUNCTION c77_calculate_content_hash(
p_schema_name text,
p_table_name text,
p_data jsonb)
RETURNS text
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
DECLARE
v_exclude_hash_columns text[] := ARRAY['id', 'content_hash', 'created_at', 'updated_at', 'deleted_at', 'hash_version'];
v_column_comment text;
v_temp_exclude_columns text[];
v_content_hash text;
BEGIN
-- Get exclude_hash_columns from the content_hash column comment
IF EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = p_schema_name
AND table_name = p_table_name
AND column_name = 'content_hash'
) THEN
SELECT col_description(
format('%I.%I', p_schema_name, p_table_name)::regclass::oid,
(
SELECT attnum
FROM pg_attribute
WHERE attrelid = format('%I.%I', p_schema_name, p_table_name)::regclass
AND attname = 'content_hash'
)
) INTO v_column_comment;
IF v_column_comment IS NOT NULL THEN
BEGIN
IF jsonb_typeof(v_column_comment::jsonb) = 'object' AND
(v_column_comment::jsonb)->>'exclude_hash_columns' IS NOT NULL THEN
v_temp_exclude_columns := ARRAY(
SELECT jsonb_array_elements_text(v_column_comment::jsonb->'exclude_hash_columns')
);
v_exclude_hash_columns := v_exclude_hash_columns || v_temp_exclude_columns;
END IF;
EXCEPTION WHEN OTHERS THEN
-- Ignore invalid comment JSON
NULL;
END;
END IF;
END IF;
-- Calculate the hash using SHA-256
SELECT encode(sha256(convert_to(
string_agg(
CASE WHEN key = ANY(v_exclude_hash_columns) THEN ''
ELSE COALESCE(value::text, '') END,
'' -- Use a text delimiter
),
'UTF8'
)), 'hex')
INTO v_content_hash
FROM jsonb_each(p_data);
RETURN v_content_hash;
END;
$BODY$;
COMMENT ON FUNCTION c77_calculate_content_hash(text, text, jsonb) IS 'Calculates a SHA-256 hash of record data for tamper detection, excluding special columns.';
-- Check freshness function
CREATE OR REPLACE FUNCTION c77_check_freshness(
p_schema_name text,
p_table_name text,
p_data jsonb)
RETURNS jsonb
LANGUAGE 'plpgsql'
COST 100
STABLE PARALLEL SAFE
AS $BODY$
DECLARE
v_stored_hash text;
v_calculated_hash text;
v_id text;
v_hash_version integer;
v_is_fresh boolean;
v_special_columns text[] := ARRAY['content_hash']; -- Start with the minimum required special column
v_data_cleaned jsonb;
v_column text;
v_has_created_at boolean;
v_has_updated_at boolean;
v_has_deleted_at boolean;
v_has_hash_version boolean;
v_query text;
BEGIN
-- Check for the existence of special columns
v_has_created_at := EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = p_schema_name
AND table_name = p_table_name
AND column_name = 'created_at'
);
v_has_updated_at := EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = p_schema_name
AND table_name = p_table_name
AND column_name = 'updated_at'
);
v_has_deleted_at := EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = p_schema_name
AND table_name = p_table_name
AND column_name = 'deleted_at'
);
v_has_hash_version := EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = p_schema_name
AND table_name = p_table_name
AND column_name = 'hash_version'
);
-- Build the special columns array dynamically
IF v_has_created_at THEN
v_special_columns := v_special_columns || ARRAY['created_at'];
END IF;
IF v_has_updated_at THEN
v_special_columns := v_special_columns || ARRAY['updated_at'];
END IF;
IF v_has_deleted_at THEN
v_special_columns := v_special_columns || ARRAY['deleted_at'];
END IF;
IF v_has_hash_version THEN
v_special_columns := v_special_columns || ARRAY['hash_version'];
END IF;
-- Extract the primary key (id) from the input data
v_id := p_data->>'id';
IF v_id IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Primary key "id" missing in input data',
'timestamp', now()
);
END IF;
-- Clean the input data by removing special columns
v_data_cleaned := p_data;
FOREACH v_column IN ARRAY v_special_columns
LOOP
v_data_cleaned := v_data_cleaned - v_column;
END LOOP;
-- Calculate the content hash of the input data
v_calculated_hash := c77_calculate_content_hash(p_schema_name, p_table_name, v_data_cleaned);
-- Build the query dynamically
v_query := format(
'SELECT content_hash %s FROM %I.%I WHERE id = $1',
CASE WHEN v_has_hash_version THEN ', hash_version' ELSE '' END,
p_schema_name,
p_table_name
);
IF v_has_deleted_at THEN
v_query := v_query || ' AND deleted_at IS NULL';
END IF;
-- Look up the stored hash and hash_version (if it exists) in the table
IF v_has_hash_version THEN
EXECUTE v_query
INTO v_stored_hash, v_hash_version
USING v_id::integer;
ELSE
EXECUTE v_query
INTO v_stored_hash
USING v_id::integer;
v_hash_version := NULL; -- Set to NULL if hash_version column doesn't exist
END IF;
-- Check if the record exists
IF v_stored_hash IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Record with id ' || v_id || ' not found or has been deleted',
'timestamp', now()
);
END IF;
-- Compare the hashes
v_is_fresh := (v_stored_hash = v_calculated_hash);
-- Return the result
RETURN jsonb_build_object(
'success', true,
'id', v_id,
'fresh', v_is_fresh,
'stored_hash', v_stored_hash,
'calculated_hash', v_calculated_hash,
'hash_version', v_hash_version,
'timestamp', now()
);
EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object(
'success', false,
'error', SQLERRM,
'error_code', SQLSTATE,
'timestamp', now()
);
END;
$BODY$;
COMMENT ON FUNCTION c77_check_freshness(text, text, jsonb) IS 'Verifies if a record has been modified by comparing stored and calculated content hashes.';
-- Check freshness bulk function
CREATE OR REPLACE FUNCTION c77_check_freshness_bulk(
p_schema_name text,
p_table_name text,
p_data jsonb)
RETURNS jsonb
LANGUAGE 'plpgsql'
COST 100
STABLE PARALLEL SAFE
AS $BODY$
DECLARE
v_record jsonb;
v_results jsonb := '[]'::jsonb;
v_result jsonb;
BEGIN
-- Validate that p_data is a JSONB array
IF jsonb_typeof(p_data) != 'array' THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Input data must be a JSONB array',
'timestamp', now()
);
END IF;
-- Loop through each record in the input array
FOR v_record IN
SELECT jsonb_array_elements(p_data)
LOOP
-- Call check_freshness for each record
v_result := c77_check_freshness(p_schema_name, p_table_name, v_record);
-- Append the result to the results array
v_results := v_results || v_result;
END LOOP;
-- Return the results
RETURN jsonb_build_object(
'success', true,
'results', v_results,
'timestamp', now()
);
EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object(
'success', false,
'error', SQLERRM,
'error_code', SQLSTATE,
'timestamp', now()
);
END;
$BODY$;
COMMENT ON FUNCTION c77_check_freshness_bulk(text, text, jsonb) IS 'Verifies if multiple records have been modified by comparing stored and calculated content hashes.';
-- FUNCTION: public.c77_secure_db_operation(jsonb)
CREATE OR REPLACE FUNCTION public.c77_secure_db_operation(
p_json_data jsonb)
RETURNS jsonb
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
DECLARE
v_schema_name text;
v_table_name text;
v_operation text;
v_primary_key text;
v_data jsonb;
v_data_cleaned jsonb;
v_exclude_hash_columns text[] := ARRAY['id', 'content_hash', 'created_at', 'updated_at', 'deleted_at', 'hash_version'];
v_special_columns text[] := ARRAY['content_hash', 'created_at', 'updated_at', 'deleted_at', 'hash_version'];
v_columns text[];
v_values text[];
v_update_pairs text[];
v_content_hash text;
v_hash_version integer := 1;
v_result jsonb;
v_row_count int;
v_post_function text;
v_exists boolean;
v_unique_columns text[];
v_unique_constraint_name text;
v_primary_key_columns text[];
v_primary_key_constraint_name text;
v_conflict_target text;
v_conflict_columns text[];
v_has_created_at boolean;
v_has_updated_at boolean;
v_has_deleted_at boolean;
v_has_hash_version boolean;
v_temp_exclude_columns text[];
v_unique_values text[];
v_column text;
v_primary_key_type text;
BEGIN
PERFORM set_config('myapp.allow_direct_modification', 'true', true);
v_schema_name := p_json_data->>'schema_name';
v_table_name := p_json_data->>'table_name';
v_operation := lower(p_json_data->>'operation');
v_primary_key := p_json_data->>'primary_key';
v_data := p_json_data->>'data';
v_post_function := p_json_data->>'post_function';
IF p_json_data->>'exclude_hash_columns' IS NOT NULL THEN
BEGIN
v_temp_exclude_columns := ARRAY(SELECT jsonb_array_elements_text(p_json_data->'exclude_hash_columns'));
v_exclude_hash_columns := v_exclude_hash_columns || v_temp_exclude_columns;
EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Invalid exclude_hash_columns in input JSON',
'error_code', SQLSTATE,
'timestamp', now()
);
END;
END IF;
IF v_schema_name IS NULL OR v_table_name IS NULL OR v_operation IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Missing required fields: schema_name, table_name, or operation',
'timestamp', now()
);
END IF;
IF v_primary_key IS NOT NULL THEN
SELECT data_type
INTO v_primary_key_type
FROM information_schema.columns
WHERE table_schema = v_schema_name
AND table_name = v_table_name
AND column_name = v_primary_key;
END IF;
v_has_created_at := EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = v_schema_name AND table_name = v_table_name AND column_name = 'created_at'
);
v_has_updated_at := EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = v_schema_name AND table_name = v_table_name AND column_name = 'updated_at'
);
v_has_deleted_at := EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = v_schema_name AND table_name = v_table_name AND column_name = 'deleted_at'
);
v_has_hash_version := EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = v_schema_name AND table_name = v_table_name AND column_name = 'hash_version'
);
v_data_cleaned := v_data;
FOREACH v_column IN ARRAY v_special_columns
LOOP
v_data_cleaned := v_data_cleaned - v_column;
END LOOP;
SELECT ARRAY_agg(key),
ARRAY_agg(quote_literal(v_data->>key))
INTO v_columns, v_values
FROM jsonb_object_keys(v_data) AS key
WHERE key != ALL(v_special_columns);
IF EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = v_schema_name
AND table_name = v_table_name
AND column_name = 'content_hash'
) AND v_operation IN ('insert', 'update', 'upsert') THEN
v_content_hash := public.c77_calculate_content_hash(v_schema_name, v_table_name, v_data_cleaned);
v_columns := v_columns || ARRAY['content_hash'];
v_values := v_values || ARRAY[quote_literal(v_content_hash)];
IF v_has_hash_version THEN
v_columns := v_columns || ARRAY['hash_version'];
v_values := v_values || ARRAY[quote_literal(v_hash_version)];
END IF;
END IF;
IF v_has_created_at AND v_operation IN ('insert', 'upsert') THEN
v_columns := v_columns || ARRAY['created_at'];
v_values := v_values || ARRAY[quote_literal(now())];
END IF;
IF v_has_updated_at AND v_operation IN ('insert', 'update', 'upsert') THEN
v_columns := v_columns || ARRAY['updated_at'];
v_values := v_values || ARRAY[quote_literal(now())];
END IF;
CASE v_operation
WHEN 'upsert' THEN
-- First, try to find a unique constraint
SELECT c.conname, ARRAY_agg(a.attname::text)
INTO v_unique_constraint_name, v_unique_columns
FROM pg_constraint c
JOIN pg_attribute a ON a.attrelid = c.conrelid AND a.attnum = ANY(c.conkey)
WHERE c.conrelid = format('%I.%I', v_schema_name, v_table_name)::regclass
AND c.contype = 'u'
GROUP BY c.conname
LIMIT 1;
IF v_unique_columns IS NOT NULL THEN
v_conflict_columns := v_unique_columns;
v_conflict_target := format('ON CONFLICT (%s)', array_to_string(ARRAY(
SELECT format('%I', unnest) FROM unnest(v_unique_columns)
), ','));
ELSE
-- Fallback to primary key if no unique constraint is found
SELECT c.conname, ARRAY_agg(a.attname::text)
INTO v_primary_key_constraint_name, v_primary_key_columns
FROM pg_constraint c
JOIN pg_attribute a ON a.attrelid = c.conrelid AND a.attnum = ANY(c.conkey)
WHERE c.conrelid = format('%I.%I', v_schema_name, v_table_name)::regclass
AND c.contype = 'p'
GROUP BY c.conname
LIMIT 1;
IF v_primary_key_columns IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'No primary key or unique constraint found on table for upsert operation',
'timestamp', now()
);
END IF;
v_conflict_columns := v_primary_key_columns;
v_conflict_target := format('ON CONFLICT (%s)', array_to_string(ARRAY(
SELECT format('%I', unnest) FROM unnest(v_primary_key_columns)
), ','));
END IF;
-- Debug: Log the conflict columns and input columns
RAISE NOTICE 'Conflict columns: %', v_conflict_columns;
RAISE NOTICE 'Input columns: %', v_columns;
-- Validate that all conflict columns are present in the input data
IF NOT (
SELECT EVERY(column_name = ANY(v_columns))
FROM unnest(v_conflict_columns) AS column_name
) THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Input data missing required conflict columns: ' || array_to_string(v_conflict_columns, ', '),
'timestamp', now(),
'debug', jsonb_build_object(
'conflict_columns', v_conflict_columns,
'input_columns', v_columns
)
);
END IF;
v_unique_values := ARRAY(
SELECT v_data->>col
FROM unnest(v_conflict_columns) AS col
);
v_update_pairs := ARRAY(
SELECT format('%I = %s', key, quote_literal(v_data->>key))
FROM jsonb_object_keys(v_data) AS key
WHERE key != ALL(v_conflict_columns) AND key != ALL(v_special_columns)
);
IF v_has_updated_at THEN
v_update_pairs := v_update_pairs || ARRAY[format('updated_at = %L', now())];
END IF;
IF v_content_hash IS NOT NULL THEN
v_update_pairs := v_update_pairs || ARRAY[format('content_hash = %L', v_content_hash)];
END IF;
IF v_has_hash_version THEN
v_update_pairs := v_update_pairs || ARRAY[format('hash_version = %L', v_hash_version)];
END IF;
EXECUTE format(
'INSERT INTO %I.%I (%s) VALUES (%s) ' ||
'%s DO UPDATE SET %s RETURNING *',
v_schema_name,
v_table_name,
array_to_string(v_columns, ','),
array_to_string(v_values, ','),
v_conflict_target,
array_to_string(v_update_pairs, ',')
);
GET DIAGNOSTICS v_row_count = ROW_COUNT;
WHEN 'insert' THEN
EXECUTE format(
'INSERT INTO %I.%I (%s) VALUES (%s) ON CONFLICT DO NOTHING RETURNING *',
v_schema_name,
v_table_name,
array_to_string(v_columns, ','),
array_to_string(v_values, ',')
);
GET DIAGNOSTICS v_row_count = ROW_COUNT;
WHEN 'update' THEN
IF v_primary_key IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Primary key required for update operation',
'timestamp', now()
);
END IF;
v_update_pairs := ARRAY(
SELECT format('%I = %s', key, quote_literal(v_data->>key))
FROM jsonb_object_keys(v_data) AS key
WHERE key != ALL(v_special_columns) AND key != v_primary_key
);
IF v_has_updated_at THEN
v_update_pairs := v_update_pairs || ARRAY[format('updated_at = %L', now())];
END IF;
IF v_content_hash IS NOT NULL THEN
v_update_pairs := v_update_pairs || ARRAY[format('content_hash = %L', v_content_hash)];
END IF;
IF v_has_hash_version THEN
v_update_pairs := v_update_pairs || ARRAY[format('hash_version = %L', v_hash_version)];
END IF;
EXECUTE format(
'UPDATE %I.%I SET %s WHERE %I = ($1)::%s RETURNING *',
v_schema_name,
v_table_name,
array_to_string(v_update_pairs, ','),
v_primary_key,
v_primary_key_type
)
USING (v_data->>v_primary_key);
GET DIAGNOSTICS v_row_count = ROW_COUNT;
WHEN 'delete' THEN
IF v_primary_key IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Primary key required for delete operation',
'timestamp', now()
);
END IF;
IF v_has_deleted_at THEN
v_update_pairs := ARRAY[format('deleted_at = %L', now())];
IF v_has_updated_at THEN
v_update_pairs := v_update_pairs || ARRAY[format('updated_at = %L', now())];
END IF;
EXECUTE format(
'UPDATE %I.%I SET %s WHERE %I = ($1)::%s AND deleted_at IS NULL RETURNING *',
v_schema_name,
v_table_name,
array_to_string(v_update_pairs, ','),
v_primary_key,
v_primary_key_type
)
USING (v_data->>v_primary_key);
GET DIAGNOSTICS v_row_count = ROW_COUNT;
IF v_row_count = 0 THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Record with ' || v_primary_key || ' = ' || (v_data->>v_primary_key) || ' not found or already deleted',
'timestamp', now()
);
END IF;
ELSE
EXECUTE format(
'DELETE FROM %I.%I WHERE %I = ($1)::%s',
v_schema_name,
v_table_name,
v_primary_key,
v_primary_key_type
)
USING (v_data->>v_primary_key);
GET DIAGNOSTICS v_row_count = ROW_COUNT;
IF v_row_count = 0 THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Record with ' || v_primary_key || ' = ' || (v_data->>v_primary_key) || ' not found',
'timestamp', now()
);
END IF;
END IF;
WHEN 'hard_delete' THEN
IF v_primary_key IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Primary key required for hard_delete operation',
'timestamp', now()
);
END IF;
EXECUTE format(
'DELETE FROM %I.%I WHERE %I = ($1)::%s',
v_schema_name,
v_table_name,
v_primary_key,
v_primary_key_type
)
USING (v_data->>v_primary_key);
GET DIAGNOSTICS v_row_count = ROW_COUNT;
IF v_row_count = 0 THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Record with ' || v_primary_key || ' = ' || (v_data->>v_primary_key) || ' not found',
'timestamp', now()
);
END IF;
ELSE
RETURN jsonb_build_object(
'success', false,
'error', 'Invalid operation specified',
'timestamp', now()
);
END CASE;
IF v_row_count > 0 AND v_post_function IS NOT NULL THEN
EXECUTE format(
'SELECT %I(%L::jsonb)',
v_post_function,
v_data::text
);
END IF;
v_result := jsonb_build_object(
'success', true,
'operation', v_operation,
'schema_name', v_schema_name,
'table_name', v_table_name,
'rows_affected', v_row_count,
'timestamp', now()
);
IF v_content_hash IS NOT NULL THEN
v_result := v_result || jsonb_build_object('content_hash', v_content_hash);
END IF;
IF v_post_function IS NOT NULL THEN
v_result := v_result || jsonb_build_object('post_function_executed', true);
END IF;
IF v_operation = 'upsert' AND v_conflict_columns IS NOT NULL THEN
v_result := v_result || jsonb_build_object(
'unique_constraint_used', COALESCE(v_unique_constraint_name, v_primary_key_constraint_name),
'unique_columns', v_conflict_columns,
'unique_values', v_unique_values
);
END IF;
IF cardinality(v_exclude_hash_columns) > 5 THEN
v_result := v_result || jsonb_build_object(
'exclude_hash_columns', v_exclude_hash_columns
);
END IF;
PERFORM set_config('myapp.allow_direct_modification', 'false', true);
RETURN v_result;
EXCEPTION WHEN OTHERS THEN
PERFORM set_config('myapp.allow_direct_modification', 'false', true);
RETURN jsonb_build_object(
'success', false,
'error', SQLERRM,
'error_code', SQLSTATE,
'timestamp', now()
);
END;
$BODY$;
-- FUNCTION: public.c77_verify_content_hashes(text, text, boolean, integer)
CREATE OR REPLACE FUNCTION public.c77_verify_content_hashes(
p_schema_name text,
p_table_name text,
p_fix_mismatches boolean DEFAULT false,
p_batch_size integer DEFAULT 1000)
RETURNS jsonb
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
DECLARE
v_exclude_columns text[] := ARRAY['id', 'content_hash', 'created_at', 'updated_at', 'deleted_at', 'hash_version'];
v_special_columns text[] := ARRAY['content_hash', 'created_at', 'updated_at', 'deleted_at', 'hash_version'];
v_columns text[];
v_primary_key text;
v_record record;
v_cursor refcursor;
v_data jsonb;
v_data_cleaned jsonb;
v_calculated_hash text;
v_stored_hash text;
v_mismatches jsonb[] := '{}';
v_mismatch jsonb;
v_total_records int := 0;
v_mismatch_count int := 0;
v_column text;
v_query text;
v_has_content_hash boolean;
v_has_hash_version boolean;
v_hash_version int;
v_batch_count int := 0;
v_row_count int;
BEGIN
-- Check if the table exists
IF NOT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = p_schema_name
AND table_name = p_table_name
) THEN
RETURN jsonb_build_object(
'success', false,
'error', format('Table %I.%I does not exist', p_schema_name, p_table_name),
'timestamp', now()
);
END IF;
-- Check if the table has a content_hash column
v_has_content_hash := EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = p_schema_name
AND table_name = p_table_name
AND column_name = 'content_hash'
);
IF NOT v_has_content_hash THEN
RETURN jsonb_build_object(
'success', false,
'error', format('Table %I.%I does not have a content_hash column', p_schema_name, p_table_name),
'timestamp', now()
);
END IF;
-- Check if the table has a hash_version column
v_has_hash_version := EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = p_schema_name
AND table_name = p_table_name
AND column_name = 'hash_version'
);
-- Get the primary key column
SELECT a.attname
INTO v_primary_key
FROM pg_constraint c
JOIN pg_attribute a ON a.attrelid = c.conrelid AND a.attnum = ANY(c.conkey)
WHERE c.conrelid = format('%I.%I', p_schema_name, p_table_name)::regclass
AND c.contype = 'p'
LIMIT 1;
IF v_primary_key IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', format('Table %I.%I does not have a primary key', p_schema_name, p_table_name),
'timestamp', now()
);
END IF;
-- Get all columns except excluded ones
SELECT ARRAY_agg(column_name)
INTO v_columns
FROM information_schema.columns
WHERE table_schema = p_schema_name
AND table_name = p_table_name
AND column_name != ALL(v_exclude_columns);
IF v_columns IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', format('Table %I.%I has no columns to hash after excluding %s', p_schema_name, p_table_name, v_exclude_columns),
'timestamp', now()
);
END IF;
-- Set myapp.allow_direct_modification to 'true' to allow updates
PERFORM set_config('myapp.allow_direct_modification', 'true', true);
-- Construct the query to fetch all records, explicitly including the primary key
v_query := format(
'SELECT %I, %s, content_hash%s FROM %I.%I ORDER BY %I',
v_primary_key,
array_to_string(ARRAY(
SELECT format('%I', col)
FROM unnest(v_columns) AS col
), ','),
CASE WHEN v_has_hash_version THEN ', hash_version' ELSE '' END,
p_schema_name,
p_table_name,
v_primary_key
);
-- Open a cursor to iterate through the records
OPEN v_cursor FOR EXECUTE v_query;
LOOP
FETCH v_cursor INTO v_record;
EXIT WHEN NOT FOUND;
v_total_records := v_total_records + 1;
v_batch_count := v_batch_count + 1;
-- Convert the record to JSONB
v_data := row_to_json(v_record)::jsonb;
-- Remove special columns
v_data_cleaned := v_data;
FOREACH v_column IN ARRAY v_special_columns
LOOP
v_data_cleaned := v_data_cleaned - v_column;
END LOOP;
-- Remove the primary key
v_data_cleaned := v_data_cleaned - v_primary_key;
-- Recalculate the content_hash
v_calculated_hash := public.c77_calculate_content_hash(p_schema_name, p_table_name, v_data_cleaned);
-- Get the stored content_hash
v_stored_hash := v_data->>'content_hash';
-- Compare the hashes
IF v_calculated_hash != v_stored_hash THEN
v_mismatch_count := v_mismatch_count + 1;
-- Build the mismatch report
v_mismatch := jsonb_build_object(
'primary_key', v_data->>v_primary_key,
'stored_hash', v_stored_hash,
'calculated_hash', v_calculated_hash,
'data', v_data_cleaned
);
-- If the table has a hash_version, include it
IF v_has_hash_version THEN
v_mismatch := v_mismatch || jsonb_build_object('hash_version', v_data->>'hash_version');
END IF;
v_mismatches := v_mismatches || v_mismatch;
-- If p_fix_mismatches is true, update the content_hash
IF p_fix_mismatches THEN
-- Get the hash_version if it exists
IF v_has_hash_version THEN
v_hash_version := (v_data->>'hash_version')::int;
ELSE
v_hash_version := 1;
END IF;
RAISE NOTICE 'Updating record with % = %', v_primary_key, v_data->>v_primary_key;
-- Update the record with the correct content_hash
EXECUTE format(
'UPDATE %I.%I SET content_hash = $1, hash_version = $2 WHERE %I = $3',
p_schema_name,
p_table_name,
v_primary_key
)
USING v_calculated_hash, v_hash_version, (v_data->>v_primary_key)::bigint;
GET DIAGNOSTICS v_row_count = ROW_COUNT;
RAISE NOTICE 'Rows updated: %', v_row_count;
END IF;
END IF;
-- Reset batch counter for tracking purposes (no transaction control)
IF v_batch_count >= p_batch_size THEN
v_batch_count := 0;
RAISE NOTICE 'Processed batch of % records', p_batch_size;
END IF;
END LOOP;
-- Close the cursor
CLOSE v_cursor;
-- Reset myapp.allow_direct_modification to 'false'
PERFORM set_config('myapp.allow_direct_modification', 'false', true);
-- Return the results
RETURN jsonb_build_object(
'success', true,
'total_records', v_total_records,
'mismatch_count', v_mismatch_count,
'mismatches', v_mismatches,
'timestamp', now()
);
EXCEPTION WHEN OTHERS THEN
-- Reset myapp.allow_direct_modification to 'false' even if an error occurs
PERFORM set_config('myapp.allow_direct_modification', 'false', true);
-- Close the cursor if it's still open
-- Note: PostgreSQL may have already closed the cursor on error, so we need to handle this carefully
BEGIN
CLOSE v_cursor;
EXCEPTION WHEN OTHERS THEN
-- Ignore errors when closing the cursor, as it may already be closed
NULL;
END;
RETURN jsonb_build_object(
'success', false,
'error', SQLERRM,
'error_code', SQLSTATE,
'timestamp', now()
);
END;
$BODY$;
-- FUNCTION: public.c77_get_operation_template(text, text, text)
CREATE OR REPLACE FUNCTION public.c77_get_operation_template(
p_schema_name text,
p_table_name text,
p_operation text)
RETURNS text
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
DECLARE
v_operation text := lower(p_operation);
v_exclude_hash_columns text[] := ARRAY['id', 'content_hash', 'created_at', 'updated_at', 'deleted_at', 'hash_version'];
v_columns text[];
v_primary_key_columns text[];
v_unique_columns text[];
v_unique_constraint_name text;
v_data_template jsonb;
v_template jsonb;
v_json_text text;
BEGIN
-- Validate inputs
IF p_schema_name IS NULL OR p_table_name IS NULL OR p_operation IS NULL THEN
RETURN format(
'-- Error: Missing required parameters: schema_name, table_name, or operation (Timestamp: %s)',
now()
);
END IF;
-- Validate schema and table existence
IF NOT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = p_schema_name
AND table_name = p_table_name
) THEN
RETURN format(
'-- Error: Table %I.%I does not exist (Timestamp: %s)',
p_schema_name, p_table_name, now()
);
END IF;
-- Validate operation
IF v_operation NOT IN ('insert', 'update', 'upsert', 'delete', 'hard_delete') THEN
RETURN format(
'-- Error: Invalid operation. Must be one of: insert, update, upsert, delete, hard_delete (Timestamp: %s)',
now()
);
END IF;
-- Get primary key columns (needed for update, delete, hard_delete, and potentially upsert)
SELECT ARRAY_agg(a.attname::text)
INTO v_primary_key_columns
FROM pg_constraint c
JOIN pg_attribute a ON a.attrelid = c.conrelid AND a.attnum = ANY(c.conkey)
WHERE c.conrelid = format('%I.%I', p_schema_name, p_table_name)::regclass
AND c.contype = 'p'
GROUP BY c.conname
LIMIT 1;
-- For delete and hard_delete, we only need the primary key in the data object
IF v_operation IN ('delete', 'hard_delete') THEN
IF v_primary_key_columns IS NULL THEN
RETURN format(
'-- Error: Table %I.%I has no primary key, which is required for %s operation (Timestamp: %s)',
p_schema_name, p_table_name, v_operation, now()
);
END IF;
-- Build the data template with only the primary key
v_data_template := jsonb_build_object(v_primary_key_columns[1], 0);
-- Build the template
v_template := jsonb_build_object(
'schema_name', p_schema_name,
'table_name', p_table_name,
'operation', v_operation,
'data', v_data_template,
'primary_key', v_primary_key_columns[1],
'comment', format('The primary_key field specifies the column to use for identifying the record. For this table, the primary key is: %s', array_to_string(v_primary_key_columns, ', '))
);
ELSE
-- For insert, update, and upsert, include all columns used in content_hash
-- Get columns that are included in content_hash (exclude special columns)
SELECT ARRAY_agg(column_name)
INTO v_columns
FROM information_schema.columns
WHERE table_schema = p_schema_name
AND table_name = p_table_name
AND column_name != ALL(v_exclude_hash_columns);
IF v_columns IS NULL OR cardinality(v_columns) = 0 THEN
RETURN format(
'-- Error: No columns available for content_hash calculation after excluding special columns (Timestamp: %s)',
now()
);
END IF;
-- Build the data template with empty placeholder values
v_data_template := jsonb_object_agg(
column_name,
CASE
WHEN data_type IN ('character varying', 'text') THEN '""'
WHEN data_type IN ('integer', 'bigint', 'smallint') THEN '0'
WHEN data_type = 'boolean' THEN 'false'
WHEN data_type IN ('timestamp with time zone', 'timestamp without time zone') THEN '"2025-01-01T00:00:00Z"'
WHEN data_type = 'jsonb' THEN '{}'
ELSE 'null'
END
)
FROM information_schema.columns
WHERE table_schema = p_schema_name
AND table_name = p_table_name
AND column_name != ALL(v_exclude_hash_columns);
-- Get unique constraint columns (for upsert)
SELECT c.conname, ARRAY_agg(a.attname::text)
INTO v_unique_constraint_name, v_unique_columns
FROM pg_constraint c
JOIN pg_attribute a ON a.attrelid = c.conrelid AND a.attnum = ANY(c.conkey)
WHERE c.conrelid = format('%I.%I', p_schema_name, p_table_name)::regclass
AND c.contype = 'u'
GROUP BY c.conname
LIMIT 1;
-- Build the base template
v_template := jsonb_build_object(
'schema_name', p_schema_name,
'table_name', p_table_name,
'operation', v_operation,
'data', v_data_template
);
-- Add primary_key field for update operation
IF v_operation = 'update' THEN
IF v_primary_key_columns IS NULL THEN
RETURN format(
'-- Error: Table %I.%I has no primary key, which is required for %s operation (Timestamp: %s)',
p_schema_name, p_table_name, v_operation, now()
);
END IF;
v_template := v_template || jsonb_build_object(
'primary_key', v_primary_key_columns[1], -- Use the first primary key column (assumes single-column PK for simplicity)
'comment', format('The primary_key field specifies the column to use for identifying the record. For this table, the primary key is: %s', array_to_string(v_primary_key_columns, ', '))
);
-- Add the primary key to the data template with a placeholder value
v_template := jsonb_set(
v_template,
'{data}',
(v_template->'data') || jsonb_build_object(v_primary_key_columns[1], 0)
);
END IF;
-- Add comment for upsert operation about conflict columns
IF v_operation = 'upsert' THEN
IF v_unique_columns IS NOT NULL THEN
v_template := v_template || jsonb_build_object(
'comment', format('For upsert, the conflict columns are determined by the unique constraint %I: %s. Ensure these columns are included in the data object.', v_unique_constraint_name, array_to_string(v_unique_columns, ', '))
);
ELSIF v_primary_key_columns IS NOT NULL THEN
v_template := v_template || jsonb_build_object(
'comment', format('For upsert, no unique constraint was found. Falling back to primary key: %s. Ensure this column is included in the data object.', array_to_string(v_primary_key_columns, ', '))
);
-- Add the primary key to the data template with a placeholder value
v_template := jsonb_set(
v_template,
'{data}',
(v_template->'data') || jsonb_build_object(v_primary_key_columns[1], 0)
);
ELSE
RETURN format(
'-- Error: Table %I.%I has no primary key or unique constraint, which is required for upsert operation (Timestamp: %s)',
p_schema_name, p_table_name, now()
);
END IF;
END IF;
END IF;
-- Convert the JSONB template to a pretty-printed text string
v_json_text := jsonb_pretty(v_template);
-- Return the formatted SQL statement
RETURN format(
'SELECT c77_secure_db_operation(' || chr(10) ||
'''%s''::jsonb' || chr(10) ||
')',
v_json_text
);
EXCEPTION WHEN OTHERS THEN
RETURN format(
'-- Error: %s (Error Code: %s, Timestamp: %s)',
SQLERRM, SQLSTATE, now()
);
END;
$BODY$;
-- FUNCTION: public.c77_manage_secure_schemas(text, text)
CREATE OR REPLACE FUNCTION public.c77_manage_secure_schemas(
p_operation text,
p_schema_name text DEFAULT NULL::text)
RETURNS jsonb
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
DECLARE
v_operation text := lower(p_operation);
v_schema_exists boolean;
v_row_count int;
BEGIN
-- Create the secure_schemas table if it doesn't exist
IF NOT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'secure_schemas'
) THEN
CREATE TABLE public.secure_schemas (
schema_name text PRIMARY KEY,
created_at timestamptz DEFAULT now(),
updated_at timestamptz DEFAULT now()
);
-- Add a comment to document the table's purpose
COMMENT ON TABLE public.secure_schemas IS 'Stores schemas where c77_apply_prevent_triggers should be automatically applied via the c77_auto_apply_prevent_triggers event trigger.';
-- Insert the 'testme' schema as an initial entry
INSERT INTO public.secure_schemas (schema_name)
VALUES ('testme')
ON CONFLICT (schema_name) DO NOTHING;
END IF;
-- Validate operation
IF v_operation NOT IN ('list', 'add', 'delete') THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Invalid operation. Must be one of: list, add, delete',
'timestamp', now()
);
END IF;
-- Handle the operation
CASE v_operation
WHEN 'list' THEN
RETURN jsonb_build_object(
'success', true,
'schemas', (
SELECT jsonb_agg(
jsonb_build_object(
'schema_name', schema_name,
'created_at', created_at,
'updated_at', updated_at
)
)
FROM public.secure_schemas
),
'timestamp', now()
);
WHEN 'add' THEN
-- Validate schema_name
IF p_schema_name IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Schema name is required for add operation',
'timestamp', now()
);
END IF;
-- Check if the schema exists
SELECT EXISTS (
SELECT 1
FROM information_schema.schemata
WHERE schema_name = p_schema_name
) INTO v_schema_exists;
IF NOT v_schema_exists THEN
RETURN jsonb_build_object(
'success', false,
'error', format('Schema %I does not exist', p_schema_name),
'timestamp', now()
);
END IF;
-- Insert the schema
INSERT INTO public.secure_schemas (schema_name)
VALUES (p_schema_name)
ON CONFLICT (schema_name) DO UPDATE
SET updated_at = now();
GET DIAGNOSTICS v_row_count = ROW_COUNT;
IF v_row_count > 0 THEN
RETURN jsonb_build_object(
'success', true,
'message', format('Schema %I added or updated in secure_schemas', p_schema_name),
'timestamp', now()
);
ELSE
RETURN jsonb_build_object(
'success', false,
'error', format('Failed to add schema %I to secure_schemas', p_schema_name),
'timestamp', now()
);
END IF;
WHEN 'delete' THEN
-- Validate schema_name
IF p_schema_name IS NULL THEN
RETURN jsonb_build_object(
'success', false,
'error', 'Schema name is required for delete operation',
'timestamp', now()
);
END IF;
-- Delete the schema
DELETE FROM public.secure_schemas
WHERE schema_name = p_schema_name;
IF FOUND THEN
RETURN jsonb_build_object(
'success', true,
'message', format('Schema %I removed from secure_schemas', p_schema_name),
'timestamp', now()
);
ELSE
RETURN jsonb_build_object(
'success', false,
'error', format('Schema %I not found in secure_schemas', p_schema_name),
'timestamp', now()
);
END IF;
ELSE
-- This should never be reached due to earlier validation, but included for completeness
RETURN jsonb_build_object(
'success', false,
'error', 'Invalid operation',
'timestamp', now()
);
END CASE;
EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object(
'success', false,
'error', SQLERRM,
'error_code', SQLSTATE,
'timestamp', now()
);
END;
$BODY$;
DO $$
BEGIN
-- Check if the event trigger already exists
IF NOT EXISTS (
SELECT 1 FROM pg_event_trigger WHERE evtname = 'c77_event_auto_apply_prevent_triggers'
) THEN
-- Create the event trigger
CREATE EVENT TRIGGER c77_event_auto_apply_prevent_triggers ON DDL_COMMAND_END
WHEN TAG IN ('CREATE TABLE', 'ALTER TABLE')
EXECUTE PROCEDURE c77_auto_apply_prevent_triggers();
END IF;
END $$;