Coverage for primary.py: 99%

73 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 09:44 +0000

1import dataclasses 

2import secrets 

3import string 

4import subprocess 

5 

6from database import Database 

7 

8 

9class Primary: 

10 def __init__(self, db: Database, list_schema_excluded): 

11 self.db = db 

12 self.db_infos = self._retrieve_db_infos(list_schema_excluded) 

13 

14 def _retrieve_db_infos(self, list_schema_excluded) -> DbInfos: 

15 schema_excluded_str = "" 

16 if list_schema_excluded is None: 

17 schema_query = "SELECT schema_name FROM information_schema.schemata WHERE schema_name NOT ILIKE 'pg_%'" 

18 else: 

19 schema_excluded_str = ",".join( 

20 [f"'{schema}'" for schema in list_schema_excluded]) 

21 schema_query = f"SELECT schema_name FROM information_schema.schemata WHERE schema_name NOT ILIKE 'pg_%' AND schema_name NOT IN ({schema_excluded_str});" 

22 results = self.db.execute_query(schema_query) 

23 db_schemas = None 

24 if results and results[0]: 

25 db_schemas = [schema[0] for schema in results] 

26 

27 results = self.db.execute_query( 

28 f"SELECT pg_size_pretty(pg_database_size('{self.db.db_name}'))") 

29 db_size = None 

30 if results and results[0]: 

31 db_size = results[0][0] 

32 

33 results = self.db.execute_query( 

34 "SELECT count(*) from pg_stat_user_tables") 

35 db_tables = None 

36 if results and results[0]: 

37 db_tables = results[0][0] 

38 

39 print(f"Starting pg_dump from server {self.db.conn_string} database {self.db.db_name} {db_size}") 

40 print(f"db_schemas : {db_schemas}") 

41 print(f"db_size : {db_size}") 

42 print(f"db_tables : {db_tables}") 

43 

44 return DbInfos(db_schemas, db_size, db_tables, schema_excluded_str) 

45 

46 def create_publication(self, unique_name: str): 

47 print( 

48 f"Create publication on primary {self.db.conn_string} database {self.db.db_name}") 

49 

50 self.db.execute_query(f"CREATE PUBLICATION publication_{unique_name};", 

51 fetch=False) 

52 # Add tables to publication 

53 query_publication = f"select schemaname, relname from pg_stat_user_tables where relname <> 'spatial_ref_sys'" 

54 if self.db_infos.schema_excluded_str != "": 

55 query_publication = query_publication + f" AND schemaname NOT IN ({self.db_infos.schema_excluded_str})" 

56 results = self.db.execute_query(query_publication) 

57 if results: 57 ↛ exitline 57 didn't return from function 'create_publication' because the condition on line 57 was always true

58 for schema, table in results: 

59 print( 

60 f"Add table {schema}.{table} to publication {unique_name}") 

61 self.db.execute_query(f"ALTER PUBLICATION publication_{unique_name} ADD TABLE {schema}.{table};", 

62 fetch=False) 

63 

64 def create_replication_user(self): 

65 print(f" create replication user on {self.db.conn_string}") 

66 # Verify if replication user already exist 

67 results = self.db.execute_query("SELECT count(rolname) FROM pg_roles WHERE rolname ='replication'") 

68 if results and results[0][0] > 0: 

69 print(f"user replication already exist") 

70 else: 

71 replication_password = generate_password() 

72 self.db.execute_query(f"CREATE USER replication LOGIN ENCRYPTED PASSWORD '{replication_password}'; " 

73 f"ALTER ROLE replication WITH REPLICATION", fetch=False) 

74 print(f"user replication created") 

75 

76 for schema in self.db_infos.db_schemas: 

77 # Grant privileges on the schema 

78 self.db.execute_query(f"GRANT SELECT ON ALL TABLES IN SCHEMA {schema} TO replication; " 

79 f"GRANT USAGE ON SCHEMA {schema} TO replication", fetch=False) 

80 print(f"GRANT right on {schema} to replication user") 

81 

82 

83 def execute_dump(self, section: str): 

84 command = [ 

85 "pg_dump", 

86 "-d", self.db.conn_string, 

87 "-Fp", 

88 "-T", "public.spatial_ref_sys", 

89 "--no-acl", 

90 "--no-owner", 

91 f"--section={section}", 

92 "-N", "information_schema" 

93 ] 

94 for schema in self.db_infos.db_schemas: 

95 command.append("-n") 

96 command.append(schema) 

97 

98 print(f" dump section {section}") 

99 print(" ".join(command)) 

100 

101 return subprocess.Popen(command, stdout=subprocess.PIPE, text=True) 

102 

103 

104@dataclasses.dataclass 

105class DbInfos: 

106 def __init__(self, db_schemas, db_size, db_tables, schema_excluded_str): 

107 self.db_schemas = db_schemas 

108 self.db_size = db_size 

109 self.db_tables = db_tables 

110 self.schema_excluded_str = schema_excluded_str 

111 

112 

113def generate_password(length=32): 

114 characters = string.ascii_letters + string.digits 

115 password = ''.join(secrets.choice(characters) for _ in range(length)) 

116 return password 

117