API
Raw Bytes
BytesConsumer
Bases: AbstractConsumer
Consumer implementation of extended interface for raw messages.
Source code in wunderkafka/consumers/bytes.py
__init__(config, sasl_watchdog=None)
Init consumer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
ConsumerConfig
|
Pydantic BaseSettings model with librdkafka consumer's configuration. |
required |
sasl_watchdog
|
Optional[Watchdog]
|
Callable to handle the global state of kerberos auth (see Watchdog). |
None
|
Source code in wunderkafka/consumers/bytes.py
__str__()
Get human-readable representation of consumer.
Returns:
Type | Description |
---|---|
str
|
string with consumer gid. |
subscribe(topics, *, from_beginning=None, offset=None, ts=None, with_timedelta=None)
Subscribe to a list of topics, or a list of specific TopicSubscriptions.
This method overrides original subscribe()
method of confluent-kafka.Consumer
and allows to subscribe
to topic via specific offset or timestamp.
.. warning:: Currently this method doesn't allow to pass callbacks and uses it's own to reset partitions.
Source code in wunderkafka/consumers/bytes.py
BytesProducer
Bases: AbstractProducer
Producer implementation of extended interface for raw messages.
Source code in wunderkafka/producers/bytes.py
config
property
Get the producer's config.
Returns:
Type | Description |
---|---|
ProducerConfig
|
Pydantic model with librdkafka producer's configuration. |
__init__(config, sasl_watchdog=None)
Init producer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
ProducerConfig
|
Pydantic model with librdkafka producer's configuration. |
required |
sasl_watchdog
|
Optional[Watchdog]
|
Callable to handle global state of kerberos auth (see Watchdog). |
None
|
Source code in wunderkafka/producers/bytes.py
Schemaless
SchemaLessJSONStringConsumer
Bases: HighLevelDeserializingConsumer
Kafka Consumer client to get JSON-serialized messages without any schema.
Source code in wunderkafka/factories/schemaless.py
__init__(config)
Init consumer from pre-defined blocks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
ConsumerConfig
|
Configuration for: - Librdkafka consumer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config. |
required |
Source code in wunderkafka/factories/schemaless.py
SchemaLessJSONStringProducer
Bases: HighLevelSerializingProducer
Kafka Producer client to serialize and send any value as JSON without any schema.
Source code in wunderkafka/factories/schemaless.py
__init__(mapping, config)
Init producer from pre-defined blocks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mapping
|
Optional[Dict[TopicName, MessageDescription]]
|
Topic-to-Schemas mapping. Mapping's value should contain at least message's value schema to be used for serialization. |
required |
config
|
ProducerConfig
|
Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config. |
required |
Source code in wunderkafka/factories/schemaless.py
SchemaLessJSONModelStringProducer
Bases: HighLevelSerializingProducer
Kafka Producer client to serialize and send any instance of pydantic model as JSON without any schema.
Source code in wunderkafka/factories/schemaless.py
__init__(mapping, config)
Init producer from pre-defined blocks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mapping
|
Optional[Dict[TopicName, MessageDescription]]
|
Topic-to-Schemas mapping. Mapping's value should contain at least message's value schema to be used for serialization. |
required |
config
|
ProducerConfig
|
Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config. |
required |
Source code in wunderkafka/factories/schemaless.py
Avro
AvroConsumer
Bases: HighLevelDeserializingConsumer
Kafka Consumer client to get AVRO-serialized messages from Confluent/Cloudera installation.
Source code in wunderkafka/factories/avro.py
__init__(config, *, sr_client=None)
Init consumer from pre-defined blocks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
ConsumerConfig
|
Configuration for: - Librdkafka consumer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config. |
required |
sr_client
|
Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]]
|
Client for schema registry |
None
|
Raises:
Type | Description |
---|---|
ValueError
|
If schema registry configuration is missing. |
Source code in wunderkafka/factories/avro.py
AvroProducer
Bases: HighLevelSerializingProducer
Kafka Producer client to serialize and send dictionaries or built-in types as messages.
Source code in wunderkafka/factories/avro.py
__init__(mapping, config, *, sr_client=None, protocol_id=1)
Init producer from pre-defined blocks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mapping
|
Optional[Dict[TopicName, MessageDescription]]
|
Topic-to-Schemas mapping. Mapping's value should contain at least message's value schema to be used for serialization. |
required |
config
|
ProducerConfig
|
Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config. |
required |
sr_client
|
Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]]
|
Client for schema registry |
None
|
protocol_id
|
int
|
Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.) |
1
|
Raises:
Type | Description |
---|---|
ValueError
|
If schema registry configuration is missing. |
Source code in wunderkafka/factories/avro.py
AvroModelProducer
Bases: HighLevelSerializingProducer
Kafka Producer client to serialize and send models or dataclasses as messages.
Source code in wunderkafka/factories/avro.py
__init__(mapping, config, *, sr_client=None, protocol_id=1)
Init producer from pre-defined blocks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mapping
|
Optional[Dict[TopicName, MessageDescription]]
|
Topic-to-Schemas mapping. Mapping's value should contain at least message's value model to derive schema which will be used for serialization. |
required |
config
|
ProducerConfig
|
Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config. |
required |
sr_client
|
Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]]
|
Client for schema registry |
None
|
protocol_id
|
int
|
Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.) |
1
|
Raises:
Type | Description |
---|---|
ValueError
|
If schema registry configuration is missing. |
Source code in wunderkafka/factories/avro.py
JSON
JSONConsumer
Bases: HighLevelDeserializingConsumer
Kafka Consumer client to get JSON-serialized messages from Confluent/Cloudera installation.
Source code in wunderkafka/factories/json.py
__init__(config, *, sr_client=None)
Init consumer from pre-defined blocks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
ConsumerConfig
|
Configuration for: - Librdkafka consumer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config. |
required |
sr_client
|
Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]]
|
Client for schema registry |
None
|
Raises:
Type | Description |
---|---|
ValueError
|
If schema registry configuration is missing. |
Source code in wunderkafka/factories/json.py
JSONProducer
Bases: HighLevelSerializingProducer
Kafka Producer client to serialize and send dictionaries or built-in types as messages.
Source code in wunderkafka/factories/json.py
__init__(mapping, config, *, sr_client=None, protocol_id=1)
Init producer from pre-defined blocks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mapping
|
Optional[Dict[TopicName, MessageDescription]]
|
Topic-to-Schemas mapping. Mapping's value should contain at least message's value schema to be used for serialization. |
required |
config
|
ProducerConfig
|
Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config. |
required |
sr_client
|
Optional[Type[ConfluentSRClient]]
|
Client for schema registry |
None
|
protocol_id
|
int
|
Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.) |
1
|
Raises:
Type | Description |
---|---|
ValueError
|
If schema registry configuration is missing. |
Source code in wunderkafka/factories/json.py
JSONModelProducer
Bases: HighLevelSerializingProducer
Kafka Producer client to serialize and send models or dataclasses as messages.
Source code in wunderkafka/factories/json.py
__init__(mapping, config, *, sr_client=None, protocol_id=1)
Init producer from pre-defined blocks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mapping
|
Optional[Dict[TopicName, MessageDescription]]
|
Topic-to-Schemas mapping. Mapping's value should contain at least message's value model to derive schema which will be used for serialization. |
required |
config
|
ProducerConfig
|
Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config. |
required |
sr_client
|
Optional[Type[ConfluentSRClient]]
|
Client for schema registry |
None
|
protocol_id
|
int
|
Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.) |
1
|
Raises:
Type | Description |
---|---|
ValueError
|
If schema registry configuration is missing. |
Source code in wunderkafka/factories/json.py
Mixed
These pre-configured consumers and producers are provided for convenience and as explicit example of how to define own factories with mixed (de)serializers.
Note
The naming follow producer API, where the key follows the value. That\'s why the classes are called: Value(de)Serializer + Key(de)Serializer.
AVRO + String
AvroModelStringProducer
Bases: HighLevelSerializingProducer
Kafka Producer client to send models as avro-serialized message values and string-serialized keys.
Source code in wunderkafka/factories/mixed.py
__init__(mapping, config, *, sr_client=None)
Init producer from pre-defined blocks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mapping
|
Optional[Dict[TopicName, MessageDescription]]
|
Topic-to-Schemas mapping. Mapping's value should contain at least message's value model to derive schema which will be used for serialization. |
required |
config
|
ProducerConfig
|
Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config. |
required |
sr_client
|
Optional[Type[ConfluentSRClient]]
|
Client for schema registry |
None
|
Raises:
Type | Description |
---|---|
ValueError
|
If schema registry configuration is missing. |
Source code in wunderkafka/factories/mixed.py
AvroStringConsumer
Bases: HighLevelDeserializingConsumer
Kafka Consumer client to get messages with avro-serialized values and string-serialized keys.
Source code in wunderkafka/factories/mixed.py
__init__(config, *, sr_client=None)
Init consumer from pre-defined blocks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
ConsumerConfig
|
Configuration for: - Librdkafka consumer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config. |
required |
sr_client
|
Optional[Type[ConfluentSRClient]]
|
Client for schema registry |
None
|
Raises:
Type | Description |
---|---|
ValueError
|
If schema registry configuration is missing. |
Source code in wunderkafka/factories/mixed.py
JSON + String
JSONModelStringProducer
Bases: HighLevelSerializingProducer
Kafka Producer client to send models as JSON-serialized message values and string-serialized keys.
Source code in wunderkafka/factories/mixed.py
__init__(mapping, config, *, sr_client=None)
Init producer from pre-defined blocks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mapping
|
Optional[Dict[TopicName, MessageDescription]]
|
Topic-to-Schemas mapping. Mapping's value should contain at least message's value model to derive schema which will be used for serialization. |
required |
config
|
ProducerConfig
|
Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config. |
required |
sr_client
|
Optional[Type[ConfluentSRClient]]
|
Client for schema registry |
None
|
Raises:
Type | Description |
---|---|
ValueError
|
If schema registry configuration is missing. |
Source code in wunderkafka/factories/mixed.py
JSONStringConsumer
Bases: HighLevelDeserializingConsumer
Kafka Consumer client to get messages with JSON-serialized values and string-serialized keys.
Source code in wunderkafka/factories/mixed.py
__init__(config, *, sr_client=None)
Init consumer from pre-defined blocks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
ConsumerConfig
|
Configuration for: - Librdkafka consumer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config. |
required |
sr_client
|
Optional[Type[ConfluentSRClient]]
|
Client for schema registry |
None
|
Raises:
Type | Description |
---|---|
ValueError
|
If schema registry configuration is missing. |
Source code in wunderkafka/factories/mixed.py
TopicSubscription
Bases: object
Allows custom definition of subscription for topic without need to build full TopicPartition list.
Source code in wunderkafka/consumers/subscription.py
__init__(topic, *, from_beginning=None, offset=None, ts=None, with_timedelta=None)
Init topic subscription object.
Only one method of subscription per topic is allowed at time:
- from beginning (depends on your retention policy)
- from end (consume only latest messages)
- via specific offset
- via specific timestamp
- via specific timedelta (from current datetime)
- no special option (consumer will use "default" value of auto.offset.reset)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic
|
str
|
Topic to subscribe. |
required |
from_beginning
|
Optional[bool]
|
If True, subscribe to get earliest available messages. If False, get latest messages. |
None
|
offset
|
Optional[int]
|
Subscribe to specific offset. If offset not found, will behave with built-in default. |
None
|
ts
|
Optional[int]
|
Subscribe to specific timestamp (milliseconds). If timestamp not found, will behave with built-in default. |
None
|
with_timedelta
|
Optional[timedelta]
|
Subscribe to some moment in the past, from current datetime for a given timedelta. Will calculate specific timestamp and subscribe via ts. |
None
|