-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add reactive programming and major optimizations #25
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
In this commit, the codebase is restructured to move the event-driven implementations to the 'events' module. This reorganization sets the stage for the upcoming integration of reactive programming features. The objective is to segregate event handling and reactive programming functionalities into separate modules aligned with their specific domains, enhancing code clarity and facilitating future development.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
dependencies
Pull requests that update a dependency
enhancement
Suggests an improvement or enhancement to an existing feature
feature
New feature or request
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Initial Checks
Description
This pull request introduces significant improvements to Pyventus, including the addition of the reactive programming paradigm and major optimizations to the previous event-driven features.
Breaking Changes
The
typing-extensions
package is now a required dependency for Pyventus. This dependency has been added to support advanced typing features in older versions of Python.All previous event-driven features must now be imported from the inner package
pyventus.events
instead of directly frompyventus
. These changes were necessary due to the integration of the reactive programming paradigm. This series of refactors were implemented not only to ensure an organized codebase but also to establish a clear boundary between event handling and reactive programming features, enabling optimized imports based on the required paradigm.The inheritance structure of the
EventEmitter
has been replaced with composition using the newProcessingService
interface. This interface establishes a common ground for processing calls, such as event emissions, in a decoupled and flexible manner. TheEventEmitter
is now a concrete class that requires aProcessingService
instance, referred to asevent_processor
, for initialization. These changes not only preserve the overall behavior and workflow of theEventEmitter
but also enhance its modularity and flexibility.The
EventLinker
class has experienced multiple method renames and return type modifications to align with the new redesigned codebase, enhancing the API for improved usability and intuitiveness. Below is a detailed list of the breaking changes made to theEventLinker
class:EventLinkageWrapper
was renamed toEventLinkerSubCtx
for consistency with the new concept of a subscription context and now extends from the base classSubscriptionContext
. The overall workflow remains roughly the same but was reworked to align with its base class and introduce optimizations based on user needs.get_events()
now returns aset
of all registered events instead of alist
with non-duplicated events.get_event_handlers()
was renamed toget_subscribers()
for consistency, and it now returns aset
of all registered subscribers instead of alist
with non-duplicated subscribers.get_events_by_event_handler()
was renamed toget_events_from_subscribers()
for consistency. It now returns aset
of events associated with the specified subscribers instead of alist
of non-duplicated events associated with the specified subscribers. This method now also supports retrieving events from multiple subscribers instead of only one at a time.get_event_handlers_by_events()
was renamed toget_subscribers_from_events()
for consistency. It now returns aset
of subscribers associated with the specified events instead of alist
of non-duplicated subscribers associated with the provided events. Additionally, a new flag calledpop_onetime_subscribers
was added to remove and return those subscribers that are one-time subscriptions.unsubscribe()
was renamed toremove()
for consistency with the new concept of subscription and the encapsulation of the unsubscription process through theteardown_callback
. This method now allows you to remove one event and subscriber from the registry at a time instead of multiple events of the given subscriber.remove_event_handler()
was renamed toremove_subscriber()
for consistency.event_handler
were renamed tosubscriber
for consistency.The
EventHandler
has been refactored from a class to an interface, outlining the overall workflow and essential protocols for event handling. However, the previous implementation of theEventHandler
has been transitioned to one of its concrete classes namedEventSubscriber
. This newEventSubscriber
class not only implements theEventHandler
interface but also combines it with theSubscription
base class, providing a convenient way to both handle event responses and manage the subscription lifecycle.Added
Added a reactive programming module to expand Python's event-driven capabilities for data-oriented processes and the ability to react efficiently to state changes over time.
Observable
base class, which defines a lazy push-style notification mechanism for streaming data to subscribers.ObservableTask
class, an observable subclass that encapsulates a unit of work and offers a mechanism for streaming its results reactively.as_observable_task()
decorator to easily convert any given callable into an observable task.Observer
interface, which defines the overall workflow and essential protocols for responding to notifications from an observable.Subscriber
class, which combines theObserver
interface with theSubscription
base class to provide a convenient way to respond to state changes emitted by an observable and manage the subscription lifecycle.Added the
Unsubscribable
interface, which provides a standardized method for objects to unsubscribe from a source and release any associated resources.Added the
Subscription
base class to simplify subscription management and resource cleanup with ateardown_callback
that is called during unsubscription.Added the
SubscriptionContext
base class, which defines the overall workflow for subscription contexts, allowing the user to define step-by-step the object that will later be subscribed to the specified source.Added a new package global exception called
PyventusImportException
, which is a custom Pyventus exception for handling missing imports within the library.Added the
MultiBidict
data structure, a generic multikeyed, multivalued bidirectional dictionary that offers a flexible mapping structure for efficient lookups, updates, and deletions of keys and their corresponding values.Introduced the
ProcessingService
interface to define flexible execution strategies for various use cases. This release includes the following concrete implementations:AsyncIOProcessingService
: A processing service that utilizes theAsyncIO
framework to handle the execution of calls.CeleryProcessingService
: A processing service that utilizes theCelery
framework to handle the execution of calls.ExecutorProcessingService
: A processing service that utilizes the Python'sExecutor
to handle the execution of calls.FastAPIProcessingService
: A processing service that uses FastAPI'sBackgroundTasks
to handle the execution of calls.RedisProcessingService
: A processing service that utilizes theRedis Queue
framework to handle the execution of calls.Added the
CallableWrapper
class to encapsulate callables and provide a unified asynchronous interface for their execution.Added new features to the
EventLinker
class, including the following methods:get_valid_subscriber()
method for a centralized mechanism to validate event subscribers.is_empty()
method to efficiently check if the main registry is empty.get_event_count()
method to return the total number of events in the registry.get_subscriber_count()
method to return the total number of subscribers in the registry.get_event_count_from_subscriber()
method to return the number of events for a specific subscriber.get_subscriber_count_from_event()
method to return the number of subscribers for a specific event.contains_event()
method to check if a specific event is present in the registry.contains_subscriber()
method to check if a specific subscriber is present in the registry.are_linked()
method to determine if a specific event is linked to a given subscriber.stateful_subctx
parameter in theonce()
andon()
methods to configure theEventLinkerSubCtx
behavior and optimize the subscription context based on user needs.Introduced the
EventSubscriber
class, which combines event handling capabilities with subscription lifecycle management.A new
benchmarks
package has been added to thetests
directory for performance evaluation of Pyventus. This release introduces theEventEmitterBenchmark
, which measures the efficiency of theEventEmitter
in handling event emissions.Added a set of utilities for creating preconfigured event emitter instances, making the setup process easier. These utilities also provide retro compatibility with the previous class-based design.
Changed
Enhanced the
emit()
method in theEventEmitter
to support the emission of global events (...
).Moved callback utilities to a dedicated module within
pyventus.core
for improved organization and reusability.Standardized the structure of Python classes and their representation, including the use of the
@override()
decorator for consistency andmypy
static type checking.Standardized the structure of the
pyproject.toml
file.Enhanced Pyventus logs by adding process and thread IDs for better debugging.
Switched from the
Black
formatter toRuff
for improved development efficiency and enhanced code quality.Refactored all project docstrings to follow a standardized format, enhancing consistency and clarity in the documentation.
Upgraded several development dependencies in the
pyproject.toml
, includingpytest-asyncio
from version0.21.0
to0.24.0
, to enable global configuration of theasyncio_mode
.Simplified the
EventCallbackType
type alias by removing the unnecessaryParamSpec
.Refactored the test suite to improve validation across all package features, ensuring correctness and achieving 100% code coverage.
Optimized
The time complexity of the$O(E \cdot S^2)$ to $O(E \cdot S)$ , where:
emit()
method in theEventEmitter
class has been significantly optimized. It has been reduced fromThis optimization results from the more efficient management of one-time subscribers by the event linker during event emission. Instead of traversing the entire event linker registry to remove each one-time subscriber involved in the event emission, it now iterates solely through the linked events.
Major optimizations have been implemented for the
EventLinker
class through the integration of theMultiBidict
data structure. This data structure is a multikeyed, multivalued bidirectional dictionary implementation that enables efficient lookups, updates, and deletions of events and their corresponding subscribers. Despite utilizing a bidirectional mapping structure, its memory footprint remains minimal due to the use of references between keys and values instead of duplication, which limits the impact to the additional dictionary and set data structures.The time complexity of the method$O(E \cdot S)$ to $O(S)$ , where:
get_subscribers()
, previously known asget_event_handlers()
, has been reduced fromThe time complexity of the$O(E \cdot S)$ to $O(E)$ , where:
get_events_from_subscribers()
, previously known asget_events_by_event_handler()
, has been reduced fromThe time complexity of the method$O(S)$ to a constant time complexity of $O(1)$ , where:
remove()
, previously known asunsubscribe()
, has been enhanced fromThe time complexity of the method$O(E \cdot S)$ to $O(E)$ , where:
remove_subscriber()
, previously known asremove_event_handler()
, has been reduced fromIntroduced
__slots__
in several classes to optimize memory usage and enhance attribute access speed.Benchmarks
Finally, to provide a quick visualization of the overall improvements and illustrate the time complexity enhancements of this upcoming release, a series of benchmarks were conducted. These benchmarks were specifically designed for the event emission process, as it encompasses all event-driven features and provides a clear overview of the improvements. Following this, a detailed explanation of the benchmarks and their results is presented.
Methodology: The benchmarks utilized a volume testing approach to assess how different subscription counts affect the event emission time.
Environment: The benchmarks were conducted in the following environment:
Setup: The benchmark setup consisted of two key components: a main Python script that managed the overall workflow of the benchmarks and the
EventEmitterBenchmark
class, which was essential for standardizing and organizing the performance tests. Following this, the main workflow of the benchmarks will be outlined, including the required packages and the adjustable settings available for different tests.Benchmark Workflow and Settings (
main.py
)Before running the benchmarks, make sure all necessary packages are installed. You can install them using the following
pip
command:pip install pyventus[tests]
Additionally, an extra package is required for visualizing the results:
matplotlib
. You can also install it with apip
command as follows:pip install matplotlib
Once the packages are installed, you can configure the benchmarks in the
main.py
script as needed and execute it. The script will first install Pyventusv0.6.0
, run the benchmarks in a separate process, and then uninstall it. Next, it will install the current version of Pyventus and run the benchmarks again in another process. Finally, the results for each version will be saved in aJSON
file, and plots will be generated based on the benchmark reports.As shown in the previous script, the
EventEmitterBenchmark
class provides several parameters that can be configured to tailor the performance tests for various scenarios. This flexibility enables a comprehensive evaluation of the event emission process. Below are the key configurations available in theEventEmitterBenchmark
class:Event Subscription Mode: This parameter defines how event subscriptions behave during the benchmark.
One-Time Subscription Mode: This parameter defines how one-time subscriptions are handled in the benchmark.
once
property is always set toFalse
.once
property, allowing for a mix of one-time and regular subscriptions.once
property is always set toTrue
.Subscription Sizes: This parameter specifies a list of varying subscription sizes used in the benchmark.
A mathematical formula is employed to proportionally subdivide the number of events and subscribers based on the specified subscription size.
Note that depending on the selected event subscription mode, the number of subscribers per event may exceed the calculated proportion. For instance, if the event subscription mode is set to
ALL
, each subscriber will be registered across all event sets, resulting in each event set's length being equal to the subscription size.Number of Repeats: This parameter indicates how many times the benchmark is repeated.
Number of Executions: This parameter represents the total number of executions performed during the benchmark.
Metrics Collected: The main metric used to assess the efficiency of the event emission process during the performance tests is the Event Emission Time. This metric tracks the time (in seconds) that it takes to complete the event emission process for each subscription size.
Calculation Method: To determine the event emission time for a given subscription size, the event emission process is executed multiple times (as specified by
num_executions
) and repeated for a number of iterations (as declared bynum_repeats
). The elapsed time for each execution is recorded, and the median of these recorded times is calculated to establish the event emission time for that repetition, which helps reduce the impact of outliers. Once all repetitions are completed, the final event emission time is determined by calculating the mean of the medians from each repetition. Additionally, to minimize timing noise, the garbage collector is disabled during the tests, and theperf_counter
function is used for accurate measurements.Results:
Event Emitter Benchmark 01: This benchmark was configured with the event subscription mode set to Single and the one-time subscription mode set to None. The test was repeated 5 times, with 1,250 executions for each benchmark repetition.
Benchmark Report (
JSON
format)pyventus_v060_eeb_report.json
pyventus_v070_eeb_report.json
v0.6.0
v0.7.0
Event Emitter Benchmark 02: This benchmark was configured with the event subscription mode set to Single and the one-time subscription mode set to All. The test was repeated 5 times, with 1,250 executions for each benchmark repetition.
Benchmark Report (
JSON
format)pyventus_v060_eeb_report.json
pyventus_v070_eeb_report.json
v0.6.0
v0.7.0
Event Emitter Benchmark 03: This benchmark was configured with the event subscription mode set to All and the one-time subscription mode set to All. The test was repeated 5 times, with 1,250 executions for each benchmark repetition.
Benchmark Report (
JSON
format)pyventus_v060_eeb_report.json
pyventus_v070_eeb_report.json
v0.6.0
v0.7.0
Event Emitter Benchmark 04: This benchmark was configured with both the event and one-time subscription modes set to Random. The test was repeated 3 times, with 500 executions for each benchmark repetition.
Benchmark Report (
JSON
format)pyventus_v060_eeb_report.json
pyventus_v070_eeb_report.json
v0.6.0
v0.7.0
As shown in the previous tables, reports, and charts, the improvements made to the event-driven features are significant, especially when one-time subscribers are involved and randomized. It's important to note that these optimizations apply not only to the event emission process but also to other components, such as the
EventLinker
.