Skip to content
This repository was archived by the owner on Feb 18, 2025. It is now read-only.

Commit 8ba9274

Browse files
authored
Merge pull request #1267 from marcosvm/marcosvm/deploy-replication
thank you!
2 parents 97eb9c2 + adfffdf commit 8ba9274

3 files changed

Lines changed: 65 additions & 3 deletions

File tree

go/http/api.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1577,6 +1577,31 @@ func (this *HttpAPI) DisableSemiSyncReplica(params martini.Params, r render.Rend
15771577
this.setSemiSyncReplica(params, r, req, user, false)
15781578
}
15791579

1580+
// DelayReplication delays replication on given instance with given seconds
1581+
func (this *HttpAPI) DelayReplication(params martini.Params, r render.Render, req *http.Request, user auth.User) {
1582+
if !isAuthorizedForAction(req, user) {
1583+
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
1584+
return
1585+
}
1586+
instanceKey, err := this.getInstanceKey(params["host"], params["port"])
1587+
if err != nil {
1588+
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
1589+
return
1590+
}
1591+
seconds, err := strconv.Atoi(params["seconds"])
1592+
if err != nil {
1593+
Respond(r, &APIResponse{Code: ERROR, Message: "Invalid value provided for seconds"})
1594+
return
1595+
}
1596+
err = inst.DelayReplication(&instanceKey, seconds)
1597+
if err != nil {
1598+
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
1599+
return
1600+
}
1601+
1602+
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Replication delayed: %+v", instanceKey), Details: seconds})
1603+
}
1604+
15801605
// SetReadOnly sets the global read_only variable
15811606
func (this *HttpAPI) SetReadOnly(params martini.Params, r render.Render, req *http.Request, user auth.User) {
15821607
if !isAuthorizedForAction(req, user) {
@@ -3724,6 +3749,7 @@ func (this *HttpAPI) RegisterRequests(m *martini.ClassicMartini) {
37243749
this.registerAPIRequest(m, "disable-semi-sync-master/:host/:port", this.DisableSemiSyncMaster)
37253750
this.registerAPIRequest(m, "enable-semi-sync-replica/:host/:port", this.EnableSemiSyncReplica)
37263751
this.registerAPIRequest(m, "disable-semi-sync-replica/:host/:port", this.DisableSemiSyncReplica)
3752+
this.registerAPIRequest(m, "delay-replication/:host/:port/:seconds", this.DelayReplication)
37273753

37283754
// Replication information:
37293755
this.registerAPIRequest(m, "can-replicate-from/:host/:port/:belowHost/:belowPort", this.CanReplicateFrom)

go/inst/instance_topology_dao.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,28 @@ func EnableSemiSync(instanceKey *InstanceKey, master, replica bool) error {
595595
return err
596596
}
597597

598+
// DelayReplication set the replication delay given seconds
599+
// keeping the current state of the replication threads.
600+
func DelayReplication(instanceKey *InstanceKey, seconds int) error {
601+
if seconds < 0 {
602+
return fmt.Errorf("invalid seconds: %d, it should be greater or equal to 0", seconds)
603+
}
604+
query := fmt.Sprintf("change master to master_delay=%d", seconds)
605+
statements, err := GetReplicationRestartPreserveStatements(instanceKey, query)
606+
if err != nil {
607+
return err
608+
}
609+
for _, cmd := range statements {
610+
if _, err := ExecInstance(instanceKey, cmd); err != nil {
611+
return log.Errorf("%+v: DelayReplication: '%q' failed: %+v", *instanceKey, cmd, err)
612+
} else {
613+
log.Infof("DelayReplication: %s on %+v", cmd, *instanceKey)
614+
}
615+
}
616+
AuditOperation("delay-replication", instanceKey, fmt.Sprintf("set to %d", seconds))
617+
return nil
618+
}
619+
598620
// ChangeMasterCredentials issues a CHANGE MASTER TO... MASTER_USER=, MASTER_PASSWORD=...
599621
func ChangeMasterCredentials(instanceKey *InstanceKey, creds *ReplicationCredentials) (*Instance, error) {
600622
instance, err := ReadTopologyInstance(instanceKey)

resources/bin/orchestrator-client

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ api_path=
6363
basic_auth="${ORCHESTRATOR_AUTH_USER:-}:${ORCHESTRATOR_AUTH_PASSWORD:-}"
6464
headers_auth="${ORCHESTRATOR_AUTH_USER_HEADER}"
6565
binlog=
66+
seconds=
6667

6768
instance_hostport=
6869
destination_hostport=
@@ -92,11 +93,12 @@ for arg in "$@"; do
9293
"-auth"|"--auth") set -- "$@" "-b" ;;
9394
"-headers-auth"|"--headers-auth") set -- "$@" "-e" ;;
9495
"-binlog"|"--binlog") set -- "$@" "-n" ;;
96+
"-seconds"|"--seconds") set -- "$@" "-S" ;;
9597
*) set -- "$@" "$arg"
9698
esac
9799
done
98100

99-
while getopts "c:i:d:s:a:D:U:o:r:u:R:t:l:H:P:q:b:e:n:h" OPTION
101+
while getopts "c:i:d:s:a:D:U:o:r:u:R:t:l:H:P:q:b:e:n:h:S:" OPTION
100102
do
101103
case $OPTION in
102104
h) command="help" ;;
@@ -118,7 +120,8 @@ do
118120
b) basic_auth="$OPTARG" ;;
119121
e) headers_auth="$OPTARG" ;;
120122
n) binlog="$OPTARG" ;;
121-
q) query="$OPTARG"
123+
q) query="$OPTARG" ;;
124+
S) seconds="$OPTARG"
122125
esac
123126
done
124127

@@ -372,6 +375,8 @@ function prompt_help {
372375
pool name for pool related commands
373376
-H <hostname> -h <hostname>
374377
indicate host for resolve and raft operations
378+
-S <seconds> --seconds
379+
seconds for delaying replication
375380
"
376381

377382
cat "$0" | universal_sed -n '/run_command/,/esac/p' | egrep '".*"[)].*;;' | universal_sed -r -e 's/"(.*?)".*#(.*)/\1~\2/' | column -t -s "~"
@@ -756,6 +761,15 @@ function general_instance_command {
756761
print_details | filter_key | print_key
757762
}
758763

764+
function delay_replication_command {
765+
path="${1:-$command}"
766+
767+
assert_nonempty "instance" "$instance_hostport"
768+
assert_nonempty "seconds" "$seconds"
769+
api "$path/$instance_hostport/$seconds"
770+
print_details
771+
}
772+
759773
function replication_analysis {
760774
api "replication-analysis"
761775
print_details | jq -r '.[] |
@@ -978,7 +992,7 @@ function run_command {
978992
"enable-semi-sync-replica") general_instance_command ;; # Enable semi-sync (replica-side)
979993
"disable-semi-sync-replica") general_instance_command ;; # Disable semi-sync (replica-side)
980994
"restart-replica-statements") restart_replica_statements ;; # Given `-q "<query>"` that requires replication restart to apply, wrap query with stop/start slave statements as required to restore instance to same replication state. Print out set of statements
981-
995+
"delay-replication") delay_replication_command ;; # Issue a CHANGE MASTER TO DELAY=seconds preserving the replication threads state
982996
"can-replicate-from") can_replicate_from ;; # Check if an instance can potentially replicate from another, according to replication rules
983997
"can-replicate-from-gtid") can_replicate_from_gtid ;; # Check if an instance can potentially replicate from another, according to replication rules and assuming Oracle GTID
984998
"is-replicating") is_replicating ;; # Check if an instance is replicating at this time (both SQL and IO threads running)

0 commit comments

Comments
 (0)