Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

All MySQL DBs limited to max 3 concurrent/idle connections #15 #931

Merged
merged 25 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9b2a04d
Merge pull request #2 from openark/workflow-upload-artifact
shlomi-noach Jul 28, 2020
9ccde4f
Merge pull request #5 from openark/parse-alter-statement
shlomi-noach Jul 29, 2020
b59a8ed
merged conflict
shlomi-noach Aug 2, 2020
6012e80
Merge pull request #8 from openark/ajm188-handle_driver_timeout_error
shlomi-noach Aug 2, 2020
ae22d84
v1.1.0
shlomi-noach Aug 5, 2020
ca0ca5a
Merge remote-tracking branch 'upstream/master' into updates-from-upst…
shlomi-noach Oct 18, 2020
e9f9af2
Merge pull request #11 from openark/updates-from-upstream-2020-10
shlomi-noach Oct 18, 2020
294d43b
WIP: copying AUTO_INCREMENT value to ghost table
shlomi-noach Dec 31, 2020
26f7602
greping for 'expect_table_structure' content
shlomi-noach Dec 31, 2020
75009db
Adding simple test for 'expect_table_structure' scenario
shlomi-noach Dec 31, 2020
eeab264
adding tests for AUTO_INCREMENT value after row deletes. Should initi…
shlomi-noach Dec 31, 2020
2d0281f
clear event beforehand
shlomi-noach Dec 31, 2020
af20211
parsing AUTO_INCREMENT from alter query, reading AUTO_INCREMENT from …
shlomi-noach Dec 31, 2020
31069ae
support GetUint64
shlomi-noach Dec 31, 2020
3d4dfaa
minor update to test
shlomi-noach Dec 31, 2020
63219ab
adding test for user defined AUTO_INCREMENT statement
shlomi-noach Dec 31, 2020
525a80d
Merge branch 'master' into copy-auto-increment
shlomi-noach Jan 5, 2021
ff82140
Merge pull request #12 from openark/copy-auto-increment
shlomi-noach Jan 5, 2021
7202076
Generated column as part of UNIQUE (or PRIMARY) KEY
shlomi-noach Jan 19, 2021
b7b3bfb
skip analysis of generated column data type in unique key
shlomi-noach Jan 19, 2021
253658d
Merge pull request #13 from openark/unique-key-generated-column
shlomi-noach Jan 27, 2021
4a36e24
Merge pull request #14 from ccoffey/cathal/safer_cut_over
shlomi-noach Feb 7, 2021
710c9dd
All MySQL DBs limited to max 3 concurrent/idle connections
shlomi-noach Feb 18, 2021
dea8d54
Merge branch 'master' into limit-mysql-connetions
shlomi-noach Feb 22, 2021
e8c6a4e
Merge branch 'master' into limit-mysql-connetions
shlomi-noach May 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ type MigrationContext struct {
OriginalTableColumns *sql.ColumnList
OriginalTableVirtualColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey)
OriginalTableAutoIncrement uint64
GhostTableColumns *sql.ColumnList
GhostTableVirtualColumns *sql.ColumnList
GhostTableUniqueKeys [](*sql.UniqueKey)
Expand Down
5 changes: 3 additions & 2 deletions go/base/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

gosql "database/sql"

"github.com/github/gh-ost/go/mysql"
)

Expand Down Expand Up @@ -62,7 +63,7 @@ func StringContainsAll(s string, substrings ...string) bool {
return nonEmptyStringsFound
}

func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext) (string, error) {
func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext, name string) (string, error) {
versionQuery := `select @@global.version`
var port, extraPort int
var version string
Expand All @@ -86,7 +87,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
}

if connectionConfig.Key.Port == port || (extraPort > 0 && connectionConfig.Key.Port == extraPort) {
migrationContext.Log.Infof("connection validated on %+v", connectionConfig.Key)
migrationContext.Log.Infof("%s connection validated on %+v", name, connectionConfig.Key)
return version, nil
} else if extraPort == 0 {
return "", fmt.Errorf("Unexpected database port reported: %+v", port)
Expand Down
25 changes: 23 additions & 2 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ type Applier struct {
singletonDB *gosql.DB
migrationContext *base.MigrationContext
finishedMigrating int64
name string
}

func NewApplier(migrationContext *base.MigrationContext) *Applier {
return &Applier{
connectionConfig: migrationContext.ApplierConnectionConfig,
migrationContext: migrationContext,
finishedMigrating: 0,
name: "applier",
}
}

Expand All @@ -78,11 +80,11 @@ func (this *Applier) InitDBConnections() (err error) {
return err
}
this.singletonDB.SetMaxOpenConns(1)
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext)
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name)
if err != nil {
return err
}
if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig, this.migrationContext); err != nil {
if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig, this.migrationContext, this.name); err != nil {
return err
}
this.migrationContext.ApplierMySQLVersion = version
Expand Down Expand Up @@ -205,6 +207,25 @@ func (this *Applier) AlterGhost() error {
return nil
}

// AlterGhost applies `alter` statement on ghost table
func (this *Applier) AlterGhostAutoIncrement() error {
query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s AUTO_INCREMENT=%d`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
this.migrationContext.OriginalTableAutoIncrement,
)
this.migrationContext.Log.Infof("Altering ghost table AUTO_INCREMENT value %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
)
this.migrationContext.Log.Debugf("AUTO_INCREMENT ALTER statement: %s", query)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
this.migrationContext.Log.Infof("Ghost table AUTO_INCREMENT altered")
return nil
}

// CreateChangelogTable creates the changelog table on the applier host
func (this *Applier) CreateChangelogTable() error {
if err := this.DropChangelogTable(); err != nil {
Expand Down
30 changes: 29 additions & 1 deletion go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ type Inspector struct {
db *gosql.DB
informationSchemaDb *gosql.DB
migrationContext *base.MigrationContext
name string
}

func NewInspector(migrationContext *base.MigrationContext) *Inspector {
return &Inspector{
connectionConfig: migrationContext.InspectorConnectionConfig,
migrationContext: migrationContext,
name: "inspector",
}
}

Expand Down Expand Up @@ -109,6 +111,10 @@ func (this *Inspector) InspectOriginalTable() (err error) {
if err != nil {
return err
}
this.migrationContext.OriginalTableAutoIncrement, err = this.getAutoIncrementValue(this.migrationContext.OriginalTableName)
if err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -184,6 +190,10 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
}

for _, column := range this.migrationContext.UniqueKey.Columns.Columns() {
if this.migrationContext.GhostTableVirtualColumns.GetColumn(column.Name) != nil {
// this is a virtual column
continue
}
if this.migrationContext.MappedSharedColumns.HasTimezoneConversion(column.Name) {
return fmt.Errorf("No support at this time for converting a column from DATETIME to TIMESTAMP that is also part of the chosen unique key. Column: %s, key: %s", column.Name, this.migrationContext.UniqueKey.Name)
}
Expand All @@ -198,7 +208,7 @@ func (this *Inspector) validateConnection() error {
return fmt.Errorf("MySQL replication length limited to 32 characters. See https://dev.mysql.com/doc/refman/5.7/en/assigning-passwords.html")
}

version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext)
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name)
this.migrationContext.InspectorMySQLVersion = version
return err
}
Expand Down Expand Up @@ -589,6 +599,24 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL
return err
}

// getAutoIncrementValue get's the original table's AUTO_INCREMENT value, if exists (0 value if not exists)
func (this *Inspector) getAutoIncrementValue(tableName string) (autoIncrement uint64, err error) {
query := `
SELECT
AUTO_INCREMENT
FROM INFORMATION_SCHEMA.TABLES
WHERE
TABLES.TABLE_SCHEMA = ?
AND TABLES.TABLE_NAME = ?
AND AUTO_INCREMENT IS NOT NULL
`
err = sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
autoIncrement = m.GetUint64("AUTO_INCREMENT")
return nil
}, this.migrationContext.DatabaseName, tableName)
return autoIncrement, err
}

// getCandidateUniqueKeys investigates a table and returns the list of unique keys
// candidate for chunking
func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*sql.UniqueKey), err error) {
Expand Down
8 changes: 8 additions & 0 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,14 @@ func (this *Migrator) initiateApplier() error {
return err
}

if this.migrationContext.OriginalTableAutoIncrement > 0 && !this.parser.IsAutoIncrementDefined() {
// Original table has AUTO_INCREMENT value and the -alter statement does not indicate any override,
// so we should copy AUTO_INCREMENT value onto our ghost table.
if err := this.applier.AlterGhostAutoIncrement(); err != nil {
this.migrationContext.Log.Errorf("Unable to ALTER ghost table AUTO_INCREMENT value, see further error details. Bailing out")
return err
}
}
this.applier.WriteChangelogState(string(GhostTableMigrated))
go this.applier.InitiateHeartbeat()
return nil
Expand Down
4 changes: 3 additions & 1 deletion go/logic/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type EventsStreamer struct {
listenersMutex *sync.Mutex
eventsChannel chan *binlog.BinlogEntry
binlogReader *binlog.GoMySQLReader
name string
}

func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer {
Expand All @@ -51,6 +52,7 @@ func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer
listeners: [](*BinlogEventListener){},
listenersMutex: &sync.Mutex{},
eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
name: "streamer",
}
}

Expand Down Expand Up @@ -106,7 +108,7 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, EventsStreamerUri); err != nil {
return err
}
if _, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext); err != nil {
if _, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name); err != nil {
return err
}
if err := this.readCurrentBinlogCoordinates(); err != nil {
Expand Down
7 changes: 5 additions & 2 deletions go/logic/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,12 @@ func (this *Throttler) collectControlReplicasLag() {
dbUri := connectionConfig.GetDBUri("information_schema")

var heartbeatValue string
if db, _, err := mysql.GetDB(this.migrationContext.Uuid, dbUri); err != nil {
db, _, err := mysql.GetDB(this.migrationContext.Uuid, dbUri)
if err != nil {
return lag, err
} else if err = db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
}

if err := db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
return lag, err
}

Expand Down
30 changes: 16 additions & 14 deletions go/mysql/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import (
"github.com/outbrain/golib/sqlutils"
)

const MaxTableNameLength = 64
const MaxReplicationPasswordLength = 32
const (
MaxTableNameLength = 64
MaxReplicationPasswordLength = 32
MaxDBPoolConnections = 3
)

type ReplicationLagResult struct {
Key InstanceKey
Expand All @@ -39,23 +42,22 @@ func (this *ReplicationLagResult) HasLag() bool {
var knownDBs map[string]*gosql.DB = make(map[string]*gosql.DB)
var knownDBsMutex = &sync.Mutex{}

func GetDB(migrationUuid string, mysql_uri string) (*gosql.DB, bool, error) {
func GetDB(migrationUuid string, mysql_uri string) (db *gosql.DB, exists bool, err error) {
cacheKey := migrationUuid + ":" + mysql_uri

knownDBsMutex.Lock()
defer func() {
knownDBsMutex.Unlock()
}()

var exists bool
if _, exists = knownDBs[cacheKey]; !exists {
if db, err := gosql.Open("mysql", mysql_uri); err == nil {
knownDBs[cacheKey] = db
} else {
return db, exists, err
defer knownDBsMutex.Unlock()

if db, exists = knownDBs[cacheKey]; !exists {
db, err = gosql.Open("mysql", mysql_uri)
if err != nil {
return nil, false, err
}
db.SetMaxOpenConns(MaxDBPoolConnections)
db.SetMaxIdleConns(MaxDBPoolConnections)
knownDBs[cacheKey] = db
}
return knownDBs[cacheKey], exists, nil
return db, exists, nil
}

// GetReplicationLagFromSlaveStatus returns replication lag for a given db; via SHOW SLAVE STATUS
Expand Down
19 changes: 16 additions & 3 deletions go/sql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
renameColumnRegexp = regexp.MustCompile(`(?i)\bchange\s+(column\s+|)([\S]+)\s+([\S]+)\s+`)
dropColumnRegexp = regexp.MustCompile(`(?i)\bdrop\s+(column\s+|)([\S]+)$`)
renameTableRegexp = regexp.MustCompile(`(?i)\brename\s+(to|as)\s+`)
autoIncrementRegexp = regexp.MustCompile(`(?i)\bauto_increment[\s]*=[\s]*([0-9]+)`)
alterTableExplicitSchemaTableRegexps = []*regexp.Regexp{
// ALTER TABLE `scm`.`tbl` something
regexp.MustCompile(`(?i)\balter\s+table\s+` + "`" + `([^` + "`" + `]+)` + "`" + `[.]` + "`" + `([^` + "`" + `]+)` + "`" + `\s+(.*$)`),
Expand All @@ -35,9 +36,10 @@ var (
)

type AlterTableParser struct {
columnRenameMap map[string]string
droppedColumns map[string]bool
isRenameTable bool
columnRenameMap map[string]string
droppedColumns map[string]bool
isRenameTable bool
isAutoIncrementDefined bool

alterStatementOptions string
alterTokens []string
Expand Down Expand Up @@ -122,6 +124,12 @@ func (this *AlterTableParser) parseAlterToken(alterToken string) (err error) {
this.isRenameTable = true
}
}
{
// auto_increment
if autoIncrementRegexp.MatchString(alterToken) {
this.isAutoIncrementDefined = true
}
}
return nil
}

Expand Down Expand Up @@ -173,6 +181,11 @@ func (this *AlterTableParser) DroppedColumnsMap() map[string]bool {
func (this *AlterTableParser) IsRenameTable() bool {
return this.isRenameTable
}

func (this *AlterTableParser) IsAutoIncrementDefined() bool {
return this.isAutoIncrementDefined
}

func (this *AlterTableParser) GetExplicitSchema() string {
return this.explicitSchema
}
Expand Down
24 changes: 24 additions & 0 deletions go/sql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestParseAlterStatement(t *testing.T) {
test.S(t).ExpectNil(err)
test.S(t).ExpectEquals(parser.alterStatementOptions, statement)
test.S(t).ExpectFalse(parser.HasNonTrivialRenames())
test.S(t).ExpectFalse(parser.IsAutoIncrementDefined())
}

func TestParseAlterStatementTrivialRename(t *testing.T) {
Expand All @@ -33,17 +34,39 @@ func TestParseAlterStatementTrivialRename(t *testing.T) {
test.S(t).ExpectNil(err)
test.S(t).ExpectEquals(parser.alterStatementOptions, statement)
test.S(t).ExpectFalse(parser.HasNonTrivialRenames())
test.S(t).ExpectFalse(parser.IsAutoIncrementDefined())
test.S(t).ExpectEquals(len(parser.columnRenameMap), 1)
test.S(t).ExpectEquals(parser.columnRenameMap["ts"], "ts")
}

func TestParseAlterStatementWithAutoIncrement(t *testing.T) {

statements := []string{
"auto_increment=7",
"auto_increment = 7",
"AUTO_INCREMENT = 71",
"add column t int, change ts ts timestamp, auto_increment=7 engine=innodb",
"add column t int, change ts ts timestamp, auto_increment =7 engine=innodb",
"add column t int, change ts ts timestamp, AUTO_INCREMENT = 7 engine=innodb",
"add column t int, change ts ts timestamp, engine=innodb auto_increment=73425",
}
for _, statement := range statements {
parser := NewAlterTableParser()
err := parser.ParseAlterStatement(statement)
test.S(t).ExpectNil(err)
test.S(t).ExpectEquals(parser.alterStatementOptions, statement)
test.S(t).ExpectTrue(parser.IsAutoIncrementDefined())
}
}

func TestParseAlterStatementTrivialRenames(t *testing.T) {
statement := "add column t int, change ts ts timestamp, CHANGE f `f` float, engine=innodb"
parser := NewAlterTableParser()
err := parser.ParseAlterStatement(statement)
test.S(t).ExpectNil(err)
test.S(t).ExpectEquals(parser.alterStatementOptions, statement)
test.S(t).ExpectFalse(parser.HasNonTrivialRenames())
test.S(t).ExpectFalse(parser.IsAutoIncrementDefined())
test.S(t).ExpectEquals(len(parser.columnRenameMap), 2)
test.S(t).ExpectEquals(parser.columnRenameMap["ts"], "ts")
test.S(t).ExpectEquals(parser.columnRenameMap["f"], "f")
Expand All @@ -64,6 +87,7 @@ func TestParseAlterStatementNonTrivial(t *testing.T) {
parser := NewAlterTableParser()
err := parser.ParseAlterStatement(statement)
test.S(t).ExpectNil(err)
test.S(t).ExpectFalse(parser.IsAutoIncrementDefined())
test.S(t).ExpectEquals(parser.alterStatementOptions, statement)
renames := parser.GetNonTrivialRenames()
test.S(t).ExpectEquals(len(renames), 2)
Expand Down
17 changes: 17 additions & 0 deletions localtests/autoinc-copy-deletes-user-defined/create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
drop event if exists gh_ost_test;

drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
primary key(id)
) auto_increment=1;

insert into gh_ost_test values (NULL, 11);
insert into gh_ost_test values (NULL, 13);
insert into gh_ost_test values (NULL, 17);
insert into gh_ost_test values (NULL, 23);
insert into gh_ost_test values (NULL, 29);
insert into gh_ost_test values (NULL, 31);
insert into gh_ost_test values (NULL, 37);
delete from gh_ost_test where id>=5;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
AUTO_INCREMENT=7
1 change: 1 addition & 0 deletions localtests/autoinc-copy-deletes-user-defined/extra_args
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
--alter='AUTO_INCREMENT=7'
Loading