Quickstart
Talk is cheap. Show me the code.
Consumer
import datetime
from wunderkafka import BytesConsumer, ConsumerConfig, SecurityProtocol
if __name__ == '__main__':
# Config is still pretty long, it is an essential complexity.
# But in minimal setup you only need to specify just `group_id`
# and `bootstrap_servers`
config = ConsumerConfig(
enable_auto_commit=False,
group_id='my_group',
bootstrap_servers='kafka-broker-01.my_domain.com:9093',
security_protocol=SecurityProtocol.sasl_ssl,
sasl_kerberos_kinit_cmd='kinit my_user@my_realm.com -k -t my_user.keytab',
)
consumer = BytesConsumer(config)
# topic subscription by different timelines is now oneliner without
# much boilerplate.
consumer.subscribe(['my_topic'], with_timedelta=datetime.timedelta(hours=10))
while True:
msgs = consumer.batch_poll()
print(
'Consumed: {0}, errors: {1}'.format(
len(msgs), len([msg for msg in msgs if msg.error()])
)
)
What you can (and can't see here):
- as you probably already guessed, we may need all that serious clumsy security stuff
- by the way, config is powered with
pydantic. So you can see all
configuration parameters for supported
librdkafka
version just in your IDE/text editor. No more searching withCtrl + F
throughCONFIGURATION.md
and dictionaries. - you may subscribe to multiple topics via single timestamp or timedelta (per topic definition is also supported)
- by default, you don't need to close consumer manually (though, you
can still do it).
atexit
is used.
More differences are on the way
AvroConsumer
from wunderkafka import AvroConsumer, ConsumerConfig, SRConfig, SecurityProtocol
BROKERS_ADDRESSES = 'kafka-broker-01.my_domain.com'
SCHEMA_REGISTRY_URL = 'https://schema-registry.my_domain.com'
if __name__ == '__main__':
config = ConsumerConfig(
enable_auto_commit=False,
group_id='my_group',
bootstrap_servers=BROKERS_ADDRESSES,
security_protocol=SecurityProtocol.sasl_ssl,
sasl_kerberos_kinit_cmd='kinit my_user@my_real.com -k -t my_user.keytab',
sr=SRConfig(url=SCHEMA_REGISTRY_URL, sasl_username='my_user@my_real.com'),
)
c = AvroConsumer(config)
c.subscribe(['my_topic'], from_beginning=True)
while True:
msgs = c.consume(timeout=10.0, ignore_keys=True)
print(len(msgs))
- keys may be ignored, which is totally optional, but may be useful.
- the main advantage here is that messages may be consumed with a batch even with avro deserialization, despite of the original API. It saves time. Really.
Multiple subscriptions example:
Let's move on.
AvroProducer
Let's skip raw producer, as we can see all benefits in AvroProducer either.
from wunderkafka import AvroProducer, ProducerConfig, SRConfig, SecurityProtocol
BROKERS_ADDRESSES = 'kafka-broker-01.my_domain.com'
SCHEMA_REGISTRY_URL = 'https://schema-registry.my_domain.com'
value_schema = """
{
"namespace": "my.test",
"name": "value",
"type": "record",
"fields" : [
{
"name" : "name",
"type" : "string"
}
]
}
"""
key_schema = """
{
"namespace": "my.test",
"name": "key",
"type": "record",
"fields" : [
{
"name" : "name",
"type" : "string"
}
]
}
"""
if __name__ == '__main__':
config = ProducerConfig(
bootstrap_servers=BROKERS_ADDRESSES,
security_protocol=SecurityProtocol.sasl_ssl,
sasl_kerberos_kinit_cmd='kinit my_user@my_real.com -k -t my_user.keytab',
sr=SRConfig(url=SCHEMA_REGISTRY_URL, sasl_username='my_user@my_real.com'),
)
topic = 'test_test_test'
producer = AvroProducer({topic: (value_schema, key_schema)}, config)
producer.send_message(topic, {"name": "Value"}, {"name": "Key"}, blocking=True)
- instead of producing message we are thinking in terms of sending
message. No big deal as original
produce()
is still under the hood, but we automatically usepoll()
for asynchronous communication andflush()
to await that message is sent. This behaviour is hidden byblocking
which isFalse
by default. - by the way,
atexit
is also used here: producer will try toflush()
. Nothing is guaranteed if something sudden will happen with process, but manual close is also in danger in that case. - less boilerplate with text schemas. You may also load it simply from files (via specific "store"), but wait for a minute, you may won't want to use them.
AvroModelProducer
What if you don't believe in everything-as-code and want more dynamics? Let's consider the next few lines:
from pydantic import BaseModel, Field
from wunderkafka import AvroModelProducer, ProducerConfig, SRConfig, SecurityProtocol
from wunderkafka.time import now
BROKERS_ADDRESSES = 'kafka-broker-01.my_domain.com'
SCHEMA_REGISTRY_URL = 'https://schema-registry.my_domain.com'
class SomeEvent(BaseModel):
name: str = 'test'
ts: int = Field(default_factory=now)
class Meta:
name = 'my.test'
if __name__ == '__main__':
config = ProducerConfig(
bootstrap_servers=BROKERS_ADDRESSES,
security_protocol=SecurityProtocol.sasl_ssl,
sasl_kerberos_kinit_cmd='kinit my_user@my_real.com -k -t my_user.keytab',
sr=SRConfig(url=SCHEMA_REGISTRY_URL, sasl_username='my_user@my_real.com'),
)
topic = 'test_test_test'
# No key, that's just an example
producer = AvroModelProducer({topic: SomeEvent}, config)
producer.send_message(topic, SomeEvent(), blocking=True)
- just like the previous example, but the schema derived from the
model itself.
dataclasses
are also supported, thanks to dataclasses-avroschema!
Conclusion
This is a simple API for "daily" usage.
You still can use original rich API of confluent-kafka if needed, but from now you have some fast track.