#!/u01/app/python/current_version/bin/python3 import os import subprocess import logging import argparse import datetime import socket import shlex import cx_Oracle from colorama import init from colorama import Fore, Back import json import time import stat import shutil import zipfile from pathlib import Path import re # CONSTANTS OGG_SERVICE_DIR = "/u01/app/oracle/admin/OGG_Service" SUCCESS = 0 ERROR = 1 TAB = "\t" NEWLINE = "\n" OGG_STATUS_SLEEP_TIME_WHEN_CHECK = 5 OGG_STATUS_MAX_ITERATION_CHECK = 12 OGG_LAG_MAX_ITERATION_CHECK = 24 TARGET_DATABASE_CREATE_INDEX_IGNORE_ERROR_MESSAGES = ["ORA-02260","ORA-00955","ORA-01408"] EXPDP_IGNORE_ERROR_MESSAGES = ["ORA-31693","ORA-02354","ORA-01466"] IMPDP_IGNORE_ERROR_MESSAGES = [] # GENERIC FUNCTIONS def start_logging(logfile): logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # create a file handler handler = logging.FileHandler(logfile) handler.setLevel(logging.INFO) # create a logging format formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") handler.setFormatter(formatter) # add the handlers to the logger logger.addHandler(handler) return logger def get_hostname(): global hostname hostname = socket.gethostname() return def upload_file(source_file, target_host, target_file): shellcommand = "ssh oracle@" + target_host + " mkdir -p $(dirname " + target_file + ") ; "+ "scp -p " + source_file + " oracle@" + target_host + ":" + target_file cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) return def download_file(source_file, source_host, target_file): shellcommand = "scp -p " + source_host + ":" + source_file + " " + target_file cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) return def upload_directory(source_dir, target_host, target_dir): shellcommand = "scp -rp " + source_dir + " oracle@" + target_host + ":" + target_dir cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) return # Functions for pretty color printing def init_pretty_color_printing(): init(autoreset=True) return def Red_On_Black(string): return (Fore.RED + Back.BLACK + string) def Cyan_On_Black(string): return (Fore.CYAN + Back.BLACK + string) def Yellow_On_Black(string): return (Fore.YELLOW + Back.BLACK + string) def Magenta_On_Black(string): return (Fore.MAGENTA + Back.BLACK + string) def Green_On_Black(string): return (Fore.GREEN + Back.BLACK + string) def White_On_Black(string): return (Fore.WHITE + Back.BLACK + string) def intersection(lst1, lst2): lst3 = [value for value in lst1 if value in lst2] return lst3 def union(lst1, lst2): final_list = list(set(lst1) | set(lst2)) return final_list def union_keep_order(lst1, lst2): lst3 = lst1 + lst2 # Create an empty list to store unique elements uniqueList = [] # Iterate over the original list and for each element # add it to uniqueList, if its not already there. for element in lst3: if element not in uniqueList: uniqueList.append(element) # Return the list of unique elements return uniqueList def removeDuplicates(listofElements): # Create an empty list to store unique elements uniqueList = [] # Iterate over the original list and for each element # add it to uniqueList, if its not already there. for elem in listofElements: if elem not in uniqueList: uniqueList.append(elem) # Return the list of unique elements return uniqueList def concatenate_files(input_files, output_file): with open(output_file, 'w') as outfile: for fname in input_files: with open(fname) as infile: outfile.write(infile.read()) return def merge_dict(dict1, dict2): dict3 = dict(dict1, **dict2) return dict3 def remove_empty_lines(filename): with open(filename) as filehandle: lines = filehandle.readlines() with open(filename, 'w') as filehandle: lines = filter(lambda x: x.strip(), lines) filehandle.writelines(lines) return # CLASS OGG_Sync class OGG_Sync: def __init__(self, extract, replicat): # Constructor of the class self.runid = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S_%f') self.extract = extract self.replicat = replicat # Identify environement type (dev|prod) envinfo_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/env.info" try: contents_of_envinfo = open(envinfo_filename, "r", encoding = "utf-8", errors = "replace").readlines() except FileNotFoundError as err: print (Red_On_Black("ERROR: file env.info not found.")) exit (ERROR) self.specific_params={} for line in contents_of_envinfo: line = line.rstrip() line = line.lstrip() if line !="": param, value = line.split("=", 1) param = param.rstrip() param = param.lstrip() value = value.rstrip() value = value.lstrip() self.specific_params[param] = value self.extract_addnative_option_array = [] # Load configuration configuration_file = OGG_SERVICE_DIR + "/etc/" + self.specific_params["type_environement"] + ".conf" contents_configuration_file = open(configuration_file, "r", encoding = "utf-8", errors = "replace").readlines() self.config_params={} for line in contents_configuration_file: line = line.rstrip() line = line.lstrip() if line !="": param, value = line.split("=", 1) param = param.rstrip() param = param.lstrip() value = value.rstrip() value = value.lstrip() self.config_params[param] = value return def get_class_attributes_as_json(self): # Display class attributes json_class_attributes = vars(self) return json.dumps(json_class_attributes, indent = 2) def get_config_params_as_json(self): # Display class attributes return json.dumps(self.config_params, indent = 2) def get_type_env(self): return def build_extract_prm(self): extract_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_header.prm" extract_tables_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_tables.prm" if os.path.isfile(extract_header_prm_filename) and os.path.isfile(extract_tables_prm_filename): extract_prm_filename = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.extract + ".prm" # concatenate header + table prm sections contents_of_prm = [] contents_of_prm.append("-- This file has been is generated from the following files on " + hostname + " :\n") contents_of_prm.append("-- " + extract_header_prm_filename + " \n") contents_of_prm.append("-- " + extract_tables_prm_filename) contents_of_prm.append(NEWLINE) contents_of_extract_header_prm = open(extract_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines() contents_of_extract_tables_prm = open(extract_tables_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines() for line in contents_of_extract_header_prm: auxline = line.rstrip() auxline = auxline.replace(" ", "") if auxline != "": contents_of_prm.append(line) contents_of_prm.append(NEWLINE) for line in contents_of_extract_tables_prm: auxline = line.rstrip() auxline = auxline.replace(" ", "") if auxline != "": contents_of_prm.append(line) f = open(extract_prm_filename, 'w') contents_of_prm="".join(contents_of_prm) f.write(contents_of_prm) f.close() # store extract_prm_filename in class for later usage self.extract_prm_filename = extract_prm_filename else: print ("OGG Sync EXTRACT configuration files does not exist: ") print (" " + extract_header_prm_filename) print (" " + extract_tables_prm_filename) exit (ERROR) return def build_replicat_prm(self): replicat_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_header.prm" replicat_tables_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_tables.prm" if os.path.isfile(replicat_header_prm_filename) and os.path.isfile(replicat_tables_prm_filename): # create temporary sync directory try: os.mkdir(OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat) # store the value in class for later usage except (FileExistsError) as e: pass replicat_prm_filename = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.replicat + ".prm" # concatename header + tables prm sections contents_of_prm = [] contents_of_prm.append("-- This file has been is generated from the following files on " + hostname + " :\n") contents_of_prm.append("-- " + replicat_header_prm_filename + " \n") contents_of_prm.append("-- " + replicat_tables_prm_filename) contents_of_prm.append(NEWLINE) contents_of_replicat_header_prm = open(replicat_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines() contents_of_replicat_tables_prm = open(replicat_tables_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines() for line in contents_of_replicat_header_prm: auxline = line.rstrip() auxline = auxline.replace(" ", "") if auxline != "": contents_of_prm.append(line) contents_of_prm.append(NEWLINE) for line in contents_of_replicat_tables_prm: auxline = line.rstrip().lstrip() auxline = auxline.replace(" ", "") auxline = re.sub(' +', ' ',auxline) if auxline != "": contents_of_prm.append(line) f = open(replicat_prm_filename, 'w') contents_of_prm="".join(contents_of_prm) f.write(contents_of_prm) f.close() # store replicat_prm_filename in class for later usage self.replicat_prm_filename = replicat_prm_filename else: print ("OGG Sync REPLICAT configuration files does not exist: ") print (" " + replicat_header_prm_filename) print (" " + replicat_tables_prm_filename) exit (ERROR) return def upload_extract_prm(self): upload_file(OGG_SERVICE_DIR + "/temp/" + self.extract + "_" + self.replicat + "/" + self.extract + ".prm" , self.config_params["ogg_host"], self.config_params["ogg12_path"] + "/dirprm/" + self.extract + ".prm") return def upload_replicat_prm(self): upload_file(OGG_SERVICE_DIR + "/temp/" + self.extract + "_" + self.replicat + "/" + self.replicat + ".prm" , self.config_params["ogg_host"], self.config_params["ogg12_path"] + "/dirprm/" + self.replicat + ".prm") return def upload_prm_files(self): self.upload_extract_prm() self.upload_replicat_prm() return def upload_temp_file_to_ogg_host(self, file_name): upload_file(OGG_SERVICE_DIR + "/temp/" + self.extract + "_" + self.replicat + "/" + file_name, self.config_params["ogg_host"], self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + file_name) return def sync_full_old(self): shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " " + self.config_params["ogg_full_refresh_shell"] + " -e " + self.extract + " -r " + self.replicat + " -v 12" process = subprocess.Popen(shlex.split(shellcommand), stdout=subprocess.PIPE) while True: output = process.stdout.readline() output = output.decode('utf-8') if output == '' and process.poll() is not None: break if output: print(Cyan_On_Black(output.strip())) rc = process.poll() return rc def parse_prm_headers(self): # Parsing EXTRACT header extract_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_header.prm" try: contents_of_extract_header_prm = open(extract_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines() except FileNotFoundError as err: print (Red_On_Black("ERROR: file " + self.extract + "_header.prm not found.")) exit (ERROR) for line in contents_of_extract_header_prm: # supress spaces in begin/end line auxline = line.rstrip() auxline = auxline.lstrip() # skip comments if not auxline.startswith("--"): if auxline.upper().startswith("USERID"): # GGSCI login on SOURCE database self.ggsci_db_source_login = auxline.replace("USERID ", "", 1) # Source DB informations (str1,str2) = auxline.split(",") (str3,str4) = str1.split("@") self.database_source = str4 self.database_source_instances = self.get_array_active_db_nodes(self.database_source) self.database_source_version = self.get_database_version(self.database_source) # Parsing REPLICAT header replicat_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_header.prm" try: contents_of_replicat_header_prm = open(replicat_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines() except FileNotFoundError as err: print (Red_On_Black("ERROR: file " + self.replicat + "_header.prm not found.")) exit (ERROR) remap_tablespace_list = [] for line in contents_of_replicat_header_prm: # supress spaces in begin/end line auxline = line.rstrip() auxline = auxline.lstrip() # skip comments if not auxline.startswith("--"): if auxline.upper().startswith("USERID"): # GGSCI login on SOURCE database self.ggsci_db_target_login = auxline.replace("USERID ", "", 1) # Source DB informations (str1,str2) = auxline.upper().split(",") (str3,str4) = str1.split("@") (str5,str6) = str4.split("_OGG") self.database_target = str5 + "EXA" if auxline.upper().startswith("DDLSUBST"): # Tablespace mapping auxline = auxline.replace("'", "") # Remove double spaces auxline = re.sub(' +', ' ',auxline) words_of_auxline = auxline.split(" ") index_of_word_with = words_of_auxline.index("with") remap_tablespace_clause = words_of_auxline[index_of_word_with -1] + ":" + words_of_auxline[index_of_word_with +1] remap_tablespace_list.append(remap_tablespace_clause) self.remap_tablespace = ",".join(remap_tablespace_list) self.database_target_instances = self.get_array_active_db_nodes(self.database_target) self.database_target_version = self.get_database_version(self.database_target) return def fast_parse_prm_headers(self): # For WWW presentation, where the standard parsing procedure is not fast enough # Parsing EXTRACT header extract_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_header.prm" try: contents_of_extract_header_prm = open(extract_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines() except FileNotFoundError as err: print (Red_On_Black("ERROR: file " + self.extract + "_header.prm not found.")) exit (ERROR) for line in contents_of_extract_header_prm: # supress spaces in begin/end line auxline = line.rstrip() auxline = auxline.lstrip() # skip comments if not auxline.startswith("--"): if auxline.upper().startswith("USERID"): # GGSCI login on SOURCE database self.ggsci_db_source_login = auxline.replace("USERID ", "", 1) # Source DB informations (str1,str2) = auxline.split(",") (str3,str4) = str1.split("@") self.database_source = str4 # Parsing REPLICAT header replicat_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_header.prm" try: contents_of_replicat_header_prm = open(replicat_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines() except FileNotFoundError as err: print (Red_On_Black("ERROR: file " + self.replicat + "_header.prm not found.")) exit (ERROR) remap_tablespace_list = [] for line in contents_of_replicat_header_prm: # supress spaces in begin/end line auxline = line.rstrip() auxline = auxline.lstrip() # skip comments if not auxline.startswith("--"): if auxline.upper().startswith("USERID"): # GGSCI login on SOURCE database self.ggsci_db_target_login = auxline.replace("USERID ", "", 1) # Source DB informations (str1,str2) = auxline.upper().split(",") (str3,str4) = str1.split("@") (str5,str6) = str4.split("_OGG") self.database_target = str5 + "EXA" return def parse_prm_delta(self): # EXTRACT DELTA extract_delta_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_delta.prm" try: contents_of_extract_delta_prm = open(extract_delta_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines() except FileNotFoundError as err: print (Red_On_Black("ERROR: file " + self.extract + "_delta.prm not found.")) exit (ERROR) if len(contents_of_extract_delta_prm) == 0: print (Red_On_Black("ERROR: file " + self.extract + "_delta.prm is empty.")) exit (ERROR) tab_owner_array=[] tab_name_array=[] # Identify tables delta_addnative_option_array = [] for line in contents_of_extract_delta_prm: auxline = line.rstrip() auxline = auxline.lstrip() # skip comments if not auxline.startswith("--"): # eliminate double spaces and spaces auxline = re.sub(' +', ' ',auxline) auxline = auxline.replace(" ","") auxline = auxline.replace(";", "") auxline = auxline.upper() if auxline.upper().startswith("TABLE"): auxline = auxline.replace("TABLE", "", 1) auxline2 = auxline.split(",")[0] (tab_owner,tab_name) = auxline2.split(".") tab_owner_array.append(tab_owner) tab_name_array.append(tab_name) if ",_ADDNATIVE" in auxline: delta_addnative_option_array.append(tab_name) initial_addnative_option_array = self.extract_addnative_option_array self.extract_addnative_option_array = union_keep_order(initial_addnative_option_array, delta_addnative_option_array) # Check if we have only one owner self.new_table_owner_on_source_db = tab_owner_array[0] self.array_new_tables_on_source_db = tab_name_array for tab_owner in tab_owner_array: if tab_owner != self.new_table_owner_on_source_db: print (Red_On_Black("ERROR: file " + self.replicat + "_delta.prm not found.")) exit (ERROR) # REPLICAT DELTA replicat_delta_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_delta.prm" try: contents_of_replicat_delta_prm = open(replicat_delta_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines() except FileNotFoundError as err: print (Red_On_Black("ERROR: file " + self.replicat + "_delta.prm not found.")) exit (ERROR) if len(contents_of_extract_delta_prm) == 0: print (contents_of_replicat_delta_prm("ERROR: file " + self.replicat + "_delta.prm is empty.")) exit (ERROR) self.new_map_dictionary = {} table_array = [] for line in contents_of_replicat_delta_prm: # supress spaces in begin/end line auxline = line.rstrip() auxline = auxline.lstrip() # skip comments if not auxline.startswith("--"): if auxline.upper().startswith("MAP") and "TARGET" in auxline.upper() and "J$" not in auxline.upper(): # Classic MAP section # Force uppercase and elilminate ";" auxline = auxline.upper() auxline = auxline.replace(";", "") (str1, str2) = auxline.split("TARGET ") str2 = str2.lstrip().rstrip() table_array.append(str2) if auxline.upper().startswith("MAP") and "TARGET" in auxline.upper() and "J$" in auxline.upper(): # J$ section auxline = auxline.upper() str1 = auxline.split(",")[1] str2 = str1.split(".")[1] self.new_map_dictionary[str2] = auxline array_new_owner_tables_on_target_db = table_array # Check if we have only one owner self.array_new_tables_on_target_db = [] self.new_table_owner_on_target_db = array_new_owner_tables_on_target_db[0].split(".")[0] for line in array_new_owner_tables_on_target_db: (owner, table_name) = line.split(".") self.array_new_tables_on_target_db.append(table_name) if owner != self.new_table_owner_on_target_db: print ("More than one table owners in REPLICAT delta parameter file.") exit (ERROR) return def check_pk_uk_on_source_db_delta_tables(self): try: db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_source, mode=cx_Oracle.SYSDBA) cursor = db.cursor() # Check if source tables exist for table_name in self.array_new_tables_on_source_db: sql = "select count(1) from DBA_TABLES where OWNER='" + self.current_table_owner_on_source_db + "' and TABLE_NAME='" + table_name +"'" cursor.execute(sql) for row in cursor: count=row[0] if count == 0: print (NEWLINE + "ERROR: Table " + self.current_table_owner_on_source_db + "." + table_name + " does not exists in database " + self.database_source) exit (ERROR) # Check if the tables have PK or UK for table_name in self.array_new_tables_on_source_db: sql = "select count(1) from DBA_CONSTRAINTS where OWNER='" + self.current_table_owner_on_source_db + "' and TABLE_NAME='" + table_name +"' and CONSTRAINT_TYPE in ('P','U')" cursor.execute(sql) for row in cursor: count=row[0] if count == 0: print (NEWLINE +"ERROR: Table " + self.current_table_owner_on_source_db + "." + table_name + " in database " + self.database_source + " does not heve any PRIMARY KEY or UNIQUE INDEX") exit (ERROR) cursor.close() except cx_Oracle.DatabaseError as err: print (err) exit (ERROR) return def add_trandata_on_delta_tables(self): # Generate INFO TRANDATA obey file # compose file content contents_of_file = [] contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login) for tab_name in self.array_new_tables_on_source_db: contents_of_file.append("INFO TRANDATA " + self.current_table_owner_on_source_db + "." + tab_name) contents_of_file.append(NEWLINE) contents_of_file = NEWLINE.join(contents_of_file) # write contect to file file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "delta_info_trandata.obey" f = open(file_name, 'w') f.write(contents_of_file) f.close() # Upload and execute the INFO TRANDATA obey file self.upload_temp_file_to_ogg_host("delta_info_trandata.obey") remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "delta_info_trandata.obey" shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'" cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) cmd_output=cmd.stdout.decode('utf-8') # Create an array for the tables with disabled TRANDATA array_new_tables_to_enable_trandata = [] for line in cmd_output.splitlines(): if "disabled" in line: # trandata is disabled table_name = line.split("for table ")[1].split(".")[1] array_new_tables_to_enable_trandata.append(table_name) # Generate ADD TRANDATA obey file # compose file content contents_of_file = [] contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login) for tab_name in array_new_tables_to_enable_trandata: contents_of_file.append("ADD TRANDATA " + self.current_table_owner_on_source_db + "." + tab_name) contents_of_file.append(NEWLINE) contents_of_file = NEWLINE.join(contents_of_file) # write contect to file file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "delta_add_trandata.obey" f = open(file_name, 'w') f.write(contents_of_file) f.close() # Upload and execute the INFO TRANDATA obey file self.upload_temp_file_to_ogg_host("delta_add_trandata.obey") remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "delta_add_trandata.obey" shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'" cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) cmd_output=cmd.stdout.decode('utf-8') return def create_remote_dirs(self): shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " mkdir -p " + self.config_params["remote_directory"] +"/temp/" + self.extract + "_" + self.replicat cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) cmd_output=cmd.stdout.decode('utf-8') return def get_array_active_db_nodes(self, database): array_active_db_nodes = [] try: db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + database, mode=cx_Oracle.SYSDBA) cursor = db.cursor() sql = "select INSTANCE_NAME, HOST_NAME from GV$INSTANCE where STATUS='OPEN'" cursor.execute(sql) for row in cursor: instance_name, host_name=row short_host_name = host_name.replace(".france.intra.corp", "") array_active_db_nodes.append(instance_name) array_active_db_nodes.append(short_host_name) cursor.close() db.close() except cx_Oracle.DatabaseError as err: print (Red_On_Black("UNHANDELED ERROR: " + str(err))) exit (ERROR) return array_active_db_nodes def get_database_version(self, database): try: db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + database, mode=cx_Oracle.SYSDBA) cursor = db.cursor() sql = "select VERSION from V$INSTANCE" cursor.execute(sql) for row in cursor: version = row[0] cursor.close() db.close() except cx_Oracle.DatabaseError as err: print (Red_On_Black("UNHANDELED ERROR: " + str(err))) exit (ERROR) return version def drop_new_tables_exists_on_target_db(self): try: db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_target, mode=cx_Oracle.SYSDBA) except cx_Oracle.DatabaseError as err: print (Red_On_Black("UNHANDELED ERROR: " + str(err))) exit (ERROR) cursor = db.cursor() # Drop table on target database for table_name in self.array_new_tables_on_target_db: try: sql = "drop table " + self.current_table_owner_on_target_db + "." + table_name +" purge" cursor.execute(sql) except cx_Oracle.DatabaseError as err: ora_xxxxx = str(err).split(":")[0] if ora_xxxxx == "ORA-00942": # ORA-00942: table or view does not exist pass else: print (Red_On_Black("UNHANDELED ERROR: " + str(err))) exit (ERROR) cursor.close() return def get_running_ogg_host(self): ogg12_env = self.config_params["ogg12_env"] ogg12_path = self.config_params["ogg12_path"] # Get the ogg host list, remove spaces, and tokenize it by commas ',' ogg_host_list = self.config_params["ogg_host_list"] ogg_host_list = ogg_host_list.replace(' ','').split(',') # Check if, and which, node is running the ogg manager for ogg_host in ogg_host_list: shellcommand="ssh oracle@" + ogg_host + " \'" + ogg12_env +"; echo info manager | " + ogg12_path + "/ggsci" +"\'" cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) cmd_output=cmd.stdout.decode('utf-8') for line in cmd_output.splitlines(): if ("MANAGER IS RUNNING" in line.upper()): self.config_params["ogg_host"] = ogg_host return ogg_host return def set_ogg_host(self, ogg_host_dev, ogg_host_prod): # Replace the ogg_host with the one received by parameter if self.specific_params["type_environement"].upper() == "DEV": self.config_params["ogg_host"] = ogg_host_dev elif self.specific_params["type_environement"].upper() == "PROD": self.config_params["ogg_host"] = ogg_host_prod return def get_extract_status(self): # Generate STOP EXTRACT obey file # compose file content contents_of_file = [] # contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login) contents_of_file.append("info " + self.extract ) contents_of_file.append(NEWLINE) contents_of_file = NEWLINE.join(contents_of_file) # write contect to file file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "info_extract.obey" f = open(file_name, 'w') f.write(contents_of_file) f.close() # Upload and execute the INFO TRANDATA obey file self.upload_temp_file_to_ogg_host("info_extract.obey") remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "info_extract.obey" shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'" cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) cmd_output=cmd.stdout.decode('utf-8') for line in cmd_output.splitlines(): if ("EXTRACT" in line.upper()) and (self.extract.upper() in line.upper()): extract_status = line.split("Status ")[1] return extract_status def stop_extract(self): # Generate STOP EXTRACT obey file # compose file content contents_of_file = [] # contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login) contents_of_file.append("stop " + self.extract ) contents_of_file.append(NEWLINE) contents_of_file = NEWLINE.join(contents_of_file) # write contect to file file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "stop_extract.obey" f = open(file_name, 'w') f.write(contents_of_file) f.close() # Upload and execute the INFO TRANDATA obey file self.upload_temp_file_to_ogg_host("stop_extract.obey") remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "stop_extract.obey" shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'" cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) cmd_output=cmd.stdout.decode('utf-8') # Check the status until is STOPPED status = self.get_extract_status() iteration = 0 while (status != "STOPPED" and iteration <= OGG_STATUS_MAX_ITERATION_CHECK): time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK) status = self.get_extract_status() iteration += 1 if iteration == OGG_STATUS_MAX_ITERATION_CHECK: print(Red_On_Black("FAILED")) exit (ERROR) return def start_extract(self): # Generate START EXTRACT obey file # compose file content contents_of_file = [] # contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login) contents_of_file.append("start " + self.extract ) contents_of_file.append(NEWLINE) contents_of_file = NEWLINE.join(contents_of_file) # write contect to file file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "start_extract.obey" f = open(file_name, 'w') f.write(contents_of_file) f.close() # Upload and execute the INFO TRANDATA obey file self.upload_temp_file_to_ogg_host("start_extract.obey") remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "start_extract.obey" shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'" cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) cmd_output=cmd.stdout.decode('utf-8') # Wait a little while time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK) # Check the status until is RUNNING status = self.get_extract_status() iteration = 0 while (status != "RUNNING" and iteration <= OGG_STATUS_MAX_ITERATION_CHECK): time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK) status = self.get_extract_status() iteration += 1 if iteration == OGG_STATUS_MAX_ITERATION_CHECK: print(Red_On_Black("FAILED")) exit (ERROR) return def get_replicat_status(self): replicat_status = "MISSING" # Generate STOP EXTRACT obey file # compose file content contents_of_file = [] # contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login) contents_of_file.append("info " + self.replicat ) contents_of_file.append(NEWLINE) contents_of_file = NEWLINE.join(contents_of_file) # write contect to file file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "info_replicat.obey" f = open(file_name, 'w') f.write(contents_of_file) f.close() # Upload and execute the INFO TRANDATA obey file self.upload_temp_file_to_ogg_host("info_replicat.obey") remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "info_replicat.obey" shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'" cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) cmd_output=cmd.stdout.decode('utf-8') for line in cmd_output.splitlines(): if ("REPLICAT" in line.upper()) and (self.replicat.upper() in line.upper()): replicat_status = line.split("Status ")[1] return replicat_status def update_sync_status(self): self.extract_status = self.get_extract_status() self.replicat_status = self.get_replicat_status() return def stop_replicat(self): # Generate STOP REPLICAT obey file # compose file content contents_of_file = [] # contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login) contents_of_file.append("stop " + self.replicat ) contents_of_file.append(NEWLINE) contents_of_file = NEWLINE.join(contents_of_file) # write contect to file file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "stop_replicat.obey" f = open(file_name, 'w') f.write(contents_of_file) f.close() # Upload and execute the obey file self.upload_temp_file_to_ogg_host("stop_replicat.obey") remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "stop_replicat.obey" shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'" cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) cmd_output=cmd.stdout.decode('utf-8') # Check the status until is STOPPED status = self.get_replicat_status() iteration = 0 while (status != "STOPPED" and iteration <= OGG_STATUS_MAX_ITERATION_CHECK): time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK) status = self.get_replicat_status() iteration += 1 if iteration == OGG_STATUS_MAX_ITERATION_CHECK: print(Red_On_Black("FAILED")) exit (ERROR) return def start_replicat(self): # Generate START REPLICAT obey file # compose file content contents_of_file = [] # contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login) contents_of_file.append("start " + self.replicat ) contents_of_file.append(NEWLINE) contents_of_file = NEWLINE.join(contents_of_file) # write contect to file file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "start_replicat.obey" f = open(file_name, 'w') f.write(contents_of_file) f.close() # Upload and execute the obey file self.upload_temp_file_to_ogg_host("start_replicat.obey") remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "start_replicat.obey" shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'" cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) cmd_output=cmd.stdout.decode('utf-8') # Check the status until is RUNNING status = self.get_replicat_status() iteration = 0 while (status != "RUNNING" and iteration <= OGG_STATUS_MAX_ITERATION_CHECK): time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK) status = self.get_replicat_status() iteration += 1 if iteration == OGG_STATUS_MAX_ITERATION_CHECK: print(Red_On_Black("FAILED")) exit (ERROR) return def replicat_has_lag(self): # Generate LAG REPLICAT obey file # compose file content contents_of_file = [] # contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login) contents_of_file.append("lag " + self.replicat ) contents_of_file.append(NEWLINE) contents_of_file = NEWLINE.join(contents_of_file) # write contect to file file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "lag_replicat.obey" f = open(file_name, 'w') f.write(contents_of_file) f.close() # Upload and execute the INFO TRANDATA obey file self.upload_temp_file_to_ogg_host("lag_replicat.obey") remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "lag_replicat.obey" shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'" cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) has_lag = True cmd_output=cmd.stdout.decode('utf-8') for line in cmd_output.splitlines(): if "no more records to process" in line: has_lag = False break return has_lag def export_delta_tables(self, scn): # Create OGGSYNC directory on source database and pick one of the active hosts for the database try: db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_source, mode=cx_Oracle.SYSDBA) cursor = db.cursor() sql = "create or replace directory OGGSYNC as '" + self.config_params["datapump_directory_path"]+ "'" cursor.execute(sql) sql = "select HOST_NAME from v$instance" cursor.execute(sql) for row in cursor: db_host_name=row[0] db_host_name = db_host_name.split(".")[0] cursor.close() except cx_Oracle.DatabaseError as err: print (err) exit (ERROR) # Generate EXPDP parfile # compose file content contents_of_file = [] contents_of_file.append("dumpfile = OGGSYNC:" + self.extract + "_"+ self.replicat + "_delta.dmp") contents_of_file.append("logfile = OGGSYNC:export_" + self.extract + "_"+ self.replicat + "_delta.log") contents_of_file.append("reuse_dumpfiles=Y") contents_of_file.append("exclude=GRANT,CONSTRAINT,INDEX,TRIGGER") if scn != None: contents_of_file.append("flashback_scn = " + scn) for table_name in self.array_new_tables_on_source_db: contents_of_file.append("tables = " + self.current_table_owner_on_source_db + "." + table_name) if self.database_source_version.startswith("12") and self.database_target_version.startswith("11"): contents_of_file.append("version=11.2") contents_of_file.append(NEWLINE) contents_of_file = NEWLINE.join(contents_of_file) # write contents to file file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "export_delta.par" f = open(file_name, 'w') f.write(contents_of_file) f.close() expdp_parfile = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "export_delta.par" # Generate EXPDP shell # compose file contents contents_of_file = [] contents_of_file.append(". oraenv < /dev/null") contents_of_file.append(self.database_source) contents_of_file.append("EOF!") contents_of_file.append("export ORACLE_SID=" + self.database_source_instances[0]) contents_of_file.append("expdp userid=\"'/ as sysdba'\" parfile=" + expdp_parfile) contents_of_file.append(NEWLINE) contents_of_file = NEWLINE.join(contents_of_file) # write contents to file file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "export_delta.sh" f = open(file, 'w') f.write(contents_of_file) f.close() expdp_shell = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "export_delta.sh" # Perform chmod +x fir the EXPDP shell st = os.stat(file_name) os.chmod(file, st.st_mode | stat.S_IEXEC) # Upload and execute the EXPDP shell and parfile self.upload_temp_file_to_ogg_host("export_delta.par") self.upload_temp_file_to_ogg_host("export_delta.sh") # EXPDP command will be launched from une of source database host shellcommand="ssh oracle@" + self.database_source_instances[1] + " " + expdp_shell process = subprocess.Popen(shlex.split(shellcommand), stdout=subprocess.PIPE) while True: output = process.stdout.readline() output = output.decode('utf-8') if output == '' and process.poll() is not None: break if output: print(Cyan_On_Black(output.strip())) rc = process.poll() # Check for errors download_file(self.config_params["datapump_directory_path"] + "/export_" + self.extract + "_" + self.replicat + "_delta.log", self.config_params["ogg_host"], OGG_SERVICE_DIR + "/temp/.") logfile = OGG_SERVICE_DIR + "/temp/export_" + self.extract + "_" + self.replicat + "_delta.log" contents_of_logfile = open(logfile, "r", encoding = "utf-8", errors = "replace").readlines() blocking_ora_error = 0 for line in contents_of_logfile: if line.startswith("ORA-"): # Line starts with an ORA-xxxx ora_error = line.split(":")[0] if ora_error not in EXPDP_IGNORE_ERROR_MESSAGES: # The ORA-xxxx message is not in the whitelist blocking_ora_error += 1 if blocking_ora_error > 0: print(Red_On_Black("FAILED")) print(White_On_Black("Please check the " + str(blocking_ora_error) + " error message(s) (ORA-xxxxx) in " + logfile)) exit (ERROR) return def import_delta_tables(self): # Create OGGSYNC directory on target database and pick one of the active hosts for the database try: db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_target, mode=cx_Oracle.SYSDBA) cursor = db.cursor() sql = "create or replace directory OGGSYNC as '" + self.config_params["datapump_directory_path"]+ "'" cursor.execute(sql) sql = "select HOST_NAME from v$instance" cursor.execute(sql) for row in cursor: db_host_name=row[0] db_host_name = db_host_name.split(".")[0] cursor.close() except cx_Oracle.DatabaseError as err: print (err) exit (ERROR) # Generate IMPDP parfile # compose file content contents_of_file = [] contents_of_file.append("dumpfile = OGGSYNC:" + self.extract + "_"+ self.replicat + "_delta.dmp") contents_of_file.append("logfile = OGGSYNC:import_" + self.extract + "_"+ self.replicat + "_delta.log") for table_name in self.array_new_tables_on_source_db: contents_of_file.append("tables = " + self.current_table_owner_on_source_db + "." + table_name) contents_of_file.append("remap_schema = " + self.current_table_owner_on_source_db + ":" + self.current_table_owner_on_target_db) if self.remap_tablespace: contents_of_file.append("remap_tablespace=" + self.remap_tablespace) contents_of_file.append(NEWLINE) contents_of_file = NEWLINE.join(contents_of_file) # write contents to file file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "import_delta.par" f = open(file_name, 'w') f.write(contents_of_file) f.close() impdp_parfile = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "import_delta.par" # Generate IMPDP shell # compose file content contents_of_file = [] contents_of_file.append(". oraenv < /dev/null") contents_of_file.append(self.database_target) contents_of_file.append("EOF!") contents_of_file.append("export ORACLE_SID=" + self.database_target_instances[0]) contents_of_file.append("impdp userid=\"'/ as sysdba'\" parfile=" + impdp_parfile) contents_of_file.append(NEWLINE) contents_of_file = NEWLINE.join(contents_of_file) # write contents to file file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "import_delta.sh" f = open(file, 'w') f.write(contents_of_file) f.close() impdp_shell = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "import_delta.sh" # Perform chmod +x for the IMPDP shell st = os.stat(file) os.chmod(file, st.st_mode | stat.S_IEXEC) # Upload and execute the IMPDP shell and parfile self.upload_temp_file_to_ogg_host("import_delta.par") self.upload_temp_file_to_ogg_host("import_delta.sh") # IMPDP command will be launched from une of target database host shellcommand="ssh oracle@" + self.database_target_instances[1] + " " + impdp_shell process = subprocess.Popen(shlex.split(shellcommand), stdout=subprocess.PIPE) while True: output = process.stdout.readline() output = output.decode('utf-8') if output == '' and process.poll() is not None: break if output: print(Cyan_On_Black(output.strip())) rc = process.poll() # Check for errors download_file(self.config_params["datapump_directory_path"] + "/import_" + self.extract + "_" + self.replicat + "_delta.log", self.config_params["ogg_host"], OGG_SERVICE_DIR + "/temp/.") logfile = OGG_SERVICE_DIR + "/temp/import_" + self.extract + "_" + self.replicat + "_delta.log" contents_of_logfile = open(logfile, "r", encoding = "utf-8", errors = "replace").readlines() blocking_ora_error = 0 for line in contents_of_logfile: if line.startswith("ORA-"): # Line starts with an ORA-xxxx ora_error = line.split(":")[0] if ora_error not in EXPDP_IGNORE_ERROR_MESSAGES: # The ORA-xxxx message is not in the whitelist blocking_ora_error += 1 if blocking_ora_error > 0: print(Red_On_Black("FAILED")) print(White_On_Black("Please check the " + str(blocking_ora_error) + " error message(s) (ORA-xxxxx) in " + logfile)) exit (ERROR) return def generate_ddl_source_delta_tables_pk_uk(self): shutil.copy(OGG_SERVICE_DIR + "/sql/" + self.config_params["sql_script_extract_ddl_pk_uk"], OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/") # Generate extract DDL script contents_of_file = [] contents_of_file.append("connect " + self.config_params["sysdba_user"] + "/" + self.config_params["sysdba_password"] + "@" + self.config_params["scan"] + "/" + self.database_source + " as sysdba") contents_of_file.append("spool " + self.database_target + "_create_pk_uk.sql") for line in self.array_new_tables_on_source_db: contents_of_file.append("@" + self.config_params["sql_script_extract_ddl_pk_uk"] + " " + self.current_table_owner_on_source_db + " " + line) contents_of_file.append("spool off") contents_of_file.append("exit") contents_of_file.append(NEWLINE) contents_of_file = NEWLINE.join(contents_of_file) # write contents to file file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_source + "_extract_ddl_indexes.sql" f = open(file, 'w') f.write(contents_of_file) f.close() # Execute extract DDL script with SQL*Plus shellcommand = "source ~/.bash_profile; cd "+ OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/"+ "; " + self.config_params["local_sqlplus"] + " /nolog @" + self.database_source + "_extract_ddl_indexes.sql" cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) # Add connection information and spool for the previous generated file file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_target + "_create_pk_uk.sql" contents_of_file = open(file, "r", encoding = "utf-8", errors = "replace").readlines() contents_of_file.insert(0, "spool " + self.database_target + "_create_pk_uk.log") contents_of_file.insert(1, NEWLINE + "connect " + self.config_params["sysdba_user"] + "/" + self.config_params["sysdba_password"] + "@" + self.config_params["scan"] + "/" + self.database_target + " as sysdba") contents_of_file.append("spool off" + NEWLINE) contents_of_file.append("exit" + NEWLINE) # Change OWNER as in target database contents_of_file_bis = [] for line in contents_of_file: #line = line.replace('\"', "") line = line.replace(self.current_table_owner_on_source_db + '"' + ".", self.current_table_owner_on_target_db + '"' + ".") contents_of_file_bis.append(line) contents_of_file_bis = "".join(contents_of_file_bis) f = open(file, 'w') f.write(contents_of_file_bis) f.close() return def create_delta_tables_pk_uk_on_target_database(self): # Execute extract DDL script with SQL*Plus shellcommand = "source ~/.bash_profile; cd "+ OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/"+ "; " + self.config_params["local_sqlplus"] + " /nolog @" + self.database_target + "_create_pk_uk.sql" cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) # Check for anormal ORA- in logfile logfile_create_index = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_target + "_create_pk_uk.log" contents_of_logfile_create_index = open(logfile_create_index, "r", encoding = "utf-8", errors = "replace").readlines() blocking_ora_error = 0 for line in contents_of_logfile_create_index: if line.startswith("ORA-"): # Line starts with an ORA-xxxx ora_error = line.split(":")[0] if ora_error not in TARGET_DATABASE_CREATE_INDEX_IGNORE_ERROR_MESSAGES: # The ORA-xxxx message is not in the whitelist blocking_ora_error += 1 if blocking_ora_error > 0: print(Red_On_Black("FAILED")) print(White_On_Black("Please check the " + str(blocking_ora_error) + " error message(s) (ORA-xxxxx) in " + logfile_create_index)) exit (ERROR) return def create_temporary_sync_directory(self): # create temporary sync directory try: os.mkdir(OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat) except (FileExistsError) as e: pass return def generate_ddl_target_delta_tables_old_index(self): shutil.copy(OGG_SERVICE_DIR + "/sql/" + self.config_params["sql_script_extract_ddl_index"], OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/") # Generate extract DDL script contents_of_file = [] contents_of_file.append("connect " + self.config_params["sysdba_user"] + "/" + self.config_params["sysdba_password"] + "@" + self.config_params["scan"] + "/" + self.database_target + " as sysdba") contents_of_file.append("spool " + self.database_target + "_create_old_indexes.sql") for line in self.array_new_tables_on_target_db: contents_of_file.append("@" + self.config_params["sql_script_extract_ddl_index"] + " " + self.current_table_owner_on_target_db + " " + line) contents_of_file.append("spool off") contents_of_file.append("exit") contents_of_file.append(NEWLINE) contents_of_file = NEWLINE.join(contents_of_file) # write contents to file file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_target + "_extract_ddl_old_indexes.sql" f = open(file, 'w') f.write(contents_of_file) f.close() # Execute extract DDL script with SQL*Plus shellcommand = "source ~/.bash_profile; cd "+ OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/"+ "; " + self.config_params["local_sqlplus"] + " /nolog @" + self.database_target + "_extract_ddl_old_indexes.sql" cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) # Add connection information and spool for the previous generated file file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_target + "_create_old_indexes.sql" contents_of_file = open(file, "r", encoding = "utf-8", errors = "replace").readlines() contents_of_file.insert(0, "spool " + self.database_target + "_create_old_indexes.log") contents_of_file.insert(1, NEWLINE + "connect " + self.config_params["sysdba_user"] + "/" + self.config_params["sysdba_password"] + "@" + self.config_params["scan"] + "/" + self.database_target + " as sysdba") contents_of_file.append("spool off" + NEWLINE) contents_of_file.append("exit" + NEWLINE) contents_of_file = "".join(contents_of_file) f = open(file, 'w') f.write(contents_of_file) f.close() # Archive SQL file archive_folder = OGG_SERVICE_DIR + "/archive" shutil.copy(file, archive_folder + "/" + self.extract + "_"+ self.replicat + "_" + self.database_target + "_create_old_indexes_" + self.runid + ".sql") return def create_delta_tables_old_indexes_on_target_database(self): # Execute extract DDL script with SQL*Plus shellcommand = "source ~/.bash_profile; cd "+ OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/"+ "; " + self.config_params["local_sqlplus"] + " /nolog @" + self.database_target + "_create_old_indexes.sql" cmd = subprocess.run( shellcommand, check=True, shell=True, stdout=subprocess.PIPE, ) # Check for anormal ORA- in logfile logfile_create_index = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_target + "_create_old_indexes.log" contents_of_logfile_create_index = open(logfile_create_index, "r", encoding = "utf-8", errors = "replace").readlines() blocking_ora_error = 0 for line in contents_of_logfile_create_index: if line.startswith("ORA-"): # Line starts with an ORA-xxxx ora_error = line.split(":")[0] if ora_error not in TARGET_DATABASE_CREATE_INDEX_IGNORE_ERROR_MESSAGES: # The ORA-xxxx message is not in the whitelist blocking_ora_error += 1 if blocking_ora_error > 0: print(Red_On_Black("FAILED")) print(White_On_Black("Please check the " + str(blocking_ora_error) + " error message(s) (ORA-xxxxx) in " + logfile_create_index)) return def add_new_tables_to_extract_prm(self): contents_of_file = [] for table_name in self.array_final_tables_on_extract: addnative_string = "" for table_with_add_native_option in self.extract_addnative_option_array: if table_name == table_with_add_native_option: addnative_string = ",_ADDNATIVE" contents_of_file.append("TABLE " + self.current_table_owner_on_source_db + "." + table_name + addnative_string + ";") contents_of_file = NEWLINE.join(contents_of_file) # Save content as prm extract tables file = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_tables.prm" f = open(file, 'w') f.write(contents_of_file) f.close() return def add_new_tables_to_replicat_prm_with_csn_filter(self, filter_transaction_csn): contents_of_file = [] # Classic MAP section for table_name in self.array_final_tables_on_replicat: if (table_name not in self.array_new_tables_on_target_db) or (filter_transaction_csn == None): contents_of_file.append("MAP " + self.current_table_owner_on_source_db + "." + table_name + ", TARGET " + self.current_table_owner_on_target_db + "." + table_name + ";") else: self.replicat_intermediate_filter_string = "FILTER ( @GETENV ('TRANSACTION', 'CSN') > " + filter_transaction_csn + ')' contents_of_file.append("MAP " + self.current_table_owner_on_source_db + "." + table_name + ", TARGET " + self.current_table_owner_on_target_db + "." + table_name + ", " + self.replicat_intermediate_filter_string + ";") # J$ section for key in self.current_map_dictionary.keys(): contents_of_file.append(self.current_map_dictionary[key]) contents_of_file = NEWLINE.join(contents_of_file) # Save content as prm replicat tables file = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_tables.prm" f = open(file, 'w') f.write(contents_of_file) f.close() return def switch_logfile_and_chekcpoint(self): try: db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_source, mode=cx_Oracle.SYSDBA) cursor = db.cursor() sql = "alter system switch logfile" cursor.execute(sql) sql = "alter system checkpoint" cursor.execute(sql) cursor.close() except cx_Oracle.DatabaseError as err: print (err) exit (ERROR) return def define_scn_for_export(self): try: db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_source, mode=cx_Oracle.SYSDBA) cursor = db.cursor() sql = """select min(scn) scn from( select min(current_scn) scn from gv$database union select min(t.start_scn) scn from gv$transaction t, gv$session s where s.saddr = t.ses_addr and s.inst_id = t.inst_id )""" cursor.execute(sql) for row in cursor: self.scn_of_export=str(row[0]) cursor.close() except cx_Oracle.DatabaseError as err: print (err) exit (ERROR) return def remove_csn_filter_from_replicat_prm(self): contents_of_file = [] for table_name in self.array_final_tables_on_replicat: contents_of_file.append("MAP " + self.current_table_owner_on_source_db + "." + table_name + ", TARGET " + self.current_table_owner_on_target_db + "." + table_name + ";") for key in self.final_map_dictionary.keys(): contents_of_file.append(self.final_map_dictionary[key]) contents_of_file = NEWLINE.join(contents_of_file) file = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_tables.prm" f = open(file, 'w') f.write(contents_of_file) f.close() return def backup_and_empty_delta_prm(self): for str1 in [self.extract, self.replicat]: file = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + str1 + "_delta.prm" file_old = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/." + str1 + "_delta.old" shutil.copyfile(file, file_old) # Empry file f = open(file, 'w') f.write("") f.close() return def archive_prm(self): prm_folder = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat archive_prm_folder = OGG_SERVICE_DIR + "/archive" zipname = self.extract + "_"+ self.replicat + "_" + self.runid +'.zip' fantasy_zip = zipfile.ZipFile(archive_prm_folder + '/'+ zipname, 'w') for folder, subfolders, files in os.walk(prm_folder): for file in files: if file.endswith('.prm'): fantasy_zip.write(os.path.join(folder, file), os.path.relpath(os.path.join(folder,file), prm_folder), compress_type = zipfile.ZIP_DEFLATED) fantasy_zip.close() return zipname def parse_prm_tables(self): # EXTRACT current ################# extract_current_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_tables.prm" try: contents_of_extract_current_prm = open(extract_current_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines() except FileNotFoundError as err: print (Red_On_Black("ERROR: file " + self.extract + "_tables.prm not found.")) exit (ERROR) tab_owner_array=[] tab_name_array=[] # Identify tables for line in contents_of_extract_current_prm: # supress spaces in begin/end line auxline = line.rstrip() auxline = auxline.lstrip() # skip comments if not auxline.startswith("--"): # eliminate double spaces and spaces auxline = re.sub(' +', ' ',auxline) auxline = auxline.replace(" ","") auxline = auxline.replace(";", "") auxline = auxline.upper() if auxline.upper().startswith("TABLE"): auxline = auxline.replace("TABLE", "", 1) auxline2 = auxline.split(",")[0] (tab_owner,tab_name) = auxline2.split(".") tab_owner_array.append(tab_owner) tab_name_array.append(tab_name) if ",_ADDNATIVE" in auxline: self.extract_addnative_option_array.append(tab_name) # Check if we have only one owner self.current_table_owner_on_source_db = tab_owner_array[0] self.array_current_tables_on_source_db = tab_name_array for tab_owner in tab_owner_array: if tab_owner != self.current_table_owner_on_source_db: print ("More than one table owners in EXTRACT current parameter file.") exit (ERROR) # REPLICAT current ################## replicat_current_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_tables.prm" try: contents_of_replicat_current_prm = open(replicat_current_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines() except FileNotFoundError as err: print (Red_On_Black("ERROR: file " + self.replicat + "_tables.prm not found.")) exit (ERROR) self.current_map_dictionary = {} table_array = [] for line in contents_of_replicat_current_prm: # supress spaces in begin/end line auxline = line.rstrip() auxline = auxline.lstrip() # Remove double spaces auxline = re.sub(' +', ' ',auxline) # skip comments if not auxline.startswith("--"): if auxline.upper().startswith("MAP") and "TARGET" in auxline.upper() and "J$" not in auxline.upper(): # Classic MAP section # Force uppercase and elilminate ";" auxline = auxline.upper() auxline = auxline.replace(";", "") (str1, str2) = auxline.split("TARGET ") str2 = str2.lstrip().rstrip() table_array.append(str2) if auxline.upper().startswith("MAP") and "TARGET" in auxline.upper() and "J$" in auxline.upper(): # J$ section auxline = auxline.upper() str1 = auxline.split(",")[1] str2 = str1.split(".")[1] self.current_map_dictionary[str2] = auxline array_current_owner_tables_on_target_db = table_array # Check if we have only one owner self.array_current_tables_on_target_db = [] self.current_table_owner_on_target_db = array_current_owner_tables_on_target_db[0].split(".")[0] for line in array_current_owner_tables_on_target_db: (owner, table_name) = line.split(".") # If a FILTER condition with CSN exists, we remove it if ("CSN" in table_name) and ("TRANSACTION" in table_name): pass else: self.array_current_tables_on_target_db.append(table_name) if owner != self.current_table_owner_on_target_db: print ("More than one table owners in REPLICAT current parameter file.") exit (ERROR) return def clean_local_prm_files(self): replicat_tables_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_tables.prm" contents_of_replicat_tables_prm = open(replicat_tables_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines() contents_of_replicat_tables_prm = removeDuplicates(contents_of_replicat_tables_prm) # Remove duplicate lines f = open(replicat_tables_prm_filename, 'w') contents_of_replicat_tables_prm="".join(contents_of_replicat_tables_prm) f.write(contents_of_replicat_tables_prm) f.close() # Remove empty lines remove_empty_lines(replicat_tables_prm_filename) return def generate_finals_tables_arrays(self): self.array_duplicate_tables_on_extract = intersection(self.array_current_tables_on_source_db, self.array_new_tables_on_source_db) self.array_final_tables_on_extract = union_keep_order(self.array_current_tables_on_source_db, self.array_new_tables_on_source_db) self.array_duplicate_tables_on_replicat = intersection(self.array_current_tables_on_target_db, self.array_new_tables_on_target_db) self.array_final_tables_on_replicat = union_keep_order(self.array_current_tables_on_target_db, self.array_new_tables_on_target_db) self.final_map_dictionary = merge_dict(self.current_map_dictionary, self.new_map_dictionary) self.tables_in_delta_exists_in_current = False if (len(self.array_duplicate_tables_on_extract) > 0) or (len(self.array_duplicate_tables_on_replicat) > 0): self.tables_in_delta_exists_in_current = False return