c77_mvc/c77_mvc--1.0.sql
2025-03-27 07:00:55 -05:00

1694 lines
71 KiB
PL/PgSQL

-- c77_mvc--1.0.sql
-- Check if c77_dbh extension is installed
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'c77_dbh') THEN
RAISE NOTICE 'The c77_dbh extension is not installed. This extension is required for c77_mvc to function properly.';
RAISE NOTICE 'Please install c77_dbh first by following the instructions at https://git.jctr3.com/trogers1884/c77_dbh';
RAISE NOTICE 'Example: CREATE EXTENSION c77_dbh; (after compiling and installing from source if necessary)';
RAISE EXCEPTION 'Installation aborted due to missing c77_dbh extension.';
END IF;
END;
$$;
-- If we reach here, c77_dbh is installed, so proceed with the installation
-- Create the table
CREATE TABLE IF NOT EXISTS public.c77_mvc_table_fitness_audit (
run_id BIGSERIAL,
run_timestamp timestamp without time zone DEFAULT CURRENT_TIMESTAMP,
source_schema text COLLATE pg_catalog."default",
source_table text COLLATE pg_catalog."default",
analysis_result jsonb,
notes text[] COLLATE pg_catalog."default",
CONSTRAINT table_fitness_audit_pkey PRIMARY KEY (run_id)
) TABLESPACE pg_default;
CREATE INDEX IF NOT EXISTS idx_table_fitness_audit_table
ON public.c77_mvc_table_fitness_audit USING btree
(source_schema COLLATE pg_catalog."default" ASC NULLS LAST, source_table COLLATE pg_catalog."default" ASC NULLS LAST)
TABLESPACE pg_default;
CREATE INDEX IF NOT EXISTS idx_table_fitness_audit_timestamp
ON public.c77_mvc_table_fitness_audit USING btree
(run_timestamp ASC NULLS LAST)
TABLESPACE pg_default;
-- Define the functions in dependency order
CREATE OR REPLACE FUNCTION public.c77_mvc_analyze_column_combinations(temp_table_name text, column_stats jsonb, sample_size bigint, total_rows bigint, exclude_key_columns text[]) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
column_combinations jsonb := '{}';
combination_result record;
distinct_count bigint;
uniqueness_ratio numeric;
duplicate_count bigint;
synthetic_uniqueness numeric;
adjusted_sample_size bigint;
sampling_percentage numeric;
BEGIN
-- Adjust sample size to not exceed total rows
adjusted_sample_size := LEAST(sample_size, total_rows);
sampling_percentage := (adjusted_sample_size::float / total_rows * 100);
-- Analyze column combinations
FOR combination_result IN (
SELECT c1.key AS col1, c2.key AS col2,
((c1.value->>'fitness_score')::float + (c2.value->>'fitness_score')::float) / 2 AS avg_fitness
FROM jsonb_each(column_stats) c1,
jsonb_each(column_stats) c2
WHERE c1.key < c2.key
AND (c1.value->>'fitness_score')::float >= 70
AND (c2.value->>'fitness_score')::float >= 70
AND NOT (c1.key = ANY(exclude_key_columns))
AND NOT (c2.key = ANY(exclude_key_columns))
ORDER BY avg_fitness DESC
LIMIT 5
)
LOOP
-- Test uniqueness of the combination
EXECUTE format('SELECT COUNT(DISTINCT (%I, %I)) FROM (SELECT %I, %I FROM %I TABLESAMPLE SYSTEM (%s) LIMIT %s) t',
combination_result.col1, combination_result.col2,
combination_result.col1, combination_result.col2,
temp_table_name,
sampling_percentage::text, adjusted_sample_size)
INTO distinct_count;
uniqueness_ratio := distinct_count::float / adjusted_sample_size;
-- Simulate synthetic key uniqueness
EXECUTE format('SELECT COUNT(*) FROM (
SELECT ROW_NUMBER() OVER (PARTITION BY %I, %I ORDER BY random()) AS rn
FROM %I TABLESAMPLE SYSTEM (%s) LIMIT %s
) t WHERE rn > 1',
combination_result.col1, combination_result.col2,
temp_table_name,
sampling_percentage::text, adjusted_sample_size)
INTO duplicate_count;
synthetic_uniqueness := 1 - (duplicate_count::float / adjusted_sample_size);
-- Store combination stats
column_combinations := column_combinations || jsonb_build_object(
format('%s,%s', combination_result.col1, combination_result.col2),
jsonb_build_object(
'uniqueness_ratio', uniqueness_ratio,
'synthetic_uniqueness', synthetic_uniqueness,
'discrimination_power', uniqueness_ratio,
'avg_fitness_score', combination_result.avg_fitness
)
);
END LOOP;
RETURN column_combinations;
EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object(
'error', format('Failed to analyze column combinations: %s', SQLERRM)
);
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_assemble_matv_result(target_schema text, target_mv_name text, partition_columns text[], order_by_columns text[], exclude_hash_columns text[], where_clause text, custom_sql text, notes text[]) RETURNS json
LANGUAGE plpgsql
AS $$
DECLARE
vtw_name text := replace(target_mv_name, 'matc_', 'vtw_');
vm_name text := replace(target_mv_name, 'matc_', 'vm_');
BEGIN
notes := array_append(notes, format('Process completed at %s', clock_timestamp()));
RETURN json_build_object(
'message', format('Created view %I.%I, materialized view %I.%I, and view %I.%I for reading.',
target_schema, vtw_name, target_schema, target_mv_name, target_schema, vm_name),
'view_name', format('%I.%I', target_schema, vtw_name),
'matview_name', format('%I.%I', target_schema, target_mv_name),
'vm_view_name', format('%I.%I', target_schema, vm_name),
'partition_columns', partition_columns,
'order_by_columns', order_by_columns,
'exclude_hash_columns', exclude_hash_columns,
'where_clause', where_clause,
'custom_sql', custom_sql,
'notes', notes
);
EXCEPTION WHEN OTHERS THEN
RETURN json_build_object(
'error', format('Failed to assemble result: %s', SQLERRM),
'notes', notes
);
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_assemble_result(source_schema text, source_table text, column_stats jsonb, column_combinations jsonb, order_by_candidates jsonb, data_quality_index numeric, notes text[], temp_table_name text) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
result_json jsonb;
run_id integer;
updated_notes text[] := notes; -- Create a local copy of notes
BEGIN
-- Build the result JSON
updated_notes := array_append(updated_notes, format('Analysis completed at %s', clock_timestamp()));
result_json := jsonb_build_object(
'message', format('Analysis of %I.%I completed', source_schema, source_table),
'column_stats', column_stats,
'recommended_partition_combinations', column_combinations,
'order_by_candidates', order_by_candidates,
'data_quality_index', ROUND(data_quality_index, 2),
'notes', updated_notes
);
-- Store results in audit table
INSERT INTO public.c77_mvc_table_fitness_audit (
source_schema,
source_table,
analysis_result,
notes
)
VALUES (
source_schema,
source_table,
result_json,
updated_notes
)
RETURNING table_fitness_audit.run_id INTO run_id;
-- Add run_id to the result
result_json := result_json || jsonb_build_object('run_id', run_id);
-- Clean up temporary table
EXECUTE format('DROP TABLE IF EXISTS %I', temp_table_name);
updated_notes := array_append(updated_notes, format('Dropped temporary table %s', temp_table_name));
-- Update result_json with the final notes
result_json := result_json || jsonb_build_object('notes', updated_notes);
RETURN result_json;
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE 'Error assembling result: %', SQLERRM;
RETURN jsonb_build_object(
'error', format('Failed to assemble result: %s', SQLERRM),
'notes', updated_notes
);
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_calculate_dqi(column_stats jsonb) RETURNS numeric
LANGUAGE plpgsql
AS $$
DECLARE
dqi_components jsonb := '{}';
col_name text;
null_ratio numeric;
encoding_issue_ratio numeric;
uniqueness_ratio numeric;
component_score numeric;
BEGIN
-- Calculate DQI components for each column
FOR col_name IN
SELECT key
FROM jsonb_object_keys(column_stats) AS key
LOOP
null_ratio := (column_stats->col_name->>'null_ratio')::numeric;
encoding_issue_ratio := (column_stats->col_name->>'encoding_issue_ratio')::numeric;
uniqueness_ratio := (column_stats->col_name->>'uniqueness_ratio')::numeric;
component_score := (1 - null_ratio) * 0.4 + (1 - encoding_issue_ratio) * 0.4 + uniqueness_ratio * 0.2;
dqi_components := dqi_components || jsonb_build_object(col_name, component_score);
END LOOP;
-- Calculate average DQI across all columns (scaled to 0-100)
RETURN (SELECT AVG(value::numeric) * 100
FROM jsonb_each_text(dqi_components));
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE 'Error calculating DQI: %', SQLERRM;
RETURN 0;
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_calculate_matv_sample_size(full_matview_name text, params jsonb) RETURNS bigint
LANGUAGE plpgsql
AS $$
DECLARE
total_matview_records bigint;
sample_size bigint;
BEGIN
-- Get total records
EXECUTE format('SELECT COUNT(*) FROM %s', full_matview_name)
INTO total_matview_records;
-- Calculate sample size using c77_mvc_calculate_sample_size
sample_size := public.c77_mvc_calculate_sample_size(total_matview_records);
RETURN sample_size;
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE 'Error calculating sample size: %', SQLERRM;
RETURN 100;
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_calculate_sample_size(total_rows bigint, confidence_level numeric DEFAULT 0.99, margin_of_error numeric DEFAULT 0.03) RETURNS bigint
LANGUAGE plpgsql
AS $$
DECLARE
z_score numeric;
n0 numeric;
p numeric := 0.5; -- Conservative estimate for maximum variability
sample_size bigint;
BEGIN
-- Map confidence level to Z-score
z_score := CASE
WHEN confidence_level = 0.90 THEN 1.645
WHEN confidence_level = 0.95 THEN 1.96
WHEN confidence_level = 0.99 THEN 2.576
ELSE 2.576 -- Default to 99%
END;
-- Initial sample size (infinite population)
n0 := (z_score * z_score * p * (1 - p)) / (margin_of_error * margin_of_error);
-- Adjust for finite population
sample_size := CEIL(n0 * total_rows / (n0 + total_rows));
sample_size := GREATEST(sample_size, 1000); -- Minimum sample size for small tables
sample_size := LEAST(sample_size, total_rows); -- Cap at total rows
RETURN sample_size;
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_collect_matv_stats(full_matview_name text, full_vtw_name text) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
total_matview_records bigint;
clean_records bigint;
encoding_issues bigint;
total_vtw_records bigint;
last_matview_update timestamp with time zone;
last_vtw_update timestamp with time zone;
size_pretty_string text;
size_mb numeric;
BEGIN
-- Collect stats
EXECUTE format('SELECT COUNT(*), COUNT(*) FILTER (WHERE encoding_status = ''CLEAN''), COUNT(*) FILTER (WHERE encoding_status IS DISTINCT FROM ''CLEAN'') FROM %s', full_matview_name)
INTO total_matview_records, clean_records, encoding_issues;
EXECUTE format('SELECT COUNT(*) FROM %s', full_vtw_name)
INTO total_vtw_records;
EXECUTE format('SELECT MAX(rowlastupdated) FROM %s', full_matview_name)
INTO last_matview_update;
EXECUTE format('SELECT MAX(rowlastupdated) FROM %s', full_vtw_name)
INTO last_vtw_update;
EXECUTE format('SELECT pg_size_pretty(pg_total_relation_size(''%s'')::BIGINT)::TEXT', full_matview_name)
INTO size_pretty_string;
size_mb := regexp_replace(size_pretty_string, '[^0-9.]', '', 'g')::NUMERIC;
RETURN jsonb_build_object(
'total_matview_records', total_matview_records,
'clean_records', clean_records,
'encoding_issues', encoding_issues,
'total_vtw_records', total_vtw_records,
'last_matview_update', last_matview_update,
'last_vtw_update', last_vtw_update,
'size_mb', size_mb
);
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE 'Error fetching stats: %', SQLERRM;
RETURN jsonb_build_object(
'total_matview_records', 0,
'clean_records', 0,
'encoding_issues', 0,
'total_vtw_records', 0,
'last_matview_update', NULL,
'last_vtw_update', NULL,
'size_mb', 0
);
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_create_indexes(target_schema text, target_mv_name text, partition_columns text[]) RETURNS text[]
LANGUAGE plpgsql
AS $$
DECLARE
notes text[] := '{}';
encoding_index_name text;
content_hash_index_name text;
unique_index_name text;
vtw_name text := replace(target_mv_name, 'matc_', 'vtw_');
vm_name text := replace(target_mv_name, 'matc_', 'vm_');
BEGIN
-- Index on encoding_status
encoding_index_name := 'idx_' || target_mv_name || '_encoding_status';
EXECUTE format('CREATE INDEX %I ON %I.%I (encoding_status)',
encoding_index_name, target_schema, target_mv_name);
-- Index on content_hash
content_hash_index_name := 'idx_' || target_mv_name || '_content_hash';
EXECUTE format('CREATE INDEX %I ON %I.%I (content_hash)',
content_hash_index_name, target_schema, target_mv_name);
notes := array_append(notes, 'Created index on content_hash');
-- Unique index on synthetic_key and partition columns
SELECT string_agg(quote_ident(unnest), ', ')
INTO unique_index_name
FROM unnest(partition_columns);
unique_index_name := format(
'CREATE UNIQUE INDEX %I ON %I.%I (synthetic_key, %s)',
'idx_' || target_mv_name || '_synthetic_key', target_schema, target_mv_name,
COALESCE(unique_index_name, '1')
);
BEGIN
EXECUTE unique_index_name;
notes := array_append(notes, 'Successfully created unique index on synthetic_key and partition columns');
EXCEPTION WHEN OTHERS THEN
EXECUTE format('DROP MATERIALIZED VIEW %I.%I', target_schema, target_mv_name);
EXECUTE format('DROP VIEW IF EXISTS %I.%I', target_schema, vtw_name);
EXECUTE format('DROP VIEW IF EXISTS %I.%I', target_schema, vm_name);
notes := array_append(notes, format('Failed to create unique index: %s', SQLERRM));
SELECT string_agg(quote_ident(unnest), ', ')
INTO unique_index_name
FROM unnest(partition_columns);
unique_index_name := format(
'CREATE INDEX %I ON %I.%I (synthetic_key, %s)',
'idx_' || target_mv_name || '_synthetic_key_fallback', target_schema, target_mv_name,
COALESCE(unique_index_name, '1')
);
EXECUTE unique_index_name;
notes := array_append(notes, 'Created non-unique fallback index due to unique index failure');
END;
RETURN notes;
EXCEPTION WHEN OTHERS THEN
RETURN array_append(notes, format('Error creating indexes: %s', SQLERRM));
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_create_optimized_matv(source_schema text, source_table text, target_schema text, target_matview text, partition_columns text[], order_by_columns text[], exclude_columns_from_hash text[] DEFAULT ARRAY[]::text[], filter_latest_only boolean DEFAULT false) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
source_full_name text := quote_ident(source_schema) || '.' || quote_ident(source_table);
vtw_name text := replace(target_matview, 'matc_', 'vtw_');
vm_name text := replace(target_matview, 'matc_', 'vm_');
vprob_name text := replace(target_matview, 'matc_', 'vprob_');
vtw_full_name text := quote_ident(target_schema) || '.' || quote_ident(vtw_name);
vm_full_name text := quote_ident(target_schema) || '.' || quote_ident(vm_name);
vprob_full_name text := quote_ident(target_schema) || '.' || quote_ident(vprob_name);
matview_full_name text := quote_ident(target_schema) || '.' || quote_ident(target_matview);
columns_list text;
vm_columns_list text;
hash_columns_list text;
encoding_check_list text;
partition_clause text := '';
order_by_clause text := '';
create_vtw_sql text;
create_matview_sql text;
create_vm_sql text;
create_vprob_sql text;
create_index_sql text;
notes text[] := '{}';
column_record record;
BEGIN
-- Step 1: Get the list of columns with regexp_replace for non-partition character-based columns (for vtw_)
columns_list := '';
vm_columns_list := '';
FOR column_record IN (
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = source_schema
AND table_name = source_table
AND column_name NOT IN ('content_hash', 'synthetic_key')
ORDER BY column_name
) LOOP
-- For vtw_: Apply regexp_replace to character-based non-partition columns
IF column_record.column_name = ANY(partition_columns) THEN
columns_list := columns_list || quote_ident(column_record.column_name) || ', ';
ELSIF column_record.data_type IN ('character', 'character varying', 'varchar', 'char', 'text') THEN
columns_list := columns_list || format('regexp_replace(t.%I, ''[^\x00-\x7F]''::text, ''PROBLEM''::text, ''g''::text) AS %I, ',
column_record.column_name, column_record.column_name);
ELSE
columns_list := columns_list || quote_ident(column_record.column_name) || ', ';
END IF;
-- For vm_ and vprob_: Just the column names, no regexp_replace or t. prefix
vm_columns_list := vm_columns_list || quote_ident(column_record.column_name) || ', ';
END LOOP;
columns_list := rtrim(columns_list, ', ');
vm_columns_list := rtrim(vm_columns_list, ', ');
-- Step 2: Validate partition_columns
IF array_length(partition_columns, 1) IS NULL OR array_length(partition_columns, 1) = 0 THEN
RAISE EXCEPTION 'partition_columns cannot be empty. At least one column is required for partitioning to ensure proper deduplication.';
END IF;
-- Step 3: Validate order_by_columns
IF array_length(order_by_columns, 1) IS NULL OR array_length(order_by_columns, 1) = 0 THEN
RAISE EXCEPTION 'order_by_columns cannot be empty. At least one column is required to ensure deterministic ordering for synthetic_key generation.';
END IF;
-- Step 4: Get the list of columns for the content_hash
IF exclude_columns_from_hash IS NULL OR array_length(exclude_columns_from_hash, 1) IS NULL OR array_length(exclude_columns_from_hash, 1) = 0 THEN
-- If exclude_columns_from_hash is empty, include all columns
SELECT string_agg('t.' || quote_ident(column_name), ', ')
INTO hash_columns_list
FROM information_schema.columns
WHERE table_schema = source_schema
AND table_name = source_table;
notes := array_append(notes, 'exclude_columns_from_hash is empty; including all columns from the source table in content_hash calculation');
ELSE
-- Otherwise, exclude the specified columns
SELECT string_agg('t.' || quote_ident(column_name), ', ')
INTO hash_columns_list
FROM information_schema.columns
WHERE table_schema = source_schema
AND table_name = source_table
AND column_name NOT IN (
SELECT unnest(exclude_columns_from_hash)
);
-- If excluding the specified columns results in no columns, include all columns as a fallback
IF hash_columns_list IS NULL THEN
SELECT string_agg('t.' || quote_ident(column_name), ', ')
INTO hash_columns_list
FROM information_schema.columns
WHERE table_schema = source_schema
AND table_name = source_table;
notes := array_append(notes, 'exclude_columns_from_hash excluded all columns; including all columns from the source table in content_hash calculation as a fallback');
END IF;
END IF;
-- Step 5: Get the list of columns for encoding_status check
SELECT string_agg(format('t.%I::text ~ ''[^\x00-\x7F]''::text', column_name), ' OR ')
INTO encoding_check_list
FROM information_schema.columns
WHERE table_schema = source_schema
AND table_name = source_table;
-- Step 6: Build partition and order-by clauses for synthetic_key
IF array_length(partition_columns, 1) > 0 THEN
partition_clause := 'PARTITION BY ' || array_to_string(partition_columns, ', ');
END IF;
IF array_length(order_by_columns, 1) > 0 THEN
order_by_clause := 'ORDER BY ' || array_to_string(order_by_columns, ', ');
END IF;
-- Step 7: Create the vtw_ view with content_hash, synthetic_key, and encoding_status
IF filter_latest_only THEN
create_vtw_sql := format('
CREATE OR REPLACE VIEW %s AS
SELECT *
FROM (
SELECT md5(CAST(ROW(%s) AS text)) AS content_hash,
%s,
(row_number() OVER (%s %s))::bigint AS synthetic_key,
CASE
WHEN %s THEN ''ENCODING_ISSUE''::text
ELSE ''CLEAN''::text
END AS encoding_status
FROM %s t
) sub
WHERE synthetic_key = 1',
vtw_full_name,
hash_columns_list,
columns_list,
partition_clause,
order_by_clause,
encoding_check_list,
source_full_name
);
ELSE
create_vtw_sql := format('
CREATE OR REPLACE VIEW %s AS
SELECT md5(CAST(ROW(%s) AS text)) AS content_hash,
%s,
(row_number() OVER (%s %s))::bigint AS synthetic_key,
CASE
WHEN %s THEN ''ENCODING_ISSUE''::text
ELSE ''CLEAN''::text
END AS encoding_status
FROM %s t',
vtw_full_name,
hash_columns_list,
columns_list,
partition_clause,
order_by_clause,
encoding_check_list,
source_full_name
);
END IF;
EXECUTE create_vtw_sql;
notes := array_append(notes, format('Created view %s', vtw_full_name));
-- Step 8: Create the matc_ materialized view as a direct copy of vtw_
create_matview_sql := format('
CREATE MATERIALIZED VIEW IF NOT EXISTS %s AS
SELECT *
FROM %s',
matview_full_name,
vtw_full_name
);
EXECUTE create_matview_sql;
notes := array_append(notes, format('Created materialized view %s', matview_full_name));
-- Step 9: Add indexes on matc_
-- Index on encoding_status
create_index_sql := format('
CREATE INDEX IF NOT EXISTS %I ON %s (encoding_status)',
target_matview || '_encoding_status_idx',
matview_full_name
);
EXECUTE create_index_sql;
notes := array_append(notes, format('Created index %s on encoding_status', target_matview || '_encoding_status_idx'));
-- Index on content_hash
create_index_sql := format('
CREATE INDEX IF NOT EXISTS %I ON %s (content_hash)',
target_matview || '_content_hash_idx',
matview_full_name
);
EXECUTE create_index_sql;
notes := array_append(notes, format('Created index %s on content_hash', target_matview || '_content_hash_idx'));
-- Unique index on (synthetic_key, partition_columns)
IF array_length(partition_columns, 1) > 0 THEN
create_index_sql := format('
CREATE UNIQUE INDEX IF NOT EXISTS %I ON %s (synthetic_key, %s)',
target_matview || '_unique_key_idx',
matview_full_name,
array_to_string(partition_columns, ', ')
);
BEGIN
EXECUTE create_index_sql;
notes := array_append(notes, format('Created unique index %s on (synthetic_key, %s)', target_matview || '_unique_key_idx', array_to_string(partition_columns, ', ')));
EXCEPTION WHEN unique_violation THEN
notes := array_append(notes, format('Unexpected failure to create unique index %s on (synthetic_key, %s) due to duplicate values in %s. This should not happen due to synthetic_key generation. Check the synthetic_key logic in %s and look for duplicates using: SELECT synthetic_key, %s, count(*) FROM %s GROUP BY synthetic_key, %s HAVING count(*) > 1;',
target_matview || '_unique_key_idx',
array_to_string(partition_columns, ', '),
matview_full_name,
vtw_full_name,
array_to_string(partition_columns, ', '),
matview_full_name,
array_to_string(partition_columns, ', ')));
END;
END IF;
-- Step 10: Create the vm_ view, excluding content_hash, synthetic_key, and encoding_status, with WHERE encoding_status = 'CLEAN'
create_vm_sql := format('
CREATE OR REPLACE VIEW %s AS
SELECT %s
FROM %s
WHERE encoding_status = ''CLEAN''',
vm_full_name,
vm_columns_list,
matview_full_name
);
EXECUTE create_vm_sql;
notes := array_append(notes, format('Created view %s', vm_full_name));
-- Step 11: Create the vprob_ view, excluding content_hash, synthetic_key, and encoding_status, with WHERE encoding_status != 'CLEAN'
create_vprob_sql := format('
CREATE OR REPLACE VIEW %s AS
SELECT %s
FROM %s
WHERE encoding_status != ''CLEAN''',
vprob_full_name,
vm_columns_list,
matview_full_name
);
EXECUTE create_vprob_sql;
notes := array_append(notes, format('Created view %s', vprob_full_name));
RETURN jsonb_build_object('notes', notes);
EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object('error', SQLERRM, 'notes', notes);
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_create_temp_table(source_schema text, source_table text) RETURNS text
LANGUAGE plpgsql
AS $$
DECLARE
temp_table_name text := 'temp_' || source_table || '_' || to_char(current_timestamp, 'YYYYMMDDHH24MISS');
column_defs text;
BEGIN
-- Step 1: Generate column definitions from source table
SELECT string_agg(
format('%I %s', column_name, data_type),
', '
) INTO column_defs
FROM information_schema.columns
WHERE table_schema = source_schema
AND table_name = source_table
AND column_name IS NOT NULL
AND TRIM(column_name) != '';
-- Step 2: Create temp table with column definitions
EXECUTE format('CREATE TEMP TABLE %I (%s)', temp_table_name, column_defs);
-- Step 3: Insert data from source table
EXECUTE format('INSERT INTO %I SELECT * FROM %I.%I', temp_table_name, source_schema, source_table);
RETURN temp_table_name;
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_generate_column_lists(source_schema text, source_table text) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
columns_list text;
non_char_columns text;
case_conditions text;
BEGIN
-- Character-type columns (cleansed)
SELECT string_agg(
'regexp_replace(' || quote_ident(column_name) || ', ''[^\x00-\x7F]'', ''PROBLEM'', ''g'') as ' || quote_ident(column_name),
', '
)
INTO columns_list
FROM information_schema.columns
WHERE table_schema = source_schema
AND table_name = source_table
AND data_type IN ('character varying', 'character', 'text', 'varchar', 'char');
-- Non-character-type columns
SELECT string_agg(quote_ident(column_name), ', ')
INTO non_char_columns
FROM information_schema.columns
WHERE table_schema = source_schema
AND table_name = source_table
AND data_type NOT IN ('character varying', 'character', 'text', 'varchar', 'char');
-- CASE conditions for encoding status
SELECT string_agg(quote_ident(column_name) || ' ~ ''[^\x00-\x7F]''', ' OR ')
INTO case_conditions
FROM information_schema.columns
WHERE table_schema = source_schema
AND table_name = source_table
AND data_type IN ('character varying', 'character', 'text', 'varchar', 'char');
RETURN jsonb_build_object(
'columns_list', COALESCE(columns_list, ''),
'non_char_columns', COALESCE(non_char_columns, ''),
'case_conditions', COALESCE(case_conditions, 'FALSE')
);
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE 'Error generating column lists: %', SQLERRM;
RETURN jsonb_build_object(
'error', format('Failed to generate column lists: %s', SQLERRM)
);
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_generate_synthetic_key_and_hash(partition_columns text[], order_by_columns text[], exclude_hash_columns text[], all_columns text[]) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
partition_sql text;
order_by_clause text;
synthetic_key_sql text;
content_hash_sql text;
hashable_columns text[];
datetime_format CONSTANT text := 'YYYY-MM-DD HH24:MI:SS'; -- Hardcoded
BEGIN
-- Partition clause
SELECT string_agg(quote_ident(unnest), ', ')
INTO partition_sql
FROM unnest(partition_columns);
partition_sql := format('PARTITION BY %s', COALESCE(partition_sql, '1'));
-- Order-by clause
SELECT string_agg(
format('TO_TIMESTAMP(SUBSTRING(NULLIF(%I, ''''), 1, 19), %L) DESC NULLS LAST', unnest, datetime_format),
', '
)
INTO order_by_clause
FROM unnest(order_by_columns);
synthetic_key_sql := format(
'ROW_NUMBER() OVER (%s ORDER BY %s) AS synthetic_key',
partition_sql, COALESCE(order_by_clause, '1')
);
-- Content hash
hashable_columns := array(
SELECT unnest(all_columns)
EXCEPT
SELECT unnest(exclude_hash_columns)
);
SELECT string_agg(quote_ident(unnest), ', ')
INTO content_hash_sql
FROM unnest(hashable_columns);
content_hash_sql := format('md5(CAST(row_to_json(ROW(%s)) AS text)) AS content_hash', COALESCE(content_hash_sql, '1'));
RETURN jsonb_build_object(
'synthetic_key_sql', synthetic_key_sql,
'content_hash_sql', content_hash_sql
);
EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object(
'error', format('Failed to generate synthetic key and hash: %s', SQLERRM)
);
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_identify_order_by_candidates(temp_table_name text, column_stats jsonb) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
order_by_candidates jsonb := '{}';
col_name text;
column_type text;
null_ratio numeric;
notes text[] := '{}';
BEGIN
-- Loop through columns to identify order-by candidates
FOR col_name, column_type IN
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema LIKE 'pg_temp%'
AND table_name = temp_table_name
AND column_name IS NOT NULL
AND TRIM(column_name) != ''
LOOP
-- Get null ratio from column_stats
null_ratio := (column_stats->col_name->>'null_ratio')::numeric;
-- Skip columns with high null ratio
IF null_ratio > 0.5 THEN
notes := array_append(notes, format('Skipped %I as order-by candidate due to high null ratio: %s', col_name, null_ratio));
CONTINUE;
END IF;
-- Check for timestamp or text columns
IF column_type IN ('timestamp', 'timestamp with time zone', 'timestamp without time zone', 'text') THEN
IF column_type = 'text' THEN
BEGIN
EXECUTE format('SELECT TO_TIMESTAMP(SUBSTRING(NULLIF(%I, ''''), 1, 19), %L) FROM %I LIMIT 1',
col_name, 'YYYY-MM-DD HH24:MI:SS', temp_table_name);
order_by_candidates := order_by_candidates || jsonb_build_object(
col_name, jsonb_build_object(
'fitness_score', (1 - null_ratio) * 100,
'note', 'Text column parseable as timestamp'
)
);
EXCEPTION WHEN OTHERS THEN
notes := array_append(notes, format('%I is text but not parseable as timestamp: %s', col_name, SQLERRM));
END;
ELSE
order_by_candidates := order_by_candidates || jsonb_build_object(
col_name, jsonb_build_object(
'fitness_score', (1 - null_ratio) * 100,
'note', 'Native timestamp column'
)
);
END IF;
END IF;
END LOOP;
RETURN order_by_candidates;
EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object(
'error', format('Failed to identify order-by candidates: %s', SQLERRM),
'notes', notes
);
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_perform_matv_action(full_matview_name text, schema_name text, matview_name text, action text, mismatched_records bigint, total_matview_records bigint, time_diff interval, mismatch_threshold numeric, time_threshold interval, encoding_issues bigint) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
action_performed boolean := false;
action_result text;
has_unique_index boolean;
index_rec record;
constraint_rec record;
BEGIN
-- Check if the materialized view has a unique index
SELECT EXISTS (
SELECT 1
FROM pg_index i
JOIN pg_class c ON c.oid = i.indrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = schema_name
AND c.relname = matview_name
AND i.indisunique = true
) INTO has_unique_index;
IF action = 'refresh' AND (
(mismatched_records::NUMERIC / NULLIF(total_matview_records, 0)::NUMERIC) * 100 > mismatch_threshold
OR time_diff >= time_threshold
) THEN
IF has_unique_index THEN
EXECUTE format('REFRESH MATERIALIZED VIEW CONCURRENTLY %s', full_matview_name);
action_result := 'Refreshed successfully (concurrently)';
ELSE
EXECUTE format('REFRESH MATERIALIZED VIEW %s', full_matview_name);
action_result := 'Refreshed successfully (non-concurrently: no unique index found)';
RAISE NOTICE 'No unique index found for %, using non-concurrent refresh', full_matview_name;
END IF;
action_performed := true;
ELSIF action = 'repair' AND encoding_issues > 0 THEN
-- Drop existing indexes
FOR index_rec IN (
SELECT indexname
FROM pg_indexes
WHERE schemaname = schema_name AND tablename = matview_name
AND indexname NOT LIKE '%_pkey'
) LOOP
EXECUTE format('DROP INDEX IF EXISTS %I.%I', schema_name, index_rec.indexname);
END LOOP;
-- Drop primary key or unique constraints
FOR constraint_rec IN (
SELECT conname
FROM pg_constraint
WHERE conrelid = (SELECT oid FROM pg_class WHERE relname = matview_name AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = schema_name))
AND contype IN ('p', 'u')
) LOOP
EXECUTE format('ALTER TABLE %I.%I DROP CONSTRAINT %I', schema_name, matview_name, constraint_rec.conname);
END LOOP;
-- Recreate standard indexes
IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_schema = schema_name AND table_name = matview_name AND column_name = 'content_hash') THEN
EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON %I.%I (content_hash)', 'idx_' || matview_name || '_content_hash', schema_name, matview_name);
END IF;
IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_schema = schema_name AND table_name = matview_name AND column_name = 'synthetic_key') THEN
EXECUTE format('CREATE UNIQUE INDEX IF NOT EXISTS %I ON %I.%I (synthetic_key)', 'idx_' || matview_name || '_synthetic_key', schema_name, matview_name);
END IF;
-- Analyze the table
EXECUTE format('ANALYZE %I.%I', schema_name, matview_name);
action_result := 'Repaired successfully: indexes and keys rebuilt';
action_performed := true;
ELSIF action = 'reindex' THEN
EXECUTE format('REINDEX TABLE %s', full_matview_name);
action_result := 'Reindexed successfully';
action_performed := true;
ELSE
action_result := 'Action skipped: threshold not met or invalid action';
END IF;
RETURN jsonb_build_object(
'action_performed', action_performed,
'action_result', action_result
);
EXCEPTION WHEN OTHERS THEN
action_result := format('Action failed: %s', SQLERRM);
RAISE NOTICE 'Action exception: %', action_result;
RETURN jsonb_build_object(
'action_performed', false,
'action_result', action_result
);
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_set_validation_params(validation_type text) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
params jsonb;
mismatch_threshold numeric;
time_threshold interval;
BEGIN
-- Set validation parameters
params := CASE validation_type
WHEN 'quick' THEN '{"sample_percent": 0.1, "confidence": 0.95, "margin": 0.03}'::jsonb
WHEN 'daily' THEN '{"sample_percent": 1.0, "confidence": 0.99, "margin": 0.01}'::jsonb
WHEN 'full' THEN '{"sample_percent": 100.0, "confidence": 0.99, "margin": 0.005}'::jsonb
ELSE '{"sample_percent": 0.1, "confidence": 0.95, "margin": 0.03}'::jsonb
END;
-- Set dynamic thresholds
mismatch_threshold := CASE validation_type
WHEN 'quick' THEN 0.1 -- 0.1% mismatch for quick
WHEN 'daily' THEN 0.05 -- 0.05% mismatch for daily
WHEN 'full' THEN 0.01 -- 0.01% mismatch for full
ELSE 0.1
END;
time_threshold := CASE validation_type
WHEN 'quick' THEN '3 days'::interval -- 3 days for quick
WHEN 'daily' THEN '1 day'::interval -- 1 day for daily
WHEN 'full' THEN '12 hours'::interval -- 12 hours for full
ELSE '3 days'::interval
END;
RETURN jsonb_build_object(
'params', params,
'mismatch_threshold', mismatch_threshold,
'time_threshold', time_threshold
);
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_validate_matv_inputs(schema_name text, matview_name text, vtw_name text) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
full_matview_name text;
full_vtw_name text;
notes text[] := '{}';
vtw_table_name text;
BEGIN
-- Construct full names
full_matview_name := quote_ident(schema_name) || '.' || quote_ident(matview_name);
vtw_table_name := COALESCE(vtw_name, replace(matview_name, 'matc_', 'vtw_'));
full_vtw_name := quote_ident(schema_name) || '.' || quote_ident(vtw_table_name);
-- Validate materialized view existence
IF NOT EXISTS (
SELECT 1
FROM pg_matviews
WHERE schemaname = schema_name
AND matviewname = matview_name
) THEN
RETURN jsonb_build_object(
'error', format('Materialized view %I.%I does not exist', schema_name, matview_name),
'notes', notes
);
END IF;
-- Validate source view existence
IF NOT EXISTS (
SELECT 1
FROM pg_tables
WHERE schemaname = schema_name
AND tablename = vtw_table_name
) AND NOT EXISTS (
SELECT 1
FROM pg_views
WHERE schemaname = schema_name
AND viewname = vtw_table_name
) THEN
RETURN jsonb_build_object(
'error', format('Source view %I.%I does not exist', schema_name, vtw_table_name),
'notes', notes
);
END IF;
RETURN jsonb_build_object(
'full_matview_name', full_matview_name,
'full_vtw_name', full_vtw_name,
'notes', notes
);
EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object(
'error', format('Error validating inputs: %s', SQLERRM),
'notes', notes
);
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_validate_order_by_columns(source_schema text, source_table text, order_by_columns text[]) RETURNS text[]
LANGUAGE plpgsql
AS $$
DECLARE
notes text[] := '{}';
col_name text;
datetime_format CONSTANT text := 'YYYY-MM-DD HH24:MI:SS'; -- Hardcoded
BEGIN
FOREACH col_name IN ARRAY order_by_columns LOOP
IF NOT EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = source_schema
AND table_name = source_table
AND column_name = col_name
) THEN
notes := array_append(notes, format('Warning: %I not found in %I.%I', col_name, source_schema, source_table));
ELSE
BEGIN
EXECUTE format('SELECT TO_TIMESTAMP(SUBSTRING(NULLIF(%I, ''''), 1, 19), %L) FROM %I.%I LIMIT 1',
col_name, datetime_format, source_schema, source_table);
EXCEPTION WHEN OTHERS THEN
notes := array_append(notes, format('Warning: %I contains unparseable timestamp data: %s', col_name, SQLERRM));
END;
END IF;
END LOOP;
RETURN notes;
EXCEPTION WHEN OTHERS THEN
RETURN array_append(notes, format('Error validating order-by columns: %s', SQLERRM));
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_analyze_column_stats(temp_table_name text, col_name text, column_type text, sample_size bigint, total_rows bigint, exclude_key_columns text[]) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
null_count bigint;
distinct_count bigint;
encoding_issue_count bigint;
uniqueness_ratio numeric;
null_ratio numeric;
encoding_issue_ratio numeric;
fitness_score numeric;
adjusted_sample_size bigint;
sampling_percentage numeric;
BEGIN
-- Adjust sample size to not exceed total rows
adjusted_sample_size := LEAST(sample_size, total_rows);
sampling_percentage := (adjusted_sample_size::float / total_rows * 100);
-- Null count
EXECUTE format('SELECT COUNT(*) FROM (SELECT %I FROM %I TABLESAMPLE SYSTEM (%s) LIMIT %s) t WHERE %I IS NULL',
col_name, temp_table_name, sampling_percentage::text, adjusted_sample_size, col_name)
INTO null_count;
null_ratio := null_count::float / adjusted_sample_size;
-- Distinct count (skip for excluded columns)
IF NOT (col_name = ANY(exclude_key_columns)) THEN
EXECUTE format('SELECT COUNT(DISTINCT %I) FROM (SELECT %I FROM %I TABLESAMPLE SYSTEM (%s) LIMIT %s) t',
col_name, col_name, temp_table_name, sampling_percentage::text, adjusted_sample_size)
INTO distinct_count;
uniqueness_ratio := distinct_count::float / adjusted_sample_size;
ELSE
uniqueness_ratio := 0;
END IF;
-- Encoding issues (for text-like columns)
IF column_type IN ('character varying', 'character', 'text', 'varchar', 'char') THEN
EXECUTE format('SELECT COUNT(*) FROM (SELECT %I FROM %I TABLESAMPLE SYSTEM (%s) LIMIT %s) t WHERE %I ~ ''[^\x00-\x7F]''',
col_name, temp_table_name, sampling_percentage::text, adjusted_sample_size, col_name)
INTO encoding_issue_count;
encoding_issue_ratio := encoding_issue_count::float / adjusted_sample_size;
ELSE
encoding_issue_ratio := 0;
END IF;
-- Fitness score for key fitness (if not excluded)
IF NOT (col_name = ANY(exclude_key_columns)) THEN
fitness_score := (uniqueness_ratio * 40) +
((1 - null_ratio) * 30) +
((1 - encoding_issue_ratio) * 20) +
(CASE
WHEN column_type IN ('character varying', 'character', 'text', 'varchar', 'char', 'integer', 'bigint') THEN 10
WHEN column_type IN ('timestamp', 'timestamp with time zone', 'timestamp without time zone') THEN 8
ELSE 5
END);
ELSE
fitness_score := 0;
END IF;
-- Return stats as JSONB
RETURN jsonb_build_object(
'data_type', column_type,
'uniqueness_ratio', uniqueness_ratio,
'distinct_count', distinct_count,
'null_ratio', null_ratio,
'null_count', null_count,
'encoding_issue_ratio', encoding_issue_ratio,
'encoding_issue_count', encoding_issue_count,
'fitness_score', fitness_score,
'excluded_from_key_fitness', (col_name = ANY(exclude_key_columns))
);
EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object(
'error', format('Failed to analyze column %I: %s', col_name, SQLERRM),
'data_type', column_type
);
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_estimate_matv_refresh_time(full_matview_name text) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
v_refresh_count bigint; -- Renamed to avoid conflict
v_refresh_total interval; -- Renamed for consistency
estimated_refresh_time interval;
BEGIN
-- Estimate refresh time
SELECT s.refresh_count, s.refresh_mv_time_total
INTO v_refresh_count, v_refresh_total
FROM public.c77_dbh_matv_stats s
WHERE s.mv_name = full_matview_name
LIMIT 1;
IF COALESCE(v_refresh_count, 0) > 0 THEN
estimated_refresh_time := v_refresh_total / v_refresh_count::numeric;
ELSE
estimated_refresh_time := '00:00:00'::interval;
END IF;
-- Return raw values for debugging
RETURN jsonb_build_object(
'estimated_refresh_time', estimated_refresh_time,
'refresh_count', v_refresh_count,
'refresh_total', v_refresh_total
);
EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object(
'estimated_refresh_time', '00:00:00'::interval,
'refresh_count', NULL,
'refresh_total', NULL,
'error', SQLERRM
);
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_check_matv_mismatches(target_schema text, matview_name text, validation_type text DEFAULT 'quick'::text) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
vtw_name text := 'vtw_' || substring(matview_name from 6); -- Replace 'matc_' with 'vtw_'
matview_full_name text := quote_ident(target_schema) || '.' || quote_ident(matview_name);
vtw_full_name text := quote_ident(target_schema) || '.' || quote_ident(vtw_name);
mismatch_count bigint;
mismatch_sql text;
content_hash_exists boolean := true;
total_matview_records bigint;
params jsonb;
sample_size bigint;
notes text[] := '{}';
BEGIN
-- Define validation parameters
params := CASE validation_type
WHEN 'quick' THEN '{"sample_percent": 0.1, "confidence": 0.95, "margin": 0.03}'::jsonb
WHEN 'daily' THEN '{"sample_percent": 1.0, "confidence": 0.99, "margin": 0.01}'::jsonb
WHEN 'full' THEN '{"sample_percent": 100.0, "confidence": 0.99, "margin": 0.005}'::jsonb
ELSE '{"sample_percent": 0.1, "confidence": 0.95, "margin": 0.03}'::jsonb
END;
-- Calculate sample size
EXECUTE format('SELECT COUNT(*) FROM %s', matview_full_name) INTO total_matview_records;
sample_size := GREATEST(100, CEIL((jsonb_extract_path_text(params, 'sample_percent')::NUMERIC / 100) * total_matview_records));
notes := array_append(notes, format('Total matview records: %s, Sample size: %s', total_matview_records, sample_size));
-- Attempt to query content_hash to check if it exists in both relations
BEGIN
EXECUTE format('SELECT 1 FROM %s WHERE content_hash IS NOT NULL LIMIT 1', vtw_full_name);
EXECUTE format('SELECT 1 FROM %s WHERE content_hash IS NOT NULL LIMIT 1', matview_full_name);
EXCEPTION WHEN undefined_column THEN
content_hash_exists := false;
END;
-- If content_hash is not found in either, return early
IF NOT content_hash_exists THEN
RAISE NOTICE 'content_hash column not found in either %.% or %.%, skipping mismatch check',
target_schema, matview_name, target_schema, vtw_name;
RETURN jsonb_build_object(
'mismatched_records', 0,
'mismatch_percent', 0.0,
'notes', notes
);
END IF;
-- Construct the mismatch check query
IF jsonb_extract_path_text(params, 'sample_percent')::NUMERIC < 100.0 THEN
-- Use sampling for quick and daily
mismatch_sql := format('
WITH matview_sample AS (
SELECT content_hash
FROM %s
ORDER BY random()
LIMIT %s
),
vtw_sample AS (
SELECT content_hash
FROM %s
ORDER BY random()
LIMIT %s
)
SELECT COUNT(*)
FROM (
SELECT content_hash FROM matview_sample
EXCEPT
SELECT content_hash FROM vtw_sample
) mismatches',
matview_full_name,
sample_size,
vtw_full_name,
sample_size
);
ELSE
-- Full comparison for 'full' validation
mismatch_sql := format('
SELECT COUNT(*)
FROM (
SELECT content_hash FROM %s
EXCEPT
SELECT content_hash FROM %s
) mismatches',
vtw_full_name,
matview_full_name
);
END IF;
EXECUTE mismatch_sql INTO mismatch_count;
notes := array_append(notes, format('Mismatch count: %s', mismatch_count));
RETURN jsonb_build_object(
'mismatched_records', mismatch_count,
'mismatch_percent', (mismatch_count::float / GREATEST(sample_size, 1)) * 100,
'notes', notes
);
EXCEPTION WHEN OTHERS THEN
notes := array_append(notes, format('Error in public.c77_mvc_check_matv_mismatches: %s', SQLERRM));
RETURN jsonb_build_object(
'mismatched_records', -1,
'mismatch_percent', -1.0,
'notes', notes
);
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_analyze_table_fitness(source_schema text, source_table text, exclude_key_columns text[] DEFAULT ARRAY[]::text[]) RETURNS json
LANGUAGE plpgsql
AS $$
DECLARE
total_rows bigint;
sample_size bigint;
temp_table_name text;
notes text[] := '{}';
result_json jsonb;
column_stats jsonb := '{}';
order_by_candidates jsonb := '{}';
column_combinations jsonb := '{}';
data_quality_index numeric;
col_name text;
column_type text;
table_exists boolean;
confidence_level CONSTANT numeric := 0.99; -- Hardcoded 99% confidence
margin_of_error CONSTANT numeric := 0.03; -- Hardcoded 3% margin of error
BEGIN
notes := array_append(notes, format('Analysis started at %s', clock_timestamp()));
-- Step 1: Validate schema and table existence
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = source_schema
AND table_name = source_table
) INTO table_exists;
IF NOT table_exists THEN
result_json := jsonb_build_object(
'error', format('Table %I.%I does not exist', source_schema, source_table),
'notes', notes
);
RETURN result_json::json;
END IF;
-- Step 2: Get total rows
BEGIN
EXECUTE format('SELECT COUNT(*) FROM %I.%I', source_schema, source_table) INTO total_rows;
notes := array_append(notes, format('Total rows in %I.%I: %s', source_schema, source_table, total_rows));
EXCEPTION WHEN OTHERS THEN
result_json := jsonb_build_object(
'error', format('Failed to count rows in %I.%I: %s', source_schema, source_table, SQLERRM),
'notes', notes
);
RETURN result_json::json;
END;
-- Step 3: Calculate sample size with hardcoded confidence level and margin of error
sample_size := public.c77_mvc_calculate_sample_size(total_rows, confidence_level, margin_of_error);
notes := array_append(notes, format('Sample size calculated: %s for %s rows (Confidence: %s%%, Margin of Error: ±%s%%)',
sample_size, total_rows, confidence_level * 100, margin_of_error * 100));
-- Step 4: Create temp table
BEGIN
temp_table_name := public.c77_mvc_create_temp_table(source_schema, source_table);
notes := array_append(notes, format('Created temporary table %s for analysis', temp_table_name));
EXCEPTION WHEN OTHERS THEN
result_json := jsonb_build_object(
'error', format('Failed to create temp table for %I.%I: %s', source_schema, source_table, SQLERRM),
'notes', notes
);
RETURN result_json::json;
END;
-- Step 5: Analyze individual columns
FOR col_name, column_type IN
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema LIKE 'pg_temp%'
AND table_name = temp_table_name
AND column_name IS NOT NULL
AND TRIM(column_name) != ''
LOOP
column_stats := column_stats || jsonb_build_object(
col_name, public.c77_mvc_analyze_column_stats(temp_table_name, col_name, column_type, sample_size, total_rows, exclude_key_columns)
);
END LOOP;
notes := array_append(notes, 'Completed analysis of individual columns');
-- Step 6: Identify order-by candidates
order_by_candidates := public.c77_mvc_identify_order_by_candidates(temp_table_name, column_stats);
notes := array_append(notes, 'Completed identification of order-by candidates');
-- Step 7: Analyze column combinations
column_combinations := public.c77_mvc_analyze_column_combinations(temp_table_name, column_stats, sample_size, total_rows, exclude_key_columns);
notes := array_append(notes, 'Completed analysis of column combinations');
-- Step 8: Calculate Data Quality Index (DQI)
data_quality_index := public.c77_mvc_calculate_dqi(column_stats);
notes := array_append(notes, format('Data Quality Index (DQI): %s', ROUND(data_quality_index, 2)));
-- Step 9: Assemble final result and clean up
result_json := public.c77_mvc_assemble_result(
source_schema, source_table, column_stats, column_combinations,
order_by_candidates, data_quality_index, notes, temp_table_name
);
RETURN result_json::json;
EXCEPTION WHEN OTHERS THEN
result_json := jsonb_build_object(
'error', format('Unexpected error in public.c77_mvc_analyze_table_fitness: %s', SQLERRM),
'notes', notes
);
RETURN result_json::json;
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_manage_matv_health(target_schema text, matview_name text, validation_type text DEFAULT 'quick'::text, action text DEFAULT NULL::text) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
vtw_name text := 'vtw_' || substring(matview_name from 6); -- Replace 'matc_' with 'vtw_'
matview_full_name text := quote_ident(target_schema) || '.' || quote_ident(matview_name);
vtw_full_name text := quote_ident(target_schema) || '.' || quote_ident(vtw_name);
total_vtw_records bigint;
total_matview_records bigint;
encoding_issues bigint;
mismatch_result jsonb;
mismatch_count bigint;
mismatch_percent float;
status text;
character_encoding_status text;
last_refresh timestamp with time zone;
staleness_threshold interval;
mismatch_threshold float;
is_stale_by_time boolean := false;
notes text[] := '{}';
start_time timestamp with time zone := now();
end_time timestamp with time zone;
elapsed_time interval;
refresh_estimate jsonb;
estimated_refresh_time interval;
stats_found boolean := false;
BEGIN
-- Log the start time
notes := array_append(notes, format('Process started at %s', start_time));
-- Step 1: Check if the materialized view exists
BEGIN
EXECUTE format('SELECT COUNT(*) FROM %s', matview_full_name) INTO total_matview_records;
EXCEPTION WHEN undefined_table THEN
status := 'Non Existent';
end_time := now();
elapsed_time := end_time - start_time;
notes := array_append(notes, format('Elapsed time: %s', elapsed_time));
RETURN jsonb_build_object(
'total_vtw_records', 0,
'total_matview_records', 0,
'encoding_issues', 0,
'mismatched_records', 0,
'mismatch_percent', 0.0,
'status', status,
'character_encoding_status', 'CLEAN',
'estimated_refresh_time', interval '0 seconds',
'notes', notes
);
END;
-- Step 2: Check if the materialized view has data
IF total_matview_records = 0 THEN
status := 'Uninitialized';
end_time := now();
elapsed_time := end_time - start_time;
notes := array_append(notes, format('Elapsed time: %s', elapsed_time));
RETURN jsonb_build_object(
'total_vtw_records', 0,
'total_matview_records', 0,
'encoding_issues', 0,
'mismatched_records', 0,
'mismatch_percent', 0.0,
'status', status,
'character_encoding_status', 'CLEAN',
'estimated_refresh_time', interval '0 seconds',
'notes', notes
);
END IF;
-- Step 3: Get the total records in the source view
EXECUTE format('SELECT COUNT(*) FROM %s', vtw_full_name) INTO total_vtw_records;
-- Step 4: Check for encoding issues
BEGIN
EXECUTE format('SELECT COUNT(*) FROM %s WHERE encoding_status = ''ENCODING_ISSUE''', matview_full_name)
INTO encoding_issues;
EXCEPTION WHEN undefined_column THEN
encoding_issues := 0; -- If encoding_status column doesn't exist, assume no issues
END;
-- Set character_encoding_status
IF encoding_issues > 0 THEN
character_encoding_status := 'DEGRADED';
ELSE
character_encoding_status := 'CLEAN';
END IF;
-- Step 5: Check for time-based staleness
SELECT refresh_mv_last
INTO last_refresh
FROM public.c77_dbh_matv_stats
WHERE mv_name = target_schema || '.' || matview_name;
IF FOUND THEN
stats_found := true;
ELSE
notes := array_append(notes, format('Warning: No refresh stats found for materialized view %s in c77_dbh_matv_stats', matview_name));
END IF;
staleness_threshold := CASE validation_type
WHEN 'quick' THEN '3 days'::interval
WHEN 'daily' THEN '1 day'::interval
WHEN 'full' THEN '12 hours'::interval
ELSE '3 days'::interval
END;
notes := array_append(notes, format('Last refresh: %s, Time since last refresh: %s, Staleness threshold: %s', last_refresh, now() - last_refresh, staleness_threshold));
IF last_refresh IS NULL OR (now() - last_refresh) > staleness_threshold THEN
is_stale_by_time := true;
END IF;
notes := array_append(notes, format('Is stale by time: %s', is_stale_by_time));
-- Step 6: Set mismatch threshold based on validation_type
mismatch_threshold := CASE validation_type
WHEN 'quick' THEN 1.0 -- 1.0%
WHEN 'daily' THEN 0.5 -- 0.5%
WHEN 'full' THEN 0.2 -- 0.2%
ELSE 1.0
END;
-- Step 7: Check for mismatches
mismatch_result := public.c77_mvc_check_matv_mismatches(target_schema, matview_name, validation_type);
mismatch_count := (mismatch_result->>'mismatched_records')::bigint;
mismatch_percent := (mismatch_result->>'mismatch_percent')::float;
-- Append mismatch notes
notes := array_cat(notes, ARRAY(SELECT jsonb_array_elements_text(mismatch_result->'notes')));
-- Log mismatch details
notes := array_append(notes, format('Mismatch percent: %s, Mismatch threshold: %s', mismatch_percent, mismatch_threshold));
-- Step 8: Determine refresh status
IF is_stale_by_time OR mismatch_percent > mismatch_threshold THEN
status := 'Stale';
ELSE
status := 'Healthy';
END IF;
-- Step 9: Get estimated refresh time using the correct function
SELECT public.c77_mvc_estimate_matv_refresh_time(matview_full_name)
INTO refresh_estimate;
estimated_refresh_time := (refresh_estimate->>'estimated_refresh_time')::interval;
notes := array_append(notes, format('Refresh estimate details: %s', refresh_estimate));
-- Step 10: Perform action if specified and status is Stale
IF action IS NOT NULL AND status = 'Stale' THEN
IF action = 'refresh' THEN
-- Refresh the materialized view (using WITH DATA for PostgreSQL 12 compatibility)
EXECUTE format('REFRESH MATERIALIZED VIEW %s WITH DATA', matview_full_name);
notes := array_append(notes, 'Performed REFRESH on materialized view');
-- No need to update refresh time; handled by system triggers
ELSIF action = 'repair' THEN
-- Drop and recreate indexes
EXECUTE format('DROP INDEX IF EXISTS %s_encoding_status_idx', matview_name);
EXECUTE format('DROP INDEX IF EXISTS %s_content_hash_idx', matview_name);
EXECUTE format('DROP INDEX IF EXISTS %s_unique_key_idx', matview_name);
EXECUTE format('CREATE INDEX %s_encoding_status_idx ON %s (encoding_status)', matview_name, matview_full_name);
EXECUTE format('CREATE INDEX %s_content_hash_idx ON %s (content_hash)', matview_name, matview_full_name);
EXECUTE format('CREATE UNIQUE INDEX %s_unique_key_idx ON %s (synthetic_key, companyid, orgname_id)', matview_name, matview_full_name);
notes := array_append(notes, 'Performed REPAIR (dropped and recreated indexes) on materialized view');
ELSIF action = 'reindex' THEN
-- Reindex the materialized view
EXECUTE format('REINDEX TABLE %s', matview_full_name);
notes := array_append(notes, 'Performed REINDEX on materialized view');
ELSE
notes := array_append(notes, format('Invalid action: %s, no action performed', action));
END IF;
-- Step 11: Re-evaluate after action
EXECUTE format('SELECT COUNT(*) FROM %s', matview_full_name) INTO total_matview_records;
EXECUTE format('SELECT COUNT(*) FROM %s WHERE encoding_status = ''ENCODING_ISSUE''', matview_full_name)
INTO encoding_issues;
mismatch_result := public.c77_mvc_check_matv_mismatches(target_schema, matview_name, validation_type);
mismatch_count := (mismatch_result->>'mismatched_records')::bigint;
mismatch_percent := (mismatch_result->>'mismatch_percent')::float;
-- Append mismatch notes
notes := array_cat(notes, ARRAY(SELECT jsonb_array_elements_text(mismatch_result->'notes')));
-- Update character_encoding_status
IF encoding_issues > 0 THEN
character_encoding_status := 'DEGRADED';
ELSE
character_encoding_status := 'CLEAN';
END IF;
-- Update status (time-based staleness should be resolved if action was 'refresh')
SELECT refresh_mv_last
INTO last_refresh
FROM public.c77_dbh_matv_stats
WHERE mv_name = target_schema || '.' || matview_name;
IF NOT FOUND THEN
notes := array_append(notes, format('Warning: No refresh stats found for materialized view %s in c77_dbh_matv_stats after action', matview_name));
END IF;
notes := array_append(notes, format('After action - Last refresh: %s, Time since last refresh: %s, Staleness threshold: %s', last_refresh, now() - last_refresh, staleness_threshold));
IF last_refresh IS NULL OR (now() - last_refresh) > staleness_threshold THEN
is_stale_by_time := true;
ELSE
is_stale_by_time := false;
END IF;
notes := array_append(notes, format('After action - Is stale by time: %s', is_stale_by_time));
-- Log mismatch details after action
notes := array_append(notes, format('After action - Mismatch percent: %s, Mismatch threshold: %s', mismatch_percent, mismatch_threshold));
IF is_stale_by_time OR mismatch_percent > mismatch_threshold THEN
status := 'Stale';
ELSE
status := 'Healthy';
END IF;
END IF;
-- Step 12: Calculate elapsed time and return the results
end_time := now();
elapsed_time := end_time - start_time;
notes := array_append(notes, format('Elapsed time: %s', elapsed_time));
RETURN jsonb_build_object(
'total_vtw_records', total_vtw_records,
'total_matview_records', total_matview_records,
'encoding_issues', encoding_issues,
'mismatched_records', mismatch_count,
'mismatch_percent', mismatch_percent,
'status', status,
'character_encoding_status', character_encoding_status,
'estimated_refresh_time', estimated_refresh_time,
'notes', notes
);
EXCEPTION WHEN OTHERS THEN
end_time := now();
elapsed_time := end_time - start_time;
notes := array_append(notes, format('Elapsed time: %s', elapsed_time));
notes := array_append(notes, format('Unexpected error in c77_mvc_manage_matv_health: %s', SQLERRM));
RETURN jsonb_build_object(
'error', SQLERRM,
'notes', notes
);
END;
$$;
CREATE OR REPLACE FUNCTION public.c77_mvc_assemble_matv_health_result(full_matview_name text, full_vtw_name text, stats jsonb, mismatched_records bigint, validation_type text, sample_size bigint, mismatch_threshold numeric, action_result text, exec_time timestamp with time zone) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
total_matview_records bigint := (stats->>'total_matview_records')::bigint;
clean_records bigint := (stats->>'clean_records')::bigint;
encoding_issues bigint := (stats->>'encoding_issues')::bigint;
clean_percent text;
estimated_refresh_time interval;
BEGIN
-- Calculate clean percent
clean_percent := CASE WHEN total_matview_records > 0
THEN to_char(ROUND((clean_records::NUMERIC / total_matview_records::NUMERIC) * 100, 2), 'FM9999999999999999.99') || '%'
ELSE 'N/A'
END;
-- Estimate refresh time
estimated_refresh_time := public.c77_mvc_estimate_matv_refresh_time(full_matview_name);
-- Assemble result
RETURN jsonb_build_object(
'matview', full_matview_name,
'vtw_source', full_vtw_name,
'total_matview_records', total_matview_records::text,
'total_vtw_records', (stats->>'total_vtw_records')::text,
'mismatched_records', mismatched_records::text,
'mismatch_percent', CASE WHEN total_matview_records > 0
THEN to_char(ROUND((mismatched_records::NUMERIC / total_matview_records::NUMERIC) * 100, 2), 'FM9999999999999999.99') || '%'
ELSE 'N/A'
END,
'clean_records', clean_records::text,
'encoding_issues', encoding_issues::text,
'clean_record%', clean_percent,
'last_matview_update', COALESCE((stats->>'last_matview_update')::text, 'N/A'),
'last_vtw_update', COALESCE((stats->>'last_vtw_update')::text, 'N/A'),
'size_mb', (stats->>'size_mb')::text,
'estimated_refresh_time', to_char(estimated_refresh_time, 'HH24:MI:SS.MS'),
'validation_type', validation_type,
'sample_size', sample_size::text,
'status', CASE
WHEN total_matview_records = 0 THEN 'Uninitialized'
WHEN (mismatched_records::NUMERIC / NULLIF(total_matview_records, 0)::NUMERIC) * 100 > mismatch_threshold THEN 'Stale'
WHEN encoding_issues > 0 THEN 'Degraded'
ELSE 'Healthy'
END,
'execution_time', to_char(clock_timestamp() - exec_time, 'HH24:MI:SS.MS')
) || COALESCE(jsonb_build_object('action_result', action_result), '{}');
END;
$$;