Advanced API
API for extension
Docs are under construction, but code is ready.
High-level pipelines
Consumer
Bases: AbstractDeserializingConsumer
Deserializing pipeline implementation of extended consumer.
Source code in wunderkafka/consumers/constructor.py
|
|
__init__(consumer, headers_handler, schema_registry, deserializer=None, value_deserializer=None, key_deserializer=None, *, stream_result=False)
Init consumer with specific dependencies.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
consumer
|
AbstractConsumer
|
Consumer implementation to receive messages. |
required |
headers_handler
|
Optional[HeaderParser]
|
Callable to parse binary headers. |
required |
schema_registry
|
Optional[AbstractSchemaRegistry]
|
Schema registry client. |
required |
deserializer
|
Optional[AbstractDeserializer]
|
Common message deserializer for value and key, if set. |
None
|
value_deserializer
|
Optional[AbstractDeserializer]
|
Message deserializer for value, if set. |
None
|
key_deserializer
|
Optional[AbstractDeserializer]
|
Message deserializer for the key, if set. |
None
|
stream_result
|
bool
|
If True, return a complex StreamResult object instead of just a model. |
False
|
Source code in wunderkafka/consumers/constructor.py
consume(timeout=1.0, num_messages=1000000, *, ignore_keys=False, raise_on_error=True, raise_on_lost=True)
Consume as many messages as we can for a given timeout and decode them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout
|
float
|
The maximum time to block waiting for messages. Decoding time doesn't count. |
1.0
|
num_messages
|
int
|
The maximum number of messages to receive from broker. Default is 1000000 that was the allowed maximum for librdkafka 1.2. |
1000000
|
ignore_keys
|
bool
|
If True, skip key decoding, key will be set to None. Otherwise, decode key as usual. |
False
|
raise_on_error
|
bool
|
If True, raise KafkaError form confluent_kafka library to handle in client code. |
True
|
raise_on_lost
|
bool
|
If True, check on own clocks if max.poll.interval.ms is exceeded. If so, raises ConsumerException to be handled in client code. |
True
|
Returns:
Type | Description |
---|---|
List[T]
|
A list of Message objects with decoded value() and key() (possibly empty on timeout). |
Raises:
Type | Description |
---|---|
KafkaError
|
See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.KafkaError # noqa: E501 |
Source code in wunderkafka/consumers/constructor.py
Producer
This module contains the high-level pipeline to produce messages with a nested producer.
It is intended to be testable enough due to the composition of dependencies.
All moving parts should be interchangeable in terms of schema, header and serialization handling (for further overriding^W extending).
HighLevelSerializingProducer
Bases: AbstractSerializingProducer
Serializing pipeline implementation of extended producer.
Source code in wunderkafka/producers/constructor.py
|
|
__init__(producer, schema_registry, header_packer, serializer=None, store=None, mapping=None, value_serializer=None, key_serializer=None, *, protocol_id=1, lazy=False)
Init producer with specific dependencies and prepare it to work against specified topic(s).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
producer
|
AbstractProducer
|
Producer implementation to send messages. |
required |
schema_registry
|
Optional[AbstractSchemaRegistry]
|
Schema registry client. |
required |
header_packer
|
Optional[HeaderPacker]
|
Callable to form binary headers. |
required |
serializer
|
Optional[AbstractSerializer]
|
Common message serializer for the key and value. If specific value_deserializer/key_deserializer defined, it will be used instead. |
None
|
store
|
Optional[AbstractDescriptionStore]
|
Specific store to provide schema text extraction from schema description. |
None
|
mapping
|
Optional[Dict[TopicName, MessageDescription]]
|
Per-topic definition of value and/or key schema descriptions. |
None
|
value_serializer
|
Optional[AbstractSerializer]
|
Message serializer for value, if set. |
None
|
key_serializer
|
Optional[AbstractSerializer]
|
Message serializer for the key, if set. |
None
|
protocol_id
|
int
|
Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.) |
1
|
lazy
|
bool
|
If True, defer schema registry publication, otherwise schema will be registered before the first message sending. |
False
|