Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add client.enabled property. #721

Merged
merged 2 commits into from
Mar 16, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 5 additions & 17 deletions packages/core/src/Coordinator.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { socketConnector } from './connectors/socket.js';
import { PreAggregator } from './preagg/PreAggregator.js';
import { queryFieldInfo } from './util/field-info.js';
import { QueryResult } from './util/query-result.js';
import { voidLogger } from './util/void-logger.js';
import { MosaicClient } from './MosaicClient.js';
@@ -236,26 +235,12 @@ export class Coordinator {
client.coordinator = this;

// initialize client lifecycle
client._pending = this.initializeClient(client);
client.initialize();

// connect filter selection
connectSelection(this, client.filterBy, client);
}

async initializeClient(client) {
// retrieve field statistics
const fields = client.fields();
if (fields?.length) {
client.fieldInfo(await queryFieldInfo(this, fields));
}

// prepare the client
await client.prepare();

// request data query
return client.requestQuery();
}

/**
* Disconnect a client from the coordinator.
* @param {MosaicClient} client The Mosaic client to disconnect.
@@ -316,7 +301,9 @@ function activateSelection(mc, selection, clause) {
const { preaggregator, filterGroups } = mc;
const { clients } = filterGroups.get(selection);
for (const client of clients) {
preaggregator.request(client, selection, clause);
if (client.enabled) {
preaggregator.request(client, selection, clause);
}
}
}

@@ -332,6 +319,7 @@ function updateSelection(mc, selection) {
const { clients } = filterGroups.get(selection);
const { active } = selection;
return Promise.allSettled(Array.from(clients, client => {
if (!client.enabled) return client.requestQuery();
const info = preaggregator.request(client, selection, active);
const filter = info ? null : selection.predicate(client);

123 changes: 105 additions & 18 deletions packages/core/src/MosaicClient.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,31 @@
import { Coordinator } from './Coordinator.js';
import { Selection } from './Selection.js';
/** @import { Query } from '@uwdata/mosaic-sql' */
/** @import { Coordinator } from './Coordinator.js' */
/** @import { Selection } from './Selection.js' */
import { queryFieldInfo } from './util/field-info.js';
import { throttle } from './util/throttle.js';

/**
* Base class for Mosaic clients.
* A Mosaic client is a data consumer that indicates its data needs to a
* Mosaic coordinator via the query method. The coordinator is responsible
* for issuing queries and returning results to the client.
*
* The client life-cycle consists of connection to a coordinator,
* initialization (potentially involving queries for data schema and summary
* statistic information), and then interactive queries that may be driven by
* an associated selection. When no longer needed, a client should be
* disconnected from the coordinator.
*
* When enabled, a client will initialize and respond to query update requests.
* If disabled, the client will delay initialization and not respond to queries
* until enabled again. Disabling a client can improve system performance when
* associated interface elements are offscreen or disabled.
*/
export class MosaicClient {
/**
* Constructor.
* @param {*} filterSelection An optional selection to interactively filter
* this client's data. If provided, a coordinator will re-query and update
* the client when the selection updates.
* Create a new client instance.
* @param {Selection} [filterSelection] An optional selection to
* interactively filter this client's data. If provided, a coordinator
* will re-query and update the client when the selection updates.
*/
constructor(filterSelection) {
/** @type {Selection} */
@@ -20,6 +35,12 @@ export class MosaicClient {
this._coordinator = null;
/** @type {Promise<any>} */
this._pending = Promise.resolve();
/** @type {boolean} */
this._enabled = true;
/** @type {boolean} */
this._initialized = false;
/** @type {Query | boolean} */
this._request = null;
}

/**
@@ -36,6 +57,33 @@ export class MosaicClient {
this._coordinator = coordinator;
}

/**
* Return this client's enabled state.
*/
get enabled() {
return this._enabled;
}

/**
* Set this client's enabled state;
*/
set enabled(state) {
state = !!state; // ensure boolean
if (this._enabled !== state) {
this._enabled = state;
if (state) {
if (!this._initialized) {
// initialization includes a query request
this.initialize();
} else if (this._request) {
// request query now if requested while disabled
this.requestQuery(this._request === true ? undefined : this._request);
}
this._request = null;
}
}
}

/**
* Return a Promise that resolves once the client has updated.
*/
@@ -122,24 +170,40 @@ export class MosaicClient {

/**
* Request the coordinator to execute a query for this client.
* If an explicit query is not provided, the client query method will
* be called, filtered by the current filterBy selection. This method
* has no effect if the client is not registered with a coordinator.
* If an explicit query is not provided, the client `query` method will
* be called, filtered by the current `filterBy` selection. This method has
* no effect if the client is not connected to a coordinator. If the client
* is connected by currently disabled, the request will be serviced if the
* client is later enabled.
* @param {Query} [query] The query to request. If unspecified, the query
* will be determind by the client's `query` method and the current
* `filterBy` selection state.
* @returns {Promise}
*/
requestQuery(query) {
const q = query || this.query(this.filterBy?.predicate(this));
return this._coordinator?.requestQuery(this, q);
if (this._enabled) {
const q = query || this.query(this.filterBy?.predicate(this));
return this._coordinator?.requestQuery(this, q);
} else {
this._request = query ?? true;
return null;
}
}

/**
* Request that the coordinator perform a throttled update of this client
* using the default query. Unlike requestQuery, for which every call will
* result in an executed query, multiple calls to requestUpdate may be
* consolidated into a single update.
* using the default query. Unlike requestQuery, for which every call results
* in an executed query, multiple calls to requestUpdate may be consolidated
* into a single update. This method has no effect if the client is not
* connected to a coordinator. If the client is connected but currently
* disabled, the request will be serviced if the client is later enabled.
*/
requestUpdate() {
this._requestUpdate();
if (this._enabled) {
this._requestUpdate();
} else {
this.requestQuery();
}
}

/**
@@ -148,8 +212,15 @@ export class MosaicClient {
* registered with a coordinator.
* @returns {Promise}
*/
initialize() {
return this._coordinator?.initializeClient(this);
async initialize() {
if (!this._enabled) {
// clear flag so we initialize when enabled again
this._initialized = false;
} else if (this._coordinator) {
// if connected, let's initialize
this._initialized = true;
this._pending = initialize(this);
}
}

/**
@@ -161,3 +232,19 @@ export class MosaicClient {
return this;
}
}

/**
* Perform client initialization. This method has been broken out so we can
* capture the resulting promise and set it as the client's pending promise.
* @param {MosaicClient} client The Mosaic client to initialize.
* @returns {Promise} A Promise that resolves when initialization completes.
*/
async function initialize(client) {
// retrieve field statistics
const fields = client.fields();
if (fields?.length) {
client.fieldInfo(await queryFieldInfo(client.coordinator, fields));
}
await client.prepare(); // perform custom preparation
return client.requestQuery(); // request data query
}
78 changes: 51 additions & 27 deletions packages/core/src/make-client.js
Original file line number Diff line number Diff line change
@@ -1,64 +1,88 @@
import { MosaicClient } from "./MosaicClient.js";
import {
coordinator as defaultCoordinator,
} from "./Coordinator.js";
/** @import { Coordinator } from './Coordinator.js' */
/** @import { Selection } from './Selection.js' */
import { MosaicClient } from './MosaicClient.js';
import { coordinator as defaultCoordinator } from './Coordinator.js';

/**
* @typedef {Object} MakeClientOptions
* @property {import('./Coordinator.js').Coordinator} [coordinator] - Mosaic coordinator. Default to the global coordinator.
* @property {import('./Selection.js').Selection|null} [selection] - A selection whose predicates will be fed into the query function to produce the SQL query.
* @property {function(): Promise<void>} [prepare] - An async function to prepare the client before running queries.
* @property {function(any): any} query - A function that returns a query from a list of selection predicates.
* @property {function(any): void} [queryResult] - Called by the coordinator to return a query result.
* @property {function(): void} [queryPending] - Called by the coordinator to report a query execution error.
* @property {function(any): void} [queryError] - Called by the coordinator to inform the client that a query is pending.
* @property {Coordinator} [coordinator] Mosaic coordinator.
* Defaults to the global coordinator.
* @property {Selection|null} [selection] A selection whose predicates are
* fed into the query function to produce the SQL query.
* @property {boolean} [enabled] A flag (default `true`) indicating if the
* client should initially be enabled or not.
* @property {function(): Promise<void>} [prepare]
* An async function to prepare the client before running queries.
* @property {function(any): any} [query]
* A function that returns a query from a list of selection predicates.
* @property {function(any): void} [queryResult]
* Called by the coordinator to return a query result.
* @property {function(): void} [queryPending]
* Called by the coordinator to report a query execution error.
* @property {function(any): void} [queryError]
* Called by the coordinator to inform the client that a query is pending.
*/

/** Make a new client with the given options, and connect the client to the provided coordinator.
* @param {MakeClientOptions} options - The options for making the client
* @returns {MosaicClient & { destroy: () => void }} - The result object with methods to request an update or destroy the client.
/**
* Make a new client with the given options, and connect the client to the
* provided coordinator.
* @param {MakeClientOptions} options The options for making the client.
* @returns {MosaicClient & { destroy: () => void }} The resulting client,
* along with a method to destroy the client when no longer needed.
*/
export function makeClient(options) {
const coordinator = options.coordinator ?? defaultCoordinator();
const client = new ProxyClient({ ...options, coordinator });
const {
coordinator = defaultCoordinator(),
...clientOptions
} = options;
const client = new ProxyClient(clientOptions);
coordinator.connect(client);
return client;
}

/** An internal class used to implement the makeClient API */
/**
* An internal class used to implement the makeClient API.
*/
class ProxyClient extends MosaicClient {
/** @param {MakeClientOptions} options */
constructor(options) {
super(options.selection);
/**
* @param {MakeClientOptions} options The options for making the client.
*/
constructor({
selection = undefined,
enabled = true,
...methods
}) {
super(selection);
this.enabled = enabled;

/** @type {MakeClientOptions} */
this._options = { ...options };
this._methods = methods;
}

async prepare() {
await this._options.prepare?.();
await this._methods.prepare?.();
}

query(filter) {
return this._options.query(filter);
return this._methods.query?.(filter) ?? null;
}

queryResult(data) {
this._options.queryResult?.(data);
this._methods.queryResult?.(data);
return this;
}

queryPending() {
this._options.queryPending?.();
this._methods.queryPending?.();
return this;
}

queryError(error) {
this._options.queryError?.(error);
this._methods.queryError?.(error);
return this;
}

destroy() {
this._options.coordinator.disconnect(this);
this.coordinator.disconnect(this);
}
}
55 changes: 55 additions & 0 deletions packages/core/test/client.test.js
Original file line number Diff line number Diff line change
@@ -117,4 +117,59 @@ describe('MosaicClient', () => {
]);
pending = [];
});

it('respects enabled status', async () => {
// instantiate coordinator to use node.js DuckDB
// disable logging and preaggregation
const coord = new Coordinator(nodeConnector(), {
logger: null,
preagg: { enabled: false }
});

let prepared = false;
let queried = false;
let result = null;

class TestClient extends MosaicClient {
constructor() { super(undefined); this.enabled = false; }
prepare() { prepared = true; }
query() { queried = true; return Query.select({ foo: 1 }); }
queryResult(data) { result = data; }
}

// client is disabled, lifecycle methods should defer
const client = new TestClient();
coord.connect(client);
await client.pending;
expect(prepared).toBe(false);
expect(queried).toBe(false);
expect(result).toBe(null);

// enable client, initialization and query should proceed
client.enabled = true;
await client.pending;
expect(prepared).toBe(true);
expect(queried).toBe(true);
expect(result == null).toBe(false);

// clear state and disable client
prepared = false;
queried = false;
result = null;
client.enabled = false;

// request re-initialization, methods should defer
client.initialize();
await client.pending;
expect(prepared).toBe(false);
expect(queried).toBe(false);
expect(result).toBe(null);

// re-enable client, lifecycle methods should proceed
client.enabled = true;
await client.pending;
expect(prepared).toBe(true);
expect(queried).toBe(true);
expect(result == null).toBe(false);
});
});