Skip to content

Fix memory leak when fetching metadata for a single topic #1130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

mszabo-wikia
Copy link

When requesting metadata for a single topic, Connection::GetMetadata() calls Connection::CreateTopic() to resolve the provided topic name into a Topic, but fails to deallocate it.

To reproduce, compile node-rdkafka and librdkafka with ASAN, then run the following:

const { KafkaConsumer } = require('.');

const consumer = new KafkaConsumer({
  'group.id': 'kafka',
  'metadata.broker.list': 'localhost:9092',
}, {});

consumer.connect({ timeout: 2000 }, function (err) {
  if (err) {
    console.error('Error connecting to Kafka:', err);
    return;
  }

  consumer.getMetadata({ topic: 'test' }, function (metadataErr, metadata) {
    if (metadataErr) {
      console.error('Error fetching metadata:', metadataErr);
    } else {
      console.log(`Metadata: ${JSON.stringify(metadata, null, 2)}`);
    }

    consumer.disconnect();
  });
})

ASAN will report a leak from GetMetadata():

Indirect leak of 1048 byte(s) in 1 object(s) allocated from:
    #0 0x7f9dd63fa037 in __interceptor_calloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:154
    #1 0x7f9dab530394 in rd_calloc /node-rdkafka/deps/librdkafka/src/rd.h:134
    #2 0x7f9dab530394 in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:349
    #3 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533
    #4 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114
    #5 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115
    #6 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104
    #7 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198
    #8 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95
    #9 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356
    #10 0x18bb06f in worker ../deps/uv/src/threadpool.c:122
    #11 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)

Indirect leak of 128 byte(s) in 1 object(s) allocated from:
    #0 0x7f9dd63fa1f8 in __interceptor_realloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:164
    #1 0x7f9dab6bf9eb in rd_realloc /node-rdkafka/deps/librdkafka/src/rd.h:146
    #2 0x7f9dab6bf9eb in rd_list_grow /node-rdkafka/deps/librdkafka/src/rdlist.c:49
    #3 0x7f9dab6bfa9f in rd_list_init /node-rdkafka/deps/librdkafka/src/rdlist.c:57
    #4 0x7f9dab530dd7 in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:478
    #5 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533
    #6 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114
    #7 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115
    #8 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104
    #9 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198
    #10 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95
    #11 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356
    #12 0x18bb06f in worker ../deps/uv/src/threadpool.c:122
    #13 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)

Indirect leak of 32 byte(s) in 1 object(s) allocated from:
    #0 0x7f9dd63fb647 in operator new(unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:99
    #1 0x7f9dd1f47743 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:84
    #2 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115
    #3 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104
    #4 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198
    #5 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95
    #6 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356
    #7 0x18bb06f in worker ../deps/uv/src/threadpool.c:122
    #8 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)

Indirect leak of 23 byte(s) in 1 object(s) allocated from:
    #0 0x7f9dd63f9e8f in __interceptor_malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:145
    #1 0x7f9dab5303ea in rd_malloc /node-rdkafka/deps/librdkafka/src/rd.h:140
    #2 0x7f9dab5303ea in rd_kafkap_str_new /node-rdkafka/deps/librdkafka/src/rdkafka_proto.h:315
    #3 0x7f9dab5303ea in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:353
    #4 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533
    #5 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114
    #6 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115
    #7 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104
    #8 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198
    #9 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95
    #10 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356
    #11 0x18bb06f in worker ../deps/uv/src/threadpool.c:122
    #12 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)

Indirect leak of 20 byte(s) in 2 object(s) allocated from:
    #0 0x7f9dd63a7817 in __interceptor_strdup ../../../../src/libsanitizer/asan/asan_interceptors.cpp:452
    #1 0x7f9dab537302 in rd_strdup /node-rdkafka/deps/librdkafka/src/rd.h:157
    #2 0x7f9dab537302 in rd_kafka_anyconf_set_prop0 /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:1827
    #3 0x7f9dab537c63 in rd_kafka_defaultconf_set /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2273
    #4 0x7f9dab5394fd in rd_kafka_topic_conf_new /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2293
    #5 0x7f9dab539e9f in rd_kafka_topic_conf_dup /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2725
    #6 0x7f9dd1f4794f in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:89
    #7 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115
    #8 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104
    #9 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198
    #10 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95
    #11 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356
    #12 0x18bb06f in worker ../deps/uv/src/threadpool.c:122
    #13 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)

The main issue seems to be that Baton does not take ownership of pointers it receives, requiring callers to manually dispose of the data on an ad-hoc basis. So, introduce a new typed RAII wrapper class suitable for wrapping the results of a librdkafka operation, and convert CreateTopic() to return it instead.

As a potential followup, other methods that currently return a Baton could also be incrementally migrated to the new wrapper to reduce the amount of manual memory management required.

When requesting metadata for a single topic, Connection::GetMetadata()
calls Connection::CreateTopic() to resolve the provided topic name into
a Topic, but fails to deallocate it.

To reproduce, compile node-rdkafka and librdkafka with ASAN, then run
the following:

```js
const { KafkaConsumer } = require('.');

const consumer = new KafkaConsumer({
  'group.id': 'kafka',
  'metadata.broker.list': 'localhost:9092',
}, {});

consumer.connect({ timeout: 2000 }, function (err) {
  if (err) {
    console.error('Error connecting to Kafka:', err);
    return;
  }

  consumer.getMetadata({ topic: 'test' }, function (metadataErr, metadata) {
    if (metadataErr) {
      console.error('Error fetching metadata:', metadataErr);
    } else {
      console.log(`Metadata: ${JSON.stringify(metadata, null, 2)}`);
    }

    consumer.disconnect();
  });
})
```

ASAN will report a leak from GetMetadata():
```
Indirect leak of 1048 byte(s) in 1 object(s) allocated from:
    #0 0x7f9dd63fa037 in __interceptor_calloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:154
    Blizzard#1 0x7f9dab530394 in rd_calloc /node-rdkafka/deps/librdkafka/src/rd.h:134
    Blizzard#2 0x7f9dab530394 in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:349
    Blizzard#3 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533
    Blizzard#4 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114
    Blizzard#5 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115
    Blizzard#6 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104
    Blizzard#7 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198
    Blizzard#8 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95
    Blizzard#9 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356
    Blizzard#10 0x18bb06f in worker ../deps/uv/src/threadpool.c:122
    Blizzard#11 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)

Indirect leak of 128 byte(s) in 1 object(s) allocated from:
    #0 0x7f9dd63fa1f8 in __interceptor_realloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:164
    Blizzard#1 0x7f9dab6bf9eb in rd_realloc /node-rdkafka/deps/librdkafka/src/rd.h:146
    Blizzard#2 0x7f9dab6bf9eb in rd_list_grow /node-rdkafka/deps/librdkafka/src/rdlist.c:49
    Blizzard#3 0x7f9dab6bfa9f in rd_list_init /node-rdkafka/deps/librdkafka/src/rdlist.c:57
    Blizzard#4 0x7f9dab530dd7 in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:478
    Blizzard#5 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533
    Blizzard#6 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114
    Blizzard#7 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115
    Blizzard#8 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104
    Blizzard#9 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198
    Blizzard#10 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95
    Blizzard#11 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356
    Blizzard#12 0x18bb06f in worker ../deps/uv/src/threadpool.c:122
    Blizzard#13 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)

Indirect leak of 32 byte(s) in 1 object(s) allocated from:
    #0 0x7f9dd63fb647 in operator new(unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:99
    Blizzard#1 0x7f9dd1f47743 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:84
    Blizzard#2 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115
    Blizzard#3 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104
    Blizzard#4 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198
    Blizzard#5 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95
    Blizzard#6 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356
    Blizzard#7 0x18bb06f in worker ../deps/uv/src/threadpool.c:122
    Blizzard#8 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)

Indirect leak of 23 byte(s) in 1 object(s) allocated from:
    #0 0x7f9dd63f9e8f in __interceptor_malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:145
    Blizzard#1 0x7f9dab5303ea in rd_malloc /node-rdkafka/deps/librdkafka/src/rd.h:140
    Blizzard#2 0x7f9dab5303ea in rd_kafkap_str_new /node-rdkafka/deps/librdkafka/src/rdkafka_proto.h:315
    Blizzard#3 0x7f9dab5303ea in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:353
    Blizzard#4 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533
    Blizzard#5 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114
    Blizzard#6 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115
    Blizzard#7 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104
    Blizzard#8 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198
    Blizzard#9 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95
    Blizzard#10 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356
    Blizzard#11 0x18bb06f in worker ../deps/uv/src/threadpool.c:122
    Blizzard#12 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)

Indirect leak of 20 byte(s) in 2 object(s) allocated from:
    #0 0x7f9dd63a7817 in __interceptor_strdup ../../../../src/libsanitizer/asan/asan_interceptors.cpp:452
    Blizzard#1 0x7f9dab537302 in rd_strdup /node-rdkafka/deps/librdkafka/src/rd.h:157
    Blizzard#2 0x7f9dab537302 in rd_kafka_anyconf_set_prop0 /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:1827
    Blizzard#3 0x7f9dab537c63 in rd_kafka_defaultconf_set /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2273
    Blizzard#4 0x7f9dab5394fd in rd_kafka_topic_conf_new /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2293
    Blizzard#5 0x7f9dab539e9f in rd_kafka_topic_conf_dup /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2725
    Blizzard#6 0x7f9dd1f4794f in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:89
    Blizzard#7 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115
    Blizzard#8 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104
    Blizzard#9 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198
    Blizzard#10 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95
    Blizzard#11 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356
    Blizzard#12 0x18bb06f in worker ../deps/uv/src/threadpool.c:122
    Blizzard#13 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6)
```

The main issue seems to be that `Baton` does not take ownership of
pointers it receives, requiring callers to manually dispose of the data
on an ad-hoc basis. So, introduce a new typed RAII wrapper class
suitable for wrapping the results of a librdkafka operation, and convert
CreateTopic() to return it instead.

As a potential followup, other methods that currently return a `Baton`
could also be incrementally migrated to the new wrapper to reduce the
amount of manual memory management required.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant