Coverage for replication.py: 100%

39 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 09:44 +0000

1import datetime 

2 

3from primary import Primary 

4from secondary import Secondary 

5 

6 

7class Replication: 

8 def __init__(self, primary: Primary, secondary: Secondary): 

9 self.primary = primary 

10 self.secondary = secondary 

11 

12 def run(self, name, today: str): 

13 subscription_name = self.secondary.get_subscription_name(self.primary.db.db_name) 

14 

15 # Check if replication is already started 

16 if subscription_name == "": 

17 print("Replication not in progress") 

18 print(f"{today} - Starting process : {name} {self.primary.db.conn_string} {self.primary.db.db_name} - {self.secondary.db.conn_string} database {self.secondary.db.db_name}") 

19 

20 self.primary.create_replication_user() 

21 

22 # Section pre-data 

23 print(f"pg_restore pre begin") 

24 with self.primary.execute_dump("pre-data") as dump: 

25 self.secondary.execute_pre_data_dump(dump) 

26 print(f"run_dump_restore_pre end") 

27 

28 # Section post-data 

29 print(f"pg_restore post begin") 

30 with self.primary.execute_dump("post-data") as dump: 

31 self.secondary.execute_post_data_dump_only_pk(dump) 

32 print(f"run_dump_restore_post end") 

33 

34 date_start = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") 

35 unique_name = f"{self.primary.db.db_name}_{date_start}" 

36 self.primary.create_publication(unique_name) 

37 self.secondary.create_subscription(unique_name) 

38 

39 subscription_name = self.secondary.get_subscription_name(self.primary.db.db_name) 

40 

41 if subscription_name: 

42 print( 

43 f"Check if first step of replication is done - db {self.secondary.db.db_name} on host {self.secondary.db.conn_string} from {self.primary.db.conn_string} database {self.primary.db.db_name}") 

44 self.secondary.wait_first_step_of_replication() 

45 

46 # Disable subscription 

47 self.secondary.disable_subscription(subscription_name) 

48 

49 # Restore post section without primary keys 

50 print(f"pg_restore post (without PK) begin") 

51 with self.primary.execute_dump("post-data") as dump: 

52 self.secondary.execute_post_data_dump_without_pk(dump) 

53 print(f"run_dump_restore_post_without_pk end") 

54 

55 # Enable subscription 

56 self.secondary.enable_subscription(subscription_name) 

57 

58 end_time = datetime.datetime.now().strftime("%Y%m%d-%H-%M-%S") 

59 print(f"end={end_time}") 

60 else: 

61 print("No replication running, exiting") 

62 

63 print("end")