Skip to content

Advanced Examples

As you've just seen in quickstart, we couldn't eliminate all the boilerplate which is needed to start consumer/producer with ease.

So let's see, what we can do here.

Redefining configs

So yes, you may still consider that you need something like this in your code base:

import os
from functools import partial

from pydantic import field_validator, Field
from wunderkafka.time import now
from wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol, AvroConsumer


# If you are a fan of 12 factors, you may want to config via env variables
class OverridenSRConfig(SRConfig):
    url: str = Field(alias='SCHEMA_REGISTRY_URL')

    @field_validator('sasl_username')
    @classmethod
    def from_env(cls, v) -> str:
        # And to use 'native' kerberos envs
        return '{0}@{1}'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))


# Or you want to override some defaults by default (pun intended)
class OverridenConfig(ConsumerConfig):
    # Consumer which do not commit messages automatically
    enable_auto_commit: bool = False
    # And knows nothing after restart due to new gid.
    group_id: str = 'wunderkafka-{0}'.format(now())
    # More 12 factors
    bootstrap_servers: str = Field(env='BOOTSTRAP_SERVER')
    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
    sasl_kerberos_kinit_cmd: str = ''
    sr: SRConfig = OverridenSRConfig()

    @field_validator('sasl_kerberos_kinit_cmd')
    @classmethod
    def format_keytab(cls, v) -> str:
        if not v:
            return 'kinit {0}@{1} -k -t {0}.keytab'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))
        # Still allowing to set it manually
        return str(v)


# After this, you can `partial` your own Producer/Consumer, something like...
MyConsumer = partial(AvroConsumer, config=OverridenConfig())

And all this effort is needed to end up with something like:

import datetime

from my_module import MyConsumer


if __name__ == '__main__':
    consumer = MyConsumer()
    consumer.subscribe(['my_topic'], with_timedelta=datetime.timedelta(minutes=2))
    while True:
        msgs = consumer.consume()
        ...

Isn't that simple?

Yet Another Framework?

Wunderkafka is written with pluggability in mind (and of course assumptions about what may be needed will be broken), so in its core is a little too general. It is intended to provide relatively simple way to rebuild your own pipeline with minimum effort.

from typing import Optional

from wunderkafka.consumers.bytes import BytesConsumer
from wunderkafka.schema_registry import ClouderaSRClient
from wunderkafka.hotfixes.watchdog import check_watchdog
from wunderkafka.serdes.headers import ConfluentClouderaHeadersHandler
from wunderkafka.consumers.constructor import HighLevelDeserializingConsumer
from wunderkafka.schema_registry.cache import SimpleCache
from wunderkafka.schema_registry.transport import KerberizableHTTPClient
from wunderkafka.serdes.avro.deserializers import FastAvroDeserializer


def MyAvroConsumer(
    config: Optional[OverridenConfig] = None,
) -> HighLevelDeserializingConsumer:
    config = config or OverridenConfig()
    config, watchdog = check_watchdog(config)
    return HighLevelDeserializingConsumer(
        consumer=BytesConsumer(config, watchdog),
        schema_registry=ClouderaSRClient(KerberizableHTTPClient(config.sr), SimpleCache()),
        headers_handler=ConfluentClouderaHeadersHandler().parse,
        deserializer=FastAvroDeserializer(),
    )

Looks java'sh? I know, forgive me that.

If you like to use inheritance, there is nice examples in wunderkafka.factories to start with. Or check advanced_API (currently to-be-done).

As you can see it is pretty straightforward and you can write your own containers for schema handling or (de)serialization, still running above performant librdkafka.

It's also simple enough to redefine every part of the (de)serializing pipeline with specific implementation. For example, if there is need to keep message's schema in message itself, it is possible to define stub instead of schema registry and write own header (un)packer.