From 995e5f18babd25ca98e63014618b4ced3208a487 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sun, 21 May 2017 09:32:23 +0300 Subject: [PATCH 1/6] concurrent readson ReadTopologyInstanceBufferable --- go/inst/instance_dao.go | 182 ++++++++++++++++++++++++---------------- 1 file changed, 109 insertions(+), 73 deletions(-) diff --git a/go/inst/instance_dao.go b/go/inst/instance_dao.go index 47bf0be6e..a122ac1e2 100644 --- a/go/inst/instance_dao.go +++ b/go/inst/instance_dao.go @@ -26,6 +26,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/openark/golib/log" @@ -248,6 +249,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, } }() + var waitGroup sync.WaitGroup readingStartTime := time.Now() instance := NewInstance() instanceFound := false @@ -340,36 +342,44 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, } if (instance.IsOracleMySQL() || instance.IsPercona()) && !instance.IsSmallerMajorVersionByString("5.6") { - var masterInfoRepositoryOnTable bool - // Stuff only supported on Oracle MySQL >= 5.6 - // ... - // @@gtid_mode only available in Orcale MySQL >= 5.6 - // Previous version just issued this query brute-force, but I don't like errors being issued where they shouldn't. - latency.Start("instance") - _ = db.QueryRow("select @@global.gtid_mode = 'ON', @@global.server_uuid, @@global.gtid_purged, @@global.master_info_repository = 'TABLE', @@global.binlog_row_image").Scan(&instance.SupportsOracleGTID, &instance.ServerUUID, &instance.GtidPurged, &masterInfoRepositoryOnTable, &instance.BinlogRowImage) - if masterInfoRepositoryOnTable { - _ = db.QueryRow("select count(*) > 0 and MAX(User_name) != '' from mysql.slave_master_info").Scan(&instance.ReplicationCredentialsAvailable) - } - latency.Stop("instance") + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + var masterInfoRepositoryOnTable bool + // Stuff only supported on Oracle MySQL >= 5.6 + // ... + // @@gtid_mode only available in Orcale MySQL >= 5.6 + // Previous version just issued this query brute-force, but I don't like errors being issued where they shouldn't. + latency.Start("instance") + _ = db.QueryRow("select @@global.gtid_mode = 'ON', @@global.server_uuid, @@global.gtid_purged, @@global.master_info_repository = 'TABLE', @@global.binlog_row_image").Scan(&instance.SupportsOracleGTID, &instance.ServerUUID, &instance.GtidPurged, &masterInfoRepositoryOnTable, &instance.BinlogRowImage) + if masterInfoRepositoryOnTable { + _ = db.QueryRow("select count(*) > 0 and MAX(User_name) != '' from mysql.slave_master_info").Scan(&instance.ReplicationCredentialsAvailable) + } + latency.Stop("instance") + }() } } { - var dummy string - // show global status works just as well with 5.6 & 5.7 (5.7 moves variables to performance_schema) - latency.Start("instance") - err = db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &instance.Uptime) - latency.Stop("instance") - - if err != nil { - logReadTopologyInstanceError(instanceKey, "show global status like 'Uptime'", err) + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + var dummy string + // show global status works just as well with 5.6 & 5.7 (5.7 moves variables to performance_schema) + latency.Start("instance") + err = db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &instance.Uptime) + latency.Stop("instance") - // We do not "goto Cleanup" here, although it should be the correct flow. - // Reason is 5.7's new security feature that requires GRANTs on performance_schema.global_variables. - // There is a wrong decisionmaking in this design and the migration path to 5.7 will be difficult. - // I don't want orchestrator to put even more burden on this. The 'Uptime' variable is not that important - // so as to completely fail reading a 5.7 instance. - // This is supposed to be fixed in 5.7.9 - } + if err != nil { + logReadTopologyInstanceError(instanceKey, "show global status like 'Uptime'", err) + + // We do not "goto Cleanup" here, although it should be the correct flow. + // Reason is 5.7's new security feature that requires GRANTs on performance_schema.global_variables. + // There is a wrong decisionmaking in this design and the migration path to 5.7 will be difficult. + // I don't want orchestrator to put even more burden on this. The 'Uptime' variable is not that important + // so as to completely fail reading a 5.7 instance. + // This is supposed to be fixed in 5.7.9 + } + }() } if resolvedHostname != instance.Key.Hostname { latency.Start("backend") @@ -464,17 +474,18 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, } if instance.LogBinEnabled { - latency.Start("instance") - err = sqlutils.QueryRowsMap(db, "show master status", func(m sqlutils.RowMap) error { - var err error - instance.SelfBinlogCoordinates.LogFile = m.GetString("File") - instance.SelfBinlogCoordinates.LogPos = m.GetInt64("Position") - return err - }) - latency.Stop("instance") - if err != nil { - goto Cleanup - } + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + err = sqlutils.QueryRowsMap(db, "show master status", func(m sqlutils.RowMap) error { + var err error + instance.SelfBinlogCoordinates.LogFile = m.GetString("File") + instance.SelfBinlogCoordinates.LogPos = m.GetInt64("Position") + return err + }) + latency.Stop("instance") + }() } instanceFound = true @@ -546,61 +557,85 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, instance.UsingPseudoGTID = false if config.Config.DetectPseudoGTIDQuery != "" && !isMaxScale { - latency.Start("instance") - if resultData, err := sqlutils.QueryResultData(db, config.Config.DetectPseudoGTIDQuery); err == nil { - if len(resultData) > 0 { - if len(resultData[0]) > 0 { - if resultData[0][0].Valid && resultData[0][0].String == "1" { - instance.UsingPseudoGTID = true + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + if resultData, err := sqlutils.QueryResultData(db, config.Config.DetectPseudoGTIDQuery); err == nil { + if len(resultData) > 0 { + if len(resultData[0]) > 0 { + if resultData[0][0].Valid && resultData[0][0].String == "1" { + instance.UsingPseudoGTID = true + } } } + } else { + logReadTopologyInstanceError(instanceKey, "DetectPseudoGTIDQuery", err) } - } else { - logReadTopologyInstanceError(instanceKey, "DetectPseudoGTIDQuery", err) - } - latency.Stop("instance") + latency.Stop("instance") + }() } if config.Config.SlaveLagQuery != "" && !isMaxScale { - latency.Start("instance") - if err := db.QueryRow(config.Config.SlaveLagQuery).Scan(&instance.SlaveLagSeconds); err == nil { - if instance.SlaveLagSeconds.Valid && instance.SlaveLagSeconds.Int64 < 0 { - log.Warningf("Host: %+v, instance.SlaveLagSeconds < 0 [%+v], correcting to 0", instanceKey, instance.SlaveLagSeconds.Int64) - instance.SlaveLagSeconds.Int64 = 0 + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + if err := db.QueryRow(config.Config.SlaveLagQuery).Scan(&instance.SlaveLagSeconds); err == nil { + if instance.SlaveLagSeconds.Valid && instance.SlaveLagSeconds.Int64 < 0 { + log.Warningf("Host: %+v, instance.SlaveLagSeconds < 0 [%+v], correcting to 0", instanceKey, instance.SlaveLagSeconds.Int64) + instance.SlaveLagSeconds.Int64 = 0 + } + } else { + instance.SlaveLagSeconds = instance.SecondsBehindMaster + logReadTopologyInstanceError(instanceKey, "SlaveLagQuery", err) } - } else { - instance.SlaveLagSeconds = instance.SecondsBehindMaster - logReadTopologyInstanceError(instanceKey, "SlaveLagQuery", err) - } - latency.Stop("instance") + latency.Stop("instance") + }() } if config.Config.DetectDataCenterQuery != "" && !isMaxScale { - latency.Start("instance") - err := db.QueryRow(config.Config.DetectDataCenterQuery).Scan(&instance.DataCenter) - latency.Stop("instance") - logReadTopologyInstanceError(instanceKey, "DetectDataCenterQuery", err) + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + err := db.QueryRow(config.Config.DetectDataCenterQuery).Scan(&instance.DataCenter) + latency.Stop("instance") + logReadTopologyInstanceError(instanceKey, "DetectDataCenterQuery", err) + }() } if config.Config.DetectPhysicalEnvironmentQuery != "" && !isMaxScale { - latency.Start("instance") - err := db.QueryRow(config.Config.DetectPhysicalEnvironmentQuery).Scan(&instance.PhysicalEnvironment) - latency.Stop("instance") - logReadTopologyInstanceError(instanceKey, "DetectPhysicalEnvironmentQuery", err) + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + err := db.QueryRow(config.Config.DetectPhysicalEnvironmentQuery).Scan(&instance.PhysicalEnvironment) + latency.Stop("instance") + logReadTopologyInstanceError(instanceKey, "DetectPhysicalEnvironmentQuery", err) + }() } if config.Config.DetectInstanceAliasQuery != "" && !isMaxScale { - latency.Start("instance") - err := db.QueryRow(config.Config.DetectInstanceAliasQuery).Scan(&instance.InstanceAlias) - latency.Stop("instance") - logReadTopologyInstanceError(instanceKey, "DetectInstanceAliasQuery", err) + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + err := db.QueryRow(config.Config.DetectInstanceAliasQuery).Scan(&instance.InstanceAlias) + latency.Stop("instance") + logReadTopologyInstanceError(instanceKey, "DetectInstanceAliasQuery", err) + }() } if config.Config.DetectSemiSyncEnforcedQuery != "" && !isMaxScale { - latency.Start("instance") - err := db.QueryRow(config.Config.DetectSemiSyncEnforcedQuery).Scan(&instance.SemiSyncEnforced) - latency.Stop("instance") - logReadTopologyInstanceError(instanceKey, "DetectSemiSyncEnforcedQuery", err) + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + err := db.QueryRow(config.Config.DetectSemiSyncEnforcedQuery).Scan(&instance.SemiSyncEnforced) + latency.Stop("instance") + logReadTopologyInstanceError(instanceKey, "DetectSemiSyncEnforcedQuery", err) + }() } { @@ -669,6 +704,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, } Cleanup: + waitGroup.Wait() readTopologyInstanceCounter.Inc(1) // logReadTopologyInstanceError(instanceKey, "ReadTopologyInstanceBufferable", err) // don't write here and a few lines later. if instanceFound { From 93ee4826f8cb7dd0b6db0488bddf9d8375b083af Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sun, 21 May 2017 09:51:15 +0300 Subject: [PATCH 2/6] ensuring correctness of ReplicationLagQuery/SlaveLagQuery --- go/config/config.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/config/config.go b/go/config/config.go index eb5fe5787..ffc120285 100644 --- a/go/config/config.go +++ b/go/config/config.go @@ -435,8 +435,11 @@ func (this *Configuration) postReadAdjustments() error { if this.ReplicationLagQuery != "" && this.SlaveLagQuery != "" && this.ReplicationLagQuery != this.SlaveLagQuery { return fmt.Errorf("config's ReplicationLagQuery and SlaveLagQuery are synonyms and cannot both be defined") } + // Make sure both are turned identical: if this.SlaveLagQuery != "" { this.ReplicationLagQuery = this.SlaveLagQuery + } else { + this.SlaveLagQuery = this.ReplicationLagQuery } } From 58b8cc873ff065cd3f182cf3fdbd174a87aa9c6d Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sun, 21 May 2017 10:04:57 +0300 Subject: [PATCH 3/6] beautify --- go/inst/instance_dao.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/go/inst/instance_dao.go b/go/inst/instance_dao.go index a122ac1e2..0aa5059f9 100644 --- a/go/inst/instance_dao.go +++ b/go/inst/instance_dao.go @@ -534,13 +534,13 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, // Discover by information_schema.processlist latency.Start("instance") err := sqlutils.QueryRowsMap(db, ` - select - substring_index(host, ':', 1) as slave_hostname - from - information_schema.processlist - where - command IN ('Binlog Dump', 'Binlog Dump GTID') - `, + select + substring_index(host, ':', 1) as slave_hostname + from + information_schema.processlist + where + command IN ('Binlog Dump', 'Binlog Dump GTID') + `, func(m sqlutils.RowMap) error { cname, resolveErr := ResolveHostname(m.GetString("slave_hostname")) if resolveErr != nil { From 2fba7669298677c234e5501a221261fa68d56d64 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sun, 21 May 2017 18:31:04 +0300 Subject: [PATCH 4/6] even more concurrency --- go/inst/instance_dao.go | 64 +++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 28 deletions(-) diff --git a/go/inst/instance_dao.go b/go/inst/instance_dao.go index 0aa5059f9..8296d23f7 100644 --- a/go/inst/instance_dao.go +++ b/go/inst/instance_dao.go @@ -532,8 +532,11 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, if !foundByShowSlaveHosts && !isMaxScale { // Either not configured to read SHOW SLAVE HOSTS or nothing was there. // Discover by information_schema.processlist - latency.Start("instance") - err := sqlutils.QueryRowsMap(db, ` + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + err := sqlutils.QueryRowsMap(db, ` select substring_index(host, ':', 1) as slave_hostname from @@ -541,18 +544,19 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, where command IN ('Binlog Dump', 'Binlog Dump GTID') `, - func(m sqlutils.RowMap) error { - cname, resolveErr := ResolveHostname(m.GetString("slave_hostname")) - if resolveErr != nil { - logReadTopologyInstanceError(instanceKey, "ResolveHostname: processlist", resolveErr) - } - replicaKey := InstanceKey{Hostname: cname, Port: instance.Key.Port} - instance.AddReplicaKey(&replicaKey) - return err - }) - latency.Stop("instance") + func(m sqlutils.RowMap) error { + cname, resolveErr := ResolveHostname(m.GetString("slave_hostname")) + if resolveErr != nil { + logReadTopologyInstanceError(instanceKey, "ResolveHostname: processlist", resolveErr) + } + replicaKey := InstanceKey{Hostname: cname, Port: instance.Key.Port} + instance.AddReplicaKey(&replicaKey) + return err + }) + latency.Stop("instance") - logReadTopologyInstanceError(instanceKey, "processlist", err) + logReadTopologyInstanceError(instanceKey, "processlist", err) + }() } instance.UsingPseudoGTID = false @@ -656,21 +660,25 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, // We'll set it here on their behalf so there's no race between the first // time an instance is discovered, and setting a rule like "must_not". if config.Config.DetectPromotionRuleQuery != "" && !isMaxScale { - var value string - latency.Start("instance") - err := db.QueryRow(config.Config.DetectPromotionRuleQuery).Scan(&value) - logReadTopologyInstanceError(instanceKey, "DetectPromotionRuleQuery", err) - promotionRule, err := ParseCandidatePromotionRule(value) - logReadTopologyInstanceError(instanceKey, "ParseCandidatePromotionRule", err) - if err == nil { - // We need to update candidate_database_instance. - // We register the rule even if it hasn't changed, - // to bump the last_suggested time. - instance.PromotionRule = promotionRule - err = RegisterCandidateInstance(instanceKey, promotionRule) - logReadTopologyInstanceError(instanceKey, "RegisterCandidateInstance", err) - } - latency.Stop("instance") + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + var value string + latency.Start("instance") + err := db.QueryRow(config.Config.DetectPromotionRuleQuery).Scan(&value) + logReadTopologyInstanceError(instanceKey, "DetectPromotionRuleQuery", err) + promotionRule, err := ParseCandidatePromotionRule(value) + logReadTopologyInstanceError(instanceKey, "ParseCandidatePromotionRule", err) + if err == nil { + // We need to update candidate_database_instance. + // We register the rule even if it hasn't changed, + // to bump the last_suggested time. + instance.PromotionRule = promotionRule + err = RegisterCandidateInstance(instanceKey, promotionRule) + logReadTopologyInstanceError(instanceKey, "RegisterCandidateInstance", err) + } + latency.Stop("instance") + }() } ReadClusterAliasOverride(instance) From 1cb9736d1b80413a97323f7a4a8d9a02d2d33fcc Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 24 May 2017 14:11:06 +0300 Subject: [PATCH 5/6] rearranged concurrent functions to run first --- go/inst/instance_dao.go | 155 ++++++++++++++++++++-------------------- 1 file changed, 79 insertions(+), 76 deletions(-) diff --git a/go/inst/instance_dao.go b/go/inst/instance_dao.go index 8296d23f7..919650d4a 100644 --- a/go/inst/instance_dao.go +++ b/go/inst/instance_dao.go @@ -318,6 +318,85 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, latency.Stop("instance") } else { // NOT MaxScale + + // We begin with a few operations we can run concurrently, and which do not depend on anything + { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + var dummy string + // show global status works just as well with 5.6 & 5.7 (5.7 moves variables to performance_schema) + latency.Start("instance") + err = db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &instance.Uptime) + latency.Stop("instance") + + if err != nil { + logReadTopologyInstanceError(instanceKey, "show global status like 'Uptime'", err) + + // We do not "goto Cleanup" here, although it should be the correct flow. + // Reason is 5.7's new security feature that requires GRANTs on performance_schema.global_variables. + // There is a wrong decisionmaking in this design and the migration path to 5.7 will be difficult. + // I don't want orchestrator to put even more burden on this. The 'Uptime' variable is not that important + // so as to completely fail reading a 5.7 instance. + // This is supposed to be fixed in 5.7.9 + } + }() + } + + if instance.LogBinEnabled { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + err = sqlutils.QueryRowsMap(db, "show master status", func(m sqlutils.RowMap) error { + var err error + instance.SelfBinlogCoordinates.LogFile = m.GetString("File") + instance.SelfBinlogCoordinates.LogPos = m.GetInt64("Position") + return err + }) + latency.Stop("instance") + }() + } + + instance.UsingPseudoGTID = false + if config.Config.DetectPseudoGTIDQuery != "" { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + if resultData, err := sqlutils.QueryResultData(db, config.Config.DetectPseudoGTIDQuery); err == nil { + if len(resultData) > 0 { + if len(resultData[0]) > 0 { + if resultData[0][0].Valid && resultData[0][0].String == "1" { + instance.UsingPseudoGTID = true + } + } + } + } else { + logReadTopologyInstanceError(instanceKey, "DetectPseudoGTIDQuery", err) + } + latency.Stop("instance") + }() + } + + if config.Config.SlaveLagQuery != "" { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + if err := db.QueryRow(config.Config.SlaveLagQuery).Scan(&instance.SlaveLagSeconds); err == nil { + if instance.SlaveLagSeconds.Valid && instance.SlaveLagSeconds.Int64 < 0 { + log.Warningf("Host: %+v, instance.SlaveLagSeconds < 0 [%+v], correcting to 0", instanceKey, instance.SlaveLagSeconds.Int64) + instance.SlaveLagSeconds.Int64 = 0 + } + } else { + instance.SlaveLagSeconds = instance.SecondsBehindMaster + logReadTopologyInstanceError(instanceKey, "SlaveLagQuery", err) + } + latency.Stop("instance") + }() + } + var mysqlHostname, mysqlReportHost string latency.Start("instance") err = db.QueryRow("select @@global.hostname, ifnull(@@global.report_host, ''), @@global.server_id, @@global.version, @@global.version_comment, @@global.read_only, @@global.binlog_format, @@global.log_bin, @@global.log_slave_updates").Scan( @@ -359,28 +438,6 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, }() } } - { - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - var dummy string - // show global status works just as well with 5.6 & 5.7 (5.7 moves variables to performance_schema) - latency.Start("instance") - err = db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &instance.Uptime) - latency.Stop("instance") - - if err != nil { - logReadTopologyInstanceError(instanceKey, "show global status like 'Uptime'", err) - - // We do not "goto Cleanup" here, although it should be the correct flow. - // Reason is 5.7's new security feature that requires GRANTs on performance_schema.global_variables. - // There is a wrong decisionmaking in this design and the migration path to 5.7 will be difficult. - // I don't want orchestrator to put even more burden on this. The 'Uptime' variable is not that important - // so as to completely fail reading a 5.7 instance. - // This is supposed to be fixed in 5.7.9 - } - }() - } if resolvedHostname != instance.Key.Hostname { latency.Start("backend") UpdateResolvedHostname(instance.Key.Hostname, resolvedHostname) @@ -473,21 +530,6 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, goto Cleanup } - if instance.LogBinEnabled { - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - latency.Start("instance") - err = sqlutils.QueryRowsMap(db, "show master status", func(m sqlutils.RowMap) error { - var err error - instance.SelfBinlogCoordinates.LogFile = m.GetString("File") - instance.SelfBinlogCoordinates.LogPos = m.GetInt64("Position") - return err - }) - latency.Stop("instance") - }() - } - instanceFound = true // ------------------------------------------------------------------------- @@ -559,45 +601,6 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, }() } - instance.UsingPseudoGTID = false - if config.Config.DetectPseudoGTIDQuery != "" && !isMaxScale { - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - latency.Start("instance") - if resultData, err := sqlutils.QueryResultData(db, config.Config.DetectPseudoGTIDQuery); err == nil { - if len(resultData) > 0 { - if len(resultData[0]) > 0 { - if resultData[0][0].Valid && resultData[0][0].String == "1" { - instance.UsingPseudoGTID = true - } - } - } - } else { - logReadTopologyInstanceError(instanceKey, "DetectPseudoGTIDQuery", err) - } - latency.Stop("instance") - }() - } - - if config.Config.SlaveLagQuery != "" && !isMaxScale { - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - latency.Start("instance") - if err := db.QueryRow(config.Config.SlaveLagQuery).Scan(&instance.SlaveLagSeconds); err == nil { - if instance.SlaveLagSeconds.Valid && instance.SlaveLagSeconds.Int64 < 0 { - log.Warningf("Host: %+v, instance.SlaveLagSeconds < 0 [%+v], correcting to 0", instanceKey, instance.SlaveLagSeconds.Int64) - instance.SlaveLagSeconds.Int64 = 0 - } - } else { - instance.SlaveLagSeconds = instance.SecondsBehindMaster - logReadTopologyInstanceError(instanceKey, "SlaveLagQuery", err) - } - latency.Stop("instance") - }() - } - if config.Config.DetectDataCenterQuery != "" && !isMaxScale { waitGroup.Add(1) go func() { From cfde9e11b7896836e649795c8cca6d18f93e3a55 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Mon, 29 May 2017 12:05:45 +0300 Subject: [PATCH 6/6] show master status -- moved to safe place --- go/inst/instance_dao.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/go/inst/instance_dao.go b/go/inst/instance_dao.go index 919650d4a..34d909f2d 100644 --- a/go/inst/instance_dao.go +++ b/go/inst/instance_dao.go @@ -343,21 +343,6 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, }() } - if instance.LogBinEnabled { - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - latency.Start("instance") - err = sqlutils.QueryRowsMap(db, "show master status", func(m sqlutils.RowMap) error { - var err error - instance.SelfBinlogCoordinates.LogFile = m.GetString("File") - instance.SelfBinlogCoordinates.LogPos = m.GetInt64("Position") - return err - }) - latency.Stop("instance") - }() - } - instance.UsingPseudoGTID = false if config.Config.DetectPseudoGTIDQuery != "" { waitGroup.Add(1) @@ -420,6 +405,21 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, resolvedHostname = instance.Key.Hostname } + if instance.LogBinEnabled { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + err = sqlutils.QueryRowsMap(db, "show master status", func(m sqlutils.RowMap) error { + var err error + instance.SelfBinlogCoordinates.LogFile = m.GetString("File") + instance.SelfBinlogCoordinates.LogPos = m.GetInt64("Position") + return err + }) + latency.Stop("instance") + }() + } + if (instance.IsOracleMySQL() || instance.IsPercona()) && !instance.IsSmallerMajorVersionByString("5.6") { waitGroup.Add(1) go func() {