diff --git a/examples/README.md b/examples/README.md index 32a248a1d..f79e20d9f 100644 --- a/examples/README.md +++ b/examples/README.md @@ -14,6 +14,7 @@ The scripts in this directory provide various examples of using Confluent's Pyth * [sasl_producer.py](sasl_producer.py): Demonstrates SASL Authentication. * [get_watermark_offsets.py](get_watermark_offsets.py): Consumer method for listing committed offsets and consumer lag for group and topics. * [oauth_producer.py](oauth_producer.py): Demonstrates OAuth Authentication (client credentials). +* [seek_specific_offset_partition.py](seek_specific_offset_partition.py): Demonstrates usage of seek method to fetch specfic offset messages from specific partition for consumer. Additional examples for [Confluent Cloud](https://www.confluent.io/confluent-cloud/): diff --git a/examples/seek_specific_offset_partition.py b/examples/seek_specific_offset_partition.py new file mode 100644 index 000000000..e95bac185 --- /dev/null +++ b/examples/seek_specific_offset_partition.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +# +# Copyright 2020 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# A simple example demonstrating use of seek method to seek messages from a specificied offset to a specified offset for a partition in a topic. +# Conventionally, using the seek method with while loop results in some messages being skipped. +# Proposed approach of reading a single message first and then reading the remaining messages, solves the issue. +# + + +import logging +import argparse +from confluent_kafka import Consumer,TopicPartition +from confluent_kafka.error import KafkaException + + +# TODO: MAX OFFSET TO EXTRACT AT A TIME + + +def fetch_partition_list(kafka_consumer:Consumer, topic:str): + """ + Parameters + ---------- + kafka_consumer : kafka_consumer object of cimpl module + defined consumer for ex: kafka_consumer = kafka_consumer({'bootstrap.servers': "servers"}) + topic : str + Name of the topic on Kafka Server to subscribe to. + Ex: "Food" + + Returns + ------- + partition_list : list + list of partitions in the topic. + topic_partition: list + list of topic partition objects (to iterate or multi-thread over) + partition_offset: list + list of list having partition,[minimum, max offset] of each partition + + Operation + ---------- + c1=kafka_consumer.list_topics() #Admin Cluster metadata + c2=kafka_consumer.list_topics().topics #Dictionary with "topic_name":topic_matadata as key value pair + c3=kafka_consumer.list_topics().topics.get(topic) # Get Specific "topic" metadata + c4=kafka_consumer.list_topics().topics.get(topic).partitions # Dictionary with "partition":"partition_metadata" as key value pair + c5=list(kafka_consumer.list_topics().topics.get(topic).partitions.keys()) #List all keys (partitions) of above dictionary + + """ + try: + kafka_consumer.subscribe([topic]) + except KafkaException as exc: + logging.error("Unable to subscribe to topic {}.\n Exception: {}".format(str(topic),str(exc))) + return [],[],[] + + try: + partition_list=list(kafka_consumer.list_topics().topics.get(topic).partitions.keys()) + topic_partition=[TopicPartition(topic,partition) for partition in partition_list] + + partition_offset=[[ele1,(kafka_consumer.get_watermark_offsets(ele2))] for ele1,ele2 + in zip(partition_list,topic_partition)] + except KafkaException as exc: + logging.error("Unable to extract offset list from topic {}.\n Exception: {}".format(str(topic),str(exc))) + return [],[],[] + return partition_list,topic_partition,partition_offset + + + + +def fetch_message_partition(consumer_conf,topic,partition,min_offset,max_offset): + """ + Returns list of messages and [offset,partition] for min,max offsets for a given partition and topic. + + Parameters + ---------- + consumer_conf: json + Kafka server Configurations to be used for creating consumer/producer object. + topic : str, + topic of Kafka server from which messages to be consumed via consumer. + partition : int + Partition of topic for which data is to be fetched. + min_offset : int + Start offset from which data is to be fetched from Partition (if available). + Else from lowest available offset of the partition. + max_offset : int + End offset till which data is to be fetched from Partition (if available). + Else till highest available offset of the partition. + + Returns + ------- + total_messages : list + list of messages from min offset to max offset in the partition of the topic. + total_partition_offest : list of lists + list of lists having [offset,partition] for the corresponding index of messge in total_messages list. + + Operation + ---------- + 1) Find the min/max available offset in the partition of the topic. + 2) Check if required min/max offset in range of available min/max offset, else assign required min/max to available min/max accordingly. + 3) Seek the consumer to the required min offset. + 4) Read a single message from consumer and store the message and [partition,offset] + 5) Read the remaining number (min-max) of messages from the consumer and store the messages and list of [partition,offset] + + """ + total_messages=[] + total_partition_offest=[] + consumer = Consumer(consumer_conf) + available_min_offset,available_max_offset=consumer.get_watermark_offsets(TopicPartition(topic,partition)) + + #Check availabel min and max offset for the partition of the topic + if min_offset>max_offset: + logging.info("Provided minimum offset: {} greater than Provided max offset:{} in partition:{} Topic:{}".format(str(min_offset), + str(max_offset),str(partition),str(topic))) + return [],[] + if min_offset< available_min_offset: + logging.info("Minimum Offset: {} less than available minimum offset: {} in partition:{} Topic:{}".format(str(min_offset), + str(available_min_offset),str(partition),str(topic))) + min_offset=available_min_offset + if max_offset> available_max_offset: + logging.info("Maximum Offset: {} greater than available maximum offset: {} in partition:{} Topic:{}".format(str(max_offset), + str(available_max_offset),str(partition),str(topic))) + max_offset=available_max_offset + + #Seeking the pointer to set to read message from the min offset + try: + partition1=TopicPartition(topic,partition,min_offset) + consumer.assign([partition1]) + consumer.seek( partition1) + start_offset=min_offset + except Exception as exc: + logging.error("Unable to seek consumer to Topic:{} Partition:{} and Offset:{}.\nException:{}".format( + str(topic),str(partition),str(min_offset),str(exc))) + + # Reading only the 1st message along with offset + try: + message=None + while message==None: + message=consumer.poll() + start_offset=message.offset() + break + total_messages.append(str(message.value())) + total_partition_offest.append([message.offset(),message.partition()]) + # Reading the messages after the 1st message + while start_offset0: + total_messages_output=[[ele1,ele2[0],ele2[1]] for ele1,ele2 in zip(total_messages,total_partition_offest)] + # Saving the message along with offset and partition as txt file + with open("file.txt", "w") as output: + output.write(str(total_messages_output)) + else: + logging.error("Partition {} not in consumer.".format(str(partition))) + + consumer.close() + +if __name__ == '__main__': + + parser = argparse.ArgumentParser(description="JSONDeserializer example") + parser.add_argument('-b', dest="bootstrap_servers", required=True, + help="Bootstrap broker(s) (host[:port])") + parser.add_argument('-g', dest="group", default="example_serde_json", + help="Consumer group") + parser.add_argument('-t', dest="topic", default="topic_z", + help="Topic name") + parser.add_argument('-p', dest="partition", default="partition", + help="Partition of topic to fetch data from") + parser.add_argument('-sof', dest="start_offset", required=True, + help="Start Offset for Partition p") + parser.add_argument('-eof', dest="end_offset", required=True, + help="End Offset for Partition p") + + main(parser.parse_args()) + + +