-
Notifications
You must be signed in to change notification settings - Fork 405
Fix memory leak when fetching metadata for a single topic #1130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
mszabo-wikia
wants to merge
1
commit into
Blizzard:master
Choose a base branch
from
mszabo-wikia:fix-gettopic-leak
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
When requesting metadata for a single topic, Connection::GetMetadata() calls Connection::CreateTopic() to resolve the provided topic name into a Topic, but fails to deallocate it. To reproduce, compile node-rdkafka and librdkafka with ASAN, then run the following: ```js const { KafkaConsumer } = require('.'); const consumer = new KafkaConsumer({ 'group.id': 'kafka', 'metadata.broker.list': 'localhost:9092', }, {}); consumer.connect({ timeout: 2000 }, function (err) { if (err) { console.error('Error connecting to Kafka:', err); return; } consumer.getMetadata({ topic: 'test' }, function (metadataErr, metadata) { if (metadataErr) { console.error('Error fetching metadata:', metadataErr); } else { console.log(`Metadata: ${JSON.stringify(metadata, null, 2)}`); } consumer.disconnect(); }); }) ``` ASAN will report a leak from GetMetadata(): ``` Indirect leak of 1048 byte(s) in 1 object(s) allocated from: #0 0x7f9dd63fa037 in __interceptor_calloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:154 Blizzard#1 0x7f9dab530394 in rd_calloc /node-rdkafka/deps/librdkafka/src/rd.h:134 Blizzard#2 0x7f9dab530394 in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:349 Blizzard#3 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533 Blizzard#4 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114 Blizzard#5 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115 Blizzard#6 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104 Blizzard#7 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198 Blizzard#8 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95 Blizzard#9 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356 Blizzard#10 0x18bb06f in worker ../deps/uv/src/threadpool.c:122 Blizzard#11 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) Indirect leak of 128 byte(s) in 1 object(s) allocated from: #0 0x7f9dd63fa1f8 in __interceptor_realloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:164 Blizzard#1 0x7f9dab6bf9eb in rd_realloc /node-rdkafka/deps/librdkafka/src/rd.h:146 Blizzard#2 0x7f9dab6bf9eb in rd_list_grow /node-rdkafka/deps/librdkafka/src/rdlist.c:49 Blizzard#3 0x7f9dab6bfa9f in rd_list_init /node-rdkafka/deps/librdkafka/src/rdlist.c:57 Blizzard#4 0x7f9dab530dd7 in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:478 Blizzard#5 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533 Blizzard#6 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114 Blizzard#7 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115 Blizzard#8 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104 Blizzard#9 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198 Blizzard#10 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95 Blizzard#11 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356 Blizzard#12 0x18bb06f in worker ../deps/uv/src/threadpool.c:122 Blizzard#13 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) Indirect leak of 32 byte(s) in 1 object(s) allocated from: #0 0x7f9dd63fb647 in operator new(unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:99 Blizzard#1 0x7f9dd1f47743 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:84 Blizzard#2 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115 Blizzard#3 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104 Blizzard#4 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198 Blizzard#5 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95 Blizzard#6 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356 Blizzard#7 0x18bb06f in worker ../deps/uv/src/threadpool.c:122 Blizzard#8 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) Indirect leak of 23 byte(s) in 1 object(s) allocated from: #0 0x7f9dd63f9e8f in __interceptor_malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:145 Blizzard#1 0x7f9dab5303ea in rd_malloc /node-rdkafka/deps/librdkafka/src/rd.h:140 Blizzard#2 0x7f9dab5303ea in rd_kafkap_str_new /node-rdkafka/deps/librdkafka/src/rdkafka_proto.h:315 Blizzard#3 0x7f9dab5303ea in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:353 Blizzard#4 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533 Blizzard#5 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114 Blizzard#6 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115 Blizzard#7 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104 Blizzard#8 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198 Blizzard#9 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95 Blizzard#10 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356 Blizzard#11 0x18bb06f in worker ../deps/uv/src/threadpool.c:122 Blizzard#12 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) Indirect leak of 20 byte(s) in 2 object(s) allocated from: #0 0x7f9dd63a7817 in __interceptor_strdup ../../../../src/libsanitizer/asan/asan_interceptors.cpp:452 Blizzard#1 0x7f9dab537302 in rd_strdup /node-rdkafka/deps/librdkafka/src/rd.h:157 Blizzard#2 0x7f9dab537302 in rd_kafka_anyconf_set_prop0 /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:1827 Blizzard#3 0x7f9dab537c63 in rd_kafka_defaultconf_set /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2273 Blizzard#4 0x7f9dab5394fd in rd_kafka_topic_conf_new /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2293 Blizzard#5 0x7f9dab539e9f in rd_kafka_topic_conf_dup /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2725 Blizzard#6 0x7f9dd1f4794f in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:89 Blizzard#7 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115 Blizzard#8 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104 Blizzard#9 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198 Blizzard#10 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95 Blizzard#11 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356 Blizzard#12 0x18bb06f in worker ../deps/uv/src/threadpool.c:122 Blizzard#13 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) ``` The main issue seems to be that `Baton` does not take ownership of pointers it receives, requiring callers to manually dispose of the data on an ad-hoc basis. So, introduce a new typed RAII wrapper class suitable for wrapping the results of a librdkafka operation, and convert CreateTopic() to return it instead. As a potential followup, other methods that currently return a `Baton` could also be incrementally migrated to the new wrapper to reduce the amount of manual memory management required.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
When requesting metadata for a single topic, Connection::GetMetadata() calls Connection::CreateTopic() to resolve the provided topic name into a Topic, but fails to deallocate it.
To reproduce, compile node-rdkafka and librdkafka with ASAN, then run the following:
ASAN will report a leak from GetMetadata():
The main issue seems to be that
Baton
does not take ownership of pointers it receives, requiring callers to manually dispose of the data on an ad-hoc basis. So, introduce a new typed RAII wrapper class suitable for wrapping the results of a librdkafka operation, and convert CreateTopic() to return it instead.As a potential followup, other methods that currently return a
Baton
could also be incrementally migrated to the new wrapper to reduce the amount of manual memory management required.