Skip to content

extract pod representation from backend/metrics to backend #751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/epp/backend/metrics/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,21 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

// FakePodMetrics is an implementation of PodMetrics that doesn't run the async refresh loop.
type FakePodMetrics struct {
Pod *Pod
Pod *backend.Pod
Metrics *Metrics
}

func (fpm *FakePodMetrics) String() string {
return fmt.Sprintf("Pod: %v; Metrics: %v", fpm.GetPod(), fpm.GetMetrics())
}

func (fpm *FakePodMetrics) GetPod() *Pod {
func (fpm *FakePodMetrics) GetPod() *backend.Pod {
return fpm.Pod
}
func (fpm *FakePodMetrics) GetMetrics() *Metrics {
Expand All @@ -55,7 +56,7 @@ type FakePodMetricsClient struct {
Res map[types.NamespacedName]*Metrics
}

func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod *Pod, existing *Metrics, port int32) (*Metrics, error) {
func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod *backend.Pod, existing *Metrics, port int32) (*Metrics, error) {
f.errMu.RLock()
err, ok := f.Err[pod.NamespacedName]
f.errMu.RUnlock()
Expand Down
12 changes: 3 additions & 9 deletions pkg/epp/backend/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"go.uber.org/multierr"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
)

const (
Expand All @@ -39,15 +40,8 @@ type PodMetricsClientImpl struct {
MetricMapping *MetricMapping
}

// FetchMetrics fetches metrics from a given pod, clones the existing metrics object and returns an
// updated one.
func (p *PodMetricsClientImpl) FetchMetrics(
ctx context.Context,
pod *Pod,
existing *Metrics,
port int32,
) (*Metrics, error) {

// FetchMetrics fetches metrics from a given pod, clones the existing metrics object and returns an updated one.
func (p *PodMetricsClientImpl) FetchMetrics(ctx context.Context, pod *backend.Pod, existing *Metrics, port int32) (*Metrics, error) {
// Currently the metrics endpoint is hard-coded, which works with vLLM.
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16): Consume this from InferencePool config.
url := "http://" + pod.Address + ":" + strconv.Itoa(int(port)) + "/metrics"
Expand Down
3 changes: 2 additions & 1 deletion pkg/epp/backend/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/protobuf/proto"
"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

Expand Down Expand Up @@ -486,7 +487,7 @@ func TestPromToPodMetrics(t *testing.T) {
// there's no server running on the specified port.
func TestFetchMetrics(t *testing.T) {
ctx := logutil.NewTestLoggerIntoContext(context.Background())
pod := &Pod{
pod := &backend.Pod{
Address: "127.0.0.1",
NamespacedName: types.NamespacedName{
Namespace: "test",
Expand Down
11 changes: 6 additions & 5 deletions pkg/epp/backend/metrics/pod_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

Expand All @@ -35,7 +36,7 @@ const (
)

type podMetrics struct {
pod atomic.Pointer[Pod]
pod atomic.Pointer[backend.Pod]
metrics atomic.Pointer[Metrics]
pmc PodMetricsClient
ds Datastore
Expand All @@ -48,14 +49,14 @@ type podMetrics struct {
}

type PodMetricsClient interface {
FetchMetrics(ctx context.Context, pod *Pod, existing *Metrics, port int32) (*Metrics, error)
FetchMetrics(ctx context.Context, pod *backend.Pod, existing *Metrics, port int32) (*Metrics, error)
}

func (pm *podMetrics) String() string {
return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetPod(), pm.GetMetrics())
}

func (pm *podMetrics) GetPod() *Pod {
func (pm *podMetrics) GetPod() *backend.Pod {
return pm.pod.Load()
}

Expand All @@ -67,8 +68,8 @@ func (pm *podMetrics) UpdatePod(in *corev1.Pod) {
pm.pod.Store(toInternalPod(in))
}

func toInternalPod(in *corev1.Pod) *Pod {
return &Pod{
func toInternalPod(in *corev1.Pod) *backend.Pod {
return &backend.Pod{
NamespacedName: types.NamespacedName{
Name: in.Name,
Namespace: in.Namespace,
Expand Down
29 changes: 2 additions & 27 deletions pkg/epp/backend/metrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
)

func NewPodMetricsFactory(pmc PodMetricsClient, refreshMetricsInterval time.Duration) *PodMetricsFactory {
Expand Down Expand Up @@ -58,38 +58,13 @@ func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.
}

type PodMetrics interface {
GetPod() *Pod
GetPod() *backend.Pod
GetMetrics() *Metrics
UpdatePod(*corev1.Pod)
StopRefreshLoop()
String() string
}

type Pod struct {
NamespacedName types.NamespacedName
Address string
}

func (p *Pod) String() string {
if p == nil {
return ""
}
return fmt.Sprintf("%+v", *p)
}

func (p *Pod) Clone() *Pod {
if p == nil {
return nil
}
return &Pod{
NamespacedName: types.NamespacedName{
Name: p.NamespacedName.Name,
Namespace: p.NamespacedName.Namespace,
},
Address: p.Address,
}
}

type Metrics struct {
// ActiveModels is a set of models(including LoRA adapters) that are currently cached to GPU.
ActiveModels map[string]int
Expand Down
45 changes: 45 additions & 0 deletions pkg/epp/backend/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package backend

import (
"fmt"

"k8s.io/apimachinery/pkg/types"
)

type Pod struct {
NamespacedName types.NamespacedName
Address string
}

func (p *Pod) String() string {
if p == nil {
return ""
}
return fmt.Sprintf("%+v", *p)
}

func (p *Pod) Clone() *Pod {
if p == nil {
return nil
}
return &Pod{
NamespacedName: types.NamespacedName{
Name: p.NamespacedName.Name,
Namespace: p.NamespacedName.Namespace,
},
Address: p.Address,
}
}
4 changes: 2 additions & 2 deletions pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"google.golang.org/protobuf/types/known/structpb"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
Expand Down Expand Up @@ -447,7 +447,7 @@ func RandomWeightedDraw(logger logr.Logger, model *v1alpha2.InferenceModel, seed
return ""
}

func GetRandomPod(ds datastore.Datastore) *backendmetrics.Pod {
func GetRandomPod(ds datastore.Datastore) *backend.Pod {
pods := ds.PodGetAll()
if len(pods) == 0 {
return nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/epp/scheduling/plugins/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/google/go-cmp/cmp"
k8stypes "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
Expand Down Expand Up @@ -227,7 +228,7 @@ func TestLoRASoftAffinityDistribution(t *testing.T) {
// Test setup: One affinity pod and one available pod
pods := []types.Pod{
&types.PodMetrics{
Pod: &backendmetrics.Pod{NamespacedName: k8stypes.NamespacedName{Name: "affinity-pod"}},
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "affinity-pod"}},
Metrics: &backendmetrics.Metrics{
MaxActiveModels: 2,
ActiveModels: map[string]int{
Expand All @@ -236,7 +237,7 @@ func TestLoRASoftAffinityDistribution(t *testing.T) {
},
},
&types.PodMetrics{
Pod: &backendmetrics.Pod{NamespacedName: k8stypes.NamespacedName{Name: "available-pod"}},
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "available-pod"}},
Metrics: &backendmetrics.Metrics{
MaxActiveModels: 2,
ActiveModels: map[string]int{},
Expand Down
Loading