diff --git a/go/config/config.go b/go/config/config.go index eb5fe5787..ffc120285 100644 --- a/go/config/config.go +++ b/go/config/config.go @@ -435,8 +435,11 @@ func (this *Configuration) postReadAdjustments() error { if this.ReplicationLagQuery != "" && this.SlaveLagQuery != "" && this.ReplicationLagQuery != this.SlaveLagQuery { return fmt.Errorf("config's ReplicationLagQuery and SlaveLagQuery are synonyms and cannot both be defined") } + // Make sure both are turned identical: if this.SlaveLagQuery != "" { this.ReplicationLagQuery = this.SlaveLagQuery + } else { + this.SlaveLagQuery = this.ReplicationLagQuery } } diff --git a/go/inst/instance_dao.go b/go/inst/instance_dao.go index 47bf0be6e..34d909f2d 100644 --- a/go/inst/instance_dao.go +++ b/go/inst/instance_dao.go @@ -26,6 +26,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/openark/golib/log" @@ -248,6 +249,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, } }() + var waitGroup sync.WaitGroup readingStartTime := time.Now() instance := NewInstance() instanceFound := false @@ -316,6 +318,70 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, latency.Stop("instance") } else { // NOT MaxScale + + // We begin with a few operations we can run concurrently, and which do not depend on anything + { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + var dummy string + // show global status works just as well with 5.6 & 5.7 (5.7 moves variables to performance_schema) + latency.Start("instance") + err = db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &instance.Uptime) + latency.Stop("instance") + + if err != nil { + logReadTopologyInstanceError(instanceKey, "show global status like 'Uptime'", err) + + // We do not "goto Cleanup" here, although it should be the correct flow. + // Reason is 5.7's new security feature that requires GRANTs on performance_schema.global_variables. + // There is a wrong decisionmaking in this design and the migration path to 5.7 will be difficult. + // I don't want orchestrator to put even more burden on this. The 'Uptime' variable is not that important + // so as to completely fail reading a 5.7 instance. + // This is supposed to be fixed in 5.7.9 + } + }() + } + + instance.UsingPseudoGTID = false + if config.Config.DetectPseudoGTIDQuery != "" { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + if resultData, err := sqlutils.QueryResultData(db, config.Config.DetectPseudoGTIDQuery); err == nil { + if len(resultData) > 0 { + if len(resultData[0]) > 0 { + if resultData[0][0].Valid && resultData[0][0].String == "1" { + instance.UsingPseudoGTID = true + } + } + } + } else { + logReadTopologyInstanceError(instanceKey, "DetectPseudoGTIDQuery", err) + } + latency.Stop("instance") + }() + } + + if config.Config.SlaveLagQuery != "" { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + if err := db.QueryRow(config.Config.SlaveLagQuery).Scan(&instance.SlaveLagSeconds); err == nil { + if instance.SlaveLagSeconds.Valid && instance.SlaveLagSeconds.Int64 < 0 { + log.Warningf("Host: %+v, instance.SlaveLagSeconds < 0 [%+v], correcting to 0", instanceKey, instance.SlaveLagSeconds.Int64) + instance.SlaveLagSeconds.Int64 = 0 + } + } else { + instance.SlaveLagSeconds = instance.SecondsBehindMaster + logReadTopologyInstanceError(instanceKey, "SlaveLagQuery", err) + } + latency.Stop("instance") + }() + } + var mysqlHostname, mysqlReportHost string latency.Start("instance") err = db.QueryRow("select @@global.hostname, ifnull(@@global.report_host, ''), @@global.server_id, @@global.version, @@global.version_comment, @@global.read_only, @@global.binlog_format, @@global.log_bin, @@global.log_slave_updates").Scan( @@ -339,36 +405,37 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, resolvedHostname = instance.Key.Hostname } - if (instance.IsOracleMySQL() || instance.IsPercona()) && !instance.IsSmallerMajorVersionByString("5.6") { - var masterInfoRepositoryOnTable bool - // Stuff only supported on Oracle MySQL >= 5.6 - // ... - // @@gtid_mode only available in Orcale MySQL >= 5.6 - // Previous version just issued this query brute-force, but I don't like errors being issued where they shouldn't. - latency.Start("instance") - _ = db.QueryRow("select @@global.gtid_mode = 'ON', @@global.server_uuid, @@global.gtid_purged, @@global.master_info_repository = 'TABLE', @@global.binlog_row_image").Scan(&instance.SupportsOracleGTID, &instance.ServerUUID, &instance.GtidPurged, &masterInfoRepositoryOnTable, &instance.BinlogRowImage) - if masterInfoRepositoryOnTable { - _ = db.QueryRow("select count(*) > 0 and MAX(User_name) != '' from mysql.slave_master_info").Scan(&instance.ReplicationCredentialsAvailable) - } - latency.Stop("instance") + if instance.LogBinEnabled { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + err = sqlutils.QueryRowsMap(db, "show master status", func(m sqlutils.RowMap) error { + var err error + instance.SelfBinlogCoordinates.LogFile = m.GetString("File") + instance.SelfBinlogCoordinates.LogPos = m.GetInt64("Position") + return err + }) + latency.Stop("instance") + }() } - } - { - var dummy string - // show global status works just as well with 5.6 & 5.7 (5.7 moves variables to performance_schema) - latency.Start("instance") - err = db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &instance.Uptime) - latency.Stop("instance") - - if err != nil { - logReadTopologyInstanceError(instanceKey, "show global status like 'Uptime'", err) - // We do not "goto Cleanup" here, although it should be the correct flow. - // Reason is 5.7's new security feature that requires GRANTs on performance_schema.global_variables. - // There is a wrong decisionmaking in this design and the migration path to 5.7 will be difficult. - // I don't want orchestrator to put even more burden on this. The 'Uptime' variable is not that important - // so as to completely fail reading a 5.7 instance. - // This is supposed to be fixed in 5.7.9 + if (instance.IsOracleMySQL() || instance.IsPercona()) && !instance.IsSmallerMajorVersionByString("5.6") { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + var masterInfoRepositoryOnTable bool + // Stuff only supported on Oracle MySQL >= 5.6 + // ... + // @@gtid_mode only available in Orcale MySQL >= 5.6 + // Previous version just issued this query brute-force, but I don't like errors being issued where they shouldn't. + latency.Start("instance") + _ = db.QueryRow("select @@global.gtid_mode = 'ON', @@global.server_uuid, @@global.gtid_purged, @@global.master_info_repository = 'TABLE', @@global.binlog_row_image").Scan(&instance.SupportsOracleGTID, &instance.ServerUUID, &instance.GtidPurged, &masterInfoRepositoryOnTable, &instance.BinlogRowImage) + if masterInfoRepositoryOnTable { + _ = db.QueryRow("select count(*) > 0 and MAX(User_name) != '' from mysql.slave_master_info").Scan(&instance.ReplicationCredentialsAvailable) + } + latency.Stop("instance") + }() } } if resolvedHostname != instance.Key.Hostname { @@ -463,20 +530,6 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, goto Cleanup } - if instance.LogBinEnabled { - latency.Start("instance") - err = sqlutils.QueryRowsMap(db, "show master status", func(m sqlutils.RowMap) error { - var err error - instance.SelfBinlogCoordinates.LogFile = m.GetString("File") - instance.SelfBinlogCoordinates.LogPos = m.GetInt64("Position") - return err - }) - latency.Stop("instance") - if err != nil { - goto Cleanup - } - } - instanceFound = true // ------------------------------------------------------------------------- @@ -521,86 +574,75 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, if !foundByShowSlaveHosts && !isMaxScale { // Either not configured to read SHOW SLAVE HOSTS or nothing was there. // Discover by information_schema.processlist - latency.Start("instance") - err := sqlutils.QueryRowsMap(db, ` - select - substring_index(host, ':', 1) as slave_hostname - from - information_schema.processlist - where - command IN ('Binlog Dump', 'Binlog Dump GTID') - `, - func(m sqlutils.RowMap) error { - cname, resolveErr := ResolveHostname(m.GetString("slave_hostname")) - if resolveErr != nil { - logReadTopologyInstanceError(instanceKey, "ResolveHostname: processlist", resolveErr) - } - replicaKey := InstanceKey{Hostname: cname, Port: instance.Key.Port} - instance.AddReplicaKey(&replicaKey) - return err - }) - latency.Stop("instance") - - logReadTopologyInstanceError(instanceKey, "processlist", err) - } - - instance.UsingPseudoGTID = false - if config.Config.DetectPseudoGTIDQuery != "" && !isMaxScale { - latency.Start("instance") - if resultData, err := sqlutils.QueryResultData(db, config.Config.DetectPseudoGTIDQuery); err == nil { - if len(resultData) > 0 { - if len(resultData[0]) > 0 { - if resultData[0][0].Valid && resultData[0][0].String == "1" { - instance.UsingPseudoGTID = true + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + err := sqlutils.QueryRowsMap(db, ` + select + substring_index(host, ':', 1) as slave_hostname + from + information_schema.processlist + where + command IN ('Binlog Dump', 'Binlog Dump GTID') + `, + func(m sqlutils.RowMap) error { + cname, resolveErr := ResolveHostname(m.GetString("slave_hostname")) + if resolveErr != nil { + logReadTopologyInstanceError(instanceKey, "ResolveHostname: processlist", resolveErr) } - } - } - } else { - logReadTopologyInstanceError(instanceKey, "DetectPseudoGTIDQuery", err) - } - latency.Stop("instance") - } + replicaKey := InstanceKey{Hostname: cname, Port: instance.Key.Port} + instance.AddReplicaKey(&replicaKey) + return err + }) + latency.Stop("instance") - if config.Config.SlaveLagQuery != "" && !isMaxScale { - latency.Start("instance") - if err := db.QueryRow(config.Config.SlaveLagQuery).Scan(&instance.SlaveLagSeconds); err == nil { - if instance.SlaveLagSeconds.Valid && instance.SlaveLagSeconds.Int64 < 0 { - log.Warningf("Host: %+v, instance.SlaveLagSeconds < 0 [%+v], correcting to 0", instanceKey, instance.SlaveLagSeconds.Int64) - instance.SlaveLagSeconds.Int64 = 0 - } - } else { - instance.SlaveLagSeconds = instance.SecondsBehindMaster - logReadTopologyInstanceError(instanceKey, "SlaveLagQuery", err) - } - latency.Stop("instance") + logReadTopologyInstanceError(instanceKey, "processlist", err) + }() } if config.Config.DetectDataCenterQuery != "" && !isMaxScale { - latency.Start("instance") - err := db.QueryRow(config.Config.DetectDataCenterQuery).Scan(&instance.DataCenter) - latency.Stop("instance") - logReadTopologyInstanceError(instanceKey, "DetectDataCenterQuery", err) + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + err := db.QueryRow(config.Config.DetectDataCenterQuery).Scan(&instance.DataCenter) + latency.Stop("instance") + logReadTopologyInstanceError(instanceKey, "DetectDataCenterQuery", err) + }() } if config.Config.DetectPhysicalEnvironmentQuery != "" && !isMaxScale { - latency.Start("instance") - err := db.QueryRow(config.Config.DetectPhysicalEnvironmentQuery).Scan(&instance.PhysicalEnvironment) - latency.Stop("instance") - logReadTopologyInstanceError(instanceKey, "DetectPhysicalEnvironmentQuery", err) + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + err := db.QueryRow(config.Config.DetectPhysicalEnvironmentQuery).Scan(&instance.PhysicalEnvironment) + latency.Stop("instance") + logReadTopologyInstanceError(instanceKey, "DetectPhysicalEnvironmentQuery", err) + }() } if config.Config.DetectInstanceAliasQuery != "" && !isMaxScale { - latency.Start("instance") - err := db.QueryRow(config.Config.DetectInstanceAliasQuery).Scan(&instance.InstanceAlias) - latency.Stop("instance") - logReadTopologyInstanceError(instanceKey, "DetectInstanceAliasQuery", err) + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + err := db.QueryRow(config.Config.DetectInstanceAliasQuery).Scan(&instance.InstanceAlias) + latency.Stop("instance") + logReadTopologyInstanceError(instanceKey, "DetectInstanceAliasQuery", err) + }() } if config.Config.DetectSemiSyncEnforcedQuery != "" && !isMaxScale { - latency.Start("instance") - err := db.QueryRow(config.Config.DetectSemiSyncEnforcedQuery).Scan(&instance.SemiSyncEnforced) - latency.Stop("instance") - logReadTopologyInstanceError(instanceKey, "DetectSemiSyncEnforcedQuery", err) + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + latency.Start("instance") + err := db.QueryRow(config.Config.DetectSemiSyncEnforcedQuery).Scan(&instance.SemiSyncEnforced) + latency.Stop("instance") + logReadTopologyInstanceError(instanceKey, "DetectSemiSyncEnforcedQuery", err) + }() } { @@ -621,21 +663,25 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, // We'll set it here on their behalf so there's no race between the first // time an instance is discovered, and setting a rule like "must_not". if config.Config.DetectPromotionRuleQuery != "" && !isMaxScale { - var value string - latency.Start("instance") - err := db.QueryRow(config.Config.DetectPromotionRuleQuery).Scan(&value) - logReadTopologyInstanceError(instanceKey, "DetectPromotionRuleQuery", err) - promotionRule, err := ParseCandidatePromotionRule(value) - logReadTopologyInstanceError(instanceKey, "ParseCandidatePromotionRule", err) - if err == nil { - // We need to update candidate_database_instance. - // We register the rule even if it hasn't changed, - // to bump the last_suggested time. - instance.PromotionRule = promotionRule - err = RegisterCandidateInstance(instanceKey, promotionRule) - logReadTopologyInstanceError(instanceKey, "RegisterCandidateInstance", err) - } - latency.Stop("instance") + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + var value string + latency.Start("instance") + err := db.QueryRow(config.Config.DetectPromotionRuleQuery).Scan(&value) + logReadTopologyInstanceError(instanceKey, "DetectPromotionRuleQuery", err) + promotionRule, err := ParseCandidatePromotionRule(value) + logReadTopologyInstanceError(instanceKey, "ParseCandidatePromotionRule", err) + if err == nil { + // We need to update candidate_database_instance. + // We register the rule even if it hasn't changed, + // to bump the last_suggested time. + instance.PromotionRule = promotionRule + err = RegisterCandidateInstance(instanceKey, promotionRule) + logReadTopologyInstanceError(instanceKey, "RegisterCandidateInstance", err) + } + latency.Stop("instance") + }() } ReadClusterAliasOverride(instance) @@ -669,6 +715,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, } Cleanup: + waitGroup.Wait() readTopologyInstanceCounter.Inc(1) // logReadTopologyInstanceError(instanceKey, "ReadTopologyInstanceBufferable", err) // don't write here and a few lines later. if instanceFound {