Skip to content

Quickstart

Talk is cheap. Show me the code.

—Linus Torvalds

Consumer

import datetime

from wunderkafka import BytesConsumer, ConsumerConfig, SecurityProtocol

if __name__ == '__main__':
    # Config is still pretty long, it is an essential complexity.
    # But in minimal setup you only need to specify just `group_id`
    # and `bootstrap_servers`
    config = ConsumerConfig(
        enable_auto_commit=False,
        group_id='my_group',
        bootstrap_servers='kafka-broker-01.my_domain.com:9093',
        security_protocol=SecurityProtocol.sasl_ssl,
        sasl_kerberos_kinit_cmd='kinit my_user@my_realm.com -k -t my_user.keytab',
    )

    consumer = BytesConsumer(config)
    # topic subscription by different timelines is now oneliner without
    # much boilerplate.
    consumer.subscribe(['my_topic'], with_timedelta=datetime.timedelta(hours=10))

    while True:
        msgs = consumer.batch_poll()
        print(
            'Consumed: {0}, errors: {1}'.format(
                len(msgs), len([msg for msg in msgs if msg.error()])
            )
        )

What you can (and can't see here):

  • as you probably already guessed, we may need all that serious clumsy security stuff
  • by the way, config is powered with pydantic. So you can see all configuration parameters for supported librdkafka version just in your IDE/text editor. No more searching with Ctrl + F through CONFIGURATION.md and dictionaries.
  • you may subscribe to multiple topics via single timestamp or timedelta (per topic definition is also supported)
  • by default, you don't need to close consumer manually (though, you can still do it). atexit is used.

More differences are on the way

AvroConsumer

from wunderkafka import AvroConsumer, ConsumerConfig, SRConfig, SecurityProtocol

BROKERS_ADDRESSES = 'kafka-broker-01.my_domain.com'
SCHEMA_REGISTRY_URL = 'https://schema-registry.my_domain.com'

if __name__ == '__main__':
    config = ConsumerConfig(
        enable_auto_commit=False,
        group_id='my_group',
        bootstrap_servers=BROKERS_ADDRESSES,
        security_protocol=SecurityProtocol.sasl_ssl,
        sasl_kerberos_kinit_cmd='kinit my_user@my_real.com -k -t my_user.keytab',
        sr=SRConfig(url=SCHEMA_REGISTRY_URL, sasl_username='my_user@my_real.com'),
    )

    c = AvroConsumer(config)
    c.subscribe(['my_topic'], from_beginning=True)

    while True:
        msgs = c.consume(timeout=10.0, ignore_keys=True)
        print(len(msgs))
  • keys may be ignored, which is totally optional, but may be useful.
  • the main advantage here is that messages may be consumed with a batch even with avro deserialization, despite of the original API. It saves time. Really.

Multiple subscriptions example:

Let's move on.

AvroProducer

Let's skip raw producer, as we can see all benefits in AvroProducer either.

from wunderkafka import AvroProducer, ProducerConfig, SRConfig, SecurityProtocol

BROKERS_ADDRESSES = 'kafka-broker-01.my_domain.com'
SCHEMA_REGISTRY_URL = 'https://schema-registry.my_domain.com'

value_schema = """
{
   "namespace": "my.test",
   "name": "value",
   "type": "record",
   "fields" : [
     {
       "name" : "name",
       "type" : "string"
     }
   ]
}
"""
key_schema = """
{
   "namespace": "my.test",
   "name": "key",
   "type": "record",
   "fields" : [
     {
       "name" : "name",
       "type" : "string"
     }
   ]
}
"""

if __name__ == '__main__':
    config = ProducerConfig(
        bootstrap_servers=BROKERS_ADDRESSES,
        security_protocol=SecurityProtocol.sasl_ssl,
        sasl_kerberos_kinit_cmd='kinit my_user@my_real.com -k -t my_user.keytab',
        sr=SRConfig(url=SCHEMA_REGISTRY_URL, sasl_username='my_user@my_real.com'),
    )

    topic = 'test_test_test'
    producer = AvroProducer({topic: (value_schema, key_schema)}, config)
    producer.send_message(topic, {"name": "Value"}, {"name": "Key"}, blocking=True)
  • instead of producing message we are thinking in terms of sending message. No big deal as original produce() is still under the hood, but we automatically use poll() for asynchronous communication and flush() to await that message is sent. This behaviour is hidden by blocking which is False by default.
  • by the way, atexit is also used here: producer will try to flush(). Nothing is guaranteed if something sudden will happen with process, but manual close is also in danger in that case.
  • less boilerplate with text schemas. You may also load it simply from files (via specific "store"), but wait for a minute, you may won't want to use them.

AvroModelProducer

What if you don't believe in everything-as-code and want more dynamics? Let's consider the next few lines:

from pydantic import BaseModel, Field
from wunderkafka import AvroModelProducer, ProducerConfig, SRConfig, SecurityProtocol
from wunderkafka.time import now

BROKERS_ADDRESSES = 'kafka-broker-01.my_domain.com'
SCHEMA_REGISTRY_URL = 'https://schema-registry.my_domain.com'


class SomeEvent(BaseModel):
    name: str = 'test'
    ts: int = Field(default_factory=now)

    class Meta:
        name = 'my.test'


if __name__ == '__main__':
    config = ProducerConfig(
        bootstrap_servers=BROKERS_ADDRESSES,
        security_protocol=SecurityProtocol.sasl_ssl,
        sasl_kerberos_kinit_cmd='kinit my_user@my_real.com -k -t my_user.keytab',
        sr=SRConfig(url=SCHEMA_REGISTRY_URL, sasl_username='my_user@my_real.com'),
    )

    topic = 'test_test_test'

    # No key, that's just an example
    producer = AvroModelProducer({topic: SomeEvent}, config)
    producer.send_message(topic, SomeEvent(), blocking=True)
  • just like the previous example, but the schema derived from the model itself. dataclasses are also supported, thanks to dataclasses-avroschema!

Conclusion

This is a simple API for "daily" usage.

You still can use original rich API of confluent-kafka if needed, but from now you have some fast track.