1575 lines
87 KiB
Python
Executable File
1575 lines
87 KiB
Python
Executable File
#!/u01/app/python/current_version/bin/python3
|
|
|
|
import os
|
|
import subprocess
|
|
import logging
|
|
import argparse
|
|
import datetime
|
|
import socket
|
|
import shlex
|
|
import cx_Oracle
|
|
from colorama import init
|
|
from colorama import Fore, Back
|
|
import json
|
|
import time
|
|
import stat
|
|
import shutil
|
|
import zipfile
|
|
from pathlib import Path
|
|
import re
|
|
|
|
|
|
|
|
# CONSTANTS
|
|
OGG_SERVICE_DIR = "/u01/app/oracle/admin/OGG_Service"
|
|
SUCCESS = 0
|
|
ERROR = 1
|
|
TAB = "\t"
|
|
NEWLINE = "\n"
|
|
OGG_STATUS_SLEEP_TIME_WHEN_CHECK = 5
|
|
OGG_STATUS_MAX_ITERATION_CHECK = 12
|
|
OGG_LAG_MAX_ITERATION_CHECK = 24
|
|
TARGET_DATABASE_CREATE_INDEX_IGNORE_ERROR_MESSAGES = ["ORA-02260","ORA-00955","ORA-01408"]
|
|
EXPDP_IGNORE_ERROR_MESSAGES = ["ORA-31693","ORA-02354","ORA-01466"]
|
|
IMPDP_IGNORE_ERROR_MESSAGES = []
|
|
|
|
|
|
|
|
# GENERIC FUNCTIONS
|
|
|
|
def start_logging(logfile):
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# create a file handler
|
|
handler = logging.FileHandler(logfile)
|
|
handler.setLevel(logging.INFO)
|
|
|
|
# create a logging format
|
|
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
|
handler.setFormatter(formatter)
|
|
|
|
# add the handlers to the logger
|
|
logger.addHandler(handler)
|
|
return logger
|
|
|
|
|
|
def get_hostname():
|
|
global hostname
|
|
hostname = socket.gethostname()
|
|
return
|
|
|
|
def upload_file(source_file, target_host, target_file):
|
|
shellcommand = "ssh oracle@" + target_host + " mkdir -p $(dirname " + target_file + ") ; "+ "scp -p " + source_file + " oracle@" + target_host + ":" + target_file
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
return
|
|
|
|
def download_file(source_file, source_host, target_file):
|
|
shellcommand = "scp -p " + source_host + ":" + source_file + " " + target_file
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
return
|
|
|
|
def upload_directory(source_dir, target_host, target_dir):
|
|
shellcommand = "scp -rp " + source_dir + " oracle@" + target_host + ":" + target_dir
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
return
|
|
|
|
|
|
# Functions for pretty color printing
|
|
def init_pretty_color_printing():
|
|
init(autoreset=True)
|
|
return
|
|
|
|
def Red_On_Black(string):
|
|
return (Fore.RED + Back.BLACK + string)
|
|
|
|
def Cyan_On_Black(string):
|
|
return (Fore.CYAN + Back.BLACK + string)
|
|
|
|
def Yellow_On_Black(string):
|
|
return (Fore.YELLOW + Back.BLACK + string)
|
|
|
|
def Magenta_On_Black(string):
|
|
return (Fore.MAGENTA + Back.BLACK + string)
|
|
|
|
def Green_On_Black(string):
|
|
return (Fore.GREEN + Back.BLACK + string)
|
|
|
|
def White_On_Black(string):
|
|
return (Fore.WHITE + Back.BLACK + string)
|
|
|
|
def intersection(lst1, lst2):
|
|
lst3 = [value for value in lst1 if value in lst2]
|
|
return lst3
|
|
|
|
def union(lst1, lst2):
|
|
final_list = list(set(lst1) | set(lst2))
|
|
return final_list
|
|
|
|
def union_keep_order(lst1, lst2):
|
|
lst3 = lst1 + lst2
|
|
# Create an empty list to store unique elements
|
|
uniqueList = []
|
|
# Iterate over the original list and for each element
|
|
# add it to uniqueList, if its not already there.
|
|
for element in lst3:
|
|
if element not in uniqueList:
|
|
uniqueList.append(element)
|
|
|
|
# Return the list of unique elements
|
|
return uniqueList
|
|
|
|
|
|
def removeDuplicates(listofElements):
|
|
# Create an empty list to store unique elements
|
|
uniqueList = []
|
|
# Iterate over the original list and for each element
|
|
# add it to uniqueList, if its not already there.
|
|
for elem in listofElements:
|
|
if elem not in uniqueList:
|
|
uniqueList.append(elem)
|
|
# Return the list of unique elements
|
|
return uniqueList
|
|
|
|
def concatenate_files(input_files, output_file):
|
|
with open(output_file, 'w') as outfile:
|
|
for fname in input_files:
|
|
with open(fname) as infile:
|
|
outfile.write(infile.read())
|
|
return
|
|
|
|
def merge_dict(dict1, dict2):
|
|
dict3 = dict(dict1, **dict2)
|
|
return dict3
|
|
|
|
def remove_empty_lines(filename):
|
|
with open(filename) as filehandle:
|
|
lines = filehandle.readlines()
|
|
|
|
with open(filename, 'w') as filehandle:
|
|
lines = filter(lambda x: x.strip(), lines)
|
|
filehandle.writelines(lines)
|
|
return
|
|
|
|
# CLASS OGG_Sync
|
|
class OGG_Sync:
|
|
def __init__(self, extract, replicat): # Constructor of the class
|
|
self.runid = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S_%f')
|
|
self.extract = extract
|
|
self.replicat = replicat
|
|
# Identify environement type (dev|prod)
|
|
envinfo_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/env.info"
|
|
try:
|
|
contents_of_envinfo = open(envinfo_filename, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
except FileNotFoundError as err:
|
|
print (Red_On_Black("ERROR: file env.info not found."))
|
|
exit (ERROR)
|
|
|
|
self.specific_params={}
|
|
for line in contents_of_envinfo:
|
|
line = line.rstrip()
|
|
line = line.lstrip()
|
|
if line !="":
|
|
param, value = line.split("=", 1)
|
|
param = param.rstrip()
|
|
param = param.lstrip()
|
|
value = value.rstrip()
|
|
value = value.lstrip()
|
|
self.specific_params[param] = value
|
|
|
|
|
|
self.extract_addnative_option_array = []
|
|
# Load configuration
|
|
configuration_file = OGG_SERVICE_DIR + "/etc/" + self.specific_params["type_environement"] + ".conf"
|
|
contents_configuration_file = open(configuration_file, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
self.config_params={}
|
|
for line in contents_configuration_file:
|
|
line = line.rstrip()
|
|
line = line.lstrip()
|
|
if line !="":
|
|
param, value = line.split("=", 1)
|
|
param = param.rstrip()
|
|
param = param.lstrip()
|
|
value = value.rstrip()
|
|
value = value.lstrip()
|
|
self.config_params[param] = value
|
|
return
|
|
|
|
def get_class_attributes_as_json(self): # Display class attributes
|
|
json_class_attributes = vars(self)
|
|
return json.dumps(json_class_attributes, indent = 2)
|
|
|
|
def get_config_params_as_json(self): # Display class attributes
|
|
return json.dumps(self.config_params, indent = 2)
|
|
|
|
def get_type_env(self):
|
|
return
|
|
|
|
def build_extract_prm(self):
|
|
extract_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_header.prm"
|
|
extract_tables_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_tables.prm"
|
|
if os.path.isfile(extract_header_prm_filename) and os.path.isfile(extract_tables_prm_filename):
|
|
extract_prm_filename = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.extract + ".prm"
|
|
# concatenate header + table prm sections
|
|
contents_of_prm = []
|
|
contents_of_prm.append("-- This file has been is generated from the following files on " + hostname + " :\n")
|
|
contents_of_prm.append("-- " + extract_header_prm_filename + " \n")
|
|
contents_of_prm.append("-- " + extract_tables_prm_filename)
|
|
contents_of_prm.append(NEWLINE)
|
|
contents_of_extract_header_prm = open(extract_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
contents_of_extract_tables_prm = open(extract_tables_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
|
|
for line in contents_of_extract_header_prm:
|
|
auxline = line.rstrip()
|
|
auxline = auxline.replace(" ", "")
|
|
if auxline != "":
|
|
contents_of_prm.append(line)
|
|
|
|
contents_of_prm.append(NEWLINE)
|
|
|
|
for line in contents_of_extract_tables_prm:
|
|
auxline = line.rstrip()
|
|
auxline = auxline.replace(" ", "")
|
|
if auxline != "":
|
|
contents_of_prm.append(line)
|
|
|
|
f = open(extract_prm_filename, 'w')
|
|
contents_of_prm="".join(contents_of_prm)
|
|
f.write(contents_of_prm)
|
|
f.close()
|
|
# store extract_prm_filename in class for later usage
|
|
self.extract_prm_filename = extract_prm_filename
|
|
else:
|
|
print ("OGG Sync EXTRACT configuration files does not exist: ")
|
|
print (" " + extract_header_prm_filename)
|
|
print (" " + extract_tables_prm_filename)
|
|
exit (ERROR)
|
|
return
|
|
|
|
def build_replicat_prm(self):
|
|
replicat_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_header.prm"
|
|
replicat_tables_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_tables.prm"
|
|
if os.path.isfile(replicat_header_prm_filename) and os.path.isfile(replicat_tables_prm_filename):
|
|
# create temporary sync directory
|
|
try:
|
|
os.mkdir(OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat)
|
|
# store the value in class for later usage
|
|
except (FileExistsError) as e:
|
|
pass
|
|
|
|
replicat_prm_filename = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.replicat + ".prm"
|
|
|
|
# concatename header + tables prm sections
|
|
contents_of_prm = []
|
|
contents_of_prm.append("-- This file has been is generated from the following files on " + hostname + " :\n")
|
|
contents_of_prm.append("-- " + replicat_header_prm_filename + " \n")
|
|
contents_of_prm.append("-- " + replicat_tables_prm_filename)
|
|
contents_of_prm.append(NEWLINE)
|
|
contents_of_replicat_header_prm = open(replicat_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
contents_of_replicat_tables_prm = open(replicat_tables_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
|
|
for line in contents_of_replicat_header_prm:
|
|
auxline = line.rstrip()
|
|
auxline = auxline.replace(" ", "")
|
|
if auxline != "":
|
|
contents_of_prm.append(line)
|
|
|
|
contents_of_prm.append(NEWLINE)
|
|
|
|
for line in contents_of_replicat_tables_prm:
|
|
auxline = line.rstrip().lstrip()
|
|
auxline = auxline.replace(" ", "")
|
|
auxline = re.sub(' +', ' ',auxline)
|
|
if auxline != "":
|
|
contents_of_prm.append(line)
|
|
|
|
f = open(replicat_prm_filename, 'w')
|
|
contents_of_prm="".join(contents_of_prm)
|
|
f.write(contents_of_prm)
|
|
f.close()
|
|
# store replicat_prm_filename in class for later usage
|
|
self.replicat_prm_filename = replicat_prm_filename
|
|
else:
|
|
print ("OGG Sync REPLICAT configuration files does not exist: ")
|
|
print (" " + replicat_header_prm_filename)
|
|
print (" " + replicat_tables_prm_filename)
|
|
exit (ERROR)
|
|
return
|
|
|
|
def upload_extract_prm(self):
|
|
upload_file(OGG_SERVICE_DIR + "/temp/" + self.extract + "_" + self.replicat + "/" + self.extract + ".prm" , self.config_params["ogg_host"], self.config_params["ogg12_path"] + "/dirprm/" + self.extract + ".prm")
|
|
return
|
|
|
|
def upload_replicat_prm(self):
|
|
upload_file(OGG_SERVICE_DIR + "/temp/" + self.extract + "_" + self.replicat + "/" + self.replicat + ".prm" , self.config_params["ogg_host"], self.config_params["ogg12_path"] + "/dirprm/" + self.replicat + ".prm")
|
|
return
|
|
|
|
def upload_prm_files(self):
|
|
self.upload_extract_prm()
|
|
self.upload_replicat_prm()
|
|
return
|
|
|
|
def upload_temp_file_to_ogg_host(self, file_name):
|
|
upload_file(OGG_SERVICE_DIR + "/temp/" + self.extract + "_" + self.replicat + "/" + file_name, self.config_params["ogg_host"], self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + file_name)
|
|
return
|
|
|
|
def sync_full_old(self):
|
|
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " " + self.config_params["ogg_full_refresh_shell"] + " -e " + self.extract + " -r " + self.replicat + " -v 12"
|
|
process = subprocess.Popen(shlex.split(shellcommand), stdout=subprocess.PIPE)
|
|
while True:
|
|
output = process.stdout.readline()
|
|
output = output.decode('utf-8')
|
|
if output == '' and process.poll() is not None:
|
|
break
|
|
if output:
|
|
print(Cyan_On_Black(output.strip()))
|
|
rc = process.poll()
|
|
return rc
|
|
|
|
def parse_prm_headers(self):
|
|
# Parsing EXTRACT header
|
|
extract_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_header.prm"
|
|
try:
|
|
contents_of_extract_header_prm = open(extract_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
except FileNotFoundError as err:
|
|
print (Red_On_Black("ERROR: file " + self.extract + "_header.prm not found."))
|
|
exit (ERROR)
|
|
|
|
for line in contents_of_extract_header_prm:
|
|
# supress spaces in begin/end line
|
|
auxline = line.rstrip()
|
|
auxline = auxline.lstrip()
|
|
# skip comments
|
|
if not auxline.startswith("--"):
|
|
if auxline.upper().startswith("USERID"):
|
|
# GGSCI login on SOURCE database
|
|
self.ggsci_db_source_login = auxline.replace("USERID ", "", 1)
|
|
# Source DB informations
|
|
(str1,str2) = auxline.split(",")
|
|
(str3,str4) = str1.split("@")
|
|
self.database_source = str4
|
|
self.database_source_instances = self.get_array_active_db_nodes(self.database_source)
|
|
self.database_source_version = self.get_database_version(self.database_source)
|
|
# Parsing REPLICAT header
|
|
replicat_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_header.prm"
|
|
try:
|
|
contents_of_replicat_header_prm = open(replicat_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
except FileNotFoundError as err:
|
|
print (Red_On_Black("ERROR: file " + self.replicat + "_header.prm not found."))
|
|
exit (ERROR)
|
|
|
|
remap_tablespace_list = []
|
|
for line in contents_of_replicat_header_prm:
|
|
# supress spaces in begin/end line
|
|
auxline = line.rstrip()
|
|
auxline = auxline.lstrip()
|
|
# skip comments
|
|
if not auxline.startswith("--"):
|
|
if auxline.upper().startswith("USERID"):
|
|
# GGSCI login on SOURCE database
|
|
self.ggsci_db_target_login = auxline.replace("USERID ", "", 1)
|
|
# Source DB informations
|
|
(str1,str2) = auxline.upper().split(",")
|
|
(str3,str4) = str1.split("@")
|
|
(str5,str6) = str4.split("_OGG")
|
|
self.database_target = str5 + "EXA"
|
|
if auxline.upper().startswith("DDLSUBST"):
|
|
# Tablespace mapping
|
|
auxline = auxline.replace("'", "")
|
|
# Remove double spaces
|
|
auxline = re.sub(' +', ' ',auxline)
|
|
words_of_auxline = auxline.split(" ")
|
|
index_of_word_with = words_of_auxline.index("with")
|
|
remap_tablespace_clause = words_of_auxline[index_of_word_with -1] + ":" + words_of_auxline[index_of_word_with +1]
|
|
remap_tablespace_list.append(remap_tablespace_clause)
|
|
|
|
self.remap_tablespace = ",".join(remap_tablespace_list)
|
|
self.database_target_instances = self.get_array_active_db_nodes(self.database_target)
|
|
self.database_target_version = self.get_database_version(self.database_target)
|
|
return
|
|
|
|
def fast_parse_prm_headers(self):
|
|
# For WWW presentation, where the standard parsing procedure is not fast enough
|
|
# Parsing EXTRACT header
|
|
extract_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_header.prm"
|
|
try:
|
|
contents_of_extract_header_prm = open(extract_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
except FileNotFoundError as err:
|
|
print (Red_On_Black("ERROR: file " + self.extract + "_header.prm not found."))
|
|
exit (ERROR)
|
|
|
|
for line in contents_of_extract_header_prm:
|
|
# supress spaces in begin/end line
|
|
auxline = line.rstrip()
|
|
auxline = auxline.lstrip()
|
|
# skip comments
|
|
if not auxline.startswith("--"):
|
|
if auxline.upper().startswith("USERID"):
|
|
# GGSCI login on SOURCE database
|
|
self.ggsci_db_source_login = auxline.replace("USERID ", "", 1)
|
|
# Source DB informations
|
|
(str1,str2) = auxline.split(",")
|
|
(str3,str4) = str1.split("@")
|
|
self.database_source = str4
|
|
# Parsing REPLICAT header
|
|
replicat_header_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_header.prm"
|
|
try:
|
|
contents_of_replicat_header_prm = open(replicat_header_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
except FileNotFoundError as err:
|
|
print (Red_On_Black("ERROR: file " + self.replicat + "_header.prm not found."))
|
|
exit (ERROR)
|
|
|
|
remap_tablespace_list = []
|
|
for line in contents_of_replicat_header_prm:
|
|
# supress spaces in begin/end line
|
|
auxline = line.rstrip()
|
|
auxline = auxline.lstrip()
|
|
# skip comments
|
|
if not auxline.startswith("--"):
|
|
if auxline.upper().startswith("USERID"):
|
|
# GGSCI login on SOURCE database
|
|
self.ggsci_db_target_login = auxline.replace("USERID ", "", 1)
|
|
# Source DB informations
|
|
(str1,str2) = auxline.upper().split(",")
|
|
(str3,str4) = str1.split("@")
|
|
(str5,str6) = str4.split("_OGG")
|
|
self.database_target = str5 + "EXA"
|
|
|
|
return
|
|
|
|
|
|
def parse_prm_delta(self):
|
|
# EXTRACT DELTA
|
|
extract_delta_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_delta.prm"
|
|
try:
|
|
contents_of_extract_delta_prm = open(extract_delta_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
except FileNotFoundError as err:
|
|
print (Red_On_Black("ERROR: file " + self.extract + "_delta.prm not found."))
|
|
exit (ERROR)
|
|
|
|
if len(contents_of_extract_delta_prm) == 0:
|
|
print (Red_On_Black("ERROR: file " + self.extract + "_delta.prm is empty."))
|
|
exit (ERROR)
|
|
|
|
tab_owner_array=[]
|
|
tab_name_array=[]
|
|
# Identify tables
|
|
delta_addnative_option_array = []
|
|
for line in contents_of_extract_delta_prm:
|
|
auxline = line.rstrip()
|
|
auxline = auxline.lstrip()
|
|
# skip comments
|
|
if not auxline.startswith("--"):
|
|
# eliminate double spaces and spaces
|
|
auxline = re.sub(' +', ' ',auxline)
|
|
auxline = auxline.replace(" ","")
|
|
auxline = auxline.replace(";", "")
|
|
auxline = auxline.upper()
|
|
if auxline.upper().startswith("TABLE"):
|
|
auxline = auxline.replace("TABLE", "", 1)
|
|
auxline2 = auxline.split(",")[0]
|
|
(tab_owner,tab_name) = auxline2.split(".")
|
|
tab_owner_array.append(tab_owner)
|
|
tab_name_array.append(tab_name)
|
|
if ",_ADDNATIVE" in auxline:
|
|
delta_addnative_option_array.append(tab_name)
|
|
initial_addnative_option_array = self.extract_addnative_option_array
|
|
self.extract_addnative_option_array = union_keep_order(initial_addnative_option_array, delta_addnative_option_array)
|
|
# Check if we have only one owner
|
|
self.new_table_owner_on_source_db = tab_owner_array[0]
|
|
self.array_new_tables_on_source_db = tab_name_array
|
|
for tab_owner in tab_owner_array:
|
|
if tab_owner != self.new_table_owner_on_source_db:
|
|
print (Red_On_Black("ERROR: file " + self.replicat + "_delta.prm not found."))
|
|
exit (ERROR)
|
|
|
|
# REPLICAT DELTA
|
|
replicat_delta_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_delta.prm"
|
|
|
|
try:
|
|
contents_of_replicat_delta_prm = open(replicat_delta_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
except FileNotFoundError as err:
|
|
print (Red_On_Black("ERROR: file " + self.replicat + "_delta.prm not found."))
|
|
exit (ERROR)
|
|
|
|
if len(contents_of_extract_delta_prm) == 0:
|
|
print (contents_of_replicat_delta_prm("ERROR: file " + self.replicat + "_delta.prm is empty."))
|
|
exit (ERROR)
|
|
|
|
self.new_map_dictionary = {}
|
|
table_array = []
|
|
for line in contents_of_replicat_delta_prm:
|
|
# supress spaces in begin/end line
|
|
auxline = line.rstrip()
|
|
auxline = auxline.lstrip()
|
|
# skip comments
|
|
if not auxline.startswith("--"):
|
|
if auxline.upper().startswith("MAP") and "TARGET" in auxline.upper() and "J$" not in auxline.upper():
|
|
# Classic MAP section
|
|
# Force uppercase and elilminate ";"
|
|
auxline = auxline.upper()
|
|
auxline = auxline.replace(";", "")
|
|
(str1, str2) = auxline.split("TARGET ")
|
|
str2 = str2.lstrip().rstrip()
|
|
table_array.append(str2)
|
|
if auxline.upper().startswith("MAP") and "TARGET" in auxline.upper() and "J$" in auxline.upper():
|
|
# J$ section
|
|
auxline = auxline.upper()
|
|
str1 = auxline.split(",")[1]
|
|
str2 = str1.split(".")[1]
|
|
self.new_map_dictionary[str2] = auxline
|
|
|
|
array_new_owner_tables_on_target_db = table_array
|
|
|
|
# Check if we have only one owner
|
|
self.array_new_tables_on_target_db = []
|
|
self.new_table_owner_on_target_db = array_new_owner_tables_on_target_db[0].split(".")[0]
|
|
for line in array_new_owner_tables_on_target_db:
|
|
(owner, table_name) = line.split(".")
|
|
self.array_new_tables_on_target_db.append(table_name)
|
|
if owner != self.new_table_owner_on_target_db:
|
|
print ("More than one table owners in REPLICAT delta parameter file.")
|
|
exit (ERROR)
|
|
return
|
|
|
|
|
|
|
|
def check_pk_uk_on_source_db_delta_tables(self):
|
|
try:
|
|
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_source, mode=cx_Oracle.SYSDBA)
|
|
cursor = db.cursor()
|
|
# Check if source tables exist
|
|
for table_name in self.array_new_tables_on_source_db:
|
|
sql = "select count(1) from DBA_TABLES where OWNER='" + self.current_table_owner_on_source_db + "' and TABLE_NAME='" + table_name +"'"
|
|
cursor.execute(sql)
|
|
for row in cursor:
|
|
count=row[0]
|
|
if count == 0:
|
|
print (NEWLINE + "ERROR: Table " + self.current_table_owner_on_source_db + "." + table_name + " does not exists in database " + self.database_source)
|
|
exit (ERROR)
|
|
# Check if the tables have PK or UK
|
|
for table_name in self.array_new_tables_on_source_db:
|
|
sql = "select count(1) from DBA_CONSTRAINTS where OWNER='" + self.current_table_owner_on_source_db + "' and TABLE_NAME='" + table_name +"' and CONSTRAINT_TYPE in ('P','U')"
|
|
cursor.execute(sql)
|
|
for row in cursor:
|
|
count=row[0]
|
|
if count == 0:
|
|
print (NEWLINE +"ERROR: Table " + self.current_table_owner_on_source_db + "." + table_name + " in database " + self.database_source + " does not heve any PRIMARY KEY or UNIQUE INDEX")
|
|
exit (ERROR)
|
|
|
|
cursor.close()
|
|
except cx_Oracle.DatabaseError as err:
|
|
print (err)
|
|
exit (ERROR)
|
|
return
|
|
|
|
|
|
def add_trandata_on_delta_tables(self):
|
|
# Generate INFO TRANDATA obey file
|
|
# compose file content
|
|
contents_of_file = []
|
|
contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
|
|
for tab_name in self.array_new_tables_on_source_db:
|
|
contents_of_file.append("INFO TRANDATA " + self.current_table_owner_on_source_db + "." + tab_name)
|
|
contents_of_file.append(NEWLINE)
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
# write contect to file
|
|
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "delta_info_trandata.obey"
|
|
f = open(file_name, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
# Upload and execute the INFO TRANDATA obey file
|
|
self.upload_temp_file_to_ogg_host("delta_info_trandata.obey")
|
|
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "delta_info_trandata.obey"
|
|
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
cmd_output=cmd.stdout.decode('utf-8')
|
|
|
|
# Create an array for the tables with disabled TRANDATA
|
|
array_new_tables_to_enable_trandata = []
|
|
for line in cmd_output.splitlines():
|
|
if "disabled" in line:
|
|
# trandata is disabled
|
|
table_name = line.split("for table ")[1].split(".")[1]
|
|
array_new_tables_to_enable_trandata.append(table_name)
|
|
|
|
# Generate ADD TRANDATA obey file
|
|
# compose file content
|
|
contents_of_file = []
|
|
contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
|
|
for tab_name in array_new_tables_to_enable_trandata:
|
|
contents_of_file.append("ADD TRANDATA " + self.current_table_owner_on_source_db + "." + tab_name)
|
|
contents_of_file.append(NEWLINE)
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
# write contect to file
|
|
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "delta_add_trandata.obey"
|
|
f = open(file_name, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
# Upload and execute the INFO TRANDATA obey file
|
|
self.upload_temp_file_to_ogg_host("delta_add_trandata.obey")
|
|
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "delta_add_trandata.obey"
|
|
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
cmd_output=cmd.stdout.decode('utf-8')
|
|
return
|
|
|
|
def create_remote_dirs(self):
|
|
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " mkdir -p " + self.config_params["remote_directory"] +"/temp/" + self.extract + "_" + self.replicat
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
cmd_output=cmd.stdout.decode('utf-8')
|
|
return
|
|
|
|
|
|
def get_array_active_db_nodes(self, database):
|
|
array_active_db_nodes = []
|
|
try:
|
|
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + database, mode=cx_Oracle.SYSDBA)
|
|
cursor = db.cursor()
|
|
sql = "select INSTANCE_NAME, HOST_NAME from GV$INSTANCE where STATUS='OPEN'"
|
|
cursor.execute(sql)
|
|
for row in cursor:
|
|
instance_name, host_name=row
|
|
short_host_name = host_name.replace(".france.intra.corp", "")
|
|
array_active_db_nodes.append(instance_name)
|
|
array_active_db_nodes.append(short_host_name)
|
|
cursor.close()
|
|
db.close()
|
|
except cx_Oracle.DatabaseError as err:
|
|
print (Red_On_Black("UNHANDELED ERROR: " + str(err)))
|
|
exit (ERROR)
|
|
return array_active_db_nodes
|
|
|
|
def get_database_version(self, database):
|
|
try:
|
|
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + database, mode=cx_Oracle.SYSDBA)
|
|
cursor = db.cursor()
|
|
sql = "select VERSION from V$INSTANCE"
|
|
cursor.execute(sql)
|
|
for row in cursor:
|
|
version = row[0]
|
|
cursor.close()
|
|
db.close()
|
|
except cx_Oracle.DatabaseError as err:
|
|
print (Red_On_Black("UNHANDELED ERROR: " + str(err)))
|
|
exit (ERROR)
|
|
return version
|
|
|
|
def drop_new_tables_exists_on_target_db(self):
|
|
try:
|
|
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_target, mode=cx_Oracle.SYSDBA)
|
|
except cx_Oracle.DatabaseError as err:
|
|
print (Red_On_Black("UNHANDELED ERROR: " + str(err)))
|
|
exit (ERROR)
|
|
|
|
cursor = db.cursor()
|
|
|
|
# Drop table on target database
|
|
for table_name in self.array_new_tables_on_target_db:
|
|
try:
|
|
sql = "drop table " + self.current_table_owner_on_target_db + "." + table_name +" purge"
|
|
cursor.execute(sql)
|
|
except cx_Oracle.DatabaseError as err:
|
|
ora_xxxxx = str(err).split(":")[0]
|
|
if ora_xxxxx == "ORA-00942":
|
|
# ORA-00942: table or view does not exist
|
|
pass
|
|
else:
|
|
print (Red_On_Black("UNHANDELED ERROR: " + str(err)))
|
|
exit (ERROR)
|
|
cursor.close()
|
|
return
|
|
|
|
|
|
def get_running_ogg_host(self):
|
|
ogg12_env = self.config_params["ogg12_env"]
|
|
ogg12_path = self.config_params["ogg12_path"]
|
|
|
|
# Get the ogg host list, remove spaces, and tokenize it by commas ','
|
|
ogg_host_list = self.config_params["ogg_host_list"]
|
|
ogg_host_list = ogg_host_list.replace(' ','').split(',')
|
|
|
|
# Check if, and which, node is running the ogg manager
|
|
for ogg_host in ogg_host_list:
|
|
shellcommand="ssh oracle@" + ogg_host + " \'" + ogg12_env +"; echo info manager | " + ogg12_path + "/ggsci" +"\'"
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
cmd_output=cmd.stdout.decode('utf-8')
|
|
for line in cmd_output.splitlines():
|
|
if ("MANAGER IS RUNNING" in line.upper()):
|
|
self.config_params["ogg_host"] = ogg_host
|
|
return ogg_host
|
|
|
|
return
|
|
|
|
|
|
def set_ogg_host(self, ogg_host_dev, ogg_host_prod):
|
|
# Replace the ogg_host with the one received by parameter
|
|
if self.specific_params["type_environement"].upper() == "DEV":
|
|
self.config_params["ogg_host"] = ogg_host_dev
|
|
elif self.specific_params["type_environement"].upper() == "PROD":
|
|
self.config_params["ogg_host"] = ogg_host_prod
|
|
return
|
|
|
|
|
|
def get_extract_status(self):
|
|
# Generate STOP EXTRACT obey file
|
|
# compose file content
|
|
contents_of_file = []
|
|
# contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
|
|
contents_of_file.append("info " + self.extract )
|
|
contents_of_file.append(NEWLINE)
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
# write contect to file
|
|
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "info_extract.obey"
|
|
f = open(file_name, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
# Upload and execute the INFO TRANDATA obey file
|
|
self.upload_temp_file_to_ogg_host("info_extract.obey")
|
|
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "info_extract.obey"
|
|
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
cmd_output=cmd.stdout.decode('utf-8')
|
|
for line in cmd_output.splitlines():
|
|
if ("EXTRACT" in line.upper()) and (self.extract.upper() in line.upper()):
|
|
extract_status = line.split("Status ")[1]
|
|
return extract_status
|
|
|
|
def stop_extract(self):
|
|
# Generate STOP EXTRACT obey file
|
|
# compose file content
|
|
contents_of_file = []
|
|
# contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
|
|
contents_of_file.append("stop " + self.extract )
|
|
contents_of_file.append(NEWLINE)
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
# write contect to file
|
|
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "stop_extract.obey"
|
|
f = open(file_name, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
# Upload and execute the INFO TRANDATA obey file
|
|
self.upload_temp_file_to_ogg_host("stop_extract.obey")
|
|
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "stop_extract.obey"
|
|
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
cmd_output=cmd.stdout.decode('utf-8')
|
|
# Check the status until is STOPPED
|
|
status = self.get_extract_status()
|
|
iteration = 0
|
|
while (status != "STOPPED" and iteration <= OGG_STATUS_MAX_ITERATION_CHECK):
|
|
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK)
|
|
status = self.get_extract_status()
|
|
iteration += 1
|
|
if iteration == OGG_STATUS_MAX_ITERATION_CHECK:
|
|
print(Red_On_Black("FAILED"))
|
|
exit (ERROR)
|
|
return
|
|
|
|
def start_extract(self):
|
|
# Generate START EXTRACT obey file
|
|
# compose file content
|
|
contents_of_file = []
|
|
# contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
|
|
contents_of_file.append("start " + self.extract )
|
|
contents_of_file.append(NEWLINE)
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
# write contect to file
|
|
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "start_extract.obey"
|
|
f = open(file_name, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
# Upload and execute the INFO TRANDATA obey file
|
|
self.upload_temp_file_to_ogg_host("start_extract.obey")
|
|
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "start_extract.obey"
|
|
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
cmd_output=cmd.stdout.decode('utf-8')
|
|
# Wait a little while
|
|
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK)
|
|
# Check the status until is RUNNING
|
|
status = self.get_extract_status()
|
|
iteration = 0
|
|
while (status != "RUNNING" and iteration <= OGG_STATUS_MAX_ITERATION_CHECK):
|
|
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK)
|
|
status = self.get_extract_status()
|
|
iteration += 1
|
|
if iteration == OGG_STATUS_MAX_ITERATION_CHECK:
|
|
print(Red_On_Black("FAILED"))
|
|
exit (ERROR)
|
|
return
|
|
|
|
|
|
def get_replicat_status(self):
|
|
replicat_status = "MISSING"
|
|
# Generate STOP EXTRACT obey file
|
|
# compose file content
|
|
contents_of_file = []
|
|
# contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
|
|
contents_of_file.append("info " + self.replicat )
|
|
contents_of_file.append(NEWLINE)
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
# write contect to file
|
|
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "info_replicat.obey"
|
|
f = open(file_name, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
# Upload and execute the INFO TRANDATA obey file
|
|
self.upload_temp_file_to_ogg_host("info_replicat.obey")
|
|
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "info_replicat.obey"
|
|
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
cmd_output=cmd.stdout.decode('utf-8')
|
|
for line in cmd_output.splitlines():
|
|
if ("REPLICAT" in line.upper()) and (self.replicat.upper() in line.upper()):
|
|
replicat_status = line.split("Status ")[1]
|
|
return replicat_status
|
|
|
|
def update_sync_status(self):
|
|
self.extract_status = self.get_extract_status()
|
|
self.replicat_status = self.get_replicat_status()
|
|
return
|
|
|
|
|
|
def stop_replicat(self):
|
|
# Generate STOP REPLICAT obey file
|
|
# compose file content
|
|
contents_of_file = []
|
|
# contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
|
|
contents_of_file.append("stop " + self.replicat )
|
|
contents_of_file.append(NEWLINE)
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
# write contect to file
|
|
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "stop_replicat.obey"
|
|
f = open(file_name, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
# Upload and execute the obey file
|
|
self.upload_temp_file_to_ogg_host("stop_replicat.obey")
|
|
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "stop_replicat.obey"
|
|
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
cmd_output=cmd.stdout.decode('utf-8')
|
|
# Check the status until is STOPPED
|
|
status = self.get_replicat_status()
|
|
iteration = 0
|
|
while (status != "STOPPED" and iteration <= OGG_STATUS_MAX_ITERATION_CHECK):
|
|
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK)
|
|
status = self.get_replicat_status()
|
|
iteration += 1
|
|
if iteration == OGG_STATUS_MAX_ITERATION_CHECK:
|
|
print(Red_On_Black("FAILED"))
|
|
exit (ERROR)
|
|
return
|
|
|
|
def start_replicat(self):
|
|
# Generate START REPLICAT obey file
|
|
# compose file content
|
|
contents_of_file = []
|
|
# contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
|
|
contents_of_file.append("start " + self.replicat )
|
|
contents_of_file.append(NEWLINE)
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
# write contect to file
|
|
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "start_replicat.obey"
|
|
f = open(file_name, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
# Upload and execute the obey file
|
|
self.upload_temp_file_to_ogg_host("start_replicat.obey")
|
|
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "start_replicat.obey"
|
|
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
cmd_output=cmd.stdout.decode('utf-8')
|
|
# Check the status until is RUNNING
|
|
status = self.get_replicat_status()
|
|
iteration = 0
|
|
while (status != "RUNNING" and iteration <= OGG_STATUS_MAX_ITERATION_CHECK):
|
|
time.sleep(OGG_STATUS_SLEEP_TIME_WHEN_CHECK)
|
|
status = self.get_replicat_status()
|
|
iteration += 1
|
|
if iteration == OGG_STATUS_MAX_ITERATION_CHECK:
|
|
print(Red_On_Black("FAILED"))
|
|
exit (ERROR)
|
|
return
|
|
|
|
|
|
def replicat_has_lag(self):
|
|
# Generate LAG REPLICAT obey file
|
|
# compose file content
|
|
contents_of_file = []
|
|
# contents_of_file.append("DBLOGIN USERID " + self.ggsci_db_source_login)
|
|
contents_of_file.append("lag " + self.replicat )
|
|
contents_of_file.append(NEWLINE)
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
# write contect to file
|
|
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "lag_replicat.obey"
|
|
f = open(file_name, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
# Upload and execute the INFO TRANDATA obey file
|
|
self.upload_temp_file_to_ogg_host("lag_replicat.obey")
|
|
remote_obey_filename = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "lag_replicat.obey"
|
|
shellcommand="ssh oracle@" + self.config_params["ogg_host"] + " \'" + self.config_params["ogg12_env"] +"; cat "+ remote_obey_filename+" | " + self.config_params["ogg12_path"] + "/ggsci" +"\'"
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
has_lag = True
|
|
cmd_output=cmd.stdout.decode('utf-8')
|
|
for line in cmd_output.splitlines():
|
|
if "no more records to process" in line:
|
|
has_lag = False
|
|
break
|
|
return has_lag
|
|
|
|
def export_delta_tables(self, scn):
|
|
# Create OGGSYNC directory on source database and pick one of the active hosts for the database
|
|
try:
|
|
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_source, mode=cx_Oracle.SYSDBA)
|
|
cursor = db.cursor()
|
|
sql = "create or replace directory OGGSYNC as '" + self.config_params["datapump_directory_path"]+ "'"
|
|
cursor.execute(sql)
|
|
|
|
sql = "select HOST_NAME from v$instance"
|
|
cursor.execute(sql)
|
|
for row in cursor:
|
|
db_host_name=row[0]
|
|
db_host_name = db_host_name.split(".")[0]
|
|
|
|
cursor.close()
|
|
except cx_Oracle.DatabaseError as err:
|
|
print (err)
|
|
exit (ERROR)
|
|
# Generate EXPDP parfile
|
|
# compose file content
|
|
contents_of_file = []
|
|
contents_of_file.append("dumpfile = OGGSYNC:" + self.extract + "_"+ self.replicat + "_delta.dmp")
|
|
contents_of_file.append("logfile = OGGSYNC:export_" + self.extract + "_"+ self.replicat + "_delta.log")
|
|
contents_of_file.append("reuse_dumpfiles=Y")
|
|
contents_of_file.append("exclude=GRANT,CONSTRAINT,INDEX,TRIGGER")
|
|
|
|
if scn != None:
|
|
contents_of_file.append("flashback_scn = " + scn)
|
|
for table_name in self.array_new_tables_on_source_db:
|
|
contents_of_file.append("tables = " + self.current_table_owner_on_source_db + "." + table_name)
|
|
|
|
if self.database_source_version.startswith("12") and self.database_target_version.startswith("11"):
|
|
contents_of_file.append("version=11.2")
|
|
|
|
contents_of_file.append(NEWLINE)
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
# write contents to file
|
|
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "export_delta.par"
|
|
f = open(file_name, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
|
|
expdp_parfile = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "export_delta.par"
|
|
# Generate EXPDP shell
|
|
# compose file contents
|
|
|
|
contents_of_file = []
|
|
contents_of_file.append(". oraenv <<EOF! > /dev/null")
|
|
contents_of_file.append(self.database_source)
|
|
contents_of_file.append("EOF!")
|
|
contents_of_file.append("export ORACLE_SID=" + self.database_source_instances[0])
|
|
contents_of_file.append("expdp userid=\"'/ as sysdba'\" parfile=" + expdp_parfile)
|
|
contents_of_file.append(NEWLINE)
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
# write contents to file
|
|
file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "export_delta.sh"
|
|
f = open(file, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
|
|
expdp_shell = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "export_delta.sh"
|
|
# Perform chmod +x fir the EXPDP shell
|
|
st = os.stat(file_name)
|
|
os.chmod(file, st.st_mode | stat.S_IEXEC)
|
|
|
|
# Upload and execute the EXPDP shell and parfile
|
|
self.upload_temp_file_to_ogg_host("export_delta.par")
|
|
self.upload_temp_file_to_ogg_host("export_delta.sh")
|
|
# EXPDP command will be launched from une of source database host
|
|
shellcommand="ssh oracle@" + self.database_source_instances[1] + " " + expdp_shell
|
|
process = subprocess.Popen(shlex.split(shellcommand), stdout=subprocess.PIPE)
|
|
while True:
|
|
output = process.stdout.readline()
|
|
output = output.decode('utf-8')
|
|
if output == '' and process.poll() is not None:
|
|
break
|
|
if output:
|
|
print(Cyan_On_Black(output.strip()))
|
|
rc = process.poll()
|
|
|
|
# Check for errors
|
|
download_file(self.config_params["datapump_directory_path"] + "/export_" + self.extract + "_" + self.replicat + "_delta.log", self.config_params["ogg_host"], OGG_SERVICE_DIR + "/temp/.")
|
|
logfile = OGG_SERVICE_DIR + "/temp/export_" + self.extract + "_" + self.replicat + "_delta.log"
|
|
contents_of_logfile = open(logfile, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
blocking_ora_error = 0
|
|
for line in contents_of_logfile:
|
|
if line.startswith("ORA-"):
|
|
# Line starts with an ORA-xxxx
|
|
ora_error = line.split(":")[0]
|
|
if ora_error not in EXPDP_IGNORE_ERROR_MESSAGES:
|
|
# The ORA-xxxx message is not in the whitelist
|
|
blocking_ora_error += 1
|
|
if blocking_ora_error > 0:
|
|
print(Red_On_Black("FAILED"))
|
|
print(White_On_Black("Please check the " + str(blocking_ora_error) + " error message(s) (ORA-xxxxx) in " + logfile))
|
|
exit (ERROR)
|
|
return
|
|
|
|
def import_delta_tables(self):
|
|
# Create OGGSYNC directory on target database and pick one of the active hosts for the database
|
|
try:
|
|
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_target, mode=cx_Oracle.SYSDBA)
|
|
cursor = db.cursor()
|
|
sql = "create or replace directory OGGSYNC as '" + self.config_params["datapump_directory_path"]+ "'"
|
|
cursor.execute(sql)
|
|
|
|
sql = "select HOST_NAME from v$instance"
|
|
cursor.execute(sql)
|
|
for row in cursor:
|
|
db_host_name=row[0]
|
|
db_host_name = db_host_name.split(".")[0]
|
|
|
|
cursor.close()
|
|
except cx_Oracle.DatabaseError as err:
|
|
print (err)
|
|
exit (ERROR)
|
|
# Generate IMPDP parfile
|
|
# compose file content
|
|
contents_of_file = []
|
|
contents_of_file.append("dumpfile = OGGSYNC:" + self.extract + "_"+ self.replicat + "_delta.dmp")
|
|
contents_of_file.append("logfile = OGGSYNC:import_" + self.extract + "_"+ self.replicat + "_delta.log")
|
|
|
|
for table_name in self.array_new_tables_on_source_db:
|
|
contents_of_file.append("tables = " + self.current_table_owner_on_source_db + "." + table_name)
|
|
|
|
contents_of_file.append("remap_schema = " + self.current_table_owner_on_source_db + ":" + self.current_table_owner_on_target_db)
|
|
|
|
if self.remap_tablespace:
|
|
contents_of_file.append("remap_tablespace=" + self.remap_tablespace)
|
|
contents_of_file.append(NEWLINE)
|
|
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
# write contents to file
|
|
file_name = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "import_delta.par"
|
|
f = open(file_name, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
|
|
impdp_parfile = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "import_delta.par"
|
|
# Generate IMPDP shell
|
|
# compose file content
|
|
|
|
contents_of_file = []
|
|
contents_of_file.append(". oraenv <<EOF! > /dev/null")
|
|
contents_of_file.append(self.database_target)
|
|
contents_of_file.append("EOF!")
|
|
contents_of_file.append("export ORACLE_SID=" + self.database_target_instances[0])
|
|
contents_of_file.append("impdp userid=\"'/ as sysdba'\" parfile=" + impdp_parfile)
|
|
contents_of_file.append(NEWLINE)
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
# write contents to file
|
|
file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + "import_delta.sh"
|
|
f = open(file, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
|
|
impdp_shell = self.config_params["remote_directory"] + "/temp/" + self.extract + "_" + self.replicat + "/" + "import_delta.sh"
|
|
# Perform chmod +x for the IMPDP shell
|
|
st = os.stat(file)
|
|
os.chmod(file, st.st_mode | stat.S_IEXEC)
|
|
|
|
# Upload and execute the IMPDP shell and parfile
|
|
self.upload_temp_file_to_ogg_host("import_delta.par")
|
|
self.upload_temp_file_to_ogg_host("import_delta.sh")
|
|
# IMPDP command will be launched from une of target database host
|
|
shellcommand="ssh oracle@" + self.database_target_instances[1] + " " + impdp_shell
|
|
process = subprocess.Popen(shlex.split(shellcommand), stdout=subprocess.PIPE)
|
|
while True:
|
|
output = process.stdout.readline()
|
|
output = output.decode('utf-8')
|
|
if output == '' and process.poll() is not None:
|
|
break
|
|
if output:
|
|
print(Cyan_On_Black(output.strip()))
|
|
rc = process.poll()
|
|
|
|
# Check for errors
|
|
download_file(self.config_params["datapump_directory_path"] + "/import_" + self.extract + "_" + self.replicat + "_delta.log", self.config_params["ogg_host"], OGG_SERVICE_DIR + "/temp/.")
|
|
logfile = OGG_SERVICE_DIR + "/temp/import_" + self.extract + "_" + self.replicat + "_delta.log"
|
|
contents_of_logfile = open(logfile, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
blocking_ora_error = 0
|
|
for line in contents_of_logfile:
|
|
if line.startswith("ORA-"):
|
|
# Line starts with an ORA-xxxx
|
|
ora_error = line.split(":")[0]
|
|
if ora_error not in EXPDP_IGNORE_ERROR_MESSAGES:
|
|
# The ORA-xxxx message is not in the whitelist
|
|
blocking_ora_error += 1
|
|
if blocking_ora_error > 0:
|
|
print(Red_On_Black("FAILED"))
|
|
print(White_On_Black("Please check the " + str(blocking_ora_error) + " error message(s) (ORA-xxxxx) in " + logfile))
|
|
exit (ERROR)
|
|
return
|
|
|
|
|
|
def generate_ddl_source_delta_tables_pk_uk(self):
|
|
shutil.copy(OGG_SERVICE_DIR + "/sql/" + self.config_params["sql_script_extract_ddl_pk_uk"], OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/")
|
|
# Generate extract DDL script
|
|
contents_of_file = []
|
|
contents_of_file.append("connect " + self.config_params["sysdba_user"] + "/" + self.config_params["sysdba_password"] + "@" + self.config_params["scan"] + "/" + self.database_source + " as sysdba")
|
|
contents_of_file.append("spool " + self.database_target + "_create_pk_uk.sql")
|
|
for line in self.array_new_tables_on_source_db:
|
|
contents_of_file.append("@" + self.config_params["sql_script_extract_ddl_pk_uk"] + " " + self.current_table_owner_on_source_db + " " + line)
|
|
|
|
contents_of_file.append("spool off")
|
|
contents_of_file.append("exit")
|
|
contents_of_file.append(NEWLINE)
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
|
|
# write contents to file
|
|
file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_source + "_extract_ddl_indexes.sql"
|
|
f = open(file, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
|
|
# Execute extract DDL script with SQL*Plus
|
|
shellcommand = "source ~/.bash_profile; cd "+ OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/"+ "; " + self.config_params["local_sqlplus"] + " /nolog @" + self.database_source + "_extract_ddl_indexes.sql"
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
|
|
# Add connection information and spool for the previous generated file
|
|
file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_target + "_create_pk_uk.sql"
|
|
contents_of_file = open(file, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
contents_of_file.insert(0, "spool " + self.database_target + "_create_pk_uk.log")
|
|
contents_of_file.insert(1, NEWLINE + "connect " + self.config_params["sysdba_user"] + "/" + self.config_params["sysdba_password"] + "@" + self.config_params["scan"] + "/" + self.database_target + " as sysdba")
|
|
contents_of_file.append("spool off" + NEWLINE)
|
|
contents_of_file.append("exit" + NEWLINE)
|
|
# Change OWNER as in target database
|
|
contents_of_file_bis = []
|
|
for line in contents_of_file:
|
|
#line = line.replace('\"', "")
|
|
line = line.replace(self.current_table_owner_on_source_db + '"' + ".", self.current_table_owner_on_target_db + '"' + ".")
|
|
contents_of_file_bis.append(line)
|
|
contents_of_file_bis = "".join(contents_of_file_bis)
|
|
f = open(file, 'w')
|
|
f.write(contents_of_file_bis)
|
|
f.close()
|
|
return
|
|
|
|
def create_delta_tables_pk_uk_on_target_database(self):
|
|
# Execute extract DDL script with SQL*Plus
|
|
shellcommand = "source ~/.bash_profile; cd "+ OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/"+ "; " + self.config_params["local_sqlplus"] + " /nolog @" + self.database_target + "_create_pk_uk.sql"
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
# Check for anormal ORA- in logfile
|
|
logfile_create_index = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_target + "_create_pk_uk.log"
|
|
contents_of_logfile_create_index = open(logfile_create_index, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
blocking_ora_error = 0
|
|
for line in contents_of_logfile_create_index:
|
|
if line.startswith("ORA-"):
|
|
# Line starts with an ORA-xxxx
|
|
ora_error = line.split(":")[0]
|
|
if ora_error not in TARGET_DATABASE_CREATE_INDEX_IGNORE_ERROR_MESSAGES:
|
|
# The ORA-xxxx message is not in the whitelist
|
|
blocking_ora_error += 1
|
|
if blocking_ora_error > 0:
|
|
print(Red_On_Black("FAILED"))
|
|
print(White_On_Black("Please check the " + str(blocking_ora_error) + " error message(s) (ORA-xxxxx) in " + logfile_create_index))
|
|
exit (ERROR)
|
|
return
|
|
|
|
|
|
def create_temporary_sync_directory(self):
|
|
# create temporary sync directory
|
|
try:
|
|
os.mkdir(OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat)
|
|
except (FileExistsError) as e:
|
|
pass
|
|
return
|
|
|
|
def generate_ddl_target_delta_tables_old_index(self):
|
|
shutil.copy(OGG_SERVICE_DIR + "/sql/" + self.config_params["sql_script_extract_ddl_index"], OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/")
|
|
# Generate extract DDL script
|
|
contents_of_file = []
|
|
contents_of_file.append("connect " + self.config_params["sysdba_user"] + "/" + self.config_params["sysdba_password"] + "@" + self.config_params["scan"] + "/" + self.database_target + " as sysdba")
|
|
contents_of_file.append("spool " + self.database_target + "_create_old_indexes.sql")
|
|
for line in self.array_new_tables_on_target_db:
|
|
contents_of_file.append("@" + self.config_params["sql_script_extract_ddl_index"] + " " + self.current_table_owner_on_target_db + " " + line)
|
|
|
|
contents_of_file.append("spool off")
|
|
contents_of_file.append("exit")
|
|
contents_of_file.append(NEWLINE)
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
|
|
# write contents to file
|
|
file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_target + "_extract_ddl_old_indexes.sql"
|
|
f = open(file, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
|
|
# Execute extract DDL script with SQL*Plus
|
|
shellcommand = "source ~/.bash_profile; cd "+ OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/"+ "; " + self.config_params["local_sqlplus"] + " /nolog @" + self.database_target + "_extract_ddl_old_indexes.sql"
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
|
|
# Add connection information and spool for the previous generated file
|
|
file = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_target + "_create_old_indexes.sql"
|
|
contents_of_file = open(file, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
contents_of_file.insert(0, "spool " + self.database_target + "_create_old_indexes.log")
|
|
contents_of_file.insert(1, NEWLINE + "connect " + self.config_params["sysdba_user"] + "/" + self.config_params["sysdba_password"] + "@" + self.config_params["scan"] + "/" + self.database_target + " as sysdba")
|
|
contents_of_file.append("spool off" + NEWLINE)
|
|
contents_of_file.append("exit" + NEWLINE)
|
|
contents_of_file = "".join(contents_of_file)
|
|
f = open(file, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
# Archive SQL file
|
|
archive_folder = OGG_SERVICE_DIR + "/archive"
|
|
shutil.copy(file, archive_folder + "/" + self.extract + "_"+ self.replicat + "_" + self.database_target + "_create_old_indexes_" + self.runid + ".sql")
|
|
return
|
|
|
|
def create_delta_tables_old_indexes_on_target_database(self):
|
|
# Execute extract DDL script with SQL*Plus
|
|
shellcommand = "source ~/.bash_profile; cd "+ OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/"+ "; " + self.config_params["local_sqlplus"] + " /nolog @" + self.database_target + "_create_old_indexes.sql"
|
|
cmd = subprocess.run(
|
|
shellcommand,
|
|
check=True,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
# Check for anormal ORA- in logfile
|
|
logfile_create_index = OGG_SERVICE_DIR + "/temp/" + self.extract + "_"+ self.replicat + "/" + self.database_target + "_create_old_indexes.log"
|
|
contents_of_logfile_create_index = open(logfile_create_index, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
blocking_ora_error = 0
|
|
for line in contents_of_logfile_create_index:
|
|
if line.startswith("ORA-"):
|
|
# Line starts with an ORA-xxxx
|
|
ora_error = line.split(":")[0]
|
|
if ora_error not in TARGET_DATABASE_CREATE_INDEX_IGNORE_ERROR_MESSAGES:
|
|
# The ORA-xxxx message is not in the whitelist
|
|
blocking_ora_error += 1
|
|
if blocking_ora_error > 0:
|
|
print(Red_On_Black("FAILED"))
|
|
print(White_On_Black("Please check the " + str(blocking_ora_error) + " error message(s) (ORA-xxxxx) in " + logfile_create_index))
|
|
return
|
|
|
|
|
|
def add_new_tables_to_extract_prm(self):
|
|
contents_of_file = []
|
|
for table_name in self.array_final_tables_on_extract:
|
|
addnative_string = ""
|
|
for table_with_add_native_option in self.extract_addnative_option_array:
|
|
if table_name == table_with_add_native_option:
|
|
addnative_string = ",_ADDNATIVE"
|
|
contents_of_file.append("TABLE " + self.current_table_owner_on_source_db + "." + table_name + addnative_string + ";")
|
|
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
# Save content as prm extract tables
|
|
file = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_tables.prm"
|
|
f = open(file, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
return
|
|
|
|
def add_new_tables_to_replicat_prm_with_csn_filter(self, filter_transaction_csn):
|
|
contents_of_file = []
|
|
# Classic MAP section
|
|
for table_name in self.array_final_tables_on_replicat:
|
|
if (table_name not in self.array_new_tables_on_target_db) or (filter_transaction_csn == None):
|
|
contents_of_file.append("MAP " + self.current_table_owner_on_source_db + "." + table_name + ", TARGET " + self.current_table_owner_on_target_db + "." + table_name + ";")
|
|
else:
|
|
self.replicat_intermediate_filter_string = "FILTER ( @GETENV ('TRANSACTION', 'CSN') > " + filter_transaction_csn + ')'
|
|
contents_of_file.append("MAP " + self.current_table_owner_on_source_db + "." + table_name + ", TARGET " + self.current_table_owner_on_target_db + "." + table_name + ", " + self.replicat_intermediate_filter_string + ";")
|
|
|
|
# J$ section
|
|
for key in self.current_map_dictionary.keys():
|
|
contents_of_file.append(self.current_map_dictionary[key])
|
|
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
# Save content as prm replicat tables
|
|
file = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_tables.prm"
|
|
f = open(file, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
return
|
|
|
|
def switch_logfile_and_chekcpoint(self):
|
|
try:
|
|
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_source, mode=cx_Oracle.SYSDBA)
|
|
cursor = db.cursor()
|
|
sql = "alter system switch logfile"
|
|
cursor.execute(sql)
|
|
sql = "alter system checkpoint"
|
|
cursor.execute(sql)
|
|
cursor.close()
|
|
except cx_Oracle.DatabaseError as err:
|
|
print (err)
|
|
exit (ERROR)
|
|
return
|
|
|
|
def define_scn_for_export(self):
|
|
try:
|
|
db = cx_Oracle.connect(self.config_params["sysdba_user"], self.config_params["sysdba_password"], self.config_params["scan"] + "/" + self.database_source, mode=cx_Oracle.SYSDBA)
|
|
cursor = db.cursor()
|
|
sql = """select min(scn) scn from(
|
|
select min(current_scn) scn from gv$database
|
|
union
|
|
select min(t.start_scn) scn from gv$transaction t, gv$session s
|
|
where s.saddr = t.ses_addr
|
|
and s.inst_id = t.inst_id
|
|
)"""
|
|
cursor.execute(sql)
|
|
for row in cursor:
|
|
self.scn_of_export=str(row[0])
|
|
cursor.close()
|
|
except cx_Oracle.DatabaseError as err:
|
|
print (err)
|
|
exit (ERROR)
|
|
return
|
|
|
|
|
|
def remove_csn_filter_from_replicat_prm(self):
|
|
contents_of_file = []
|
|
for table_name in self.array_final_tables_on_replicat:
|
|
contents_of_file.append("MAP " + self.current_table_owner_on_source_db + "." + table_name + ", TARGET " + self.current_table_owner_on_target_db + "." + table_name + ";")
|
|
|
|
for key in self.final_map_dictionary.keys():
|
|
contents_of_file.append(self.final_map_dictionary[key])
|
|
|
|
contents_of_file = NEWLINE.join(contents_of_file)
|
|
|
|
file = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_tables.prm"
|
|
f = open(file, 'w')
|
|
f.write(contents_of_file)
|
|
f.close()
|
|
return
|
|
|
|
def backup_and_empty_delta_prm(self):
|
|
for str1 in [self.extract, self.replicat]:
|
|
file = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + str1 + "_delta.prm"
|
|
file_old = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/." + str1 + "_delta.old"
|
|
shutil.copyfile(file, file_old)
|
|
# Empry file
|
|
f = open(file, 'w')
|
|
f.write("")
|
|
f.close()
|
|
return
|
|
|
|
|
|
def archive_prm(self):
|
|
prm_folder = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat
|
|
archive_prm_folder = OGG_SERVICE_DIR + "/archive"
|
|
zipname = self.extract + "_"+ self.replicat + "_" + self.runid +'.zip'
|
|
fantasy_zip = zipfile.ZipFile(archive_prm_folder + '/'+ zipname, 'w')
|
|
for folder, subfolders, files in os.walk(prm_folder):
|
|
for file in files:
|
|
if file.endswith('.prm'):
|
|
fantasy_zip.write(os.path.join(folder, file), os.path.relpath(os.path.join(folder,file), prm_folder), compress_type = zipfile.ZIP_DEFLATED)
|
|
fantasy_zip.close()
|
|
return zipname
|
|
|
|
def parse_prm_tables(self):
|
|
# EXTRACT current
|
|
#################
|
|
extract_current_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.extract + "_tables.prm"
|
|
try:
|
|
contents_of_extract_current_prm = open(extract_current_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
except FileNotFoundError as err:
|
|
print (Red_On_Black("ERROR: file " + self.extract + "_tables.prm not found."))
|
|
exit (ERROR)
|
|
|
|
tab_owner_array=[]
|
|
tab_name_array=[]
|
|
# Identify tables
|
|
for line in contents_of_extract_current_prm:
|
|
# supress spaces in begin/end line
|
|
auxline = line.rstrip()
|
|
auxline = auxline.lstrip()
|
|
# skip comments
|
|
if not auxline.startswith("--"):
|
|
# eliminate double spaces and spaces
|
|
auxline = re.sub(' +', ' ',auxline)
|
|
auxline = auxline.replace(" ","")
|
|
auxline = auxline.replace(";", "")
|
|
auxline = auxline.upper()
|
|
if auxline.upper().startswith("TABLE"):
|
|
auxline = auxline.replace("TABLE", "", 1)
|
|
auxline2 = auxline.split(",")[0]
|
|
(tab_owner,tab_name) = auxline2.split(".")
|
|
tab_owner_array.append(tab_owner)
|
|
tab_name_array.append(tab_name)
|
|
if ",_ADDNATIVE" in auxline:
|
|
self.extract_addnative_option_array.append(tab_name)
|
|
|
|
# Check if we have only one owner
|
|
self.current_table_owner_on_source_db = tab_owner_array[0]
|
|
self.array_current_tables_on_source_db = tab_name_array
|
|
for tab_owner in tab_owner_array:
|
|
if tab_owner != self.current_table_owner_on_source_db:
|
|
print ("More than one table owners in EXTRACT current parameter file.")
|
|
exit (ERROR)
|
|
|
|
# REPLICAT current
|
|
##################
|
|
replicat_current_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_tables.prm"
|
|
try:
|
|
contents_of_replicat_current_prm = open(replicat_current_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
except FileNotFoundError as err:
|
|
print (Red_On_Black("ERROR: file " + self.replicat + "_tables.prm not found."))
|
|
exit (ERROR)
|
|
|
|
self.current_map_dictionary = {}
|
|
table_array = []
|
|
for line in contents_of_replicat_current_prm:
|
|
# supress spaces in begin/end line
|
|
auxline = line.rstrip()
|
|
auxline = auxline.lstrip()
|
|
# Remove double spaces
|
|
auxline = re.sub(' +', ' ',auxline)
|
|
# skip comments
|
|
if not auxline.startswith("--"):
|
|
if auxline.upper().startswith("MAP") and "TARGET" in auxline.upper() and "J$" not in auxline.upper():
|
|
# Classic MAP section
|
|
# Force uppercase and elilminate ";"
|
|
auxline = auxline.upper()
|
|
auxline = auxline.replace(";", "")
|
|
(str1, str2) = auxline.split("TARGET ")
|
|
str2 = str2.lstrip().rstrip()
|
|
table_array.append(str2)
|
|
if auxline.upper().startswith("MAP") and "TARGET" in auxline.upper() and "J$" in auxline.upper():
|
|
# J$ section
|
|
auxline = auxline.upper()
|
|
str1 = auxline.split(",")[1]
|
|
str2 = str1.split(".")[1]
|
|
self.current_map_dictionary[str2] = auxline
|
|
|
|
|
|
array_current_owner_tables_on_target_db = table_array
|
|
|
|
# Check if we have only one owner
|
|
self.array_current_tables_on_target_db = []
|
|
self.current_table_owner_on_target_db = array_current_owner_tables_on_target_db[0].split(".")[0]
|
|
for line in array_current_owner_tables_on_target_db:
|
|
(owner, table_name) = line.split(".")
|
|
# If a FILTER condition with CSN exists, we remove it
|
|
if ("CSN" in table_name) and ("TRANSACTION" in table_name):
|
|
pass
|
|
else:
|
|
self.array_current_tables_on_target_db.append(table_name)
|
|
if owner != self.current_table_owner_on_target_db:
|
|
print ("More than one table owners in REPLICAT current parameter file.")
|
|
exit (ERROR)
|
|
|
|
return
|
|
|
|
def clean_local_prm_files(self):
|
|
replicat_tables_prm_filename = OGG_SERVICE_DIR + "/sync.d/" + self.extract + "_"+ self.replicat + "/" + self.replicat + "_tables.prm"
|
|
contents_of_replicat_tables_prm = open(replicat_tables_prm_filename, "r", encoding = "utf-8", errors = "replace").readlines()
|
|
contents_of_replicat_tables_prm = removeDuplicates(contents_of_replicat_tables_prm)
|
|
|
|
# Remove duplicate lines
|
|
f = open(replicat_tables_prm_filename, 'w')
|
|
contents_of_replicat_tables_prm="".join(contents_of_replicat_tables_prm)
|
|
f.write(contents_of_replicat_tables_prm)
|
|
f.close()
|
|
# Remove empty lines
|
|
remove_empty_lines(replicat_tables_prm_filename)
|
|
return
|
|
|
|
|
|
def generate_finals_tables_arrays(self):
|
|
self.array_duplicate_tables_on_extract = intersection(self.array_current_tables_on_source_db, self.array_new_tables_on_source_db)
|
|
self.array_final_tables_on_extract = union_keep_order(self.array_current_tables_on_source_db, self.array_new_tables_on_source_db)
|
|
self.array_duplicate_tables_on_replicat = intersection(self.array_current_tables_on_target_db, self.array_new_tables_on_target_db)
|
|
self.array_final_tables_on_replicat = union_keep_order(self.array_current_tables_on_target_db, self.array_new_tables_on_target_db)
|
|
self.final_map_dictionary = merge_dict(self.current_map_dictionary, self.new_map_dictionary)
|
|
self.tables_in_delta_exists_in_current = False
|
|
if (len(self.array_duplicate_tables_on_extract) > 0) or (len(self.array_duplicate_tables_on_replicat) > 0):
|
|
self.tables_in_delta_exists_in_current = False
|
|
return
|