Skip to content

Initial EntrySetFilterWithComparator #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions coherence/cache.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
* Copyright (c) 2022, 2025 Oracle and/or its affiliates.
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/
Expand Down Expand Up @@ -93,7 +93,7 @@ type NamedMap[K comparable, V any] interface {
IsEmpty(ctx context.Context) (bool, error)

// EntrySetFilter returns a channel from which entries satisfying the specified filter can be obtained.
// Each entry in the channel is of type *StreamEntry which basically wraps an error and the result.
// Each entry in the channel is of type *[StreamedEntry] which wraps an error and the result.
// As always, the result must be accessed (and will be valid) only if the error is nil.
EntrySetFilter(ctx context.Context, filter filters.Filter) <-chan *StreamedEntry[K, V]

Expand All @@ -106,7 +106,7 @@ type NamedMap[K comparable, V any] interface {
Get(ctx context.Context, key K) (*V, error)

// GetAll returns a channel from which entries satisfying the specified filter can be obtained.
// Each entry in the channel is of type *StreamEntry which basically wraps an error and the result.
// Each entry in the channel is of type *[StreamedEntry] which wraps an error and the result.
// As always, the result must be accessed (and will be valid) only of the error is nil.
GetAll(ctx context.Context, keys []K) <-chan *StreamedEntry[K, V]

Expand All @@ -120,7 +120,7 @@ type NamedMap[K comparable, V any] interface {
InvokeAll(ctx context.Context, keysOrFilter any, proc processors.Processor) <-chan *StreamedValue[V]

// KeySetFilter returns a channel from which keys of the entries that satisfy the filter can be obtained.
// Each entry in the channel is of type *StreamEntry which basically wraps an error and the key.
// Each entry in the channel is of type *[StreamedEntry] which wraps an error and the key.
// As always, the result must be accessed (and will be valid) only of the error is nil.
KeySetFilter(ctx context.Context, filter filters.Filter) <-chan *StreamedKey[K]

Expand Down
58 changes: 39 additions & 19 deletions coherence/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func unregisterLifecycleListener[K comparable, V any](baseClient *baseClient[K,
}

// executeAddIndex executes the add index operation against a baseClient.
func executeAddIndex[K comparable, V, T, E any](ctx context.Context, bc *baseClient[K, V], extractor extractors.ValueExtractor[T, E], sorted bool, comparator extractors.ValueExtractor[T, E]) error {
func executeAddIndex[K comparable, V, T, E any](ctx context.Context, bc *baseClient[K, V], extractor extractors.ValueExtractor[T, E], sorted bool, comparator extractors.Comparator[E]) error {
var (
extractorSerializer = NewSerializer[any](bc.format)
binExtractor []byte
Expand Down Expand Up @@ -778,8 +778,8 @@ func executeInvokeAllFilterOrKeysV1Value[K comparable, V any, R any](ctx context
}

// executeInvokeAllFilterOrKeysV1 executes an invokeAll() when connected to v1 gRPC proxy.
func executeInvokeEntrySetFilterV1[K comparable, V any](ctx context.Context, bc *baseClient[K, V], binFilter []byte, ch chan *StreamedEntry[K, V]) {
chInvoke, err := bc.session.v1StreamManagerCache.entrySetFilter(ctx, bc.name, binFilter)
func executeInvokeEntrySetFilterV1[K comparable, V any](ctx context.Context, bc *baseClient[K, V], binFilter []byte, binComparator []byte, ch chan *StreamedEntry[K, V]) {
chInvoke, err := bc.session.v1StreamManagerCache.entrySetFilter(ctx, bc.name, binFilter, binComparator)

if err != nil {
ch <- &StreamedEntry[K, V]{Err: err}
Expand All @@ -802,8 +802,8 @@ func executeKeySetFilterV1[K comparable, V any](ctx context.Context, bc *baseCli
}

// executeValuesFilterV1 executes an values with filter when connected to v1 gRPC proxy.
func executeValuesFilterV1[K comparable, V any](ctx context.Context, bc *baseClient[K, V], binFilter []byte, ch chan *StreamedValue[V]) {
chInvoke, err := bc.session.v1StreamManagerCache.valuesFilter(ctx, bc.name, binFilter)
func executeValuesFilterV1[K comparable, V any](ctx context.Context, bc *baseClient[K, V], binFilter []byte, binComparator []byte, ch chan *StreamedValue[V]) {
chInvoke, err := bc.session.v1StreamManagerCache.valuesFilter(ctx, bc.name, binFilter, binComparator)

if err != nil {
ch <- &StreamedValue[V]{Err: err}
Expand Down Expand Up @@ -1858,11 +1858,13 @@ func executeEntrySet[K comparable, V any](ctx context.Context, bc *baseClient[K,
return ch
}

func executeEntrySetFilter[K comparable, V any](ctx context.Context, bc *baseClient[K, V], fltr filters.Filter) <-chan *StreamedEntry[K, V] {
func executeEntrySetFilter[K comparable, V any, E any](ctx context.Context, bc *baseClient[K, V], fltr filters.Filter, comparator extractors.Comparator[E]) <-chan *StreamedEntry[K, V] {
var (
err = bc.ensureClientConnection()
binFilter = make([]byte, 0)
ch = make(chan *StreamedEntry[K, V])
err = bc.ensureClientConnection()
binFilter = make([]byte, 0)
binComparator = make([]byte, 0)
ch = make(chan *StreamedEntry[K, V])
serializer = NewSerializer[any](bc.format)
)

if err != nil {
Expand All @@ -1875,26 +1877,34 @@ func executeEntrySetFilter[K comparable, V any](ctx context.Context, bc *baseCli
if fltr == nil {
fltr = filters.Always()
}
binFilter, err = NewSerializer[any](bc.format).Serialize(fltr)
binFilter, err = serializer.Serialize(fltr)
if err != nil {
ch <- &StreamedEntry[K, V]{Err: err}
return ch
}

if comparator != nil {
binComparator, err = serializer.Serialize(comparator)
if err != nil {
ch <- &StreamedEntry[K, V]{Err: err}
return ch
}
}

go func() {
if cancel != nil {
defer cancel()
}

if bc.session.GetProtocolVersion() > 0 {
executeInvokeEntrySetFilterV1(ctx, bc, binFilter, ch)
executeInvokeEntrySetFilterV1(ctx, bc, binFilter, binComparator, ch)
close(ch)
return
}

var (
request = pb.EntrySetRequest{Cache: bc.name, Filter: binFilter,
Format: bc.format, Scope: bc.sessionOpts.Scope}
Format: bc.format, Scope: bc.sessionOpts.Scope, Comparator: binComparator}
key *K
value *V
)
Expand Down Expand Up @@ -1942,11 +1952,13 @@ func executeEntrySetFilter[K comparable, V any](ctx context.Context, bc *baseCli
}

// executeValues executes the Values operation against a baseClient.
func executeValues[K comparable, V any](ctx context.Context, bc *baseClient[K, V], fltr filters.Filter) <-chan *StreamedValue[V] {
func executeValues[K comparable, V any, E any](ctx context.Context, bc *baseClient[K, V], fltr filters.Filter, comparator extractors.Comparator[E]) <-chan *StreamedValue[V] {
var (
err = bc.ensureClientConnection()
binFilter = make([]byte, 0)
ch = make(chan *StreamedValue[V])
err = bc.ensureClientConnection()
binFilter = make([]byte, 0)
binComparator = make([]byte, 0)
ch = make(chan *StreamedValue[V])
serializer = NewSerializer[any](bc.format)
)

if err != nil {
Expand All @@ -1959,24 +1971,32 @@ func executeValues[K comparable, V any](ctx context.Context, bc *baseClient[K, V
if fltr == nil {
fltr = filters.Always()
}
binFilter, err = NewSerializer[any](bc.format).Serialize(fltr)
binFilter, err = serializer.Serialize(fltr)
if err != nil {
ch <- &StreamedValue[V]{Err: err}
return ch
}

if comparator != nil {
binComparator, err = serializer.Serialize(comparator)
if err != nil {
ch <- &StreamedValue[V]{Err: err}
return ch
}
}

go func() {
if cancel != nil {
defer cancel()
}

if bc.session.GetProtocolVersion() > 0 {
executeValuesFilterV1(ctx, bc, binFilter, ch)
executeValuesFilterV1(ctx, bc, binFilter, binComparator, ch)
close(ch)
return
}
request := pb.ValuesRequest{Cache: bc.name, Filter: binFilter,
Format: bc.format, Scope: bc.sessionOpts.Scope}
Format: bc.format, Scope: bc.sessionOpts.Scope, Comparator: binComparator}
valuesClient, err1 := bc.client.Values(newCtx, &request)

if err1 != nil {
Expand Down
20 changes: 20 additions & 0 deletions coherence/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,26 @@ EntrySet, KeySet, Values, InvokeAll and InvokeAllFilter.
// we can also do more complex filtering such as looking for people > 30 and where there name begins with 'T'
ch := namedMap.EntrySetFilter(ctx, filters.Greater(age, 20).And(filters.Like(name, "T%", true)))

If you want to sort the results from the EntrySetFilter command you can use the following function
[EntrySetFilterWithComparator]. Due generics limitations in Go, this is not a function call off the [NamedMap]
or [NamedCache] interface, but a function call that takes a [NamedMap] or [NamedCache].

age := extractors.Extract[int]("age")

fmt.Println("Retrieve the people between the age of 17 and 21 and order by age ascending")
ch := coherence.EntrySetFilterWithComparator(ctx, namedMap, filters.Between(age, 17, 21), extractors.ExtractorComparator(age, true))
for result := range ch {
if result.Err != nil {
panic(err)
}
fmt.Printf("Key: %v, Value: %s\n", result.Key, result.Value.String())
}

Note: the entries are sorted internally on the gRPC proxy to avoid excessive memory usage, but you need to be careful
when running this operation against NamedCaches with large number of entries.

Sorting via a [Comparator] is only available when connecting to Coherence server versions CE 25.03+ and commercial 14.1.2.0+.

# Using entry processors for in-place processing

A Processor is an object that allows you to process (update) one or more [NamedMap] entries on the [NamedMap] itself,
Expand Down
76 changes: 55 additions & 21 deletions coherence/extractors/extractors.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
* Copyright (c) 2022, 2025 Oracle and/or its affiliates.
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/
Expand All @@ -11,18 +11,18 @@ import (
)

const (
comparatorsBasePackage = "comparator."
comparatorsBasePackage = "util.comparator."
extractorPackage = "extractor."

safeComparatorType = comparatorsBasePackage + "SafeComparator"
inverseComparatorType = comparatorsBasePackage + "InverseComparator"
extractorComparatorType = comparatorsBasePackage + "ExtractorComparator"
entryComparatorType = comparatorsBasePackage + "EntryComparator"

universalExtractorType = extractorPackage + "UniversalExtractor"
chainedExtractorType = extractorPackage + "ChainedExtractor"
identityExtractorType = extractorPackage + "IdentityExtractor"
multiExtractorType = extractorPackage + "MultiExtractor"
queueKeyExtractorType = "internal.net.queue.extractor.QueueKeyExtractor"
)

// ValueExtractor extracts a value from a given object.
Expand All @@ -33,6 +33,11 @@ type ValueExtractor[T, E any] interface {
Extract(obj T) (E, error)
}

// Comparator allows sorting of result sets.
type Comparator[T any] interface {
Compare(obj T, comparator T) (int, error)
}

type abstractExtractor struct {
Type string `json:"@class,omitempty"`
Name string `json:"name,omitempty"`
Expand All @@ -44,11 +49,22 @@ type SafeComparator[E any] struct {
Comparator *extractorComparator[E] `json:"comparator,omitempty"`
}

// Compare compares two values for sorting. Only used on the server.
// This is an internal type (exported only for serialization purpose).
func (sc *SafeComparator[T]) Compare(_ T, _ T) (int, error) {
return 0, nil
}

type extractorComparator[E any] struct {
Type string `json:"@class,omitempty"`
Extractor *ValueExtractor[any, E] `json:"extractor"`
}

// Compare is an internal type (exported only for serialization purpose).
func (ec *extractorComparator[T]) Compare(_ T, _ T) (int, error) {
return 0, nil
}

// NewSafeComparator returns a new safe Comparator.
// The type parameter is E = the type of value that will be extracted.
func NewSafeComparator[E any](extractor ValueExtractor[any, E], ascending bool) *SafeComparator[E] {
Expand All @@ -60,6 +76,17 @@ func NewSafeComparator[E any](extractor ValueExtractor[any, E], ascending bool)
return &SafeComparator[E]{Type: comparatorType, Comparator: ec}
}

// ExtractorComparator returns a [Comparator] which will compare the extracted value and sort the results
// based upon the value of the ascending parameter.
func ExtractorComparator[E any](extractor ValueExtractor[any, E], ascending bool) Comparator[E] {
comparatorType := safeComparatorType
if !ascending {
comparatorType = inverseComparatorType
}
ec := &extractorComparator[E]{Type: extractorComparatorType, Extractor: &extractor}
return &SafeComparator[E]{Type: comparatorType, Comparator: ec}
}

type universalExtractor[T, E any] struct {
abstractExtractor
}
Expand Down Expand Up @@ -146,13 +173,28 @@ func (ue *chainedExtractor[T, E]) Extract(_ T) (E, error) {
return zeroValue, nil
}

// Compare is an internal type (exported only for serialization purpose).
func (ue *chainedExtractor[T, E]) Compare(_ T, _ T) (int, error) {
return 0, nil
}

// Extract a value from the object.
// This is an internal type (exported only for serialization purpose).
func (ue *compositeExtractor[T, E]) Extract(_ T) (E, error) {
var zeroValue E
return zeroValue, nil
}

// Compare is an internal type (exported only for serialization purpose).
func (ue *compositeExtractor[T, E]) Compare(_ T, _ T) (int, error) {
return 0, nil
}

// Compare is an internal type (exported only for serialization purpose).
func (ue *universalExtractor[T, E]) Compare(_ T, _ T) (int, error) {
return 0, nil
}

// Extract creates a ValueExtractor from an entry's value. If the property
// contains a "." (period), then a chained extractor is created otherwise a
// UniversalExtractor is created. The type parameter is E = type of extracted value.
Expand All @@ -164,6 +206,11 @@ func Extract[E any](property string) (extractor ValueExtractor[any, E]) {
return newUniversalExtractor[any, E](property)
}

// Universal returns a UniversalExtractor.
func Universal[E any](property string) ValueExtractor[any, E] {
return newUniversalExtractor[any, E](property)
}

type identityExtractor[T, E any] struct {
abstractExtractor
}
Expand All @@ -173,6 +220,11 @@ func (ue *identityExtractor[T, E]) Extract(_ T) (E, error) {
return zeroValue, nil
}

// Compare is an internal type (exported only for serialization purpose).
func (ue *identityExtractor[T, E]) Compare(_ T, _ T) (int, error) {
return 0, nil
}

// Identity returns a ValueExtractor that extracts the objects identity or key.
func Identity[V any]() ValueExtractor[any, V] {
ie := &identityExtractor[any, V]{
Expand All @@ -182,21 +234,3 @@ func Identity[V any]() ValueExtractor[any, V] {
}
return ie
}

type queueKeyExtractor[T, E any] struct {
abstractExtractor
}

func (ue *queueKeyExtractor[T, E]) Extract(_ T) (E, error) {
var zeroValue E
return zeroValue, nil
}

func QueueKeyExtractor[V any]() ValueExtractor[any, V] {
ie := &queueKeyExtractor[any, V]{
abstractExtractor: abstractExtractor{
Type: queueKeyExtractorType,
},
}
return ie
}
Loading
Loading