21июн
Как транспонировать таблицу в ORACLE?
Постановка задачи: Есть некая таблица, имеющая три колонки. Необходимо для одной колонки применить некую агрегатную функцию, а другую колону вынести в шапку таблицы.
Это все действо называется еще как создание кросстаблицы (crosstable), транспонирование таблицы, опорный запрос (pivot).
Например, select pole1,pole2,count(*) from table1 group by pole1,pole2 и к тому же pole2 вынести в шапку как это делается для сводной таблицы в excel.
Если необходимо транспонировать небольшое ограниченное множество значений определенного поля, то здесь есть простое решение. Для неограниченного множества здесь сложнее – тяжко написать нужный запрос. Но к счастью, есть готовый пакет, формирующий запрос на выборку данных для crosstab. Как его использовать, можно увидеть в примере. Этот пакет был предложен Alexus12 по адресу http://www.sql.ru/forum/actualthread.aspx?tid=409886&pg=-1
Думаю, что автор за основу взял метод, предложенный Томом Кайтом и модернизировал его для многих функций агрегирования (SUM,AVG,COUNT,MIN,MAX), с учетом формата транспонируемого поля и красивым обзыванием получаемых столбцов.
--Функция а-ля PivotTable в Excel, возвращает текст запроса для перекрестной таблицы
PACKAGE PK_CROSSTAB as
type refcursor is ref cursor;
type array is table of varchar2(30);
Function PivotSQL (
--вернуть текст запроса для получения crosstab
p_query in varchar2,--
p_rowfields in varchar2,
p_columnfield in varchar2,
p_function in varchar2,
p_functionfield in varchar2 )
return varchar2;
Procedure FormatParam (var_data in varchar2, var_type in number, out_decode in out varchar2, out_col in out varchar2);
end;
PACKAGE BODY PK_CROSSTAB as
Function PivotSQL (
--вернуть текст запроса для получения crosstab
p_query in varchar2,--
p_rowfields in varchar2,
p_columnfield in varchar2,
p_function in varchar2,
p_functionfield in varchar2
) return varchar2
as
l_max_cols number;
l_query long;
type array_varchar2 is table of varchar2(255);
l_columnnames array_varchar2 :=array_varchar2();
l_cursor refcursor;
tmp long;
--dbms_sql types:
l_theCursor integer default dbms_sql.open_cursor;--get col types
l_colCnt number default 0;
l_descTbl dbms_sql.desc_tab;
col_num number;
l_columnfieldtype number;
--decode names
o_decode varchar2(50);
o_col varchar2(50);
ft utl_file.file_type;
begin
--dbms_output.enable(20000000);
--check params
ft:=utl_file.fopen('TAX_LOAD_DIR','my_file.txt','W');
IF instr(p_columnfield,',')>0 THEN
raise_application_error (-20001, 'Can use only 1 columnfield');
ELSIF upper(p_function) not in ('SUM','AVG','COUNT','MIN','MAX') THEN
raise_application_error (-20001, 'Can use only standard aggregate functions');
END IF;
/* Шаг 2: проанализировать запрос, чтобы можно было получить описание его результатов. */
dbms_sql.parse(l_theCursor, p_query, dbms_sql.native);
/* Шаг З: получаем описание результатов запроса. */
dbms_sql.describe_columns(l_theCursor, l_colCnt, l_descTbl);
/*
* Following loop could simply be for j in 1..col_cnt loop.
* Here we are simply illustrating some of the PL/SQL table
* features.
*/
col_num := l_descTbl.first;
loop
exit when (col_num is null);
--find column field type
if l_descTbl(col_num).col_name=upper(p_columnfield) then
l_columnfieldtype:=l_descTbl(col_num).col_type;
--dbms_output.put_line('Col#:'||col_num||' Name:'||l_descTbl(col_num).col_name||' Type:'||l_descTbl(col_num).col_type);
end if;
col_num := l_descTbl.next(col_num);
end loop;
--return 'test ok';
-- figure out the column names we must support for horizontal cross
if (p_columnfield is not null) then
tmp:='SELECT DISTINCT ' || p_columnfield || ' FROM (' || p_query || ') ORDER BY ' || p_columnfield;
-- dbms_output.put_line('columns cursor:'||tmp);
OPEN l_cursor for tmp;
LOOP
l_columnnames.EXTEND;
FETCH l_cursor into l_columnnames(l_columnnames.COUNT);
--dbms_output.put_line('l_columnnames:'||l_columnnames(l_columnnames.COUNT));
EXIT WHEN l_cursor%NOTFOUND;
END LOOP;
CLOSE l_cursor;
-- execute immediate 'SELECT DISTINCT ' || p_columnfield || ' FROM (' || p_query || ')' bulk collect into l_columnnames ;
else
raise_application_error (-20001, 'Cannot figure out max cols');
end if;
-- Now, construct the query that can answer the question for us...
l_query := 'SELECT ' || p_rowfields ;
for i in 1 .. l_columnnames.count-1 loop
FormatParam(l_columnnames(i),l_columnfieldtype, o_decode, o_col);--format params
l_query := l_query || ',' || p_function || '(DECODE(' || p_columnfield || ',' || o_decode || ','|| p_functionfield ||',null)) as "'|| o_col ||'" ' ; --" для строк с пробелами
end loop;
l_query := l_query || ' FROM (' || p_query || ')';
l_query := l_query || ' GROUP BY ' || p_rowfields || ' ORDER BY ' || p_rowfields;
/* Step 9: закрываем курсор, чтобы освободить ресурсы. */
dbms_sql.close_cursor(l_theCursor);
utl_file.put_line(ft,'end_end');
utl_file.fCLOSE(ft);
-- and return it
--dbms_output.put_line('l_query:'||l_query);
return l_query;
--Поскольку вполне вероятно, что
--в условии запроса есть константы, мы включаем опцию cursor_sharing перед анализом
--запроса, чтобы принудительно использовались связываемые переменные, а затем отключаем ее.
/* execute immediate 'alter session set cursor_sharing=force';
open p_cursor for l_query;
execute immediate 'alter session set cursor_sharing=exact';
*/
EXCEPTION
WHEN OTHERS THEN
/* Step 9: закрываем курсор, чтобы освободить ресурсы. */
dbms_sql.close_cursor(l_theCursor);
raise_application_error (-20001,'Ошибка в PivotSQL:' || SQLERRM);
end;
--=========================
Procedure FormatParam (var_data in varchar2, var_type in number, out_decode in out varchar2, out_col in out varchar2)
--форматировать параметр в соотв с типом для PivotSQL
--принять текст параметра и его тип
-- выдать строки для decode и имени колонки
/* типы dbms_sql.describe_columns :
DATE Type:12
Varchar2 Type:1
Number Type:2
*/
IS
ft utl_file.file_type;
BEGIN
ft:=utl_file.fopen('TAX_LOAD_DIR','my_format.txt','W');
utl_file.put_line(ft,'VAR_DATA='||var_data);
utl_file.put_line(ft,'VAR_type='||var_type);
IF var_data is null THEN--если в колонку выпал null
out_decode:='NULL';
out_col:='==NULL==';
--данный case не перепутается с текстовым значением 'NULL' столбца varchar - будет две разных колонки
ELSIF var_type = 1 THEN -- Varchar2
out_decode:=''''||var_data||'''';--add quotes
out_col:=substr(var_data,1,30);
ELSIF var_type = 2 THEN --Number
out_decode:=var_data;--do nothing
out_col:=substr(var_data,1,30);
ELSIF var_type = 12 THEN --DATE
out_decode:='to_date('''||var_data||''',''dd.mm.yyyy'')';--format as internal date
out_col:=to_char(to_date(var_data));
utl_file.put_line(ft,'out_decode='||out_decode);
utl_file.put_line(ft,'out_col='||out_col);
utl_file.fCLOSE(ft);
ELSE
out_decode:='== UNDEFINED TYPE:'||var_type;
out_col:='== UNDEFINED TYPE';
END IF;
EXCEPTION
WHEN OTHERS THEN
raise_application_error (-20001,'Ошибка в FormatParam:' || SQLERRM);
END;
end;
Пример использования:
select pk_crosstab.PivotSQL('select owner,tablespace_name,table_name from all_tables','owner','tablespace_name','count','table_name') from dual
результат этого запроса :
SELECT owner,count(DECODE(tablespace_name,'ADMIN',table_name,null)) as "ADMIN"
,count(DECODE(tablespace_name,'ANALIZ',table_name,null)) as "ANALIZ"
,count(DECODE(tablespace_name,'ETALON',table_name,null)) as "ETALON"
,count(DECODE(tablespace_name,'SYSAUX',table_name,null)) as "SYSAUX" ,count(DECODE(tablespace_name,'SYSTEM',table_name,null)) as "SYSTEM"
,count(DECODE(tablespace_name,'USERS',table_name,null)) as "USERS"
,count(DECODE(tablespace_name,NULL,table_name,null)) as "==NULL==" FROM
(select owner,tablespace_name,table_name from all_tables) GROUP BY owner ORDER BY owner
Выполнив полученный запрос, получаем транспонированную таблицу.
OWNER ADMIN ANALIZ ETALON SYSAUX SYSTEM USERS ==NULL==
ANALIZ 0 95 0 0 0 0 5
CTXSYS 0 0 0 26 0 0 11
DBSNMP 0 0 0 17 0 0 4
DBSYS 0 0 0 0 0 116 0
DICT 0 0 0 0 0 1 1
DMSYS 0 0 0 2 0 0 0
ETALON 0 0 528 0 0 0 208
MDSYS 0 0 0 37 0 0 12
OLAP 0 0 0 0 0 202 0
OLAPSYS 0 0 0 114 0 0 12
ORDSYS 0 0 0 4 0 0 0
OUTLN 0 0 0 0 3 0 0
SYS 0 0 0 187 424 0 99
SYSMAN 0 0 0 311 0 0 26
SYSTEM 0 0 0 22 88 0 32
TSMSYS 0 0 0 1 0 0 0
WMSYS 0 0 0 35 0 0 5
XDB 0 0 0 11 0 0 0
А это пакет, предложенный Томом Кайтом, который мне не нравится, но вам может быть пригодится:
set echo on
create or replace package my_pkg
as
type refcursor is ref cursor;
type array is table of varchar2(30);
procedure pivot( p_max_cols in number default NULL,
p_max_cols_query in varchar2 default NULL,
p_query in varchar2,
p_anchor in array,
p_pivot in array,
p_cursor in out refcursor );
end;
/
create or replace package body my_pkg
as
procedure pivot( p_max_cols in number default NULL,
p_max_cols_query in varchar2 default NULL,
p_query in varchar2,
p_anchor in array,
p_pivot in array,
p_cursor in out refcursor )
as
l_max_cols number;
l_query long;
l_cnames array;
begin
-- figure out the number of columns we must support
-- we either KNOW this or we have a query that can tell us
if ( p_max_cols is not null )
then
l_max_cols := p_max_cols;
elsif ( p_max_cols_query is not null )
then
execute immediate p_max_cols_query into l_max_cols;
else
raise_application_error(-20001, 'Cannot figure out max cols');
end if;
-- Now, construct the query that can answer the question for us...
-- start with the C1, C2, ... CX columns:
l_query := 'select ';
for i in 1 .. p_anchor.count
loop
l_query := l_query || p_anchor(i) || ',';
end loop;
-- Now add in the C{x+1}... CN columns to be pivoted:
-- the format is "max(decode(rn,1,C{X+1},null)) cx+1_1"
for i in 1 .. l_max_cols
loop
for j in 1 .. p_pivot.count
loop
l_query := l_query ||
'max(decode(rn,'||i||','||
p_pivot(j)||',null)) ' ||
p_pivot(j) || '_' || i || ',';
end loop;
end loop;
-- Now just add in the original query
l_query := rtrim(l_query,',')||' from ( '||p_query||') group by ';
-- and then the group by columns...
for i in 1 .. p_anchor.count
loop
l_query := l_query || p_anchor(i) || ',';
end loop;
l_query := rtrim(l_query,',');
-- and return it
execute immediate 'alter session set cursor_sharing=force';
open p_cursor for l_query;
execute immediate 'alter session set cursor_sharing=exact';
end;
end;
/
variable x refcursor
set autoprint on
begin
my_pkg.pivot
(p_max_cols_query => 'select max(count(*)) from emp
group by deptno,job',
p_query => 'select deptno, job, ename, sal,
row_number() over (partition by deptno, job
order by sal, ename)
rn from emp a',
p_anchor => my_pkg.array('DEPTNO','JOB'),
p_pivot => my_pkg.array('ENAME', 'SAL'),
p_cursor => :x );
end;
/
begin
my_pkg.pivot
( p_max_cols_query => 'select max(count(*)) from emp group by mgr',
p_query => 'select a.ename mgr, b.ename,
row_number() over ( partition by a.ename order by b.ename ) rn
from emp a, emp b
where a.empno = b.mgr',
p_anchor => my_pkg.array( 'MGR' ),
p_pivot => my_pkg.array( 'ENAME' ),
p_cursor => :x );
end;
/
begin
my_pkg.pivot
(p_max_cols => 4,
p_query => 'select job, count(*) cnt, deptno,
row_number() over (partition by job order by deptno) rn
from emp
group by job, deptno',
p_anchor => my_pkg.array('JOB'),
p_pivot => my_pkg.array('DEPTNO', 'CNT'),
p_cursor => :x );
end;
/
А это простой запрос Тома Кайта для ограниченного количества значений транспонируемого поля:
select deptno,
max(decode(seq,1,ename,null)) highest_paid,
max(decode(seq,2,ename,null)) second_highest,
max(decode(seq,3,ename,null)) third_highest
from ( SELECT deptno, ename,
row_number() OVER
(PARTITION BY deptno
ORDER BY sal desc NULLS LAST ) seq
FROM emp )
where seq <= 3
group by deptno
Но я для простых случаев пользуюсь запросом подобным следующему:
select pole1,SUM(decode(pole2,z1,1,0)) z1,SUM(decode(pole2,z2,1,0)) z2,SUM(decode(pole2,z3,1,0)) z3
from table1 group by rollup(pole1) ORDER BY 1