博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
postgresql下一种对已有数据进行重新分表的plpgsql脚本
阅读量:6610 次
发布时间:2019-06-24

本文共 5176 字,大约阅读时间需要 17 分钟。

背景:

  现有一个数据表task,目前已有数据位4000万左右,因为表太大,影响到数据库的操作性能,所以考虑对该task表中的数据进行重新分片。
如果是在数据库设计的时候就考虑到这样的问题,可以采用postgresql的分区表,通过表继承以及创建一些trigger或者rules来实现这样的要求。但目前task表中已有大量的数据,所以直接多task表采用继承的方式来实现分片已经不太现实,另外insert到task表中时date_create已有值,但最终的表分区应该按照date_start,date_start只有在后期才会更新,在date_create时date_start为空;上层的应用已经成型,不可能更改代码。所以综合考虑下来采用传统的分区技术已经不太现实或者代价太高,无奈之下只能自己动手写一写plpgsql函数,通过crontab制定定时任务来对task表中的数据进行分块。
具体代码如下:

View Code
--创建相对应的表 CREATE OR REPLACE FUNCTION do_create_partition(table_name TEXT) RETURNS TEXT AS $BODY$ DECLARE BEGIN     RAISE NOTICE 'Begin to create patition ...';     RAISE NOTICE 'partition table name = %', table_name; --创建分区     /*     EXECUTE 'CREATE TABLE '         || table_name         || '() INHERITS (task);'; */ EXECUTE  'CREATE TABLE ' || table_name || '(' || 'task_id integer NOT NULL,' || 'task_name text,' || 'task_type text,' || 'priority integer,' || 'expires integer,' || 'countdown integer,' || 'tube_name text,' || 'kwargs text,' || 'retries integer,' || 'status text,' || 'result text,' || 'date_start timestamp without time zone,' || 'date_done timestamp without time zone,' || 'traceback text,' || 'date_create timestamp without time zone,' || 'worker_id integer' || ')'; --创建主键     EXECUTE  'ALTER TABLE ' || table_name || ' ADD CONSTRAINT ' || table_name || '_task_id ' ' PRIMARY KEY(task_id);';     RAISE NOTICE '% created"', table_name; RETURN 'DONE'; END $BODY$ LANGUAGE 'plpgsql' --创建所需的分区表 CREATE OR REPLACE FUNCTION create_partition_table() RETURNS TEXT AS $BODY$ DECLARE     p_table_name TEXT;     now_dt TIMESTAMP;     table_exist int; BEGIN     RAISE NOTICE 'Begin to create partition table ...'; --INSERT INTO task_20111018 VALUES (r.*)     -- construct the table name     --table_name := 'task_' || to_char(r.date_start, 'YYYYMMDD');     --获取当前时间     SELECT CURRENT_TIMESTAMP INTO now_dt;     p_table_name := 'task_' || to_char(now_dt, 'YYYYMMDD');     RAISE NOTICE 'partition table = %', p_table_name; --检测所需的分区表是否存在     SELECT count(*) INTO table_exist FROM pg_tables WHERE schemaname='public' and  tablename=p_table_name; IF table_exist = 0 THEN --创建当前分区表         RAISE NOTICE 'CREATE current partition table...';         PERFORM do_create_partition(p_table_name); ELSE --检测创建第二天的分区表         now_dt := now_dt + interval '1 day';         p_table_name := 'task_' || to_char(now_dt, 'YYYYMMDD');         RAISE NOTICE 'partition table = %', p_table_name; --检测所需的分区表是否存在         SELECT count(*) INTO table_exist FROM pg_tables WHERE schemaname='public' and  tablename=p_table_name; IF table_exist = 0 THEN --创建当前分区表             RAISE NOTICE 'CREATE next partition table...';             PERFORM do_create_partition(p_table_name); END IF; END IF; --INSERT INTO table_name VALUES (r.*);     RAISE NOTICE 'DO..."'; RETURN 'DONE'; END $BODY$ LANGUAGE 'plpgsql' --将task中的已有数据根据date_start转移到相应的分区表中 CREATE OR REPLACE FUNCTION slice_data() RETURNS TEXT AS $BODY$ DECLARE     r task%rowtype;     table_name TEXT;     table_exist int;     date_format TEXT; BEGIN     RAISE NOTICE 'Begin to slice data ...'; --将date_start在今天之前的相关记录进行转储     FOR r IN SELECT * FROM task WHERE status in ('SUCCESS', 'FAILURE' ) and date_start is not NULL and date_start < current_date::timestamp --limit 1000000     LOOP --INSERT INTO task_20111018 VALUES (r.*)         -- construct the table name         table_name := 'task_' || to_char(r.date_start, 'YYYYMMDD'); --table_name = 'task_20111018';         RAISE NOTICE 'table_name = %', table_name; --INSERT INTO table_name VALUES (r.*);         --检测所需的分区表是否存在         SELECT count(*) INTO table_exist FROM pg_tables WHERE schemaname='public' and  tablename=table_name; IF table_exist = 0 THEN --创建所需分区             PERFORM do_create_partition(table_name); END IF;         date_format := 'YYYY-MM-DD HH24:MI:SS';         RAISE NOTICE 'do insert [task_id = %] ...  ', r.task_id; EXECUTE  'INSERT INTO ' || table_name || '(task_id, task_name, task_type, priority, expires, countdown, tube_name, kwargs, retries, status, result, date_start, date_done, traceback, date_create, worker_id)' || ' VALUES (' || r.task_id || ',' || COALESCE(quote_literal(r.task_name), 'DEFAULT') || ',' || COALESCE(quote_literal(r.task_type), 'DEFAULT') || ',' || r.priority || ',' || r.expires || ',' || r.countdown || ',' || COALESCE(quote_literal(r.tube_name), 'DEFAULT') || ',' || COALESCE(quote_literal(r.kwargs), 'DEFAULT') || ',' || r.retries || ',' || COALESCE(quote_literal(r.status), 'DEFAULT') || ',' || COALESCE(quote_literal(r.result), 'DEFAULT') || ',' || COALESCE(quote_literal(to_char(r.date_start, date_format)), 'DEFAULT') || ',' || COALESCE(quote_literal(to_char(r.date_done, date_format)), 'DEFAULT') || ',' || COALESCE(quote_literal(r.traceback), 'DEFAULT') || ',' || COALESCE(quote_literal(to_char(r.date_create, date_format)), 'DEFAULT') || ',' || COALESCE(r.worker_id, 0) || ')'; DELETE FROM task WHERE task_id = r.task_id;         RAISE NOTICE 'delete old record'; END LOOP; RETURN 'DONE'; END $BODY$ LANGUAGE 'plpgsql'

可以在crontab中添加如下任务来实现对task已有数据的分块操作

0 2 * * * /opt/pgsql9.1/bin/psql -d taskmanager -U postgres -c "select * slice_data();"

转载于:https://www.cnblogs.com/Jerryshome/archive/2012/02/15/2352734.html

你可能感兴趣的文章
Tinkphp
查看>>
How to temporally disable IDE tools (load manually)
查看>>
图片存储类型的种类、特点、区别
查看>>
temporary Object and destructor
查看>>
xcode - 移动手势
查看>>
细说浏览器特性检测(1)-jQuery1.4添加部分
查看>>
Java基础-算术运算符(Arithmetic Operators)
查看>>
C#编程(四十七)----------集合接口和类型
查看>>
【转】关于大型网站技术演进的思考(十二)--网站静态化处理—缓存(4)
查看>>
积跬步,聚小流------Bootstrap学习记录(1)
查看>>
HDUPhysical Examination(贪心)
查看>>
苹果公司的产品已用完后门与微软垄断,要检查起来,打架!
查看>>
Android官方架构组件LiveData: 观察者模式领域二三事
查看>>
你必须知道的HTTP基本概念
查看>>
使用OpenGrok搭建 可搜索可跳转的源码 阅读网站
查看>>
Android ContentProvider调用报错"Bad call:..."及相关Binder权限问题分析
查看>>
基本shell脚本的编辑及变量
查看>>
加密和解密 tar
查看>>
将datatable 保存为 Excel文件(高效率版本)
查看>>
C/C++五大内存分区(转)
查看>>