301 lines
15 KiB
Python
Executable File
301 lines
15 KiB
Python
Executable File
#!/u01/app/python/current_version/bin/python3
|
|
|
|
from libs.ogg_libs import *
|
|
|
|
def parse_command_line_args():
|
|
parser = argparse.ArgumentParser(description = "Oracle Golden Gate Syncronisation Tool")
|
|
parser.add_argument("-e", "--extract", help = "EXTRACT name (ex: oedri1p)", required = True)
|
|
parser.add_argument("-r", "--replicat", help = "REPLICAT name (ex: ordro1p)", required = True)
|
|
parser.add_argument("-s", "--sync", choices=['full','incremental','stop','start'],help = "ACTION (full or incremental sync, stop or start)", required = True)
|
|
parser.add_argument('-f','--force', action='store_true', default=False ,help='May the force be with you', required=False)
|
|
parser.add_argument('-d','--debug', action='store_true', default=False ,help='Used ONLY for internal debug purpose', required=False)
|
|
|
|
args = parser.parse_args()
|
|
extract = args.extract.lower()
|
|
replicat = args.replicat.lower()
|
|
debug = args.debug
|
|
|
|
try:
|
|
sync = args.sync.lower()
|
|
except AttributeError: sync = "none"
|
|
|
|
return (extract, replicat, sync, debug)
|
|
|
|
|
|
# To reset the default terminal color after each usage of colorama module
|
|
init_pretty_color_printing()
|
|
(extract, replicat, sync, debug) = parse_command_line_args()
|
|
get_hostname()
|
|
script_path = os.path.dirname(os.path.abspath(__file__))
|
|
script_name = os.path.basename(__file__)
|
|
logger = start_logging(script_path+"/log/ogg_sync.log")
|
|
# Log BEGIN of the execution
|
|
logger.info("BEGIN ogg_sync.py -e " + extract + " -r " + replicat + " -s " + sync)
|
|
ogg_sync = OGG_Sync(extract, replicat)
|
|
ogg_sync.create_remote_dirs()
|
|
if debug:
|
|
# DEBUG ONLY
|
|
ogg_sync.create_temporary_sync_directory()
|
|
ogg_sync.parse_prm_headers()
|
|
ogg_sync.parse_prm_tables()
|
|
ogg_sync.build_extract_prm()
|
|
ogg_sync.build_replicat_prm()
|
|
ogg_sync.parse_prm_delta();
|
|
print(ogg_sync.get_class_attributes_as_json())
|
|
print(ogg_sync.get_config_params_as_json())
|
|
exit()
|
|
if sync == "full":
|
|
# FULL sync
|
|
print(White_On_Black("Start OGG Synchronisation ") + Yellow_On_Black(extract + "=>" + replicat) + White_On_Black(" in mode ") + Yellow_On_Black(sync.upper()))
|
|
print(White_On_Black("Generate " + ogg_sync.extract + ".prm and " + ogg_sync.replicat + ".prm files and upload them on OGG host " + ogg_sync.config_params["ogg_host"] + "... "), end='')
|
|
ogg_sync.create_temporary_sync_directory()
|
|
ogg_sync.parse_prm_headers()
|
|
ogg_sync.parse_prm_tables()
|
|
ogg_sync.build_extract_prm()
|
|
ogg_sync.build_replicat_prm()
|
|
ogg_sync.upload_prm_files()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Remote run on " + ogg_sync.config_params["ogg_host"] + ": /dbfs_tools/TOOLS/admin/sh/ogg_refresh_zfs.sh -e " + ogg_sync.extract + " -r " + ogg_sync.replicat + " -v 12"))
|
|
ogg_sync.sync_full_old()
|
|
|
|
print(White_On_Black("Wait a moment..."), end='')
|
|
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK*2)
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Check if OGG Synchronisation is running... "), end='')
|
|
ogg_sync.update_sync_status()
|
|
if (ogg_sync.extract_status != "RUNNING" or ogg_sync.replicat_status != "RUNNING"):
|
|
print(Red_On_Black("FAILED"))
|
|
print(White_On_Black(TAB + "- extract " + ogg_sync.extract + " is " + ogg_sync.extract_status))
|
|
print(White_On_Black(TAB + "- replicat " + ogg_sync.replicat + " is " + ogg_sync.replicat_status))
|
|
exit (ERROR)
|
|
print(Green_On_Black("SUCCES"))
|
|
# Log for debug purpose
|
|
logger.info(ogg_sync.get_class_attributes_as_json())
|
|
logger.info(ogg_sync.get_config_params_as_json())
|
|
elif sync == "incremental":
|
|
# INCREMENTAL sync
|
|
print(White_On_Black("Start OGG Synchronisation ") + Yellow_On_Black(extract + "=>" + replicat) + White_On_Black(" in mode ") + Yellow_On_Black(sync.upper()))
|
|
print(White_On_Black("Parse extract/replicat prm files... "), end='')
|
|
ogg_sync.create_temporary_sync_directory()
|
|
ogg_sync.parse_prm_headers()
|
|
ogg_sync.parse_prm_tables()
|
|
ogg_sync.parse_prm_delta()
|
|
ogg_sync.generate_finals_tables_arrays()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Check if OGG Synchronisation is running... "), end='')
|
|
ogg_sync.update_sync_status()
|
|
if (ogg_sync.extract_status != "RUNNING" or ogg_sync.replicat_status != "RUNNING"):
|
|
print(Red_On_Black("FAILED"))
|
|
print(White_On_Black(TAB + "- extract " + ogg_sync.extract + " is " + ogg_sync.extract_status))
|
|
print(White_On_Black(TAB + "- replicat " + ogg_sync.replicat + " is " + ogg_sync.replicat_status))
|
|
print(White_On_Black("Both extract/replicat should be RUNNING before adding new tables."))
|
|
exit (ERROR)
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Catch the lag of replication... "), end='')
|
|
iteration = 0
|
|
while (ogg_sync.replicat_has_lag() and iteration <= OGG_LAG_MAX_ITERATION_CHECK):
|
|
print(White_On_Black("."), end='')
|
|
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK)
|
|
iteration += 1
|
|
|
|
if iteration == OGG_STATUS_MAX_ITERATION_CHECK:
|
|
print(Red_On_Black("FAILED"))
|
|
print(White_On_Black("Too much lag to catch, please retry later"))
|
|
exit (ERROR)
|
|
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Found new table(s) to add from " + ogg_sync.database_source +" database: ") + Yellow_On_Black(",".join(ogg_sync.array_new_tables_on_target_db)))
|
|
|
|
print(White_On_Black("Check for Primary Key or Unique Index on source table(s)... "), end='')
|
|
ogg_sync.check_pk_uk_on_source_db_delta_tables()
|
|
print(Green_On_Black("SUCCES"))
|
|
print(White_On_Black("Extract Primary Key and Unique Index DDL from source table(s)... "), end='')
|
|
ogg_sync.generate_ddl_source_delta_tables_pk_uk()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
# Stop EXTRACT
|
|
print(White_On_Black("Stop extract " + ogg_sync.extract + "... "), end='')
|
|
ogg_sync.stop_extract()
|
|
print(Green_On_Black("SUCCES"))
|
|
# Wait REPLICAT for catching the LAG
|
|
|
|
print(White_On_Black("Wait replicat to catch the lag... "), end='')
|
|
iteration = 0
|
|
while (ogg_sync.replicat_has_lag() and iteration <= OGG_LAG_MAX_ITERATION_CHECK):
|
|
print(White_On_Black("."), end='')
|
|
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK)
|
|
iteration += 1
|
|
|
|
if iteration == OGG_STATUS_MAX_ITERATION_CHECK:
|
|
print(Red_On_Black("FAILED"))
|
|
exit (ERROR)
|
|
|
|
# Stop REPLICAT
|
|
print(Green_On_Black("SUCCES"))
|
|
print(White_On_Black("Stop replicat " + ogg_sync.replicat + "... "), end='')
|
|
ogg_sync.stop_replicat()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Add Trandata on source table(s)... "), end='')
|
|
ogg_sync.add_trandata_on_delta_tables()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Extract indexes from target delta tables if exist... "), end='')
|
|
ogg_sync.generate_ddl_target_delta_tables_old_index()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("On target database " + ogg_sync.database_target + ", drop tables " + ",".join(ogg_sync.array_new_tables_on_target_db) + " (if exist)... "), end='')
|
|
ogg_sync.drop_new_tables_exists_on_target_db()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Switch logile and ckeckpoint on source " + ogg_sync.database_source + " database... "), end='')
|
|
ogg_sync.switch_logfile_and_chekcpoint()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Define SCN for EXPDP on source " + ogg_sync.database_source + " database... "), end='')
|
|
ogg_sync.define_scn_for_export()
|
|
print(Green_On_Black("SUCCES"))
|
|
print(White_On_Black("SCN=" + ogg_sync.scn_of_export + " will be used for exporting new table(s)"))
|
|
|
|
# Backup current prm files
|
|
print(White_On_Black("Backup current prm files... "), end='')
|
|
zipname = ogg_sync.archive_prm()
|
|
print(Green_On_Black("SUCCES"))
|
|
print(White_On_Black("Current prm files backed up in " + zipname))
|
|
|
|
# Add new tables in the extract prm file
|
|
print(White_On_Black("Add new tables in "+ ogg_sync.extract + "_tables.prm file... "), end='')
|
|
ogg_sync.add_new_tables_to_extract_prm()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
# Add new tables in the replicat prm file
|
|
print(White_On_Black("Add new tables in "+ ogg_sync.replicat + "_tables.prm file... "), end='')
|
|
ogg_sync.add_new_tables_to_replicat_prm_with_csn_filter(ogg_sync.scn_of_export)
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Generate " + ogg_sync.extract + ".prm and " + ogg_sync.replicat + ".prm files and upload them on OGG host " + ogg_sync.config_params["ogg_host"] + "... "), end='')
|
|
ogg_sync.build_extract_prm()
|
|
ogg_sync.build_replicat_prm()
|
|
ogg_sync.upload_prm_files()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
# Start EXTRACT
|
|
print(White_On_Black("Start extract " + ogg_sync.extract + "... "), end='')
|
|
ogg_sync.start_extract()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Start export of new table(s) as of SCN=" + ogg_sync.scn_of_export + " on remote host " + ogg_sync.database_source_instances[1]) +":")
|
|
ogg_sync.export_delta_tables(ogg_sync.scn_of_export)
|
|
|
|
print(White_On_Black("Import export of new table(s) as of SCN=" + ogg_sync.scn_of_export + " on remote host " + ogg_sync.database_source_instances[1]) +":")
|
|
ogg_sync.import_delta_tables()
|
|
|
|
# Create indexes on target tables
|
|
print(White_On_Black("Create indexes on target tables... "), end='')
|
|
ogg_sync.create_delta_tables_pk_uk_on_target_database()
|
|
ogg_sync.create_delta_tables_old_indexes_on_target_database()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
# Start REPLICAT with
|
|
print(White_On_Black("Start replicat " + ogg_sync.replicat + " using: " + 'FILTER ( @GETENV ("TRANSACTION", "CSN") > ' + ogg_sync.scn_of_export) + "... ", end='')
|
|
ogg_sync.start_replicat()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
# Wait REPLICAT for catching the LAG
|
|
print(White_On_Black("Wait replicat to catch the lag... "), end='')
|
|
while ogg_sync.replicat_has_lag():
|
|
print(White_On_Black("."), end='')
|
|
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK)
|
|
|
|
# Stop REPLICAT
|
|
print(Green_On_Black("SUCCES"))
|
|
print(White_On_Black("Stop replicat " + ogg_sync.replicat + "... "), end='')
|
|
ogg_sync.stop_replicat()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
# Remove replcat filters
|
|
print(White_On_Black("Remove filter from " + ogg_sync.replicat + ".prm files and upload them on OGG host " + ogg_sync.config_params["ogg_host"] + "... "), end='')
|
|
ogg_sync.remove_csn_filter_from_replicat_prm()
|
|
ogg_sync.build_replicat_prm()
|
|
ogg_sync.upload_replicat_prm()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
# Start REPLICAT
|
|
print(White_On_Black("Start replicat " + ogg_sync.replicat + "... "), end='')
|
|
ogg_sync.start_replicat()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
# Wait REPLICAT for catching the LAG
|
|
print(White_On_Black("Wait replicat to catch the lag... "), end='')
|
|
while ogg_sync.replicat_has_lag():
|
|
print(White_On_Black("."), end='')
|
|
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK)
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Backup *delta.prm files in .*delta.old..."), end='')
|
|
ogg_sync.backup_and_empty_delta_prm()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
# Log for debug purpose
|
|
logger.info(ogg_sync.get_class_attributes_as_json())
|
|
logger.info(ogg_sync.get_config_params_as_json())
|
|
elif sync == "start":
|
|
print(White_On_Black("Parse extract/replicat prm headers... "), end='')
|
|
ogg_sync.parse_prm_headers()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
# Start EXRACT and REPLICAT
|
|
print(White_On_Black("Start extract "+ ogg_sync.extract + " and replicat " + ogg_sync.replicat + "... "), end='')
|
|
ogg_sync.start_extract()
|
|
ogg_sync.start_replicat()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Wait a moment..."), end='')
|
|
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK*2)
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Check if OGG Synchronisation is running... "), end='')
|
|
ogg_sync.update_sync_status()
|
|
if (ogg_sync.extract_status != "RUNNING" or ogg_sync.replicat_status != "RUNNING"):
|
|
print(Red_On_Black("FAILED"))
|
|
print(White_On_Black(TAB + "- extract " + ogg_sync.extract + " is " + ogg_sync.extract_status))
|
|
print(White_On_Black(TAB + "- replicat " + ogg_sync.replicat + " is " + ogg_sync.replicat_status))
|
|
exit (ERROR)
|
|
print(Green_On_Black("SUCCES"))
|
|
# Log for debug purpose
|
|
logger.info(ogg_sync.get_class_attributes_as_json())
|
|
logger.info(ogg_sync.get_config_params_as_json())
|
|
elif sync == "stop":
|
|
print(White_On_Black("Parse extract/replicat prm headers... "), end='')
|
|
ogg_sync.parse_prm_headers()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
# Stop EXRACT and REPLICAT
|
|
print(White_On_Black("Stop extract "+ ogg_sync.extract + " and replicat " + ogg_sync.replicat + "... "), end='')
|
|
ogg_sync.stop_replicat()
|
|
ogg_sync.stop_extract()
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Wait a moment..."), end='')
|
|
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK*2)
|
|
print(Green_On_Black("SUCCES"))
|
|
|
|
print(White_On_Black("Check if OGG Synchronisation is stopped... "), end='')
|
|
ogg_sync.update_sync_status()
|
|
if (ogg_sync.extract_status == "RUNNING" or ogg_sync.replicat_status == "RUNNING"):
|
|
print(Red_On_Black("FAILED"))
|
|
print(White_On_Black(TAB + "- extract " + ogg_sync.extract + " is " + ogg_sync.extract_status))
|
|
print(White_On_Black(TAB + "- replicat " + ogg_sync.replicat + " is " + ogg_sync.replicat_status))
|
|
exit (ERROR)
|
|
print(Green_On_Black("SUCCES"))
|
|
# Log for debug purpose
|
|
logger.info(ogg_sync.get_class_attributes_as_json())
|
|
logger.info(ogg_sync.get_config_params_as_json())
|
|
# Log END of the execution
|
|
logger.info("END ogg_sync.py -e " + extract + " -r " + replicat + " -s " + sync)
|