diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e1ea588c..a2d90152 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,7 +28,7 @@ jobs: test010: runs-on: ubuntu-latest container: - image: cimg/go:1.19 + image: cimg/go:1.21 env: GO111MODULE: "on" KAFKA_TOPICS_TEST_ZK_ADDR: zookeeper:2181 @@ -39,7 +39,7 @@ jobs: - name: Go setup uses: actions/setup-go@v3 with: - go-version: 1.19 + go-version: 1.21 - name: Display Go version run: go version - name: Run tests @@ -132,7 +132,7 @@ jobs: test270: runs-on: ubuntu-latest container: - image: cimg/go:1.19 + image: cimg/go:1.21 env: GO111MODULE: "on" KAFKA_TOPICS_TEST_ZK_ADDR: zookeeper:2181 @@ -143,7 +143,7 @@ jobs: - name: Go setup uses: actions/setup-go@v3 with: - go-version: 1.19 + go-version: 1.21 - name: Display Go version run: go version - name: Run tests diff --git a/README.md b/README.md index 00bafb0f..7a121b72 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ docker-compose up -d 3. Apply the topic configs in [`examples/local-cluster/topics`](/examples/local-cluster/topics): ``` -topicctl apply --skip-confirm examples/local-cluster/topics/*yaml +topicctl apply --skip-confirm examples/local-cluster/topics/*.yaml ``` 4. Send some test messages to the `topic-default` topic: @@ -225,13 +225,49 @@ subcommands interactively. topicctl reset-offsets [topic] [group] [flags] ``` -The `reset-offsets` subcommand allows resetting the offsets for a consumer group in a topic. There are 2 main approaches for setting the offsets: +The `reset-offsets` subcommand allows resetting the offsets +for a consumer group in a topic. +There are a few typical approaches for setting the offsets: -1. Use a combination of `--partitions`, `--offset`, `--to-earliest` and `--to-latest` flags. `--partitions` flag specifies a list of partitions to be reset e.g. `1,2,3 ...`. If not used, the command defaults to resetting consumer group offsets for ALL of the partitions. `--offset` flag indicates the specific value that all desired consumer group partitions will be set to. If not set, it will default to -2. Finally, `--to-earliest` flag resets offsets of consumer group members to earliest offsets of partitions while `--to-latest` resets offsets of consumer group members to latest offsets of partitions. However, only one of the `--to-earliest`, `--to-latest` and `--offset` flags can be used at a time. This approach is easy to use but lacks the ability for detailed offset configuration. +1. Use `--partitions` and combine it with one of the offset operators: + `--delete`, `--offset`, `--to-earliest` or `--to-latest`. +2. Use `--partition-offset-map` to pass specific offsets per partition. + For example, `1=5,2=10` means that the consumer group offset + for partition 1 must be set to 5, and partition 2 to offset 10. + This is mainly used for replays of specific traffic, + such as when a deploy has mishandled or corrupted state, + and the prior release must be rerun + starting at a specific offset per partition. + This is the most flexible approach for offset setting. -2. Use `--partition-offset-map` flag to specify a detailed offset configuration for individual partitions. For example, `1=5,2=10,7=12,...` means that the consumer group offset for partition 1 must be set to 5, partition 2 to offset 10, partition 7 to offset 12 and so on. This approach provides greater flexibility and fine-grained control for this operation. Note that `--partition-offset-map` flag is standalone and cannot be coupled with any of the previous flags. +Note that `--partition-offset-map` flag is standalone +and cannot be coupled with other flags. +##### Partition selection flags +At most one of the following may be selected: + +* `--partitions` specifies a comma-separated list of partitions IDs. + +If none of these are specified, +the command defaults to selecting ALL of the partitions. + +##### Offset selection flags + +At most one of the following may be selected: + +* `--delete` removes stored group offsets. + This will generally have the same effect as `--to-earliest` or `--to-latest`, + depending on the consumer group configuration. + However, `--delete` is more reliable and convenient, + since `--to-earliest` in particular involves a race with message retention + that may require numerous attempts. +* `--offset` indicates the specific value that all selected + consumer group partitions will be set to. +* `--to-earliest` resets group offsets to oldest still-retained per partition. +* `--to-latest` resets group offsets to newest per partitions. + +If none of these are specified, `--to-earliest` will be the default. #### tail diff --git a/cmd/topicctl/subcmd/reset.go b/cmd/topicctl/subcmd/reset.go index acd9a049..64e73317 100644 --- a/cmd/topicctl/subcmd/reset.go +++ b/cmd/topicctl/subcmd/reset.go @@ -6,17 +6,19 @@ import ( "fmt" "strconv" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + + "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/cli" "github.com/segmentio/topicctl/pkg/groups" "github.com/segmentio/topicctl/pkg/util" - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" ) var resetOffsetsCmd = &cobra.Command{ - Use: "reset-offsets [topic name] [group name]", + Use: "reset-offsets ", Short: "reset consumer group offsets", - Args: cobra.MinimumNArgs(2), + Args: cobra.ExactArgs(2), PreRunE: resetOffsetsPreRun, RunE: resetOffsetsRun, } @@ -27,6 +29,7 @@ type resetOffsetsCmdConfig struct { partitionOffsetMap map[string]int64 toEarliest bool toLatest bool + delete bool shared sharedOptions } @@ -62,40 +65,58 @@ func init() { "to-latest", false, "Resets offsets of consumer group members to latest offsets of partitions") + resetOffsetsCmd.Flags().BoolVar( + &resetOffsetsConfig.delete, + "delete", + false, + "Deletes offsets for the given consumer group") addSharedFlags(resetOffsetsCmd, &resetOffsetsConfig.shared) RootCmd.AddCommand(resetOffsetsCmd) } func resetOffsetsPreRun(cmd *cobra.Command, args []string) error { - resetOffsetSpecification := "You must choose only one of the following reset-offset specifications: --to-earliest, --to-latest, --offset." - offsetMapSpecification := "--partition-offset-map option cannot be coupled with any of the following options: --partitions, --to-earliest, --to-latest, --offset." - - if len(resetOffsetsConfig.partitionOffsetMap) > 0 && (cmd.Flags().Changed("offset") || - len(resetOffsetsConfig.partitions) > 0 || - resetOffsetsConfig.toEarliest || - resetOffsetsConfig.toLatest) { - return errors.New(offsetMapSpecification) + resetOffsetSpec := "You must choose only one of the following " + + "reset-offset specifications: --delete, --to-earliest, --to-latest, " + + "--offset, or --partition-offset-map." + offsetMapSpec := "--partition-offset-map option cannot be used with --partitions." + + cfg := resetOffsetsConfig + + numOffsetSpecs := numTrue( + cfg.toEarliest, + cfg.toLatest, + cfg.delete, + cmd.Flags().Changed("offset"), + len(cfg.partitionOffsetMap) > 0, + ) - } else if resetOffsetsConfig.toEarliest && resetOffsetsConfig.toLatest { - return errors.New(resetOffsetSpecification) + if numOffsetSpecs > 1 { + return errors.New(resetOffsetSpec) + } - } else if cmd.Flags().Changed("offset") && (resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest) { - return errors.New(resetOffsetSpecification) + if len(cfg.partitionOffsetMap) > 0 && len(cfg.partitions) > 0 { + return errors.New(offsetMapSpec) } - return resetOffsetsConfig.shared.validate() + + return cfg.shared.validate() } func resetOffsetsRun(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - adminClient, err := resetOffsetsConfig.shared.getAdminClient(ctx, nil, true) + cfg := resetOffsetsConfig + + adminClient, err := cfg.shared.getAdminClient(ctx, nil, true) if err != nil { return err } + defer adminClient.Close() + connector := adminClient.GetConnector() + topic := args[0] group := args[1] @@ -103,69 +124,66 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error { if err != nil { return err } - partitionIDsMap := map[int]struct{}{} + + partitionIDsMap := make(map[int]struct{}, len(topicInfo.Partitions)) for _, partitionInfo := range topicInfo.Partitions { partitionIDsMap[partitionInfo.ID] = struct{}{} } - var resetOffsetsStrategy string - if resetOffsetsConfig.toLatest { - resetOffsetsStrategy = groups.LatestResetOffsetsStrategy - } else if resetOffsetsConfig.toEarliest { - resetOffsetsStrategy = groups.EarliestResetOffsetsStrategy + + var strategy string + + switch { + case resetOffsetsConfig.toLatest: + strategy = groups.LatestResetOffsetsStrategy + case resetOffsetsConfig.toEarliest: + strategy = groups.EarliestResetOffsetsStrategy } - partitionOffsets := map[int]int64{} - if len(resetOffsetsConfig.partitionOffsetMap) > 0 { - for partition, offset := range resetOffsetsConfig.partitionOffsetMap { - var partitionID int - if partitionID, err = strconv.Atoi(partition); err != nil { - return fmt.Errorf("Partition value %s must be a number", partition) - } - if _, ok := partitionIDsMap[partitionID]; !ok { - return fmt.Errorf("Partition %d not found in topic %s", partitionID, topic) - } + // If explicit per-partition offsets were specified, set them now. + partitionOffsets, err := parsePartitionOffsetMap(partitionIDsMap, cfg.partitionOffsetMap) + if err != nil { + return err + } - partitionOffsets[partitionID] = offset + // Set explicit partitions (without offsets) if specified, + // otherwise operate on fetched partition info; + // these will only take effect of per-partition offsets were not specified. + partitions := cfg.partitions + if len(partitions) == 0 && len(partitionOffsets) == 0 { + convert := func(info admin.PartitionInfo) int { return info.ID } + partitions = convertSlice(topicInfo.Partitions, convert) + } + for _, partition := range partitions { + _, ok := partitionIDsMap[partition] + if !ok { + format := "Partition %d not found in topic %s" + return fmt.Errorf(format, partition, topic) } - } else if len(resetOffsetsConfig.partitions) > 0 { - for _, partition := range resetOffsetsConfig.partitions { - if _, ok := partitionIDsMap[partition]; !ok { - return fmt.Errorf("Partition %d not found in topic %s", partition, topic) - } - if resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest { - partitionOffsets[partition], err = groups.GetEarliestOrLatestOffset(ctx, adminClient.GetConnector(), topic, resetOffsetsStrategy, partition) - if err != nil { - return err - } - } else { - partitionOffsets[partition] = resetOffsetsConfig.offset - } - + if strategy == "" { + partitionOffsets[partition] = cfg.offset + return nil } - } else { - for _, partitionInfo := range topicInfo.Partitions { - if resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest { - partitionOffsets[partitionInfo.ID], err = groups.GetEarliestOrLatestOffset(ctx, adminClient.GetConnector(), topic, resetOffsetsStrategy, partitionInfo.ID) - if err != nil { - return err - } - } else { - partitionOffsets[partitionInfo.ID] = resetOffsetsConfig.offset - } + + offset, err := groups.GetEarliestOrLatestOffset(ctx, connector, topic, strategy, partition) + if err != nil { + return err } + + partitionOffsets[partition] = offset } log.Infof( - "This will reset the offsets for the following partitions in topic %s for group %s:\n%s", + "This will reset the offsets for the following partitions "+ + "in topic %s for group %s:\n%s", topic, group, groups.FormatPartitionOffsets(partitionOffsets), ) - log.Info( - "Please ensure that all other consumers are stopped, otherwise the reset might be overridden.", - ) + + log.Info("Please ensure that all other consumers are stopped, " + + "otherwise the reset might be overridden.") ok, _ := util.Confirm("OK to continue?", false) if !ok { @@ -173,10 +191,59 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error { } cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) - return cliRunner.ResetOffsets( - ctx, - topic, - group, - partitionOffsets, - ) + + if resetOffsetsConfig.delete { + input := groups.DeleteOffsetsInput{ + GroupID: group, + Topic: topic, + Partitions: partitions, + } + + return cliRunner.DeleteOffsets(ctx, &input) + } + + return cliRunner.ResetOffsets(ctx, topic, group, partitionOffsets) +} + +func numTrue(bools ...bool) int { + var n int + for _, b := range bools { + if b { + n++ + } + } + + return n +} + +func convertSlice[T1, T2 any](input []T1, fn func(T1) T2) []T2 { + out := make([]T2, len(input)) + + for i, v := range input { + out[i] = fn(v) + } + + return out +} + +func parsePartitionOffsetMap(partitionIDsMap map[int]struct{}, input map[string]int64) (map[int]int64, error) { + out := make(map[int]int64, len(input)) + + for partition, offset := range input { + partitionID, err := strconv.Atoi(partition) + if err != nil { + format := "Partition value %s must be an integer" + return nil, fmt.Errorf(format, partition) + } + + _, ok := partitionIDsMap[partitionID] + if !ok { + format := "Partition %d not found" + return nil, fmt.Errorf(format, partitionID) + } + + out[partitionID] = offset + } + + return out, nil } diff --git a/go.mod b/go.mod index 66f80b7d..90517120 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/segmentio/topicctl -go 1.18 +go 1.21 require ( github.com/aws/aws-sdk-go v1.44.208 diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 783963b8..efd76938 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -594,6 +594,11 @@ func (c *CLIRunner) GetUsers(ctx context.Context, names []string) error { return nil } +// DeleteOffsets removes offsets for a single consumer group / topic combination. +func (c *CLIRunner) DeleteOffsets(ctx context.Context, input *groups.DeleteOffsetsInput) error { + return invoke(ctx, c, input, groups.DeleteOffsets) +} + // ResetOffsets resets the offsets for a single consumer group / topic combination. func (c *CLIRunner) ResetOffsets( ctx context.Context, @@ -649,6 +654,7 @@ func (c *CLIRunner) Tail( 10e3, 10e6, ) + stats, err := tailer.LogMessages(ctx, maxMessages, filterRegexp, raw, headers) filtered := filterRegexp != "" @@ -689,6 +695,22 @@ func (c *CLIRunner) stopSpinner() { } } +type invokeFunc[T any] func(context.Context, *admin.Connector, T) error + +func invoke[T any](ctx context.Context, c *CLIRunner, v T, fn invokeFunc[T]) error { + c.startSpinner() + + err := fn(ctx, c.adminClient.GetConnector(), v) + c.stopSpinner() + if err != nil { + return err + } + + c.printer("Success") + + return nil +} + func stringsToInts(strs []string) ([]int, error) { ints := []int{} diff --git a/pkg/groups/groups.go b/pkg/groups/groups.go index b3b78439..251d5105 100644 --- a/pkg/groups/groups.go +++ b/pkg/groups/groups.go @@ -191,6 +191,40 @@ func GetMemberLags( return partitionLags, nil } +// DeleteOffsetsInput configures a call to [DeleteOffsets]. +type DeleteOffsetsInput struct { + GroupID string + Topic string + Partitions []int +} + +// DeleteOffsets removes a consumer group's offsets +// on the given topic-partition combinations. +func DeleteOffsets(ctx context.Context, connector *admin.Connector, input *DeleteOffsetsInput) error { + req := kafka.OffsetDeleteRequest{ + Addr: connector.KafkaClient.Addr, + GroupID: input.GroupID, + Topics: map[string][]int{input.Topic: input.Partitions}, + } + + resp, err := connector.KafkaClient.OffsetDelete(ctx, &req) + if err != nil { + return err + } + + var errs []error + + for _, results := range resp.Topics { + for _, result := range results { + if result.Error != nil { + errs = append(errs, result.Error) + } + } + } + + return errors.Join(errs...) +} + // ResetOffsets updates the offsets for a given topic / group combination. func ResetOffsets( ctx context.Context,