====== Oracle 12c - Datapump Export mit PL/SQL - Import/Export aus der Datenbank starten ====== DB: 11g / 12g / 18c **Aufgabe:** Ein Schema soll zwischen zwei Datenbanken regelmäßig "kopiert"/"abgeglichen" werden. **Lösung:** PL/SQL Code steuert DataPump direkt aus der Datenbank heraus, damit ist kein Zugriff auf das Betriebssystem für den User notwendig. Das Kopieren erfolgt dabei über einen Datenbank Link. Kompletter Source Code in der aktuellesten Version siehe hier => https://github.com/gpipperr/OraPowerShell/tree/master/Ora_SQLPlus_SQLcL_sql_scripts/datapump ---- ==== DataPump über eine DB Link aufrufen==== === Datapump Import über das Netz ==== Über einen DB Link aus dem Zielsystem das Quellsystem abfragen und dort die Daten einfügen: |--------| |--------| | GPITST | <==== DB Link | GPIPRD | |--------| |--------| source destination Der Export erfolgt von der "source database" in die "destination database", d.h. der DB Link wird in der "destination database" auf die "source database" angelegt!. Zu Datapump siehe auch => [[dba:datapump_import|Oracle Data Pump Schema Export und Import]] ---- ==== Umsetzung ==== === Test Schemas anlegen === **Source** DB **GPITEST** create user BESTDBA identified by "xxxxxxxx"; grant connect, resource to BESTDBA grant DATAPUMP_EXP_FULL_DATABASE to BESTDBA connect BESTDBA/"xxxxxxxx" -- test daten erzeugen SQL> create table t_all_objects as select * from all_objects; Table created. SQL> select count(*) from t_all_objects; COUNT(*) ---------- 86820 **Target/Destination** DB **GPIPROD** create user BESTDBA identified by "xxxxxxx"; grant connect, resource to BESTDBA; grant DATAPUMP_IMP_FULL_DATABASE to BESTDBA; grant create table, create procedure to bestdba; CREATE directory BACKUP AS "/opt/oracle/acfs/import"; GRANT READ,WRITE ON directory BACKUP TO BESTDBA; connect BESTDBA/"xxxxxxxx" CREATE DATABASE LINK DP_TRANSFER CONNECT TO BESTDBA IDENTIFIED BY "xxxxxxx" USING 'GPITSTDB'; SQL> select global_name from global_name@DP_TRANSFER; GLOBAL_NAME -------------------------------------------------------------------------------- GPITST Bzw. bestehenden User wie folgt anpassen! ---- === User Rechte vergeben=== Rollen für den Export/Import der Datenbank vergeben: * DATAPUMP_EXP_FULL_DATABASE * DATAPUMP_IMP_FULL_DATABASE Auf Source Database: grant DATAPUMP_EXP_FULL_DATABASE to BESTDBA; Auf Target Database: grant DATAPUMP_IMP_FULL_DATABASE to BESTDBA; grant create table, create procedure to bestdba; ---- === DB Link von der „destination/Target database“ auf die Source anlegen === Der Export erfolgt von der „source database“ in die „destination database“, d.h. der DB Link wird in der „destination database“ auf die „source database“ angelegt!. Anlegen nach folgenden Muster unter dem User der auch den Export durchführen soll: CREATE DATABASE LINK mylink CONNECT TO remote_user IDENTIFIED BY remote_pwd USING 'remote_db'; Ist das Password des Users nicht bekannt den DB Link anlegen über eine Hilfsfunktion anlegen: [[dba:create_db_link_other_schema|Einen DB Link in einem anderem Schema anlegen]] In einer Oracle Cluster Umgebung darauf achten,das der TNS alias auch in der TNSNAMES.ora des Clusters ( wie User "Grid") steht! ---- === DB Directory für das Log angelegen === Damit wir später auch das Log lesen können importieren wir das Log als external Table. Directory anlegen (im Cluster darauf achten, das es auch vom Grid User geschrieben werden kann und von beiden Seiten des Clusters erreicht wird!) : create directory import as "/opt/oracle/acfs/import"; grant read,write on directory import to bestdba; Rechte an dem Directory dem User geben! ---- ==== PL/SQL Code für den Start von DataPump -das komplette Schema importieren ==== Mit diesem Code kann dann ein ganzes Schema zu synchronisieren: CREATE OR REPLACE PROCEDURE dp_import_user_schema IS v_dp_handle NUMBER; PRAGMA AUTONOMOUS_TRANSACTION; v_db_directory varchar2(200):='BACKUP'; v_db_link varchar2(200):='DP_TRANSFER'; v_job_name varchar2(256):=user ||'_IMPORT' || TO_CHAR (SYSDATE, 'DD_HH24'); --v_log_file_name varchar2(256):=user||'_' || TO_CHAR (SYSDATE, 'YYYYMMDD-HH24MISS') || '.log'; v_log_file_name varchar2(256):='db_import_plsql.log'; BEGIN dbms_output.put_line(' -- Import Parameter ------------' ); dbms_output.put_line(' -- DB Link :: '|| v_db_link ); dbms_output.put_line(' -- DB DIRECTORY :: '|| v_db_directory); dbms_output.put_line(' -- DP JOB Name :: '|| v_job_name); dbms_output.put_line(' -- DP Log File :: '|| v_log_file_name); -- Create Data Pump Handle - "IMPORT" in this case v_dp_handle := DBMS_DATAPUMP.open (operation => 'IMPORT' , job_mode => 'SCHEMA' , job_name => v_job_name , remote_link => v_db_link); -- No PARALLEL DBMS_DATAPUMP.set_parallel (handle => v_dp_handle, degree => 1); -- consistent EXPORT -- Consistent to the start of the export with the timestamp of systimestamp -- DBMS_DATAPUMP.SET_PARAMETER( handle => v_dp_handle , name => 'FLASHBACK_TIME' , value => 'systimestamp' ); -- impprt the complete schema Filter DBMS_DATAPUMP.metadata_filter (handle => v_dp_handle , name => 'SCHEMA_EXPR' , VALUE => 'IN ('''||user||''')'); -- Logfile DBMS_DATAPUMP.add_file (handle => v_dp_handle ,filename => v_log_file_name ,filetype => DBMS_DATAPUMP.KU$_FILE_TYPE_LOG_FILE ,directory => v_db_directory ,reusefile => 1 -- overwrite existing files ,filesize => '10000M'); -- Do it! DBMS_DATAPUMP.start_job (handle => v_dp_handle); COMMIT; DBMS_DATAPUMP.detach (handle => v_dp_handle); END dp_import_user_schema; / ====Nur eine einzelne Tabelle importieren===== Code um eine Tabelle Schema zu importieren: CREATE OR REPLACE PROCEDURE dp_import_table(p_tablename varchar2 , p_mode varchar2) --- +---------------------------------- -- -- testcall exec db_import_table(p_tablename => 'T_ALL_OBJECTS', p_mode=> 'REPLACE') -- -- +---------------------------------- IS v_dp_handle NUMBER; PRAGMA AUTONOMOUS_TRANSACTION; v_db_directory varchar2(200):='BACKUP'; v_db_link varchar2(200):='DP_TRANSFER'; v_job_name varchar2(256):=user ||'_IMPORT' || TO_CHAR (SYSDATE, 'DD_HH24'); --v_log_file_name varchar2(256):=user||'_' || TO_CHAR (SYSDATE, 'YYYYMMDD-HH24MISS') || '.log'; -- use same name to import the data later via external table v_log_file_name varchar2(256):='db_import_plsql.log'; BEGIN dbms_output.put_line(' -- Import Parameter ------------' ); dbms_output.put_line(' -- Tablename :: '|| p_tablename ); dbms_output.put_line(' -- Replace Modus :: '|| p_mode); dbms_output.put_line(' -- DB Link :: '|| v_db_link ); dbms_output.put_line(' -- DB DIRECTORY :: '|| v_db_directory); dbms_output.put_line(' -- DP JOB Name :: '|| v_job_name); dbms_output.put_line(' -- DP Log File :: '|| v_log_file_name); if upper(p_mode) not in ('TRUNCATE', 'REPLACE', 'APPEND', 'SKIP') then RAISE_APPLICATION_ERROR (-20000, '-- Error :: This Tablemode is not supported ::'||p_mode); end if; -- Create Data Pump Handle - "IMPORT" in this case v_dp_handle := DBMS_DATAPUMP.open (operation => 'IMPORT' , job_mode => 'TABLE' , job_name => v_job_name , remote_link => v_db_link); -- No PARALLEL DBMS_DATAPUMP.set_parallel (handle => v_dp_handle, degree => 1); -- consistent EXPORT -- Consistent to the start of the export with the timestamp of systimestamp -- DBMS_DATAPUMP.SET_PARAMETER( handle => v_dp_handle , name => 'FLASHBACK_TIME' , value => 'systimestamp' ); -- TABLE_EXISTS_ACTION -- : TRUNCATE, REPLACE, APPEND, and SKIP. DBMS_DATAPUMP.SET_PARAMETER( handle => v_dp_handle , name => 'TABLE_EXISTS_ACTION' , value => upper(p_mode) ); --import only this table DBMS_DATAPUMP.metadata_filter ( handle => v_dp_handle , name => 'NAME_EXPR' , VALUE => 'IN ('''||p_tablename||''')'); -- impprt from this Schema DBMS_DATAPUMP.metadata_filter (handle => v_dp_handle , name => 'SCHEMA_EXPR' , VALUE => 'IN ('''||user||''')'); -- Logfile DBMS_DATAPUMP.add_file (handle => v_dp_handle ,filename => v_log_file_name ,filetype => DBMS_DATAPUMP.KU$_FILE_TYPE_LOG_FILE ,directory => v_db_directory ,reusefile => 1 -- overwrite existing files ,filesize => '10000M'); -- Do it! DBMS_DATAPUMP.start_job (handle => v_dp_handle); COMMIT; DBMS_DATAPUMP.detach (handle => v_dp_handle); END dp_import_table; / ---- ==== Probleme bei der Entwicklung ==== === Problem ORA-31626: job does not exist === Beim ersten Aufruf erhalte ich den Fehler "ORA-31626: job does not exist". Fehler: ERROR at line 1: ORA-31626: job does not exist ORA-06512: at "SYS.DBMS_SYS_ERROR", line 79 ORA-06512: at "SYS.DBMS_DATAPUMP", line 1137 ORA-06512: at "SYS.DBMS_DATAPUMP", line 5285 Rechte? Lösung: -- Target DB grant create table, create procedure to bestdba; Der User benötigt auch die direkten Grants, eine Rolle ist nicht ausreichend, ist ja PL/SQL im Hintergrund! siehe auch https://asktom.oracle.com/pls/apex/f?p=100:11:0::::P11_QUESTION_ID:9532917900346934390 === Problem ORA-31626: job does not exist === Fehler: ORA-39001: invalid argument value ORA-06512: at "SYS.DBMS_SYS_ERROR", line 79 ORA-06512: at "SYS.DBMS_DATAPUMP", line 3507 ORA-06512: at "SYS.DBMS_DATAPUMP", line 5296 Im ersten Schritt den DB Link nochmals prüfen, Name des DB Links im Script falsch vergeben 8-O. Siehe auch => Error ORA-39001 When Using DBMS_DATAPUMP API Over A Network Link (Doc ID 1160207.1) ===Problem DBMS_DATAPUMP.ATTACH ORA-31626: job does not exist=== Ein Job läßt sich nicht mehr löschen, steht im Status "DEFINING", nun was tun?? Diese bestehende Session beenden und neu anmelden! Schlägt etwas fehl, ist das Handle blockiert! Einfach abmelden und wieder anmelden und schon ist alles wieder gut! ---- ==== Datapump Log File in der DB auslesen ==== Für das Auslese des Logfiles wird eine external Tabelle eingesetzt. drop table DP_DUMP_LOG; CREATE TABLE DP_DUMP_LOG ( log_line VARCHAR2(4000) ) ORGANIZATION EXTERNAL ( TYPE ORACLE_LOADER DEFAULT DIRECTORY backup ACCESS PARAMETERS ( RECORDS DELIMITED BY NEWLINE FIELDS TERMINATED BY ',' MISSING FIELD VALUES ARE NULL ( log_line CHAR(4000) ) ) LOCATION ('db_import_plsql.log') ) PARALLEL 1 REJECT LIMIT UNLIMITED; Auswerten mit: select * from DP_DUMP_LOG; Starting "BESTDBA"."BESTDBA_IMPORT16_09": Estimate in progress using BLOCKS method... Processing object type TABLE_EXPORT/TABLE/TABLE_DATA Total estimation using BLOCKS method: 10 MB Processing object type TABLE_EXPORT/TABLE/TABLE . . imported "BESTDBA"."T_ALL_OBJECTS" 86820 rows Processing object type TABLE_EXPORT/TABLE/STATISTICS/TABLE_STATISTICS Job "BESTDBA"."BESTDBA_IMPORT16_09" successfully completed at Sat Feb 16 09:24:44 2019 elapsed 0 00:00:03 ---- ==== Job in der DB wieder abbrechen ==== CREATE OR REPLACE PROCEDURE dp_import_stop_job(p_job_name varchar2) is --- +---------------------------------- -- -- testcall exec db_import_stop_job(p_job_name => 'MY_JOB') -- -- +---------------------------------- v_dp_handle NUMBER; cursor c_act_jobs is select job_name , operation , job_mode , state , attached_sessions from user_datapump_jobs where job_name not like 'BIN$%' order by 1,2 ; v_job_exits boolean:=false; v_job_mode varchar2(32); v_job_state varchar2(32); v_real_job_name varchar2(32); v_count pls_integer; v_sts ku$_Status; v_job_run_state varchar2(2000); BEGIN dbms_output.put_line(' -- Stop the Job Parameter ------------' ); dbms_output.put_line(' -- p_job_name :: '|| p_job_name ); -- query all actual jobs -- to show a list of candidates if job_name is wrong -- for rec in c_act_jobs loop if rec.job_name = upper(p_job_name) then v_job_exits:=true; v_real_job_name:=rec.job_name; v_job_mode:=rec.job_mode; v_job_state:=rec.state; else v_job_exits:=false; end if; dbms_output.put_line('--- Found this Job :: ' ||rec.job_name ); dbms_output.put_line('+-- Operation :: ' ||rec.operation ); dbms_output.put_line('+-- Mode :: ' ||rec.job_mode ); dbms_output.put_line('+-- State :: ' ||rec.state ); dbms_output.put_line('+-- Sessions :: ' ||rec.attached_sessions ); end loop; if v_job_exits then begin -- Create Data Pump Handle - "ATTACH" in this case v_dp_handle := DBMS_DATAPUMP.ATTACH( job_name => v_real_job_name ,job_owner => user); exception when DBMS_DATAPUMP.NO_SUCH_JOB then -- check if the old job table exits select count(*) into v_count from user_tables where upper(table_name) = upper(v_real_job_name); if v_count > 0 then execute immediate 'drop table '||user||'."'||v_real_job_name||'"'; end if; RAISE_APPLICATION_ERROR (-20003, '-- Error :: Job Not running anymore, check for other errors - no mastertable for '||p_job_name || ' get Error '||SQLERRM); when others then RAISE_APPLICATION_ERROR (-20002, '-- Error :: Not possible to attach to the job - Error :: '||SQLERRM); end; if v_job_state in ('DEFINING') then -- check if the job is in the defining state! -- abnormal situation, normal stop not possible -- use DBMS_DATAPUMP.START_JOB to restart the job DBMS_DATAPUMP.START_JOB ( handle => v_dp_handle ); end if; -- print the status dbms_datapump.get_status (handle => v_dp_handle , mask => dbms_datapump.KU$_STATUS_WIP , timeout => 0 , job_state => v_job_run_state , status => v_sts ); dbms_output.put_line('+-- Akt State :: ' ||v_job_run_state ); -- Stop the job DBMS_DATAPUMP.STOP_JOB ( handle => v_dp_handle , immediate => 1 -- stop now , keep_master => null -- delete Master table , delay => 5 -- wait 5 seconds before kill for other sessions ); else RAISE_APPLICATION_ERROR (-20000, '-- Error :: This job name not found::'||p_job_name); end if; end dp_import_stop_job; / ---- ==== Jobs in der DB kontrollieren ==== siehe auch https://github.com/gpipperr/OraPowerShell/blob/master/Ora_SQLPlus_SQLcL_sql_scripts/datapump.sql --============================================================================== -- GPI - Gunther Pippèrr -- Desc: Get Information about running data pump jobs -- Date: November 2013 --============================================================================== set linesize 130 pagesize 300 column owner_name format a10; column job_name format a20 column state format a12 column operation like state column job_mode like state ttitle "Datapump Jobs" SKIP 2 select owner_name , job_name , operation , job_mode , state , attached_sessions from dba_datapump_jobs where job_name not like 'BIN$%' order by 1,2 / ttitle "Datapump Master Table" SKIP 2 column status format a10; column object_id format 99999999 column object_type format a12 column OBJECT_NAME format a25 select o.status , o.object_id , o.object_type , o.owner||'.'||object_name as OBJECT_NAME from dba_objects o , dba_datapump_jobs j where o.owner=j.owner_name and o.object_name=j.job_name and j.job_name not like 'BIN$%' order by 4,2 / ttitle off prompt ... prompt ... check for "NOT RUNNING" Jobs prompt ... ==== Quellen ==== * https://docs.oracle.com/cd/B28359_01/appdev.111/b28419/d_datpmp.htm#i998298 * https://docs.oracle.com/database/121/SUTIL/GUID-5AAC848B-5A2B-4FD1-97ED-D3A048263118.htm#SUTIL977 ---- ==== Code Beispiel für einen "normalen" DataPump Export Aufruf ==== -- Export GPIDB Database SET SERVEROUTPUT ON ACCEPT export_dir CHAR PROMPT 'Enter the name for the directory for the export of the database:' DECLARE CURSOR c_dir (P_DIRNAME VARCHAR2) IS SELECT DIRECTORY_PATH FROM dba_directories WHERE DIRECTORY_NAME = P_DIRNAME; v_dir dba_directories.DIRECTORY_PATH%TYPE; BEGIN DBMS_OUTPUT.put_line ('check for directory GPIDB_EXPORT'); OPEN c_dir ('GPIDB_EXPORT'); FETCH c_dir INTO v_dir; IF SQL%NOTFOUND THEN DBMS_OUTPUT.put_line ('create directory GPIDB_EXPORT'); EXECUTE IMMEDIATE 'create directory GPIDB_export as ''/orabackup'''; ELSE IF v_dir NOT LIKE '&&export_dir' THEN DBMS_OUTPUT.put_line ('relink directory GPIDB_EXPORT'); EXECUTE IMMEDIATE 'drop directory GPIDB_export'; EXECUTE IMMEDIATE 'create directory GPIDB_export as ''/orabackup'''; END IF; END IF; CLOSE c_dir; END; / SELECT DIRECTORY_PATH FROM dba_directories WHERE DIRECTORY_NAME = 'GPIDB_EXPORT'; --- Start datapump to export the database CREATE OR REPLACE PROCEDURE db_export_GPIDB IS v_dp_handle NUMBER; PRAGMA AUTONOMOUS_TRANSACTION; BEGIN -- Create Data Pump Handle - "TABLE EXPORT" in this case v_dp_handle := DBMS_DATAPUMP.open (operation => 'EXPORT', job_mode => 'SCHEMA', job_name => 'GPIDB_EXPORT2' || TO_CHAR (SYSDATE, 'DD_HH24')); DBMS_DATAPUMP.set_parallel (handle => v_dp_handle, degree => 4); -- Export the complete schema DBMS_DATAPUMP.metadata_filter (handle => v_dp_handle, name => 'SCHEMA_EXPR', VALUE => 'IN (''GPIDB'')'); -- Specify target file - make it unique with a timestamp DBMS_DATAPUMP.add_file (handle => v_dp_handle ,filename => 'GPIDB_' || TO_CHAR (SYSDATE, 'YYYYMMDD-HH24MISS') || '%U.dmp' ,directory => 'GPIDB_EXPORT' ,reusefile => 1 -- overwrite existing files ,filesize => '50000M'); -- Logfile DBMS_DATAPUMP.add_file (handle => v_dp_handle ,filename => 'GPIDB_' || TO_CHAR (SYSDATE, 'YYYYMMDD-HH24MISS') || '.log' ,filetype => DBMS_DATAPUMP.KU$_FILE_TYPE_LOG_FILE ,directory => 'GPIDB_EXPORT' ,reusefile => 1 -- overwrite existing files ,filesize => '10000M'); -- MERGE => that each partitioned table is re-created in the target database as an unpartitioned table -- DBMS_DATAPUMP.set_parameter (handle => v_dp_handle, name => 'PARTITION_OPTIONS', VALUE => 'MERGE'); -- Do it! DBMS_DATAPUMP.start_job (handle => v_dp_handle); COMMIT; -- DBMS_DATAPUMP.detach (handle => v_dp_handle); END; / BEGIN DBMS_OUTPUT.put_line (' create export at ' || TO_CHAR (SYSDATE, 'dd.mm HH24:MI')); db_export_GPIDB; END; / PROMPT "to attach to the job please use:" SELECT 'expdp "''/ as sysdba''" attach=GPIDB_EXPORT2' || TO_CHAR (SYSDATE, 'DD_HH24') FROM DUAL; TTITLE OFF