-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(producer): handle drop_if_high_mem
option
#75
Conversation
2dee413
to
8591941
Compare
8591941
to
d31b857
Compare
cc471c1
to
059026a
Compare
…nary before sending
059026a
to
3f774bc
Compare
src/pulsar_producer.erl
Outdated
@@ -85,12 +87,15 @@ | |||
-define(MAX_REQ_ID, 4294836225). | |||
-define(MAX_SEQ_ID, 18445618199572250625). | |||
|
|||
-define(DEFAULT_MAX_INFLIGHT, 1000). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe easier to understand and use if this is to control the max number of in-flight batches, instead of max inflight of calls ?
and by default 100 should be good.
like max_send_ahead for wolff_producer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems to already be the number of inflight messages, despite the name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for Kafka, emqx uses default value max_inflight = 10 in the schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... as it counts individual messages, not calls:
pulsar-client-erl/src/pulsar_producer.erl
Lines 825 to 830 in 1865307
FinalBatchSize = length(FinalBatch), | |
send_batch_payload(FinalBatch, SequenceId, State0), | |
Requests = Requests0#{SequenceId => ?INFLIGHT_REQ(QAckRef, FromsToMessages, FinalBatchSize)}, | |
InflightCalls = InflightCalls0 + FinalBatchSize, | |
pulsar_metrics:inflight_set(State1, InflightCalls), | |
State2 = State1#{requests := Requests, inflight_calls := InflightCalls}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and I tested with 10, and it lead to a lot of queuing, hurting throughput
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... though, I just tested again, and it worked fine with 10 🤔
9befb58
to
8dd54d3
Compare
8dd54d3
to
28cde0e
Compare
28cde0e
to
5e19632
Compare
113405b
to
c0dd39a
Compare
c0dd39a
to
08ca110
Compare
end. | ||
|
||
stop(SockPid) when is_pid(SockPid) -> | ||
link(SockPid), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will cause caller to exit if caller is not trapping exit. but it has to happen, it's the remote-shell process (when trying to manually stop).
Fixes https://emqx.atlassian.net/browse/EMQX-13853