Coverage for primary.py: 99%
73 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 09:44 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 09:44 +0000
1import dataclasses
2import secrets
3import string
4import subprocess
6from database import Database
9class Primary:
10 def __init__(self, db: Database, list_schema_excluded):
11 self.db = db
12 self.db_infos = self._retrieve_db_infos(list_schema_excluded)
14 def _retrieve_db_infos(self, list_schema_excluded) -> DbInfos:
15 schema_excluded_str = ""
16 if list_schema_excluded is None:
17 schema_query = "SELECT schema_name FROM information_schema.schemata WHERE schema_name NOT ILIKE 'pg_%'"
18 else:
19 schema_excluded_str = ",".join(
20 [f"'{schema}'" for schema in list_schema_excluded])
21 schema_query = f"SELECT schema_name FROM information_schema.schemata WHERE schema_name NOT ILIKE 'pg_%' AND schema_name NOT IN ({schema_excluded_str});"
22 results = self.db.execute_query(schema_query)
23 db_schemas = None
24 if results and results[0]:
25 db_schemas = [schema[0] for schema in results]
27 results = self.db.execute_query(
28 f"SELECT pg_size_pretty(pg_database_size('{self.db.db_name}'))")
29 db_size = None
30 if results and results[0]:
31 db_size = results[0][0]
33 results = self.db.execute_query(
34 "SELECT count(*) from pg_stat_user_tables")
35 db_tables = None
36 if results and results[0]:
37 db_tables = results[0][0]
39 print(f"Starting pg_dump from server {self.db.conn_string} database {self.db.db_name} {db_size}")
40 print(f"db_schemas : {db_schemas}")
41 print(f"db_size : {db_size}")
42 print(f"db_tables : {db_tables}")
44 return DbInfos(db_schemas, db_size, db_tables, schema_excluded_str)
46 def create_publication(self, unique_name: str):
47 print(
48 f"Create publication on primary {self.db.conn_string} database {self.db.db_name}")
50 self.db.execute_query(f"CREATE PUBLICATION publication_{unique_name};",
51 fetch=False)
52 # Add tables to publication
53 query_publication = f"select schemaname, relname from pg_stat_user_tables where relname <> 'spatial_ref_sys'"
54 if self.db_infos.schema_excluded_str != "":
55 query_publication = query_publication + f" AND schemaname NOT IN ({self.db_infos.schema_excluded_str})"
56 results = self.db.execute_query(query_publication)
57 if results: 57 ↛ exitline 57 didn't return from function 'create_publication' because the condition on line 57 was always true
58 for schema, table in results:
59 print(
60 f"Add table {schema}.{table} to publication {unique_name}")
61 self.db.execute_query(f"ALTER PUBLICATION publication_{unique_name} ADD TABLE {schema}.{table};",
62 fetch=False)
64 def create_replication_user(self):
65 print(f" create replication user on {self.db.conn_string}")
66 # Verify if replication user already exist
67 results = self.db.execute_query("SELECT count(rolname) FROM pg_roles WHERE rolname ='replication'")
68 if results and results[0][0] > 0:
69 print(f"user replication already exist")
70 else:
71 replication_password = generate_password()
72 self.db.execute_query(f"CREATE USER replication LOGIN ENCRYPTED PASSWORD '{replication_password}'; "
73 f"ALTER ROLE replication WITH REPLICATION", fetch=False)
74 print(f"user replication created")
76 for schema in self.db_infos.db_schemas:
77 # Grant privileges on the schema
78 self.db.execute_query(f"GRANT SELECT ON ALL TABLES IN SCHEMA {schema} TO replication; "
79 f"GRANT USAGE ON SCHEMA {schema} TO replication", fetch=False)
80 print(f"GRANT right on {schema} to replication user")
83 def execute_dump(self, section: str):
84 command = [
85 "pg_dump",
86 "-d", self.db.conn_string,
87 "-Fp",
88 "-T", "public.spatial_ref_sys",
89 "--no-acl",
90 "--no-owner",
91 f"--section={section}",
92 "-N", "information_schema"
93 ]
94 for schema in self.db_infos.db_schemas:
95 command.append("-n")
96 command.append(schema)
98 print(f" dump section {section}")
99 print(" ".join(command))
101 return subprocess.Popen(command, stdout=subprocess.PIPE, text=True)
104@dataclasses.dataclass
105class DbInfos:
106 def __init__(self, db_schemas, db_size, db_tables, schema_excluded_str):
107 self.db_schemas = db_schemas
108 self.db_size = db_size
109 self.db_tables = db_tables
110 self.schema_excluded_str = schema_excluded_str
113def generate_password(length=32):
114 characters = string.ascii_letters + string.digits
115 password = ''.join(secrets.choice(characters) for _ in range(length))
116 return password