Skip to content
This repository was archived by the owner on Feb 18, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,11 @@ func (this *Configuration) postReadAdjustments() error {
if this.ReplicationLagQuery != "" && this.SlaveLagQuery != "" && this.ReplicationLagQuery != this.SlaveLagQuery {
return fmt.Errorf("config's ReplicationLagQuery and SlaveLagQuery are synonyms and cannot both be defined")
}
// Make sure both are turned identical:
if this.SlaveLagQuery != "" {
this.ReplicationLagQuery = this.SlaveLagQuery
} else {
this.SlaveLagQuery = this.ReplicationLagQuery
}
}

Expand Down
293 changes: 170 additions & 123 deletions go/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/openark/golib/log"
Expand Down Expand Up @@ -248,6 +249,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
}
}()

var waitGroup sync.WaitGroup
readingStartTime := time.Now()
instance := NewInstance()
instanceFound := false
Expand Down Expand Up @@ -316,6 +318,70 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
latency.Stop("instance")
} else {
// NOT MaxScale

// We begin with a few operations we can run concurrently, and which do not depend on anything
{
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
var dummy string
// show global status works just as well with 5.6 & 5.7 (5.7 moves variables to performance_schema)
latency.Start("instance")
err = db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &instance.Uptime)
latency.Stop("instance")

if err != nil {
logReadTopologyInstanceError(instanceKey, "show global status like 'Uptime'", err)

// We do not "goto Cleanup" here, although it should be the correct flow.
// Reason is 5.7's new security feature that requires GRANTs on performance_schema.global_variables.
// There is a wrong decisionmaking in this design and the migration path to 5.7 will be difficult.
// I don't want orchestrator to put even more burden on this. The 'Uptime' variable is not that important
// so as to completely fail reading a 5.7 instance.
// This is supposed to be fixed in 5.7.9
}
}()
}

instance.UsingPseudoGTID = false
if config.Config.DetectPseudoGTIDQuery != "" {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
latency.Start("instance")
if resultData, err := sqlutils.QueryResultData(db, config.Config.DetectPseudoGTIDQuery); err == nil {
if len(resultData) > 0 {
if len(resultData[0]) > 0 {
if resultData[0][0].Valid && resultData[0][0].String == "1" {
instance.UsingPseudoGTID = true
}
}
}
} else {
logReadTopologyInstanceError(instanceKey, "DetectPseudoGTIDQuery", err)
}
latency.Stop("instance")
}()
}

if config.Config.SlaveLagQuery != "" {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
latency.Start("instance")
if err := db.QueryRow(config.Config.SlaveLagQuery).Scan(&instance.SlaveLagSeconds); err == nil {
if instance.SlaveLagSeconds.Valid && instance.SlaveLagSeconds.Int64 < 0 {
log.Warningf("Host: %+v, instance.SlaveLagSeconds < 0 [%+v], correcting to 0", instanceKey, instance.SlaveLagSeconds.Int64)
instance.SlaveLagSeconds.Int64 = 0
}
} else {
instance.SlaveLagSeconds = instance.SecondsBehindMaster
logReadTopologyInstanceError(instanceKey, "SlaveLagQuery", err)
}
latency.Stop("instance")
}()
}

var mysqlHostname, mysqlReportHost string
latency.Start("instance")
err = db.QueryRow("select @@global.hostname, ifnull(@@global.report_host, ''), @@global.server_id, @@global.version, @@global.version_comment, @@global.read_only, @@global.binlog_format, @@global.log_bin, @@global.log_slave_updates").Scan(
Expand All @@ -339,36 +405,37 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
resolvedHostname = instance.Key.Hostname
}

if (instance.IsOracleMySQL() || instance.IsPercona()) && !instance.IsSmallerMajorVersionByString("5.6") {
var masterInfoRepositoryOnTable bool
// Stuff only supported on Oracle MySQL >= 5.6
// ...
// @@gtid_mode only available in Orcale MySQL >= 5.6
// Previous version just issued this query brute-force, but I don't like errors being issued where they shouldn't.
latency.Start("instance")
_ = db.QueryRow("select @@global.gtid_mode = 'ON', @@global.server_uuid, @@global.gtid_purged, @@global.master_info_repository = 'TABLE', @@global.binlog_row_image").Scan(&instance.SupportsOracleGTID, &instance.ServerUUID, &instance.GtidPurged, &masterInfoRepositoryOnTable, &instance.BinlogRowImage)
if masterInfoRepositoryOnTable {
_ = db.QueryRow("select count(*) > 0 and MAX(User_name) != '' from mysql.slave_master_info").Scan(&instance.ReplicationCredentialsAvailable)
}
latency.Stop("instance")
if instance.LogBinEnabled {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
latency.Start("instance")
err = sqlutils.QueryRowsMap(db, "show master status", func(m sqlutils.RowMap) error {
var err error
instance.SelfBinlogCoordinates.LogFile = m.GetString("File")
instance.SelfBinlogCoordinates.LogPos = m.GetInt64("Position")
return err
})
latency.Stop("instance")
}()
}
}
{
var dummy string
// show global status works just as well with 5.6 & 5.7 (5.7 moves variables to performance_schema)
latency.Start("instance")
err = db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &instance.Uptime)
latency.Stop("instance")

if err != nil {
logReadTopologyInstanceError(instanceKey, "show global status like 'Uptime'", err)

// We do not "goto Cleanup" here, although it should be the correct flow.
// Reason is 5.7's new security feature that requires GRANTs on performance_schema.global_variables.
// There is a wrong decisionmaking in this design and the migration path to 5.7 will be difficult.
// I don't want orchestrator to put even more burden on this. The 'Uptime' variable is not that important
// so as to completely fail reading a 5.7 instance.
// This is supposed to be fixed in 5.7.9
if (instance.IsOracleMySQL() || instance.IsPercona()) && !instance.IsSmallerMajorVersionByString("5.6") {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
var masterInfoRepositoryOnTable bool
// Stuff only supported on Oracle MySQL >= 5.6
// ...
// @@gtid_mode only available in Orcale MySQL >= 5.6
// Previous version just issued this query brute-force, but I don't like errors being issued where they shouldn't.
latency.Start("instance")
_ = db.QueryRow("select @@global.gtid_mode = 'ON', @@global.server_uuid, @@global.gtid_purged, @@global.master_info_repository = 'TABLE', @@global.binlog_row_image").Scan(&instance.SupportsOracleGTID, &instance.ServerUUID, &instance.GtidPurged, &masterInfoRepositoryOnTable, &instance.BinlogRowImage)
if masterInfoRepositoryOnTable {
_ = db.QueryRow("select count(*) > 0 and MAX(User_name) != '' from mysql.slave_master_info").Scan(&instance.ReplicationCredentialsAvailable)
}
latency.Stop("instance")
}()
}
}
if resolvedHostname != instance.Key.Hostname {
Expand Down Expand Up @@ -463,20 +530,6 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
goto Cleanup
}

if instance.LogBinEnabled {
latency.Start("instance")
err = sqlutils.QueryRowsMap(db, "show master status", func(m sqlutils.RowMap) error {
var err error
instance.SelfBinlogCoordinates.LogFile = m.GetString("File")
instance.SelfBinlogCoordinates.LogPos = m.GetInt64("Position")
return err
})
latency.Stop("instance")
if err != nil {
goto Cleanup
}
}

instanceFound = true

// -------------------------------------------------------------------------
Expand Down Expand Up @@ -521,86 +574,75 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
if !foundByShowSlaveHosts && !isMaxScale {
// Either not configured to read SHOW SLAVE HOSTS or nothing was there.
// Discover by information_schema.processlist
latency.Start("instance")
err := sqlutils.QueryRowsMap(db, `
select
substring_index(host, ':', 1) as slave_hostname
from
information_schema.processlist
where
command IN ('Binlog Dump', 'Binlog Dump GTID')
`,
func(m sqlutils.RowMap) error {
cname, resolveErr := ResolveHostname(m.GetString("slave_hostname"))
if resolveErr != nil {
logReadTopologyInstanceError(instanceKey, "ResolveHostname: processlist", resolveErr)
}
replicaKey := InstanceKey{Hostname: cname, Port: instance.Key.Port}
instance.AddReplicaKey(&replicaKey)
return err
})
latency.Stop("instance")

logReadTopologyInstanceError(instanceKey, "processlist", err)
}

instance.UsingPseudoGTID = false
if config.Config.DetectPseudoGTIDQuery != "" && !isMaxScale {
latency.Start("instance")
if resultData, err := sqlutils.QueryResultData(db, config.Config.DetectPseudoGTIDQuery); err == nil {
if len(resultData) > 0 {
if len(resultData[0]) > 0 {
if resultData[0][0].Valid && resultData[0][0].String == "1" {
instance.UsingPseudoGTID = true
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
latency.Start("instance")
err := sqlutils.QueryRowsMap(db, `
select
substring_index(host, ':', 1) as slave_hostname
from
information_schema.processlist
where
command IN ('Binlog Dump', 'Binlog Dump GTID')
`,
func(m sqlutils.RowMap) error {
cname, resolveErr := ResolveHostname(m.GetString("slave_hostname"))
if resolveErr != nil {
logReadTopologyInstanceError(instanceKey, "ResolveHostname: processlist", resolveErr)
}
}
}
} else {
logReadTopologyInstanceError(instanceKey, "DetectPseudoGTIDQuery", err)
}
latency.Stop("instance")
}
replicaKey := InstanceKey{Hostname: cname, Port: instance.Key.Port}
instance.AddReplicaKey(&replicaKey)
return err
})
latency.Stop("instance")

if config.Config.SlaveLagQuery != "" && !isMaxScale {
latency.Start("instance")
if err := db.QueryRow(config.Config.SlaveLagQuery).Scan(&instance.SlaveLagSeconds); err == nil {
if instance.SlaveLagSeconds.Valid && instance.SlaveLagSeconds.Int64 < 0 {
log.Warningf("Host: %+v, instance.SlaveLagSeconds < 0 [%+v], correcting to 0", instanceKey, instance.SlaveLagSeconds.Int64)
instance.SlaveLagSeconds.Int64 = 0
}
} else {
instance.SlaveLagSeconds = instance.SecondsBehindMaster
logReadTopologyInstanceError(instanceKey, "SlaveLagQuery", err)
}
latency.Stop("instance")
logReadTopologyInstanceError(instanceKey, "processlist", err)
}()
}

if config.Config.DetectDataCenterQuery != "" && !isMaxScale {
latency.Start("instance")
err := db.QueryRow(config.Config.DetectDataCenterQuery).Scan(&instance.DataCenter)
latency.Stop("instance")
logReadTopologyInstanceError(instanceKey, "DetectDataCenterQuery", err)
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
latency.Start("instance")
err := db.QueryRow(config.Config.DetectDataCenterQuery).Scan(&instance.DataCenter)
latency.Stop("instance")
logReadTopologyInstanceError(instanceKey, "DetectDataCenterQuery", err)
}()
}

if config.Config.DetectPhysicalEnvironmentQuery != "" && !isMaxScale {
latency.Start("instance")
err := db.QueryRow(config.Config.DetectPhysicalEnvironmentQuery).Scan(&instance.PhysicalEnvironment)
latency.Stop("instance")
logReadTopologyInstanceError(instanceKey, "DetectPhysicalEnvironmentQuery", err)
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
latency.Start("instance")
err := db.QueryRow(config.Config.DetectPhysicalEnvironmentQuery).Scan(&instance.PhysicalEnvironment)
latency.Stop("instance")
logReadTopologyInstanceError(instanceKey, "DetectPhysicalEnvironmentQuery", err)
}()
}

if config.Config.DetectInstanceAliasQuery != "" && !isMaxScale {
latency.Start("instance")
err := db.QueryRow(config.Config.DetectInstanceAliasQuery).Scan(&instance.InstanceAlias)
latency.Stop("instance")
logReadTopologyInstanceError(instanceKey, "DetectInstanceAliasQuery", err)
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
latency.Start("instance")
err := db.QueryRow(config.Config.DetectInstanceAliasQuery).Scan(&instance.InstanceAlias)
latency.Stop("instance")
logReadTopologyInstanceError(instanceKey, "DetectInstanceAliasQuery", err)
}()
}

if config.Config.DetectSemiSyncEnforcedQuery != "" && !isMaxScale {
latency.Start("instance")
err := db.QueryRow(config.Config.DetectSemiSyncEnforcedQuery).Scan(&instance.SemiSyncEnforced)
latency.Stop("instance")
logReadTopologyInstanceError(instanceKey, "DetectSemiSyncEnforcedQuery", err)
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
latency.Start("instance")
err := db.QueryRow(config.Config.DetectSemiSyncEnforcedQuery).Scan(&instance.SemiSyncEnforced)
latency.Stop("instance")
logReadTopologyInstanceError(instanceKey, "DetectSemiSyncEnforcedQuery", err)
}()
}

{
Expand All @@ -621,21 +663,25 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
// We'll set it here on their behalf so there's no race between the first
// time an instance is discovered, and setting a rule like "must_not".
if config.Config.DetectPromotionRuleQuery != "" && !isMaxScale {
var value string
latency.Start("instance")
err := db.QueryRow(config.Config.DetectPromotionRuleQuery).Scan(&value)
logReadTopologyInstanceError(instanceKey, "DetectPromotionRuleQuery", err)
promotionRule, err := ParseCandidatePromotionRule(value)
logReadTopologyInstanceError(instanceKey, "ParseCandidatePromotionRule", err)
if err == nil {
// We need to update candidate_database_instance.
// We register the rule even if it hasn't changed,
// to bump the last_suggested time.
instance.PromotionRule = promotionRule
err = RegisterCandidateInstance(instanceKey, promotionRule)
logReadTopologyInstanceError(instanceKey, "RegisterCandidateInstance", err)
}
latency.Stop("instance")
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
var value string
latency.Start("instance")
err := db.QueryRow(config.Config.DetectPromotionRuleQuery).Scan(&value)
logReadTopologyInstanceError(instanceKey, "DetectPromotionRuleQuery", err)
promotionRule, err := ParseCandidatePromotionRule(value)
logReadTopologyInstanceError(instanceKey, "ParseCandidatePromotionRule", err)
if err == nil {
// We need to update candidate_database_instance.
// We register the rule even if it hasn't changed,
// to bump the last_suggested time.
instance.PromotionRule = promotionRule
err = RegisterCandidateInstance(instanceKey, promotionRule)
logReadTopologyInstanceError(instanceKey, "RegisterCandidateInstance", err)
}
latency.Stop("instance")
}()
}

ReadClusterAliasOverride(instance)
Expand Down Expand Up @@ -669,6 +715,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool,
}

Cleanup:
waitGroup.Wait()
readTopologyInstanceCounter.Inc(1)
// logReadTopologyInstanceError(instanceKey, "ReadTopologyInstanceBufferable", err) // don't write here and a few lines later.
if instanceFound {
Expand Down