背景:
现有一个数据表task,目前已有数据位4000万左右,因为表太大,影响到数据库的操作性能,所以考虑对该task表中的数据进行重新分片。如果是在数据库设计的时候就考虑到这样的问题,可以采用postgresql的分区表,通过表继承以及创建一些trigger或者rules来实现这样的要求。但目前task表中已有大量的数据,所以直接多task表采用继承的方式来实现分片已经不太现实,另外insert到task表中时date_create已有值,但最终的表分区应该按照date_start,date_start只有在后期才会更新,在date_create时date_start为空;上层的应用已经成型,不可能更改代码。所以综合考虑下来采用传统的分区技术已经不太现实或者代价太高,无奈之下只能自己动手写一写plpgsql函数,通过crontab制定定时任务来对task表中的数据进行分块。具体代码如下: View Code
--创建相对应的表 CREATE OR REPLACE FUNCTION do_create_partition(table_name TEXT) RETURNS TEXT AS $BODY$ DECLARE BEGIN RAISE NOTICE 'Begin to create patition ...'; RAISE NOTICE 'partition table name = %', table_name; --创建分区 /* EXECUTE 'CREATE TABLE ' || table_name || '() INHERITS (task);'; */ EXECUTE 'CREATE TABLE ' || table_name || '(' || 'task_id integer NOT NULL,' || 'task_name text,' || 'task_type text,' || 'priority integer,' || 'expires integer,' || 'countdown integer,' || 'tube_name text,' || 'kwargs text,' || 'retries integer,' || 'status text,' || 'result text,' || 'date_start timestamp without time zone,' || 'date_done timestamp without time zone,' || 'traceback text,' || 'date_create timestamp without time zone,' || 'worker_id integer' || ')'; --创建主键 EXECUTE 'ALTER TABLE ' || table_name || ' ADD CONSTRAINT ' || table_name || '_task_id ' ' PRIMARY KEY(task_id);'; RAISE NOTICE '% created"', table_name; RETURN 'DONE'; END $BODY$ LANGUAGE 'plpgsql' --创建所需的分区表 CREATE OR REPLACE FUNCTION create_partition_table() RETURNS TEXT AS $BODY$ DECLARE p_table_name TEXT; now_dt TIMESTAMP; table_exist int; BEGIN RAISE NOTICE 'Begin to create partition table ...'; --INSERT INTO task_20111018 VALUES (r.*) -- construct the table name --table_name := 'task_' || to_char(r.date_start, 'YYYYMMDD'); --获取当前时间 SELECT CURRENT_TIMESTAMP INTO now_dt; p_table_name := 'task_' || to_char(now_dt, 'YYYYMMDD'); RAISE NOTICE 'partition table = %', p_table_name; --检测所需的分区表是否存在 SELECT count(*) INTO table_exist FROM pg_tables WHERE schemaname='public' and tablename=p_table_name; IF table_exist = 0 THEN --创建当前分区表 RAISE NOTICE 'CREATE current partition table...'; PERFORM do_create_partition(p_table_name); ELSE --检测创建第二天的分区表 now_dt := now_dt + interval '1 day'; p_table_name := 'task_' || to_char(now_dt, 'YYYYMMDD'); RAISE NOTICE 'partition table = %', p_table_name; --检测所需的分区表是否存在 SELECT count(*) INTO table_exist FROM pg_tables WHERE schemaname='public' and tablename=p_table_name; IF table_exist = 0 THEN --创建当前分区表 RAISE NOTICE 'CREATE next partition table...'; PERFORM do_create_partition(p_table_name); END IF; END IF; --INSERT INTO table_name VALUES (r.*); RAISE NOTICE 'DO..."'; RETURN 'DONE'; END $BODY$ LANGUAGE 'plpgsql' --将task中的已有数据根据date_start转移到相应的分区表中 CREATE OR REPLACE FUNCTION slice_data() RETURNS TEXT AS $BODY$ DECLARE r task%rowtype; table_name TEXT; table_exist int; date_format TEXT; BEGIN RAISE NOTICE 'Begin to slice data ...'; --将date_start在今天之前的相关记录进行转储 FOR r IN SELECT * FROM task WHERE status in ('SUCCESS', 'FAILURE' ) and date_start is not NULL and date_start < current_date::timestamp --limit 1000000 LOOP --INSERT INTO task_20111018 VALUES (r.*) -- construct the table name table_name := 'task_' || to_char(r.date_start, 'YYYYMMDD'); --table_name = 'task_20111018'; RAISE NOTICE 'table_name = %', table_name; --INSERT INTO table_name VALUES (r.*); --检测所需的分区表是否存在 SELECT count(*) INTO table_exist FROM pg_tables WHERE schemaname='public' and tablename=table_name; IF table_exist = 0 THEN --创建所需分区 PERFORM do_create_partition(table_name); END IF; date_format := 'YYYY-MM-DD HH24:MI:SS'; RAISE NOTICE 'do insert [task_id = %] ... ', r.task_id; EXECUTE 'INSERT INTO ' || table_name || '(task_id, task_name, task_type, priority, expires, countdown, tube_name, kwargs, retries, status, result, date_start, date_done, traceback, date_create, worker_id)' || ' VALUES (' || r.task_id || ',' || COALESCE(quote_literal(r.task_name), 'DEFAULT') || ',' || COALESCE(quote_literal(r.task_type), 'DEFAULT') || ',' || r.priority || ',' || r.expires || ',' || r.countdown || ',' || COALESCE(quote_literal(r.tube_name), 'DEFAULT') || ',' || COALESCE(quote_literal(r.kwargs), 'DEFAULT') || ',' || r.retries || ',' || COALESCE(quote_literal(r.status), 'DEFAULT') || ',' || COALESCE(quote_literal(r.result), 'DEFAULT') || ',' || COALESCE(quote_literal(to_char(r.date_start, date_format)), 'DEFAULT') || ',' || COALESCE(quote_literal(to_char(r.date_done, date_format)), 'DEFAULT') || ',' || COALESCE(quote_literal(r.traceback), 'DEFAULT') || ',' || COALESCE(quote_literal(to_char(r.date_create, date_format)), 'DEFAULT') || ',' || COALESCE(r.worker_id, 0) || ')'; DELETE FROM task WHERE task_id = r.task_id; RAISE NOTICE 'delete old record'; END LOOP; RETURN 'DONE'; END $BODY$ LANGUAGE 'plpgsql'
可以在crontab中添加如下任务来实现对task已有数据的分块操作
0 2 * * * /opt/pgsql9.1/bin/psql -d taskmanager -U postgres -c "select * slice_data();"