In the Psql console copy and paste the following to create the tables
-- this is run on the coordinator
CREATE TABLE http_request (
site_id INT,
ingest_time TIMESTAMPTZ DEFAULT now(),
url TEXT,
request_country TEXT,
ip_address TEXT,
status_code INT,
response_time_msec INT
) PARTITION BY RANGE (ingest_time);
-- Configure pgpartman to create daily partitions
SELECT partman.create_parent('public.http_request', 'ingest_time', 'native', 'daily');
UPDATE partman.part_config SET infinite_time_partitions = true;
-- Automatically create daily partitions
SELECT partman.run_maintenance(p_analyze := false);
-- Schedule automatic creation of partions on a daily basis
SELECT cron.schedule('@daily', $$SELECT partman.run_maintenance(p_analyze := false)$$);
CREATE TABLE http_request_1min (
site_id INT,
ingest_time TIMESTAMPTZ, -- which minute this row represents
request_country TEXT,
error_count INT,
success_count INT,
request_count INT,
sum_response_time_msec INT,
CHECK (request_count = error_count + success_count),
CHECK (ingest_time = date_trunc('minute', ingest_time)),
PRIMARY KEY (site_id, ingest_time,request_country)
);
CREATE TABLE latest_rollup (
minute timestamptz PRIMARY KEY,
CHECK (minute = date_trunc('minute', minute))
);
In the Cloud Shell editor copy and paste (use Contorl+V key to paste in the editor) the following to create the http_request load generator
-- loop continuously writing records every 1/4 second
DO $$
BEGIN LOOP
INSERT INTO http_request (
site_id, ingest_time, url, request_country,
ip_address, status_code, response_time_msec
) VALUES (
trunc(random()*32), clock_timestamp(),
concat('http://example.com/', md5(random()::text)),
('{China,India,USA,Indonesia}'::text[])[ceil(random()*4)],
concat(
trunc(random()*250 + 2), '.',
trunc(random()*250 + 2), '.',
trunc(random()*250 + 2), '.',
trunc(random()*250 + 2)
)::inet,
('{200,404}'::int[])[ceil(random()*2)],
5+trunc(random()*150)
);
COMMIT;
PERFORM pg_sleep(random() * 0.25);
END LOOP;
END $$;
In the Psql console copy and paste the following to see average response time for sites
SELECT
site_id,
date_trunc('minute', ingest_time) as minute,
COUNT(1) AS request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
FROM http_request
WHERE date_trunc('minute', ingest_time) > now() - '5 minutes'::interval
GROUP BY site_id, minute
ORDER BY minute ASC
LIMIT 15;
In the Psql console copy and paste the following to create the rollup_http_request function
-- initialize to a time long ago
INSERT INTO latest_rollup VALUES ('10-10-1901');
-- function to do the rollup
CREATE OR REPLACE FUNCTION rollup_http_request() RETURNS void AS $$
DECLARE
curr_rollup_time timestamptz := date_trunc('minute', now());
last_rollup_time timestamptz := minute from latest_rollup;
BEGIN
INSERT INTO http_request_1min (
site_id, ingest_time, request_country, request_count,
success_count, error_count, sum_response_time_msec
) SELECT
site_id,
date_trunc('minute', ingest_time),
request_country,
COUNT(1) as request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
SUM(response_time_msec) AS sum_response_time_msec
FROM http_request
-- roll up only data new since last_rollup_time
WHERE date_trunc('minute', ingest_time) <@
tstzrange(last_rollup_time, curr_rollup_time, '(]')
GROUP BY 1, 2,3
ON CONFLICT (site_id,ingest_time,request_country)
DO UPDATE
SET request_count = http_request_1min.request_count + excluded.request_count,
success_count = http_request_1min.success_count + excluded.success_count,
error_count = http_request_1min.error_count + excluded.error_count,
sum_response_time_msec = http_request_1min.sum_response_time_msec + excluded.sum_response_time_msec;
-- update the value in latest_rollup so that next time we run the
-- rollup it will operate on data newer than curr_rollup_time
UPDATE latest_rollup SET minute = curr_rollup_time;
END;
$$ LANGUAGE plpgsql;
In the Psql console copy and paste the following to run the query on the 1 minute aggregated table
SELECT site_id, ingest_time as minute, request_count,
success_count, error_count, sum_response_time_msec/request_count as average_response_time_msec
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - '5 minutes'::interval
LIMIT 15;
As an example, suppose the retention for the raw data is 3 days, you could DROP the partition(s) that are earlier than 3 days.
DROP TABLE http_request_pYYYY_MM_DD;
In the Psql console copy and paste the following to add it to the query of our rollup function
-- function to do the rollup
CREATE OR REPLACE FUNCTION rollup_http_request() RETURNS void AS $$
DECLARE
curr_rollup_time timestamptz := date_trunc('minute', now());
last_rollup_time timestamptz := minute from latest_rollup;
BEGIN
INSERT INTO http_request_1min (
site_id, ingest_time, request_country, request_count,
success_count, error_count, sum_response_time_msec,
distinct_ip_addresses
) SELECT
site_id,
date_trunc('minute', ingest_time),
request_country,
COUNT(1) as request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
SUM(response_time_msec) AS sum_response_time_msec,
hll_add_agg(hll_hash_text(ip_address)) AS distinct_ip_addresses
FROM http_request
-- roll up only data new since last_rollup_time
WHERE date_trunc('minute', ingest_time) <@
tstzrange(last_rollup_time, curr_rollup_time, '(]')
GROUP BY 1, 2,3
ON CONFLICT (site_id,ingest_time,request_country)
DO UPDATE
SET request_count = http_request_1min.request_count + excluded.request_count,
success_count = http_request_1min.success_count + excluded.success_count,
error_count = http_request_1min.error_count + excluded.error_count,
sum_response_time_msec = http_request_1min.sum_response_time_msec + excluded.sum_response_time_msec,
distinct_ip_addresses = hll_union(http_request_1min.distinct_ip_addresses,excluded.distinct_ip_addresses);
-- update the value in latest_rollup so that next time we run the
-- rollup it will operate on data newer than curr_rollup_time
UPDATE latest_rollup SET minute = curr_rollup_time;
END;
$$ LANGUAGE plpgsql;
In the Psql console copy and paste the following to compute distinct IP counts over time
SELECT site_id, ingest_time as minute, request_count, success_count,
error_count, sum_response_time_msec/request_count as average_response_time_msec,
hll_cardinality(distinct_ip_addresses)::bigint AS distinct_ip_address_count
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - interval '5 minutes' LIMIT 15;
Topn rollup function
CREATE OR REPLACE FUNCTION rollup_http_request() RETURNS void AS $$
DECLARE
curr_rollup_time timestamptz := date_trunc('minute', now());
last_rollup_time timestamptz := minute from latest_rollup;
BEGIN
INSERT INTO http_request_1min (
site_id, ingest_time, request_country, request_count,
success_count, error_count, sum_response_time_msec,
distinct_ip_addresses,top_urls_1000
) SELECT
site_id,
date_trunc('minute', ingest_time),
request_country,
COUNT(1) as request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
SUM(response_time_msec) AS sum_response_time_msec,
hll_add_agg(hll_hash_text(ip_address)) AS distinct_ip_addresses,
topn_add_agg(url::text) AS top_urls_1000
FROM http_request
-- roll up only data new since last_rollup_time
WHERE date_trunc('minute', ingest_time) <@
tstzrange(last_rollup_time, curr_rollup_time, '(]')
GROUP BY 1, 2,3
ON CONFLICT (site_id,ingest_time,request_country)
DO UPDATE
SET request_count = http_request_1min.request_count + excluded.request_count,
success_count = http_request_1min.success_count + excluded.success_count,
error_count = http_request_1min.error_count + excluded.error_count,
sum_response_time_msec = http_request_1min.sum_response_time_msec + excluded.sum_response_time_msec,
distinct_ip_addresses = hll_union(http_request_1min.distinct_ip_addresses,excluded.distinct_ip_addresses),
top_urls_1000 = topn_union(http_request_1min.top_urls_1000, excluded.top_urls_1000);
-- update the value in latest_rollup so that next time we run the
-- rollup it will operate on data newer than curr_rollup_time
UPDATE latest_rollup SET minute = curr_rollup_time;
END;
$$ LANGUAGE plpgsql;
Dashboard query to get the top urls per minute over the last 5 minutes. If you observe we query the top_urls_1000 column using the topn() function to get only the top most url per minute.
SELECT site_id, ingest_time as minute, request_count, success_count,
error_count, sum_response_time_msec/request_count as average_response_time_msec,
hll_cardinality(distinct_ip_addresses)::bigint AS distinct_ip_address_count
,(topn(http_request_1min.top_urls_1000,1)).*
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - interval '5 minutes' LIMIT 15;
In the Psql console copy and paste the following to create a report for the top 10 urls in the last 5 minutes. If you observe the query uses topn_union_agg to aggregate the minutely aggregates over the last 5 minutes.
SELECT (topn(topn_agg,10)).item as top_urls from (
SELECT topn_union_agg(http_request_1min.top_urls_1000) topn_agg
FROM http_request_1min WHERE ingest_time > date_trunc('minute', now()) - '5 minutes'::interval) a;
real-time-analytics-hands-on-lab-hyperscale-citus's People