Advanced Examples
As you've just seen in quickstart
, we
couldn't eliminate all the boilerplate which is needed to start
consumer/producer with ease.
So let's see, what we can do here.
Redefining configs
So yes, you may still consider that you need something like this in your code base:
import os
from functools import partial
from pydantic import field_validator, Field
from wunderkafka.time import now
from wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol, AvroConsumer
# If you are a fan of 12 factors, you may want to config via env variables
class OverridenSRConfig(SRConfig):
url: str = Field(alias='SCHEMA_REGISTRY_URL')
@field_validator('sasl_username')
@classmethod
def from_env(cls, v) -> str:
# And to use 'native' kerberos envs
return '{0}@{1}'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))
# Or you want to override some defaults by default (pun intended)
class OverridenConfig(ConsumerConfig):
# Consumer which do not commit messages automatically
enable_auto_commit: bool = False
# And knows nothing after restart due to new gid.
group_id: str = 'wunderkafka-{0}'.format(now())
# More 12 factors
bootstrap_servers: str = Field(env='BOOTSTRAP_SERVER')
security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
sasl_kerberos_kinit_cmd: str = ''
sr: SRConfig = OverridenSRConfig()
@field_validator('sasl_kerberos_kinit_cmd')
@classmethod
def format_keytab(cls, v) -> str:
if not v:
return 'kinit {0}@{1} -k -t {0}.keytab'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))
# Still allowing to set it manually
return str(v)
# After this, you can `partial` your own Producer/Consumer, something like...
MyConsumer = partial(AvroConsumer, config=OverridenConfig())
And all this effort is needed to end up with something like:
import datetime
from my_module import MyConsumer
if __name__ == '__main__':
consumer = MyConsumer()
consumer.subscribe(['my_topic'], with_timedelta=datetime.timedelta(minutes=2))
while True:
msgs = consumer.consume()
...
Isn't that simple?
Yet Another Framework?
Wunderkafka is written with pluggability in mind (and of course assumptions about what may be needed will be broken), so in its core is a little too general. It is intended to provide relatively simple way to rebuild your own pipeline with minimum effort.
from typing import Optional
from wunderkafka.consumers.bytes import BytesConsumer
from wunderkafka.schema_registry import ClouderaSRClient
from wunderkafka.hotfixes.watchdog import check_watchdog
from wunderkafka.serdes.headers import ConfluentClouderaHeadersHandler
from wunderkafka.consumers.constructor import HighLevelDeserializingConsumer
from wunderkafka.schema_registry.cache import SimpleCache
from wunderkafka.schema_registry.transport import KerberizableHTTPClient
from wunderkafka.serdes.avro.deserializers import FastAvroDeserializer
def MyAvroConsumer(
config: Optional[OverridenConfig] = None,
) -> HighLevelDeserializingConsumer:
config = config or OverridenConfig()
config, watchdog = check_watchdog(config)
return HighLevelDeserializingConsumer(
consumer=BytesConsumer(config, watchdog),
schema_registry=ClouderaSRClient(KerberizableHTTPClient(config.sr), SimpleCache()),
headers_handler=ConfluentClouderaHeadersHandler().parse,
deserializer=FastAvroDeserializer(),
)
Looks java'sh? I know, forgive me that.
If you like to use inheritance, there is nice examples in
wunderkafka.factories
to start with. Or check
advanced_API
(currently to-be-done).
As you can see it is pretty straightforward and you can write your own
containers for schema handling or (de)serialization, still running above
performant librdkafka
.
It's also simple enough to redefine every part of the (de)serializing pipeline with specific implementation. For example, if there is need to keep message's schema in message itself, it is possible to define stub instead of schema registry and write own header (un)packer.