Files
notes/tiddlywiki/ogg_libs.py.txt
2026-03-12 22:01:38 +01:00

1575 lines
87 KiB
Python
Executable File

#!/u01/app/python/current_version/bin/python3
import os
import subprocess
import logging
import argparse
import datetime
import socket
import shlex
import cx_Oracle
from colorama import init
from colorama import Fore, Back
import json
import time
import stat
import shutil
import zipfile
from pathlib import Path
import re
# CONSTANTS
OGG_SERVICE_DIR = "/u01/app/oracle/admin/OGG_Service"
SUCCESS = 0
ERROR = 1
TAB = "\t"
NEWLINE = "\n"
OGG_STATUS_SLEEP_TIME_WHEN_CHECK = 5
OGG_STATUS_MAX_ITERATION_CHECK = 12
OGG_LAG_MAX_ITERATION_CHECK = 24
TARGET_DATABASE_CREATE_INDEX_IGNORE_ERROR_MESSAGES = ["ORA-02260","ORA-00955","ORA-01408"]
EXPDP_IGNORE_ERROR_MESSAGES = ["ORA-31693","ORA-02354","ORA-01466"]
IMPDP_IGNORE_ERROR_MESSAGES = []
# GENERIC FUNCTIONS
def start_logging(logfile):
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# create a file handler
handler = logging.FileHandler(logfile)
handler.setLevel(logging.INFO)
# create a logging format
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(handler)
return logger
def get_hostname():
global hostname
hostname = socket.gethostname()
return
def upload_file(source_file, target_host, target_file):
shellcommand = "ssh oracle@" + target_host + " mkdir -p $(dirname " + target_file + ") ; "+ "scp -p " + source_file + " oracle@" + target_host + ":" + target_file
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
return
def download_file(source_file, source_host, target_file):
shellcommand = "scp -p " + source_host + ":" + source_file + " " + target_file
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
return
def upload_directory(source_dir, target_host, target_dir):
shellcommand = "scp -rp " + source_dir + " oracle@" + target_host + ":" + target_dir
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
return
# Functions for pretty color printing
def init_pretty_color_printing():
init(autoreset=True)
return
def Red_On_Black(string):
return (Fore.RED + Back.BLACK + string)
def Cyan_On_Black(string):
return (Fore.CYAN + Back.BLACK + string)
def Yellow_On_Black(string):
return (Fore.YELLOW + Back.BLACK + string)
def Magenta_On_Black(string):
return (Fore.MAGENTA + Back.BLACK + string)
def Green_On_Black(string):
return (Fore.GREEN + Back.BLACK + string)
def White_On_Black(string):
return (Fore.WHITE + Back.BLACK + string)
def intersection(lst1, lst2):
lst3 = [value for value in lst1 if value in lst2]
return lst3
def union(lst1, lst2):
final_list = list(set(lst1) | set(lst2))
return final_list
def union_keep_order(lst1, lst2):
lst3 = lst1 + lst2
# Create an empty list to store unique elements
uniqueList = []
# Iterate over the original list and for each element
# add it to uniqueList, if its not already there.
for element in lst3:
if element not in uniqueList:
uniqueList.append(element)
# Return the list of unique elements
return uniqueList
def removeDuplicates(listofElements):
# Create an empty list to store unique elements
uniqueList = []
# Iterate over the original list and for each element
# add it to uniqueList, if its not already there.
for elem in listofElements:
if elem not in uniqueList:
uniqueList.append(elem)
# Return the list of unique elements
return uniqueList
def concatenate_files(input_files, output_file):
with open(output_file, 'w') as outfile:
for fname in input_files:
with open(fname) as infile:
outfile.write(infile.read())
return
def merge_dict(dict1, dict2):
dict3 = dict(dict1, **dict2)
return dict3
def remove_empty_lines(filename):
with open(filename) as filehandle:
lines = filehandle.readlines()
with open(filename, 'w') as filehandle:
lines = filter(lambda x: x.strip(), lines)
filehandle.writelines(lines)
return
# CLASS OGG_Sync
class OGG_Sync:
def __init__(self, extract, replicat): # Constructor of the class
self.runid = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S_%f')
self.extract = extract
self.replicat = replicat
# Identify environement type (dev|prod)
envinfo_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/env.info"
try:
contents_of_envinfo = open(envinfo_filename, "r", encoding = "utf-8", errors = "replace").readlines()
except FileNotFoundError as err:
print (Red_On_Black("ERROR: file env.info not found."))
exit (ERROR)
self.specific_params={}
for line in contents_of_envinfo:
line = line.rstrip()
line = line.lstrip()
if line !="":
param, value = line.split("=", 1)
param = param.rstrip()
param = param.lstrip()
value = value.rstrip()
value = value.lstrip()
self.specific_params[param] = value
self.extract_addnative_option_array = []
# Load configuration
configuration_file = OGG_SERVICE_DIR + "/etc/" + self.specific_params["type_environement"] + ".conf"
contents_configuration_file = open(configuration_file, "r", encoding = "utf-8", errors = "replace").readlines()
self.config_params={}
for line in contents_configuration_file:
line = line.rstrip()
line = line.lstrip()
if line !="":
param, value = line.split("=", 1)
param = param.rstrip()
param = param.lstrip()
value = value.rstrip()
value = value.lstrip()
self.config_params[param] = value
return
def get_class_attributes_as_json(self): # Display class attributes
json_class_attributes = vars(self)
return json.dumps(json_class_attributes, indent = 2)
def get_config_params_as_json(self): # Display class attributes
return json.dumps(self.config_params, indent = 2)
def get_type_env(self):
return
def build_extract_prm(self):
extract_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_header.prm"
extract_tables_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_tables.prm"
if os.path.isfile(extract_header_prm_filename) and os.path.isfile(extract_tables_prm_filename):
extract_prm_filename = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.extract + ".prm"
# concatenate header + table prm sections
contents_of_prm = []
contents_of_prm.append("-- This file has been is generated from the following files on " + hostname + " :\n")
contents_of_prm.append("-- " + extract_header_prm_filename + " \n")
contents_of_prm.append("-- " + extract_tables_prm_filename)
contents_of_prm.append(NEWLINE)
contents_of_extract_header_prm = open(extract_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
contents_of_extract_tables_prm = open(extract_tables_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
for line in contents_of_extract_header_prm:
auxline = line.rstrip()
auxline = auxline.replace(" ", "")
if auxline != "":
contents_of_prm.append(line)
contents_of_prm.append(NEWLINE)
for line in contents_of_extract_tables_prm:
auxline = line.rstrip()
auxline = auxline.replace(" ", "")
if auxline != "":
contents_of_prm.append(line)
f = open(extract_prm_filename, 'w')
contents_of_prm="".join(contents_of_prm)
f.write(contents_of_prm)
f.close()
# store extract_prm_filename in class for later usage
self.extract_prm_filename = extract_prm_filename
else:
print ("OGG Sync EXTRACT configuration files does not exist: ")
print (" " + extract_header_prm_filename)
print (" " + extract_tables_prm_filename)
exit (ERROR)
return
def build_replicat_prm(self):
replicat_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_header.prm"
replicat_tables_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_tables.prm"
if os.path.isfile(replicat_header_prm_filename) and os.path.isfile(replicat_tables_prm_filename):
# create temporary sync directory
try:
os.mkdir(OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat)
# store the value in class for later usage
except (FileExistsError) as e:
pass
replicat_prm_filename = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.replicat + ".prm"
# concatename header + tables prm sections
contents_of_prm = []
contents_of_prm.append("-- This file has been is generated from the following files on " + hostname + " :\n")
contents_of_prm.append("-- " + replicat_header_prm_filename + " \n")
contents_of_prm.append("-- " + replicat_tables_prm_filename)
contents_of_prm.append(NEWLINE)
contents_of_replicat_header_prm = open(replicat_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
contents_of_replicat_tables_prm = open(replicat_tables_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
for line in contents_of_replicat_header_prm:
auxline = line.rstrip()
auxline = auxline.replace(" ", "")
if auxline != "":
contents_of_prm.append(line)
contents_of_prm.append(NEWLINE)
for line in contents_of_replicat_tables_prm:
auxline = line.rstrip().lstrip()
auxline = auxline.replace(" ", "")
auxline = re.sub(' +', ' ',auxline)
if auxline != "":
contents_of_prm.append(line)
f = open(replicat_prm_filename, 'w')
contents_of_prm="".join(contents_of_prm)
f.write(contents_of_prm)
f.close()
# store replicat_prm_filename in class for later usage
self.replicat_prm_filename = replicat_prm_filename
else:
print ("OGG Sync REPLICAT configuration files does not exist: ")
print (" " + replicat_header_prm_filename)
print (" " + replicat_tables_prm_filename)
exit (ERROR)
return
def upload_extract_prm(self):
upload_file(OGG_SERVICE_DIR + "/temp/" + self.extract + "_" + self.replicat + "/" + self.extract + ".prm" , self.config_params["ogg_host"], self.config_params["ogg12_path"] + "/dirprm/" + self.extract + ".prm")
return
def upload_replicat_prm(self):
upload_file(OGG_SERVICE_DIR + "/temp/" + self.extract + "_" + self.replicat + "/" + self.replicat + ".prm" , self.config_params["ogg_host"], self.config_params["ogg12_path"] + "/dirprm/" + self.replicat + ".prm")
return
def upload_prm_files(self):
self.upload_extract_prm()
self.upload_replicat_prm()
return
def upload_temp_file_to_ogg_host(self, file_name):
upload_file(OGG_SERVICE_DIR + "/temp/" + self.extract + "_" + self.replicat + "/" + file_name, self.config_params["ogg_host"], self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + file_name)
return
def sync_full_old(self):
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " " + self.config_params["ogg_full_refresh_shell"] + " -e " + self.extract + " -r " + self.replicat + " -v 12"
process = subprocess.Popen(shlex.split(shellcommand), stdout=subprocess.PIPE)
while True:
output = process.stdout.readline()
output = output.decode('utf-8')
if output == '' and process.poll() is not None:
break
if output:
print(Cyan_On_Black(output.strip()))
rc = process.poll()
return rc
def parse_prm_headers(self):
# Parsing EXTRACT header
extract_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_header.prm"
try:
contents_of_extract_header_prm = open(extract_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
except FileNotFoundError as err:
print (Red_On_Black("ERROR: file " + self.extract + "_header.prm not found."))
exit (ERROR)
for line in contents_of_extract_header_prm:
# supress spaces in begin/end line
auxline = line.rstrip()
auxline = auxline.lstrip()
# skip comments
if not auxline.startswith("--"):
if auxline.upper().startswith("USERID"):
# GGSCI login on SOURCE database
self.ggsci_db_source_login = auxline.replace("USERID ", "", 1)
# Source DB informations
(str1,str2) = auxline.split(",")
(str3,str4) = str1.split("@")
self.database_source = str4
self.database_source_instances = self.get_array_active_db_nodes(self.database_source)
self.database_source_version = self.get_database_version(self.database_source)
# Parsing REPLICAT header
replicat_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_header.prm"
try:
contents_of_replicat_header_prm = open(replicat_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
except FileNotFoundError as err:
print (Red_On_Black("ERROR: file " + self.replicat + "_header.prm not found."))
exit (ERROR)
remap_tablespace_list = []
for line in contents_of_replicat_header_prm:
# supress spaces in begin/end line
auxline = line.rstrip()
auxline = auxline.lstrip()
# skip comments
if not auxline.startswith("--"):
if auxline.upper().startswith("USERID"):
# GGSCI login on SOURCE database
self.ggsci_db_target_login = auxline.replace("USERID ", "", 1)
# Source DB informations
(str1,str2) = auxline.upper().split(",")
(str3,str4) = str1.split("@")
(str5,str6) = str4.split("_OGG")
self.database_target = str5 + "EXA"
if auxline.upper().startswith("DDLSUBST"):
# Tablespace mapping
auxline = auxline.replace("'", "")
# Remove double spaces
auxline = re.sub(' +', ' ',auxline)
words_of_auxline = auxline.split(" ")
index_of_word_with = words_of_auxline.index("with")
remap_tablespace_clause = words_of_auxline[index_of_word_with -1] + ":" + words_of_auxline[index_of_word_with +1]
remap_tablespace_list.append(remap_tablespace_clause)
self.remap_tablespace = ",".join(remap_tablespace_list)
self.database_target_instances = self.get_array_active_db_nodes(self.database_target)
self.database_target_version = self.get_database_version(self.database_target)
return
def fast_parse_prm_headers(self):
# For WWW presentation, where the standard parsing procedure is not fast enough
# Parsing EXTRACT header
extract_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_header.prm"
try:
contents_of_extract_header_prm = open(extract_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
except FileNotFoundError as err:
print (Red_On_Black("ERROR: file " + self.extract + "_header.prm not found."))
exit (ERROR)
for line in contents_of_extract_header_prm:
# supress spaces in begin/end line
auxline = line.rstrip()
auxline = auxline.lstrip()
# skip comments
if not auxline.startswith("--"):
if auxline.upper().startswith("USERID"):
# GGSCI login on SOURCE database
self.ggsci_db_source_login = auxline.replace("USERID ", "", 1)
# Source DB informations
(str1,str2) = auxline.split(",")
(str3,str4) = str1.split("@")
self.database_source = str4
# Parsing REPLICAT header
replicat_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_header.prm"
try:
contents_of_replicat_header_prm = open(replicat_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
except FileNotFoundError as err:
print (Red_On_Black("ERROR: file " + self.replicat + "_header.prm not found."))
exit (ERROR)
remap_tablespace_list = []
for line in contents_of_replicat_header_prm:
# supress spaces in begin/end line
auxline = line.rstrip()
auxline = auxline.lstrip()
# skip comments
if not auxline.startswith("--"):
if auxline.upper().startswith("USERID"):
# GGSCI login on SOURCE database
self.ggsci_db_target_login = auxline.replace("USERID ", "", 1)
# Source DB informations
(str1,str2) = auxline.upper().split(",")
(str3,str4) = str1.split("@")
(str5,str6) = str4.split("_OGG")
self.database_target = str5 + "EXA"
return
def parse_prm_delta(self):
# EXTRACT DELTA
extract_delta_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_delta.prm"
try:
contents_of_extract_delta_prm = open(extract_delta_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
except FileNotFoundError as err:
print (Red_On_Black("ERROR: file " + self.extract + "_delta.prm not found."))
exit (ERROR)
if len(contents_of_extract_delta_prm) == 0:
print (Red_On_Black("ERROR: file " + self.extract + "_delta.prm is empty."))
exit (ERROR)
tab_owner_array=[]
tab_name_array=[]
# Identify tables
delta_addnative_option_array = []
for line in contents_of_extract_delta_prm:
auxline = line.rstrip()
auxline = auxline.lstrip()
# skip comments
if not auxline.startswith("--"):
# eliminate double spaces and spaces
auxline = re.sub(' +', ' ',auxline)
auxline = auxline.replace(" ","")
auxline = auxline.replace(";", "")
auxline = auxline.upper()
if auxline.upper().startswith("TABLE"):
auxline = auxline.replace("TABLE", "", 1)
auxline2 = auxline.split(",")[0]
(tab_owner,tab_name) = auxline2.split(".")
tab_owner_array.append(tab_owner)
tab_name_array.append(tab_name)
if ",_ADDNATIVE" in auxline:
delta_addnative_option_array.append(tab_name)
initial_addnative_option_array = self.extract_addnative_option_array
self.extract_addnative_option_array = union_keep_order(initial_addnative_option_array, delta_addnative_option_array)
# Check if we have only one owner
self.new_table_owner_on_source_db = tab_owner_array[0]
self.array_new_tables_on_source_db = tab_name_array
for tab_owner in tab_owner_array:
if tab_owner != self.new_table_owner_on_source_db:
print (Red_On_Black("ERROR: file " + self.replicat + "_delta.prm not found."))
exit (ERROR)
# REPLICAT DELTA
replicat_delta_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_delta.prm"
try:
contents_of_replicat_delta_prm = open(replicat_delta_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
except FileNotFoundError as err:
print (Red_On_Black("ERROR: file " + self.replicat + "_delta.prm not found."))
exit (ERROR)
if len(contents_of_extract_delta_prm) == 0:
print (contents_of_replicat_delta_prm("ERROR: file " + self.replicat + "_delta.prm is empty."))
exit (ERROR)
self.new_map_dictionary = {}
table_array = []
for line in contents_of_replicat_delta_prm:
# supress spaces in begin/end line
auxline = line.rstrip()
auxline = auxline.lstrip()
# skip comments
if not auxline.startswith("--"):
if auxline.upper().startswith("MAP") and "TARGET" in auxline.upper() and "J$" not in auxline.upper():
# Classic MAP section
# Force uppercase and elilminate ";"
auxline = auxline.upper()
auxline = auxline.replace(";", "")
(str1, str2) = auxline.split("TARGET ")
str2 = str2.lstrip().rstrip()
table_array.append(str2)
if auxline.upper().startswith("MAP") and "TARGET" in auxline.upper() and "J$" in auxline.upper():
# J$ section
auxline = auxline.upper()
str1 = auxline.split(",")[1]
str2 = str1.split(".")[1]
self.new_map_dictionary[str2] = auxline
array_new_owner_tables_on_target_db = table_array
# Check if we have only one owner
self.array_new_tables_on_target_db = []
self.new_table_owner_on_target_db = array_new_owner_tables_on_target_db[0].split(".")[0]
for line in array_new_owner_tables_on_target_db:
(owner, table_name) = line.split(".")
self.array_new_tables_on_target_db.append(table_name)
if owner != self.new_table_owner_on_target_db:
print ("More than one table owners in REPLICAT delta parameter file.")
exit (ERROR)
return
def check_pk_uk_on_source_db_delta_tables(self):
try:
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_source, mode=cx_Oracle.SYSDBA)
cursor = db.cursor()
# Check if source tables exist
for table_name in self.array_new_tables_on_source_db:
sql = "select count(1) from DBA_TABLES where OWNER='" + self.current_table_owner_on_source_db + "' and TABLE_NAME='" + table_name +"'"
cursor.execute(sql)
for row in cursor:
count=row[0]
if count == 0:
print (NEWLINE + "ERROR: Table " + self.current_table_owner_on_source_db + "." + table_name + " does not exists in database " + self.database_source)
exit (ERROR)
# Check if the tables have PK or UK
for table_name in self.array_new_tables_on_source_db:
sql = "select count(1) from DBA_CONSTRAINTS where OWNER='" + self.current_table_owner_on_source_db + "' and TABLE_NAME='" + table_name +"' and CONSTRAINT_TYPE in ('P','U')"
cursor.execute(sql)
for row in cursor:
count=row[0]
if count == 0:
print (NEWLINE +"ERROR: Table " + self.current_table_owner_on_source_db + "." + table_name + " in database " + self.database_source + " does not heve any PRIMARY KEY or UNIQUE INDEX")
exit (ERROR)
cursor.close()
except cx_Oracle.DatabaseError as err:
print (err)
exit (ERROR)
return
def add_trandata_on_delta_tables(self):
# Generate INFO TRANDATA obey file
# compose file content
contents_of_file = []
contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
for tab_name in self.array_new_tables_on_source_db:
contents_of_file.append("INFO TRANDATA " + self.current_table_owner_on_source_db + "." + tab_name)
contents_of_file.append(NEWLINE)
contents_of_file = NEWLINE.join(contents_of_file)
# write contect to file
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "delta_info_trandata.obey"
f = open(file_name, 'w')
f.write(contents_of_file)
f.close()
# Upload and execute the INFO TRANDATA obey file
self.upload_temp_file_to_ogg_host("delta_info_trandata.obey")
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "delta_info_trandata.obey"
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
cmd_output=cmd.stdout.decode('utf-8')
# Create an array for the tables with disabled TRANDATA
array_new_tables_to_enable_trandata = []
for line in cmd_output.splitlines():
if "disabled" in line:
# trandata is disabled
table_name = line.split("for table ")[1].split(".")[1]
array_new_tables_to_enable_trandata.append(table_name)
# Generate ADD TRANDATA obey file
# compose file content
contents_of_file = []
contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
for tab_name in array_new_tables_to_enable_trandata:
contents_of_file.append("ADD TRANDATA " + self.current_table_owner_on_source_db + "." + tab_name)
contents_of_file.append(NEWLINE)
contents_of_file = NEWLINE.join(contents_of_file)
# write contect to file
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "delta_add_trandata.obey"
f = open(file_name, 'w')
f.write(contents_of_file)
f.close()
# Upload and execute the INFO TRANDATA obey file
self.upload_temp_file_to_ogg_host("delta_add_trandata.obey")
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "delta_add_trandata.obey"
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
cmd_output=cmd.stdout.decode('utf-8')
return
def create_remote_dirs(self):
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " mkdir -p " + self.config_params["remote_directory"] +"/temp/" + self.extract + "_" + self.replicat
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
cmd_output=cmd.stdout.decode('utf-8')
return
def get_array_active_db_nodes(self, database):
array_active_db_nodes = []
try:
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + database, mode=cx_Oracle.SYSDBA)
cursor = db.cursor()
sql = "select INSTANCE_NAME, HOST_NAME from GV$INSTANCE where STATUS='OPEN'"
cursor.execute(sql)
for row in cursor:
instance_name, host_name=row
short_host_name = host_name.replace(".france.intra.corp", "")
array_active_db_nodes.append(instance_name)
array_active_db_nodes.append(short_host_name)
cursor.close()
db.close()
except cx_Oracle.DatabaseError as err:
print (Red_On_Black("UNHANDELED ERROR: " + str(err)))
exit (ERROR)
return array_active_db_nodes
def get_database_version(self, database):
try:
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + database, mode=cx_Oracle.SYSDBA)
cursor = db.cursor()
sql = "select VERSION from V$INSTANCE"
cursor.execute(sql)
for row in cursor:
version = row[0]
cursor.close()
db.close()
except cx_Oracle.DatabaseError as err:
print (Red_On_Black("UNHANDELED ERROR: " + str(err)))
exit (ERROR)
return version
def drop_new_tables_exists_on_target_db(self):
try:
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_target, mode=cx_Oracle.SYSDBA)
except cx_Oracle.DatabaseError as err:
print (Red_On_Black("UNHANDELED ERROR: " + str(err)))
exit (ERROR)
cursor = db.cursor()
# Drop table on target database
for table_name in self.array_new_tables_on_target_db:
try:
sql = "drop table " + self.current_table_owner_on_target_db + "." + table_name +" purge"
cursor.execute(sql)
except cx_Oracle.DatabaseError as err:
ora_xxxxx = str(err).split(":")[0]
if ora_xxxxx == "ORA-00942":
# ORA-00942: table or view does not exist
pass
else:
print (Red_On_Black("UNHANDELED ERROR: " + str(err)))
exit (ERROR)
cursor.close()
return
def get_running_ogg_host(self):
ogg12_env = self.config_params["ogg12_env"]
ogg12_path = self.config_params["ogg12_path"]
# Get the ogg host list, remove spaces, and tokenize it by commas ','
ogg_host_list = self.config_params["ogg_host_list"]
ogg_host_list = ogg_host_list.replace(' ','').split(',')
# Check if, and which, node is running the ogg manager
for ogg_host in ogg_host_list:
shellcommand="ssh oracle@" + ogg_host + " \'" + ogg12_env +"; echo info manager | " + ogg12_path + "/ggsci" +"\'"
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
cmd_output=cmd.stdout.decode('utf-8')
for line in cmd_output.splitlines():
if ("MANAGER IS RUNNING" in line.upper()):
self.config_params["ogg_host"] = ogg_host
return ogg_host
return
def set_ogg_host(self, ogg_host_dev, ogg_host_prod):
# Replace the ogg_host with the one received by parameter
if self.specific_params["type_environement"].upper() == "DEV":
self.config_params["ogg_host"] = ogg_host_dev
elif self.specific_params["type_environement"].upper() == "PROD":
self.config_params["ogg_host"] = ogg_host_prod
return
def get_extract_status(self):
# Generate STOP EXTRACT obey file
# compose file content
contents_of_file = []
# contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
contents_of_file.append("info " + self.extract )
contents_of_file.append(NEWLINE)
contents_of_file = NEWLINE.join(contents_of_file)
# write contect to file
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "info_extract.obey"
f = open(file_name, 'w')
f.write(contents_of_file)
f.close()
# Upload and execute the INFO TRANDATA obey file
self.upload_temp_file_to_ogg_host("info_extract.obey")
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "info_extract.obey"
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
cmd_output=cmd.stdout.decode('utf-8')
for line in cmd_output.splitlines():
if ("EXTRACT" in line.upper()) and (self.extract.upper() in line.upper()):
extract_status = line.split("Status ")[1]
return extract_status
def stop_extract(self):
# Generate STOP EXTRACT obey file
# compose file content
contents_of_file = []
# contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
contents_of_file.append("stop " + self.extract )
contents_of_file.append(NEWLINE)
contents_of_file = NEWLINE.join(contents_of_file)
# write contect to file
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "stop_extract.obey"
f = open(file_name, 'w')
f.write(contents_of_file)
f.close()
# Upload and execute the INFO TRANDATA obey file
self.upload_temp_file_to_ogg_host("stop_extract.obey")
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "stop_extract.obey"
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
cmd_output=cmd.stdout.decode('utf-8')
# Check the status until is STOPPED
status = self.get_extract_status()
iteration = 0
while (status != "STOPPED" and iteration <= OGG_STATUS_MAX_ITERATION_CHECK):
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK)
status = self.get_extract_status()
iteration += 1
if iteration == OGG_STATUS_MAX_ITERATION_CHECK:
print(Red_On_Black("FAILED"))
exit (ERROR)
return
def start_extract(self):
# Generate START EXTRACT obey file
# compose file content
contents_of_file = []
# contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
contents_of_file.append("start " + self.extract )
contents_of_file.append(NEWLINE)
contents_of_file = NEWLINE.join(contents_of_file)
# write contect to file
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "start_extract.obey"
f = open(file_name, 'w')
f.write(contents_of_file)
f.close()
# Upload and execute the INFO TRANDATA obey file
self.upload_temp_file_to_ogg_host("start_extract.obey")
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "start_extract.obey"
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
cmd_output=cmd.stdout.decode('utf-8')
# Wait a little while
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK)
# Check the status until is RUNNING
status = self.get_extract_status()
iteration = 0
while (status != "RUNNING" and iteration <= OGG_STATUS_MAX_ITERATION_CHECK):
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK)
status = self.get_extract_status()
iteration += 1
if iteration == OGG_STATUS_MAX_ITERATION_CHECK:
print(Red_On_Black("FAILED"))
exit (ERROR)
return
def get_replicat_status(self):
replicat_status = "MISSING"
# Generate STOP EXTRACT obey file
# compose file content
contents_of_file = []
# contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
contents_of_file.append("info " + self.replicat )
contents_of_file.append(NEWLINE)
contents_of_file = NEWLINE.join(contents_of_file)
# write contect to file
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "info_replicat.obey"
f = open(file_name, 'w')
f.write(contents_of_file)
f.close()
# Upload and execute the INFO TRANDATA obey file
self.upload_temp_file_to_ogg_host("info_replicat.obey")
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "info_replicat.obey"
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
cmd_output=cmd.stdout.decode('utf-8')
for line in cmd_output.splitlines():
if ("REPLICAT" in line.upper()) and (self.replicat.upper() in line.upper()):
replicat_status = line.split("Status ")[1]
return replicat_status
def update_sync_status(self):
self.extract_status = self.get_extract_status()
self.replicat_status = self.get_replicat_status()
return
def stop_replicat(self):
# Generate STOP REPLICAT obey file
# compose file content
contents_of_file = []
# contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
contents_of_file.append("stop " + self.replicat )
contents_of_file.append(NEWLINE)
contents_of_file = NEWLINE.join(contents_of_file)
# write contect to file
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "stop_replicat.obey"
f = open(file_name, 'w')
f.write(contents_of_file)
f.close()
# Upload and execute the obey file
self.upload_temp_file_to_ogg_host("stop_replicat.obey")
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "stop_replicat.obey"
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
cmd_output=cmd.stdout.decode('utf-8')
# Check the status until is STOPPED
status = self.get_replicat_status()
iteration = 0
while (status != "STOPPED" and iteration <= OGG_STATUS_MAX_ITERATION_CHECK):
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK)
status = self.get_replicat_status()
iteration += 1
if iteration == OGG_STATUS_MAX_ITERATION_CHECK:
print(Red_On_Black("FAILED"))
exit (ERROR)
return
def start_replicat(self):
# Generate START REPLICAT obey file
# compose file content
contents_of_file = []
# contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
contents_of_file.append("start " + self.replicat )
contents_of_file.append(NEWLINE)
contents_of_file = NEWLINE.join(contents_of_file)
# write contect to file
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "start_replicat.obey"
f = open(file_name, 'w')
f.write(contents_of_file)
f.close()
# Upload and execute the obey file
self.upload_temp_file_to_ogg_host("start_replicat.obey")
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "start_replicat.obey"
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
cmd_output=cmd.stdout.decode('utf-8')
# Check the status until is RUNNING
status = self.get_replicat_status()
iteration = 0
while (status != "RUNNING" and iteration <= OGG_STATUS_MAX_ITERATION_CHECK):
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK)
status = self.get_replicat_status()
iteration += 1
if iteration == OGG_STATUS_MAX_ITERATION_CHECK:
print(Red_On_Black("FAILED"))
exit (ERROR)
return
def replicat_has_lag(self):
# Generate LAG REPLICAT obey file
# compose file content
contents_of_file = []
# contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
contents_of_file.append("lag " + self.replicat )
contents_of_file.append(NEWLINE)
contents_of_file = NEWLINE.join(contents_of_file)
# write contect to file
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "lag_replicat.obey"
f = open(file_name, 'w')
f.write(contents_of_file)
f.close()
# Upload and execute the INFO TRANDATA obey file
self.upload_temp_file_to_ogg_host("lag_replicat.obey")
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "lag_replicat.obey"
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
has_lag = True
cmd_output=cmd.stdout.decode('utf-8')
for line in cmd_output.splitlines():
if "no more records to process" in line:
has_lag = False
break
return has_lag
def export_delta_tables(self, scn):
# Create OGGSYNC directory on source database and pick one of the active hosts for the database
try:
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_source, mode=cx_Oracle.SYSDBA)
cursor = db.cursor()
sql = "create or replace directory OGGSYNC as '" + self.config_params["datapump_directory_path"]+ "'"
cursor.execute(sql)
sql = "select HOST_NAME from v$instance"
cursor.execute(sql)
for row in cursor:
db_host_name=row[0]
db_host_name = db_host_name.split(".")[0]
cursor.close()
except cx_Oracle.DatabaseError as err:
print (err)
exit (ERROR)
# Generate EXPDP parfile
# compose file content
contents_of_file = []
contents_of_file.append("dumpfile = OGGSYNC:" + self.extract + "_"+ self.replicat + "_delta.dmp")
contents_of_file.append("logfile = OGGSYNC:export_" + self.extract + "_"+ self.replicat + "_delta.log")
contents_of_file.append("reuse_dumpfiles=Y")
contents_of_file.append("exclude=GRANT,CONSTRAINT,INDEX,TRIGGER")
if scn != None:
contents_of_file.append("flashback_scn = " + scn)
for table_name in self.array_new_tables_on_source_db:
contents_of_file.append("tables = " + self.current_table_owner_on_source_db + "." + table_name)
if self.database_source_version.startswith("12") and self.database_target_version.startswith("11"):
contents_of_file.append("version=11.2")
contents_of_file.append(NEWLINE)
contents_of_file = NEWLINE.join(contents_of_file)
# write contents to file
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "export_delta.par"
f = open(file_name, 'w')
f.write(contents_of_file)
f.close()
expdp_parfile = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "export_delta.par"
# Generate EXPDP shell
# compose file contents
contents_of_file = []
contents_of_file.append(". oraenv <<EOF! > /dev/null")
contents_of_file.append(self.database_source)
contents_of_file.append("EOF!")
contents_of_file.append("export ORACLE_SID=" + self.database_source_instances[0])
contents_of_file.append("expdp userid=\"'/ as sysdba'\" parfile=" + expdp_parfile)
contents_of_file.append(NEWLINE)
contents_of_file = NEWLINE.join(contents_of_file)
# write contents to file
file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "export_delta.sh"
f = open(file, 'w')
f.write(contents_of_file)
f.close()
expdp_shell = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "export_delta.sh"
# Perform chmod +x fir the EXPDP shell
st = os.stat(file_name)
os.chmod(file, st.st_mode | stat.S_IEXEC)
# Upload and execute the EXPDP shell and parfile
self.upload_temp_file_to_ogg_host("export_delta.par")
self.upload_temp_file_to_ogg_host("export_delta.sh")
# EXPDP command will be launched from une of source database host
shellcommand="ssh oracle@" + self.database_source_instances[1] + " " + expdp_shell
process = subprocess.Popen(shlex.split(shellcommand), stdout=subprocess.PIPE)
while True:
output = process.stdout.readline()
output = output.decode('utf-8')
if output == '' and process.poll() is not None:
break
if output:
print(Cyan_On_Black(output.strip()))
rc = process.poll()
# Check for errors
download_file(self.config_params["datapump_directory_path"] + "/export_" + self.extract + "_" + self.replicat + "_delta.log", self.config_params["ogg_host"], OGG_SERVICE_DIR + "/temp/.")
logfile = OGG_SERVICE_DIR + "/temp/export_" + self.extract + "_" + self.replicat + "_delta.log"
contents_of_logfile = open(logfile, "r", encoding = "utf-8", errors = "replace").readlines()
blocking_ora_error = 0
for line in contents_of_logfile:
if line.startswith("ORA-"):
# Line starts with an ORA-xxxx
ora_error = line.split(":")[0]
if ora_error not in EXPDP_IGNORE_ERROR_MESSAGES:
# The ORA-xxxx message is not in the whitelist
blocking_ora_error += 1
if blocking_ora_error > 0:
print(Red_On_Black("FAILED"))
print(White_On_Black("Please check the " + str(blocking_ora_error) + " error message(s) (ORA-xxxxx) in " + logfile))
exit (ERROR)
return
def import_delta_tables(self):
# Create OGGSYNC directory on target database and pick one of the active hosts for the database
try:
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_target, mode=cx_Oracle.SYSDBA)
cursor = db.cursor()
sql = "create or replace directory OGGSYNC as '" + self.config_params["datapump_directory_path"]+ "'"
cursor.execute(sql)
sql = "select HOST_NAME from v$instance"
cursor.execute(sql)
for row in cursor:
db_host_name=row[0]
db_host_name = db_host_name.split(".")[0]
cursor.close()
except cx_Oracle.DatabaseError as err:
print (err)
exit (ERROR)
# Generate IMPDP parfile
# compose file content
contents_of_file = []
contents_of_file.append("dumpfile = OGGSYNC:" + self.extract + "_"+ self.replicat + "_delta.dmp")
contents_of_file.append("logfile = OGGSYNC:import_" + self.extract + "_"+ self.replicat + "_delta.log")
for table_name in self.array_new_tables_on_source_db:
contents_of_file.append("tables = " + self.current_table_owner_on_source_db + "." + table_name)
contents_of_file.append("remap_schema = " + self.current_table_owner_on_source_db + ":" + self.current_table_owner_on_target_db)
if self.remap_tablespace:
contents_of_file.append("remap_tablespace=" + self.remap_tablespace)
contents_of_file.append(NEWLINE)
contents_of_file = NEWLINE.join(contents_of_file)
# write contents to file
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "import_delta.par"
f = open(file_name, 'w')
f.write(contents_of_file)
f.close()
impdp_parfile = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "import_delta.par"
# Generate IMPDP shell
# compose file content
contents_of_file = []
contents_of_file.append(". oraenv <<EOF! > /dev/null")
contents_of_file.append(self.database_target)
contents_of_file.append("EOF!")
contents_of_file.append("export ORACLE_SID=" + self.database_target_instances[0])
contents_of_file.append("impdp userid=\"'/ as sysdba'\" parfile=" + impdp_parfile)
contents_of_file.append(NEWLINE)
contents_of_file = NEWLINE.join(contents_of_file)
# write contents to file
file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "import_delta.sh"
f = open(file, 'w')
f.write(contents_of_file)
f.close()
impdp_shell = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "import_delta.sh"
# Perform chmod +x for the IMPDP shell
st = os.stat(file)
os.chmod(file, st.st_mode | stat.S_IEXEC)
# Upload and execute the IMPDP shell and parfile
self.upload_temp_file_to_ogg_host("import_delta.par")
self.upload_temp_file_to_ogg_host("import_delta.sh")
# IMPDP command will be launched from une of target database host
shellcommand="ssh oracle@" + self.database_target_instances[1] + " " + impdp_shell
process = subprocess.Popen(shlex.split(shellcommand), stdout=subprocess.PIPE)
while True:
output = process.stdout.readline()
output = output.decode('utf-8')
if output == '' and process.poll() is not None:
break
if output:
print(Cyan_On_Black(output.strip()))
rc = process.poll()
# Check for errors
download_file(self.config_params["datapump_directory_path"] + "/import_" + self.extract + "_" + self.replicat + "_delta.log", self.config_params["ogg_host"], OGG_SERVICE_DIR + "/temp/.")
logfile = OGG_SERVICE_DIR + "/temp/import_" + self.extract + "_" + self.replicat + "_delta.log"
contents_of_logfile = open(logfile, "r", encoding = "utf-8", errors = "replace").readlines()
blocking_ora_error = 0
for line in contents_of_logfile:
if line.startswith("ORA-"):
# Line starts with an ORA-xxxx
ora_error = line.split(":")[0]
if ora_error not in EXPDP_IGNORE_ERROR_MESSAGES:
# The ORA-xxxx message is not in the whitelist
blocking_ora_error += 1
if blocking_ora_error > 0:
print(Red_On_Black("FAILED"))
print(White_On_Black("Please check the " + str(blocking_ora_error) + " error message(s) (ORA-xxxxx) in " + logfile))
exit (ERROR)
return
def generate_ddl_source_delta_tables_pk_uk(self):
shutil.copy(OGG_SERVICE_DIR + "/sql/" + self.config_params["sql_script_extract_ddl_pk_uk"], OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/")
# Generate extract DDL script
contents_of_file = []
contents_of_file.append("connect " + self.config_params["sysdba_user"] + "/" + self.config_params["sysdba_password"] + "@" + self.config_params["scan"] + "/" + self.database_source + " as sysdba")
contents_of_file.append("spool " + self.database_target + "_create_pk_uk.sql")
for line in self.array_new_tables_on_source_db:
contents_of_file.append("@" + self.config_params["sql_script_extract_ddl_pk_uk"] + " " + self.current_table_owner_on_source_db + " " + line)
contents_of_file.append("spool off")
contents_of_file.append("exit")
contents_of_file.append(NEWLINE)
contents_of_file = NEWLINE.join(contents_of_file)
# write contents to file
file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_source + "_extract_ddl_indexes.sql"
f = open(file, 'w')
f.write(contents_of_file)
f.close()
# Execute extract DDL script with SQL*Plus
shellcommand = "source ~/.bash_profile; cd "+ OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/"+ "; " + self.config_params["local_sqlplus"] + " /nolog @" + self.database_source + "_extract_ddl_indexes.sql"
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
# Add connection information and spool for the previous generated file
file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_target + "_create_pk_uk.sql"
contents_of_file = open(file, "r", encoding = "utf-8", errors = "replace").readlines()
contents_of_file.insert(0, "spool " + self.database_target + "_create_pk_uk.log")
contents_of_file.insert(1, NEWLINE + "connect " + self.config_params["sysdba_user"] + "/" + self.config_params["sysdba_password"] + "@" + self.config_params["scan"] + "/" + self.database_target + " as sysdba")
contents_of_file.append("spool off" + NEWLINE)
contents_of_file.append("exit" + NEWLINE)
# Change OWNER as in target database
contents_of_file_bis = []
for line in contents_of_file:
#line = line.replace('\"', "")
line = line.replace(self.current_table_owner_on_source_db + '"' + ".", self.current_table_owner_on_target_db + '"' + ".")
contents_of_file_bis.append(line)
contents_of_file_bis = "".join(contents_of_file_bis)
f = open(file, 'w')
f.write(contents_of_file_bis)
f.close()
return
def create_delta_tables_pk_uk_on_target_database(self):
# Execute extract DDL script with SQL*Plus
shellcommand = "source ~/.bash_profile; cd "+ OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/"+ "; " + self.config_params["local_sqlplus"] + " /nolog @" + self.database_target + "_create_pk_uk.sql"
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
# Check for anormal ORA- in logfile
logfile_create_index = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_target + "_create_pk_uk.log"
contents_of_logfile_create_index = open(logfile_create_index, "r", encoding = "utf-8", errors = "replace").readlines()
blocking_ora_error = 0
for line in contents_of_logfile_create_index:
if line.startswith("ORA-"):
# Line starts with an ORA-xxxx
ora_error = line.split(":")[0]
if ora_error not in TARGET_DATABASE_CREATE_INDEX_IGNORE_ERROR_MESSAGES:
# The ORA-xxxx message is not in the whitelist
blocking_ora_error += 1
if blocking_ora_error > 0:
print(Red_On_Black("FAILED"))
print(White_On_Black("Please check the " + str(blocking_ora_error) + " error message(s) (ORA-xxxxx) in " + logfile_create_index))
exit (ERROR)
return
def create_temporary_sync_directory(self):
# create temporary sync directory
try:
os.mkdir(OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat)
except (FileExistsError) as e:
pass
return
def generate_ddl_target_delta_tables_old_index(self):
shutil.copy(OGG_SERVICE_DIR + "/sql/" + self.config_params["sql_script_extract_ddl_index"], OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/")
# Generate extract DDL script
contents_of_file = []
contents_of_file.append("connect " + self.config_params["sysdba_user"] + "/" + self.config_params["sysdba_password"] + "@" + self.config_params["scan"] + "/" + self.database_target + " as sysdba")
contents_of_file.append("spool " + self.database_target + "_create_old_indexes.sql")
for line in self.array_new_tables_on_target_db:
contents_of_file.append("@" + self.config_params["sql_script_extract_ddl_index"] + " " + self.current_table_owner_on_target_db + " " + line)
contents_of_file.append("spool off")
contents_of_file.append("exit")
contents_of_file.append(NEWLINE)
contents_of_file = NEWLINE.join(contents_of_file)
# write contents to file
file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_target + "_extract_ddl_old_indexes.sql"
f = open(file, 'w')
f.write(contents_of_file)
f.close()
# Execute extract DDL script with SQL*Plus
shellcommand = "source ~/.bash_profile; cd "+ OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/"+ "; " + self.config_params["local_sqlplus"] + " /nolog @" + self.database_target + "_extract_ddl_old_indexes.sql"
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
# Add connection information and spool for the previous generated file
file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_target + "_create_old_indexes.sql"
contents_of_file = open(file, "r", encoding = "utf-8", errors = "replace").readlines()
contents_of_file.insert(0, "spool " + self.database_target + "_create_old_indexes.log")
contents_of_file.insert(1, NEWLINE + "connect " + self.config_params["sysdba_user"] + "/" + self.config_params["sysdba_password"] + "@" + self.config_params["scan"] + "/" + self.database_target + " as sysdba")
contents_of_file.append("spool off" + NEWLINE)
contents_of_file.append("exit" + NEWLINE)
contents_of_file = "".join(contents_of_file)
f = open(file, 'w')
f.write(contents_of_file)
f.close()
# Archive SQL file
archive_folder = OGG_SERVICE_DIR + "/archive"
shutil.copy(file, archive_folder + "/" + self.extract + "_"+ self.replicat + "_" + self.database_target + "_create_old_indexes_" + self.runid + ".sql")
return
def create_delta_tables_old_indexes_on_target_database(self):
# Execute extract DDL script with SQL*Plus
shellcommand = "source ~/.bash_profile; cd "+ OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/"+ "; " + self.config_params["local_sqlplus"] + " /nolog @" + self.database_target + "_create_old_indexes.sql"
cmd = subprocess.run(
shellcommand,
check=True,
shell=True,
stdout=subprocess.PIPE,
)
# Check for anormal ORA- in logfile
logfile_create_index = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_target + "_create_old_indexes.log"
contents_of_logfile_create_index = open(logfile_create_index, "r", encoding = "utf-8", errors = "replace").readlines()
blocking_ora_error = 0
for line in contents_of_logfile_create_index:
if line.startswith("ORA-"):
# Line starts with an ORA-xxxx
ora_error = line.split(":")[0]
if ora_error not in TARGET_DATABASE_CREATE_INDEX_IGNORE_ERROR_MESSAGES:
# The ORA-xxxx message is not in the whitelist
blocking_ora_error += 1
if blocking_ora_error > 0:
print(Red_On_Black("FAILED"))
print(White_On_Black("Please check the " + str(blocking_ora_error) + " error message(s) (ORA-xxxxx) in " + logfile_create_index))
return
def add_new_tables_to_extract_prm(self):
contents_of_file = []
for table_name in self.array_final_tables_on_extract:
addnative_string = ""
for table_with_add_native_option in self.extract_addnative_option_array:
if table_name == table_with_add_native_option:
addnative_string = ",_ADDNATIVE"
contents_of_file.append("TABLE " + self.current_table_owner_on_source_db + "." + table_name + addnative_string + ";")
contents_of_file = NEWLINE.join(contents_of_file)
# Save content as prm extract tables
file = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_tables.prm"
f = open(file, 'w')
f.write(contents_of_file)
f.close()
return
def add_new_tables_to_replicat_prm_with_csn_filter(self, filter_transaction_csn):
contents_of_file = []
# Classic MAP section
for table_name in self.array_final_tables_on_replicat:
if (table_name not in self.array_new_tables_on_target_db) or (filter_transaction_csn == None):
contents_of_file.append("MAP " + self.current_table_owner_on_source_db + "." + table_name + ", TARGET " + self.current_table_owner_on_target_db + "." + table_name + ";")
else:
self.replicat_intermediate_filter_string = "FILTER ( @GETENV ('TRANSACTION', 'CSN') > " + filter_transaction_csn + ')'
contents_of_file.append("MAP " + self.current_table_owner_on_source_db + "." + table_name + ", TARGET " + self.current_table_owner_on_target_db + "." + table_name + ", " + self.replicat_intermediate_filter_string + ";")
# J$ section
for key in self.current_map_dictionary.keys():
contents_of_file.append(self.current_map_dictionary[key])
contents_of_file = NEWLINE.join(contents_of_file)
# Save content as prm replicat tables
file = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_tables.prm"
f = open(file, 'w')
f.write(contents_of_file)
f.close()
return
def switch_logfile_and_chekcpoint(self):
try:
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_source, mode=cx_Oracle.SYSDBA)
cursor = db.cursor()
sql = "alter system switch logfile"
cursor.execute(sql)
sql = "alter system checkpoint"
cursor.execute(sql)
cursor.close()
except cx_Oracle.DatabaseError as err:
print (err)
exit (ERROR)
return
def define_scn_for_export(self):
try:
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_source, mode=cx_Oracle.SYSDBA)
cursor = db.cursor()
sql = """select min(scn) scn from(
select min(current_scn) scn from gv$database
union
select min(t.start_scn) scn from gv$transaction t, gv$session s
where s.saddr = t.ses_addr
and s.inst_id = t.inst_id
)"""
cursor.execute(sql)
for row in cursor:
self.scn_of_export=str(row[0])
cursor.close()
except cx_Oracle.DatabaseError as err:
print (err)
exit (ERROR)
return
def remove_csn_filter_from_replicat_prm(self):
contents_of_file = []
for table_name in self.array_final_tables_on_replicat:
contents_of_file.append("MAP " + self.current_table_owner_on_source_db + "." + table_name + ", TARGET " + self.current_table_owner_on_target_db + "." + table_name + ";")
for key in self.final_map_dictionary.keys():
contents_of_file.append(self.final_map_dictionary[key])
contents_of_file = NEWLINE.join(contents_of_file)
file = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_tables.prm"
f = open(file, 'w')
f.write(contents_of_file)
f.close()
return
def backup_and_empty_delta_prm(self):
for str1 in [self.extract, self.replicat]:
file = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + str1 + "_delta.prm"
file_old = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/." + str1 + "_delta.old"
shutil.copyfile(file, file_old)
# Empry file
f = open(file, 'w')
f.write("")
f.close()
return
def archive_prm(self):
prm_folder = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat
archive_prm_folder = OGG_SERVICE_DIR + "/archive"
zipname = self.extract + "_"+ self.replicat + "_" + self.runid +'.zip'
fantasy_zip = zipfile.ZipFile(archive_prm_folder + '/'+ zipname, 'w')
for folder, subfolders, files in os.walk(prm_folder):
for file in files:
if file.endswith('.prm'):
fantasy_zip.write(os.path.join(folder, file), os.path.relpath(os.path.join(folder,file), prm_folder), compress_type = zipfile.ZIP_DEFLATED)
fantasy_zip.close()
return zipname
def parse_prm_tables(self):
# EXTRACT current
#################
extract_current_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_tables.prm"
try:
contents_of_extract_current_prm = open(extract_current_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
except FileNotFoundError as err:
print (Red_On_Black("ERROR: file " + self.extract + "_tables.prm not found."))
exit (ERROR)
tab_owner_array=[]
tab_name_array=[]
# Identify tables
for line in contents_of_extract_current_prm:
# supress spaces in begin/end line
auxline = line.rstrip()
auxline = auxline.lstrip()
# skip comments
if not auxline.startswith("--"):
# eliminate double spaces and spaces
auxline = re.sub(' +', ' ',auxline)
auxline = auxline.replace(" ","")
auxline = auxline.replace(";", "")
auxline = auxline.upper()
if auxline.upper().startswith("TABLE"):
auxline = auxline.replace("TABLE", "", 1)
auxline2 = auxline.split(",")[0]
(tab_owner,tab_name) = auxline2.split(".")
tab_owner_array.append(tab_owner)
tab_name_array.append(tab_name)
if ",_ADDNATIVE" in auxline:
self.extract_addnative_option_array.append(tab_name)
# Check if we have only one owner
self.current_table_owner_on_source_db = tab_owner_array[0]
self.array_current_tables_on_source_db = tab_name_array
for tab_owner in tab_owner_array:
if tab_owner != self.current_table_owner_on_source_db:
print ("More than one table owners in EXTRACT current parameter file.")
exit (ERROR)
# REPLICAT current
##################
replicat_current_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_tables.prm"
try:
contents_of_replicat_current_prm = open(replicat_current_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
except FileNotFoundError as err:
print (Red_On_Black("ERROR: file " + self.replicat + "_tables.prm not found."))
exit (ERROR)
self.current_map_dictionary = {}
table_array = []
for line in contents_of_replicat_current_prm:
# supress spaces in begin/end line
auxline = line.rstrip()
auxline = auxline.lstrip()
# Remove double spaces
auxline = re.sub(' +', ' ',auxline)
# skip comments
if not auxline.startswith("--"):
if auxline.upper().startswith("MAP") and "TARGET" in auxline.upper() and "J$" not in auxline.upper():
# Classic MAP section
# Force uppercase and elilminate ";"
auxline = auxline.upper()
auxline = auxline.replace(";", "")
(str1, str2) = auxline.split("TARGET ")
str2 = str2.lstrip().rstrip()
table_array.append(str2)
if auxline.upper().startswith("MAP") and "TARGET" in auxline.upper() and "J$" in auxline.upper():
# J$ section
auxline = auxline.upper()
str1 = auxline.split(",")[1]
str2 = str1.split(".")[1]
self.current_map_dictionary[str2] = auxline
array_current_owner_tables_on_target_db = table_array
# Check if we have only one owner
self.array_current_tables_on_target_db = []
self.current_table_owner_on_target_db = array_current_owner_tables_on_target_db[0].split(".")[0]
for line in array_current_owner_tables_on_target_db:
(owner, table_name) = line.split(".")
# If a FILTER condition with CSN exists, we remove it
if ("CSN" in table_name) and ("TRANSACTION" in table_name):
pass
else:
self.array_current_tables_on_target_db.append(table_name)
if owner != self.current_table_owner_on_target_db:
print ("More than one table owners in REPLICAT current parameter file.")
exit (ERROR)
return
def clean_local_prm_files(self):
replicat_tables_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_tables.prm"
contents_of_replicat_tables_prm = open(replicat_tables_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
contents_of_replicat_tables_prm = removeDuplicates(contents_of_replicat_tables_prm)
# Remove duplicate lines
f = open(replicat_tables_prm_filename, 'w')
contents_of_replicat_tables_prm="".join(contents_of_replicat_tables_prm)
f.write(contents_of_replicat_tables_prm)
f.close()
# Remove empty lines
remove_empty_lines(replicat_tables_prm_filename)
return
def generate_finals_tables_arrays(self):
self.array_duplicate_tables_on_extract = intersection(self.array_current_tables_on_source_db, self.array_new_tables_on_source_db)
self.array_final_tables_on_extract = union_keep_order(self.array_current_tables_on_source_db, self.array_new_tables_on_source_db)
self.array_duplicate_tables_on_replicat = intersection(self.array_current_tables_on_target_db, self.array_new_tables_on_target_db)
self.array_final_tables_on_replicat = union_keep_order(self.array_current_tables_on_target_db, self.array_new_tables_on_target_db)
self.final_map_dictionary = merge_dict(self.current_map_dictionary, self.new_map_dictionary)
self.tables_in_delta_exists_in_current = False
if (len(self.array_duplicate_tables_on_extract) > 0) or (len(self.array_duplicate_tables_on_replicat) > 0):
self.tables_in_delta_exists_in_current = False
return