Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(producer): handle drop_if_high_mem option #75

Merged
merged 10 commits into from
Feb 28, 2025

Conversation

thalesmg
Copy link
Contributor

@thalesmg thalesmg marked this pull request as draft February 27, 2025 13:57
@thalesmg thalesmg force-pushed the 20250227-oom-drop branch 7 times, most recently from 2dee413 to 8591941 Compare February 27, 2025 18:12
@thalesmg thalesmg marked this pull request as ready for review February 27, 2025 18:28
@thalesmg thalesmg requested a review from zmstone February 27, 2025 18:30
@thalesmg thalesmg force-pushed the 20250227-oom-drop branch 4 times, most recently from cc471c1 to 059026a Compare February 27, 2025 20:04
@@ -85,12 +87,15 @@
-define(MAX_REQ_ID, 4294836225).
-define(MAX_SEQ_ID, 18445618199572250625).

-define(DEFAULT_MAX_INFLIGHT, 1000).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe easier to understand and use if this is to control the max number of in-flight batches, instead of max inflight of calls ?
and by default 100 should be good.
like max_send_ahead for wolff_producer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems to already be the number of inflight messages, despite the name

Copy link
Member

@zmstone zmstone Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for Kafka, emqx uses default value max_inflight = 10 in the schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... as it counts individual messages, not calls:

FinalBatchSize = length(FinalBatch),
send_batch_payload(FinalBatch, SequenceId, State0),
Requests = Requests0#{SequenceId => ?INFLIGHT_REQ(QAckRef, FromsToMessages, FinalBatchSize)},
InflightCalls = InflightCalls0 + FinalBatchSize,
pulsar_metrics:inflight_set(State1, InflightCalls),
State2 = State1#{requests := Requests, inflight_calls := InflightCalls},

Copy link
Contributor Author

@thalesmg thalesmg Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and I tested with 10, and it lead to a lot of queuing, hurting throughput

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... though, I just tested again, and it worked fine with 10 🤔

end.

stop(SockPid) when is_pid(SockPid) ->
link(SockPid),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will cause caller to exit if caller is not trapping exit. but it has to happen, it's the remote-shell process (when trying to manually stop).

@thalesmg thalesmg merged commit 8983b48 into emqx:master Feb 28, 2025
2 checks passed
@thalesmg thalesmg deleted the 20250227-oom-drop branch February 28, 2025 12:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants