Coverage for replication.py: 100%
39 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 09:44 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 09:44 +0000
1import datetime
3from primary import Primary
4from secondary import Secondary
7class Replication:
8 def __init__(self, primary: Primary, secondary: Secondary):
9 self.primary = primary
10 self.secondary = secondary
12 def run(self, name, today: str):
13 subscription_name = self.secondary.get_subscription_name(self.primary.db.db_name)
15 # Check if replication is already started
16 if subscription_name == "":
17 print("Replication not in progress")
18 print(f"{today} - Starting process : {name} {self.primary.db.conn_string} {self.primary.db.db_name} - {self.secondary.db.conn_string} database {self.secondary.db.db_name}")
20 self.primary.create_replication_user()
22 # Section pre-data
23 print(f"pg_restore pre begin")
24 with self.primary.execute_dump("pre-data") as dump:
25 self.secondary.execute_pre_data_dump(dump)
26 print(f"run_dump_restore_pre end")
28 # Section post-data
29 print(f"pg_restore post begin")
30 with self.primary.execute_dump("post-data") as dump:
31 self.secondary.execute_post_data_dump_only_pk(dump)
32 print(f"run_dump_restore_post end")
34 date_start = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
35 unique_name = f"{self.primary.db.db_name}_{date_start}"
36 self.primary.create_publication(unique_name)
37 self.secondary.create_subscription(unique_name)
39 subscription_name = self.secondary.get_subscription_name(self.primary.db.db_name)
41 if subscription_name:
42 print(
43 f"Check if first step of replication is done - db {self.secondary.db.db_name} on host {self.secondary.db.conn_string} from {self.primary.db.conn_string} database {self.primary.db.db_name}")
44 self.secondary.wait_first_step_of_replication()
46 # Disable subscription
47 self.secondary.disable_subscription(subscription_name)
49 # Restore post section without primary keys
50 print(f"pg_restore post (without PK) begin")
51 with self.primary.execute_dump("post-data") as dump:
52 self.secondary.execute_post_data_dump_without_pk(dump)
53 print(f"run_dump_restore_post_without_pk end")
55 # Enable subscription
56 self.secondary.enable_subscription(subscription_name)
58 end_time = datetime.datetime.now().strftime("%Y%m%d-%H-%M-%S")
59 print(f"end={end_time}")
60 else:
61 print("No replication running, exiting")
63 print("end")