Advanced API
API for extension
Docs are under construction, but code is ready.
High-level pipelines
Consumer
Bases: AbstractDeserializingConsumer
Deserializing pipeline implementation of extended consumer.
Source code in wunderkafka/consumers/constructor.py
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
|
__init__(consumer, headers_handler, schema_registry, deserializer=None, value_deserializer=None, key_deserializer=None, *, stream_result=False)
Init consumer with specific dependencies.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
consumer
|
AbstractConsumer
|
Consumer implementation to receive messages. |
required |
headers_handler
|
Optional[HeaderParser]
|
Callable to parse binary headers. |
required |
schema_registry
|
Optional[AbstractSchemaRegistry]
|
Schema registry client. |
required |
deserializer
|
Optional[AbstractDeserializer]
|
Common message deserializer for value and key, if set. |
None
|
value_deserializer
|
Optional[AbstractDeserializer]
|
Message deserializer for value, if set. |
None
|
key_deserializer
|
Optional[AbstractDeserializer]
|
Message deserializer for the key, if set. |
None
|
stream_result
|
bool
|
If True, return a complex StreamResult object instead of just a model. |
False
|
Source code in wunderkafka/consumers/constructor.py
consume(timeout=1.0, num_messages=1000000, *, ignore_keys=False, raise_on_error=True, raise_on_lost=True)
Consume as many messages as we can for a given timeout and decode them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout
|
float
|
The maximum time to block waiting for messages. Decoding time doesn't count. |
1.0
|
num_messages
|
int
|
The maximum number of messages to receive from broker. Default is 1000000 that was the allowed maximum for librdkafka 1.2. |
1000000
|
ignore_keys
|
bool
|
If True, skip key decoding, key will be set to None. Otherwise, decode key as usual. |
False
|
raise_on_error
|
bool
|
If True, raise KafkaError form confluent_kafka library to handle in client code. |
True
|
raise_on_lost
|
bool
|
If True, check on own clocks if max.poll.interval.ms is exceeded. If so, raises ConsumerException to be handled in client code. |
True
|
Returns:
Type | Description |
---|---|
List[T]
|
A list of Message objects with decoded value() and key() (possibly empty on timeout). |
Raises:
Type | Description |
---|---|
KafkaError
|
See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.KafkaError # noqa: E501 |
Source code in wunderkafka/consumers/constructor.py
Producer
This module contains the high-level pipeline to produce messages with a nested producer.
It is intended to be testable enough due to the composition of dependencies.
All moving parts should be interchangeable in terms of schema, header and serialization handling (for further overriding^W extending).
HighLevelSerializingProducer
Bases: AbstractSerializingProducer
Serializing pipeline implementation of extended producer.
Source code in wunderkafka/producers/constructor.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
|
__init__(producer, schema_registry, header_packer, serializer=None, store=None, mapping=None, value_serializer=None, key_serializer=None, *, protocol_id=1, lazy=False)
Init producer with specific dependencies and prepare it to work against specified topic(s).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
producer
|
AbstractProducer
|
Producer implementation to send messages. |
required |
schema_registry
|
Optional[AbstractSchemaRegistry]
|
Schema registry client. |
required |
header_packer
|
Optional[HeaderPacker]
|
Callable to form binary headers. |
required |
serializer
|
Optional[AbstractSerializer]
|
Common message serializer for the key and value. If specific value_deserializer/key_deserializer defined, it will be used instead. |
None
|
store
|
Optional[AbstractDescriptionStore]
|
Specific store to provide schema text extraction from schema description. |
None
|
mapping
|
Optional[Dict[TopicName, MessageDescription]]
|
Per-topic definition of value and/or key schema descriptions. |
None
|
value_serializer
|
Optional[AbstractSerializer]
|
Message serializer for value, if set. |
None
|
key_serializer
|
Optional[AbstractSerializer]
|
Message serializer for the key, if set. |
None
|
protocol_id
|
int
|
Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.) |
1
|
lazy
|
bool
|
If True, defer schema registry publication, otherwise schema will be registered before the first message sending. |
False
|