Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add commands for force failover to a region #509

Merged
merged 4 commits into from
Mar 12, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: allow changing replicationFactor when scaling
deepthidevaki committed Mar 7, 2024
commit 5745c04370a7d9b4174120093c7f5cea0c2e6a67
24 changes: 14 additions & 10 deletions go-chaos/cmd/cluster.go
Original file line number Diff line number Diff line change
@@ -69,6 +69,7 @@ func AddClusterCommands(rootCmd *cobra.Command, flags *Flags) {
waitCommand.MarkFlagRequired("changeId")
clusterCommand.AddCommand(scaleCommand)
scaleCommand.Flags().IntVar(&flags.brokers, "brokers", 0, "The amount of brokers to scale to")
scaleCommand.Flags().Int32Var(&flags.replicationFactor, "replicationFactor", -1, "The new replication factor")
scaleCommand.MarkFlagRequired("brokers")
forceFailoverCommand.Flags().Int32Var(&flags.regions, "regions", 1, "The number of regions in the cluster")
forceFailoverCommand.Flags().Int32Var(&flags.regionId, "regionId", 0, "The id of the region to failover to")
@@ -97,9 +98,9 @@ func scaleCluster(flags *Flags) error {
}

if len(currentTopology.Brokers) > flags.brokers {
_, err = scaleDownBrokers(k8Client, port, flags.brokers)
_, err = scaleDownBrokers(k8Client, port, flags.brokers, flags.replicationFactor)
} else if len(currentTopology.Brokers) < flags.brokers {
_, err = scaleUpBrokers(k8Client, port, flags.brokers)
_, err = scaleUpBrokers(k8Client, port, flags.brokers, flags.replicationFactor)
} else {
return fmt.Errorf("cluster is already at size %d", flags.brokers)
}
@@ -108,17 +109,17 @@ func scaleCluster(flags *Flags) error {
return nil
}

func scaleUpBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers)
func scaleUpBrokers(k8Client internal.K8Client, port int, brokers int, replicationFactor int32) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers, replicationFactor)
ensureNoError(err)
_, err = k8Client.ScaleZeebeCluster(brokers)
ensureNoError(err)
waitForChange(port, changeResponse.ChangeId)
return changeResponse, nil
}

func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers)
func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int, replicationFactor int32) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers, replicationFactor)
ensureNoError(err)

// Wait for brokers to leave before scaling down
@@ -130,20 +131,23 @@ func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int) (*Chang
return changeResponse, nil
}

func requestBrokerScaling(port int, brokers int) (*ChangeResponse, error) {
func requestBrokerScaling(port int, brokers int, replicationFactor int32) (*ChangeResponse, error) {
brokerIds := make([]int32, brokers)
for i := 0; i < brokers; i++ {
brokerIds[i] = int32(i)
}
return sendScaleRequest(port, brokerIds, false)
return sendScaleRequest(port, brokerIds, false, replicationFactor)
}

func sendScaleRequest(port int, brokerIds []int32, force bool) (*ChangeResponse, error) {
func sendScaleRequest(port int, brokerIds []int32, force bool, replicationFactor int32) (*ChangeResponse, error) {
forceParam := "false"
if force {
forceParam = "true"
}
url := fmt.Sprintf("http://localhost:%d/actuator/cluster/brokers?force=%s", port, forceParam)
if replicationFactor > 0 {
url = url + fmt.Sprintf("&replicationFactor=%d", replicationFactor)
}
request, err := json.Marshal(brokerIds)
if err != nil {
return nil, err
@@ -257,7 +261,7 @@ func forceFailover(flags *Flags) error {

brokersInRegion := getBrokers(currentTopology, flags.regions, flags.regionId)

changeResponse, err := sendScaleRequest(port, brokersInRegion, true)
changeResponse, err := sendScaleRequest(port, brokersInRegion, true, -1)
ensureNoError(err)

err = waitForChange(port, changeResponse.ChangeId)
9 changes: 5 additions & 4 deletions go-chaos/cmd/root.go
Original file line number Diff line number Diff line change
@@ -78,10 +78,11 @@ type Flags struct {
jobType string

// cluster
changeId int64
brokers int
regionId int32
regions int32
changeId int64
brokers int
regionId int32
regions int32
replicationFactor int32
}

var Version = "development"