Inhaltsverzeichnis
Mit dem Oracle Scheduler 12c/18c/19c das Filesystem überwachen und bei Bedarf einen Job starten
Aufgabe:
Sobald Dateien in einem Ordner auf der Datenbank Maschine auftauchen, soll ein Job zur Verarbeitung der Daten gestartet werden.
Der DB Job soll erkennen ob neue Daten vorliegen und dann über das Betriebssystem verarbeiten/in die DB laden.
Mehr Details zum Scheduler siehe ⇒ Der Oracle Job Scheduler ab 10g
Architektur
In der Datenbank ist per Default ein Job „FILE_WATCHER“ angelegt, der alle 10 Minuten alle definierten „Datei Überwachungen“ prüft.
Vorbereitung DB und OS User
DB User
User für den Import der Daten anlegen und die notwendigen Rechte vergeben:
sqlplus / AS sysdba -- Create the Job User CREATE USER job_control IDENTIFIED BY job$100; GRANT CONNECT,resource TO job_control; GRANT unlimited tablespace TO job_control;
OS User
Linux User für den Datentransfer im OS der DB anlegen:
groupadd -g 1100 etl useradd -u 1102 -g etl job_control passwd job_control # Create directory for the ETL scripts mkdir /srv/job_control chown job_control:etl /srv/job_control
Die OS Credentials für den OS User in der Datenbank hinterlegen
sqlplus / AS sysdba -- Credentials anlegen BEGIN DBMS_SCHEDULER.CREATE_CREDENTIAL( 'FILE_WATCH_CRED' , 'job_control' , 'oracle'); END; / -- Rechte an dem Credential an Job Control User vergeben GRANT EXECUTE ON FILE_WATCH_CRED TO job_control; -- DD abfragen COLUMN credential_name format a32 COLUMN username format a32 SELECT credential_name ,username FROM user_credentials ORDER BY credential_name /
File Watcher Objekt in der DB definieren
BEGIN DBMS_SCHEDULER.CREATE_FILE_WATCHER( FILE_WATCHER_NAME => 'IMPORT_FILE_WATCHER' , DIRECTORY_PATH => '/srv/job_control' , FILE_NAME => 'import*.csv' , CREDENTIAL_NAME => 'FILE_WATCH_CRED' , DESTINATION => NULL , ENABLED => FALSE ); END; / --bei Fehlern entfernen und neu anlegen BEGIN DBMS_SCHEDULER.DROP_FILE_WATCHER ( file_watcher_name => 'IMPORT_FILE_WATCHER' ); END; / -- rechte an den DB User vergeben GRANT EXECUTE ON IMPORT_FILE_WATCHER TO job_control;
Was haben wir angelegt:
SET LINESIZE 100 COLUMN file_watcher_name FORMAT A20 COLUMN destination FORMAT A15 COLUMN directory_path FORMAT A15 COLUMN file_name FORMAT A10 COLUMN credential_name FORMAT A20 SELECT file_watcher_name , destination , directory_path , file_name , credential_name FROM dba_scheduler_file_watchers ;
Überwachungs-Interval bei Bedarf anpassen
Per Default läuft der File Watcher Job in der Datenbank alle 10 Minuten.
Anpassen auf z.B. 2 Minuten:
BEGIN DBMS_SCHEDULER.SET_ATTRIBUTE('FILE_WATCHER_SCHEDULE' , 'REPEAT_INTERVAL' ,'FREQ=MINUTELY;INTERVAL=2'); END; /
Eigentlichen Job für die Verarbeitung unter dem User JOB_CONTROL anlegen
Im ersten Schritt sollen in diesem Demo nur die Meta Daten der Datei in eine Log Tabelle geschrieben werden.
Log Tabelle:
CREATE TABLE CSV_IMPORT_FILE_WATCH_LOG ( message VARCHAR2(4000) );
PL/SQL Code Block um die Meta Daten auszuwerten:
CREATE OR REPLACE PROCEDURE CSV_INTERFACE_FILE_IMPORT (p_file_watch_result SYS.SCHEDULER_FILEWATCHER_RESULT) AS v_message CSV_IMPORT_FILE_WATCH_LOG.message%TYPE; BEGIN v_message := 'Pfad :' || p_file_watch_result.directory_path ||'-Name :'|| p_file_watch_result.actual_file_name ||'-Size :'|| p_file_watch_result.file_size ||'-Date :'|| p_file_watch_result.file_timestamp ||'-Epoch :'|| p_file_watch_result.ts_ms_from_epoch; INSERT INTO CSV_IMPORT_FILE_WATCH_LOG (message) VALUES (v_message); COMMIT; END; /
Oracle Scheduler Programm Definition:
BEGIN DBMS_SCHEDULER.create_program( program_name => 'CSV_INTERFACE_FILE_IMPORT_PROG', program_type => 'STORED_PROCEDURE', program_action => 'CSV_INTERFACE_FILE_IMPORT', number_of_arguments => 1, enabled => FALSE); END; / BEGIN DBMS_SCHEDULER.define_metadata_argument( program_name => 'CSV_INTERFACE_FILE_IMPORT_PROG', metadata_attribute => 'event_message', argument_position => 1); END; /
Den eigentlichen Job anlegen:
BEGIN DBMS_SCHEDULER.drop_job('job_control.CSV_INTERFACE_FILE_IMPORT_JOB'); END; / BEGIN DBMS_SCHEDULER.create_job( job_name => 'job_control.CSV_INTERFACE_FILE_IMPORT_JOB', program_name => 'job_control.CSV_INTERFACE_FILE_IMPORT_PROG', event_condition => NULL, queue_spec => 'IMPORT_FILE_WATCHER', auto_drop => FALSE, enabled => FALSE); END; /
Problem: ORA-24085: operation failed, queue IMPORT_FILE_WATCHER is invalid
ERROR at line 1: ORA-24085: operation failed, queue IMPORT_FILE_WATCHER IS invalid ORA-06512: at "SYS.DBMS_ISCHED", line 175 ORA-06512: at "SYS.DBMS_SCHEDULER", line 536 ORA-06512: at line 2
⇒ Lösung : als SYS den Job anlegen, hier fehlt noch ein Recht, aber nur welches? Direkt auf die AD Queue? Seltsam, bin noch am suchen.
Falls auch auf neue Parallel erstellte Dateien zu reagieren:
BEGIN DBMS_SCHEDULER.set_attribute('CSV_INTERFACE_FILE_IMPORT_JOB','PARALLEL_INSTANCES',TRUE); END; /
Alles aktivieren:
EXEC DBMS_SCHEDULER.enable('IMPORT_FILE_WATCHER'); EXEC DBMS_SCHEDULER.enable('CSV_INTERFACE_FILE_IMPORT_PROG'); EXEC DBMS_SCHEDULER.enable('CSV_INTERFACE_FILE_IMPORT_JOB');
Debug
Manueller Aufruf der Routine hinter dem File Watcher und Auslesen des Tracefiles:
oradebug setmypid oradebug tracefile_name ALTER system SET events '27401 trace name context forever, level 262144'; BEGIN dbms_ischedfw.file_watch_job; END; /
wie:
vi /opt/oracle/diag/rdbms/gpidb/GPIDB/trace/GPIDB_ora_51129.trc .. file_watcher:: 2021-05-16 21:52:10.679: Directory: /srv/job_control, ArrivalTime: 1621194686000, SteadyStDur: 30000 file_watcher:: 2021-05-16 21:52:10.680: LastChkTime for directory /srv/job_control changed to 1621194686000 file_watcher:: 2021-05-16 21:52:10.682: All checking done file_watcher:: 2021-05-16 21:52:10.683: Setting up results information file_watcher:: 2021-05-16 21:52:10.684: Directory: /srv/job_control file_watcher:: 2021-05-16 21:52:10.685: Result file name: import23.csv file_watcher:: 2021-05-16 21:52:10.687: Request pattern: import*.csv, Regex: import.*.csv file_watcher:: 2021-05-16 21:52:10.688: Match found, adding to list of matching requests file_watcher:: 2021-05-16 21:52:10.690: File watching done file_watcher:: 2021-05-16 21:52:10.709: Updating history file_watcher:: 2021-05-16 21:52:10.710: Iteration 1 file_watcher:: 2021-05-16 21:52:10.710: Updating history entry file_watcher:: 2021-05-16 21:52:10.710: Dir Path: /srv/job_control file_watcher:: 2021-05-16 21:52:10.710: Last Chk Time: 16-MAY-21 07.51.26.000000 PM +00:00 file_watcher:: 2021-05-16 21:52:10.710: Processing results file_watcher:: 2021-05-16 21:52:10.711: Iteration 1 file_watcher:: 2021-05-16 21:52:10.711: Dir Path: /srv/job_control file_watcher:: 2021-05-16 21:52:10.711: File Name: import23.csv file_watcher:: 2021-05-16 21:52:10.711: File Size: 22258 file_watcher:: 2021-05-16 21:52:10.711: File Tstamp: 16-MAY-21 07.51.26.000000 PM +00:00 file_watcher:: 2021-05-16 21:52:10.711: Matching Requests: file_watcher:: 2021-05-16 21:52:10.711: Request: SYS.IMPORT_FILE_WATCHER SCHED 05-16 21:52:10.713 1 00 51129 ora 0(sjssuWriteJssuFlags):Using Flags 9 ..
Quellen
Oracle Dokumentation:
Blogs: