Skip to content

Advanced API

API for extension

Docs are under construction, but code is ready.

High-level pipelines

Consumer

Bases: AbstractDeserializingConsumer

Deserializing pipeline implementation of extended consumer.

Source code in wunderkafka/consumers/constructor.py
class HighLevelDeserializingConsumer(AbstractDeserializingConsumer):
    """Deserializing pipeline implementation of extended consumer."""

    def __init__(
        self,
        consumer: AbstractConsumer,
        headers_handler: Optional[HeaderParser],
        schema_registry: Optional[AbstractSchemaRegistry],
        deserializer: Optional[AbstractDeserializer] = None,
        value_deserializer: Optional[AbstractDeserializer] = None,
        key_deserializer: Optional[AbstractDeserializer] = None,
        *,
        stream_result: bool = False,
    ):
        """
        Init consumer with specific dependencies.

        :param consumer:            Consumer implementation to receive messages.
        :param headers_handler:     Callable to parse binary headers.
        :param schema_registry:     Schema registry client.
        :param deserializer:        Common message deserializer for value and key, if set.
        :param value_deserializer:  Message deserializer for value, if set.
        :param key_deserializer:    Message deserializer for the key, if set.
        :param stream_result:       If True, return a complex StreamResult object instead of just a model.
        """
        self.consumer = consumer
        self._header_parser = headers_handler
        self._registry = schema_registry
        self._deserializer = deserializer

        self._value_deserializer = choose_one_of(deserializer, value_deserializer, 'value')
        self._key_deserializer = choose_one_of(deserializer, key_deserializer, 'key')

        self._stream_result = stream_result

    def subscribe(  # noqa: D102,WPS211 # docstring inherited from superclass.
        self,
        topics: List[Union[str, TopicSubscription]],
        *,
        from_beginning: Optional[bool] = None,
        offset: Optional[int] = None,
        ts: Optional[int] = None,
        with_timedelta: Optional[datetime.timedelta] = None,
    ) -> None:
        self.consumer.subscribe(
            topics, from_beginning=from_beginning, offset=offset, ts=ts, with_timedelta=with_timedelta,
        )

    def commit(  # noqa: D102,WPS211 # docstring inherited from superclass.
        self,
        message: Optional[Message] = None,
        offsets: Optional[List[TopicPartition]] = None,
        asynchronous: bool = True,
    ) -> Optional[List[TopicPartition]]:
        if message is None and offsets is not None:
            return self.consumer.commit(offsets=offsets, asynchronous=asynchronous)
        if message is not None and offsets is None:
            return self.consumer.commit(message=message, asynchronous=asynchronous)
        # Default behavior
        return self.consumer.commit(message=message, offsets=offsets, asynchronous=asynchronous)

    def consume(
        self,
        timeout: float = 1.0,
        num_messages: int = 1000000,
        *,
        ignore_keys: bool = False,
        raise_on_error: bool = True,
        raise_on_lost: bool = True,
    ) -> List[T]:
        """
        Consume as many messages as we can for a given timeout and decode them.

        :param timeout:         The maximum time to block waiting for messages. Decoding time doesn't count.
        :param num_messages:    The maximum number of messages to receive from broker.
                                Default is 1000000 that was the allowed maximum for librdkafka 1.2.
        :param ignore_keys:     If True, skip key decoding, key will be set to None. Otherwise, decode key as usual.
        :param raise_on_error:  If True, raise KafkaError form confluent_kafka library to handle in client code.
        :param raise_on_lost:   If True, check on own clocks if max.poll.interval.ms is exceeded. If so, raises
                                ConsumerException to be handled in client code.

        :raises KafkaError:     See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.KafkaError  # noqa: E501

        :return:                A list of Message objects with decoded value() and key() (possibly empty on timeout).
        """  # noqa: E501
        msgs = self.consumer.batch_poll(timeout, num_messages, raise_on_lost=raise_on_lost)
        return self._decoded(
            msgs,
            ignore_keys=ignore_keys,
            raise_on_error=raise_on_error,
        )

    def _decoded(self, msgs: List[Message], *, ignore_keys: bool, raise_on_error: bool) -> List[T]:
        results: List[StreamResult] = []
        for msg in msgs:
            kafka_error = msg.error()
            # Not sure if there is any need for an option to exclude errored message from the consumed ones,
            # so there is no `else`. In case when `.error()` returns KafkaError, but `raise_on_error` is set to False,
            # `.value()` called in client code will return raw bytes with string from KafkaError,
            # e.g.
            #   b'Application maximum poll interval (300000ms) exceeded by Nms'
            if kafka_error is not None:
                logger.error(kafka_error)
                if raise_on_error:
                    # Even PyCharm stubs show that it is inherited from an object, in fact it is a valid Exception
                    raise kafka_error

            topic = msg.topic()

            raw_key_value = msg.key()
            decode_key_ok = True
            if ignore_keys:
                # Yes, we lose information, but it is necessary to not get raw bytes
                # if `.key()` will be called in client code later.
                msg.set_key(None)
            else:
                try:
                    decoded_key = self._decode(topic, msg.key(), is_key=True)
                # KeyDeserializationError is inherited from SerializationError
                except SerializationError:
                    decode_key_ok = False
                    logger.error("Unable to decode key from bytes: {0}".format(raw_key_value))
                    if not ignore_keys:
                        raise
                else:
                    msg.set_key(decoded_key)

            try:
                decoded_value = self._decode(topic, msg.value())
            except (SerializationError, ValueError) as exc:
                logger.error("Unable to decode value from bytes: {0}".format(msg.value()))
                if not self._stream_result:
                    raise
                value_error = str(exc)
                if decode_key_ok:
                    error = PayloadError(description=value_error)
                else:
                    message = "Unable to decode key (topic: {0}, key payload: {1})".format(topic, raw_key_value)
                    error = PayloadError(description=message)
                results.append(StreamResult(payload=None, error=error, msg=msg))
            else:
                msg.set_value(decoded_value)
                if self._stream_result:
                    results.append(StreamResult(payload=decoded_value, error=None, msg=msg))

        to_return = results if self._stream_result else msgs
        return to_return

    # Todo (tribunsky.kir): arguable: make different composition (headers, SR & deserializer united cache)
    def _decode(self, topic: str, blob: Optional[bytes], *, is_key: bool = False) -> Any:
        if blob is None:
            return None

        deserializer = self._get_deserializer(is_key)
        if deserializer.schemaless:
            return deserializer.deserialize('', blob)
        # Header is separate in the sake of customization, e.g., we don't have SR and put schema directly in a message
        assert self._header_parser is not None
        assert self._registry is not None
        parsed_header = self._header_parser(blob)
        schema_meta = SchemaMeta(
            topic=topic,
            is_key=is_key,
            header=parsed_header,
        )
        schema_text = self._registry.get_schema_text(schema_meta)
        schema = SchemaDescription(text=schema_text)
        # performance tradeoff: a message may be long, and we don't want to:
        # - copy the whole tail
        # - have implicit offset as if we read buffer when extracting header
        # so dealing with implicit offset and the whole binary string
        return deserializer.deserialize(schema.text, blob, seek_pos=parsed_header.size)

    def _get_deserializer(self, is_key: bool) -> AbstractDeserializer:
        return self._key_deserializer if is_key else self._value_deserializer

__init__(consumer, headers_handler, schema_registry, deserializer=None, value_deserializer=None, key_deserializer=None, *, stream_result=False)

Init consumer with specific dependencies.

Parameters:

Name Type Description Default
consumer AbstractConsumer

Consumer implementation to receive messages.

required
headers_handler Optional[HeaderParser]

Callable to parse binary headers.

required
schema_registry Optional[AbstractSchemaRegistry]

Schema registry client.

required
deserializer Optional[AbstractDeserializer]

Common message deserializer for value and key, if set.

None
value_deserializer Optional[AbstractDeserializer]

Message deserializer for value, if set.

None
key_deserializer Optional[AbstractDeserializer]

Message deserializer for the key, if set.

None
stream_result bool

If True, return a complex StreamResult object instead of just a model.

False
Source code in wunderkafka/consumers/constructor.py
def __init__(
    self,
    consumer: AbstractConsumer,
    headers_handler: Optional[HeaderParser],
    schema_registry: Optional[AbstractSchemaRegistry],
    deserializer: Optional[AbstractDeserializer] = None,
    value_deserializer: Optional[AbstractDeserializer] = None,
    key_deserializer: Optional[AbstractDeserializer] = None,
    *,
    stream_result: bool = False,
):
    """
    Init consumer with specific dependencies.

    :param consumer:            Consumer implementation to receive messages.
    :param headers_handler:     Callable to parse binary headers.
    :param schema_registry:     Schema registry client.
    :param deserializer:        Common message deserializer for value and key, if set.
    :param value_deserializer:  Message deserializer for value, if set.
    :param key_deserializer:    Message deserializer for the key, if set.
    :param stream_result:       If True, return a complex StreamResult object instead of just a model.
    """
    self.consumer = consumer
    self._header_parser = headers_handler
    self._registry = schema_registry
    self._deserializer = deserializer

    self._value_deserializer = choose_one_of(deserializer, value_deserializer, 'value')
    self._key_deserializer = choose_one_of(deserializer, key_deserializer, 'key')

    self._stream_result = stream_result

consume(timeout=1.0, num_messages=1000000, *, ignore_keys=False, raise_on_error=True, raise_on_lost=True)

Consume as many messages as we can for a given timeout and decode them.

Parameters:

Name Type Description Default
timeout float

The maximum time to block waiting for messages. Decoding time doesn't count.

1.0
num_messages int

The maximum number of messages to receive from broker. Default is 1000000 that was the allowed maximum for librdkafka 1.2.

1000000
ignore_keys bool

If True, skip key decoding, key will be set to None. Otherwise, decode key as usual.

False
raise_on_error bool

If True, raise KafkaError form confluent_kafka library to handle in client code.

True
raise_on_lost bool

If True, check on own clocks if max.poll.interval.ms is exceeded. If so, raises ConsumerException to be handled in client code.

True

Returns:

Type Description
List[T]

A list of Message objects with decoded value() and key() (possibly empty on timeout).

Raises:

Type Description
KafkaError

See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.KafkaError # noqa: E501

Source code in wunderkafka/consumers/constructor.py
def consume(
    self,
    timeout: float = 1.0,
    num_messages: int = 1000000,
    *,
    ignore_keys: bool = False,
    raise_on_error: bool = True,
    raise_on_lost: bool = True,
) -> List[T]:
    """
    Consume as many messages as we can for a given timeout and decode them.

    :param timeout:         The maximum time to block waiting for messages. Decoding time doesn't count.
    :param num_messages:    The maximum number of messages to receive from broker.
                            Default is 1000000 that was the allowed maximum for librdkafka 1.2.
    :param ignore_keys:     If True, skip key decoding, key will be set to None. Otherwise, decode key as usual.
    :param raise_on_error:  If True, raise KafkaError form confluent_kafka library to handle in client code.
    :param raise_on_lost:   If True, check on own clocks if max.poll.interval.ms is exceeded. If so, raises
                            ConsumerException to be handled in client code.

    :raises KafkaError:     See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.KafkaError  # noqa: E501

    :return:                A list of Message objects with decoded value() and key() (possibly empty on timeout).
    """  # noqa: E501
    msgs = self.consumer.batch_poll(timeout, num_messages, raise_on_lost=raise_on_lost)
    return self._decoded(
        msgs,
        ignore_keys=ignore_keys,
        raise_on_error=raise_on_error,
    )

Producer

This module contains the high-level pipeline to produce messages with a nested producer.

It is intended to be testable enough due to the composition of dependencies.

All moving parts should be interchangeable in terms of schema, header and serialization handling (for further overriding^W extending).

HighLevelSerializingProducer

Bases: AbstractSerializingProducer

Serializing pipeline implementation of extended producer.

Source code in wunderkafka/producers/constructor.py
class HighLevelSerializingProducer(AbstractSerializingProducer):
    """Serializing pipeline implementation of extended producer."""

    def __init__(  # noqa: WPS211  # ToDo (tribunsky.kir): rethink building of producer.
        #                                                  Maybe single Callable (like in kafka-python) much better.
        self,
        producer: AbstractProducer,
        # Some serializers doesn't need SR at all, e.g. StringSerializer.
        schema_registry: Optional[AbstractSchemaRegistry],
        # As some serializers doesn't contain magic byte, we do not need to handle the first bytes of a message at all.
        header_packer: Optional[HeaderPacker],
        serializer: Optional[AbstractSerializer] = None,
        store: Optional[AbstractDescriptionStore] = None,
        # ToDo: switch mapping to something like consumer's TopicSubscription?
        mapping: Optional[Dict[TopicName, MessageDescription]] = None,
        value_serializer: Optional[AbstractSerializer] = None,
        key_serializer: Optional[AbstractSerializer] = None,
        *,
        protocol_id: int = 1,
        lazy: bool = False,
    ) -> None:
        """
        Init producer with specific dependencies and prepare it to work against specified topic(s).

        :param producer:            Producer implementation to send messages.
        :param schema_registry:     Schema registry client.
        :param header_packer:       Callable to form binary headers.
        :param serializer:          Common message serializer for the key and value.
                                    If specific value_deserializer/key_deserializer defined, it will be used instead.
        :param store:               Specific store to provide schema text extraction from schema description.
        :param mapping:             Per-topic definition of value and/or key schema descriptions.
        :param value_serializer:    Message serializer for value, if set.
        :param key_serializer:      Message serializer for the key, if set.
        :param protocol_id:         Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.)
        :param lazy:                If True,
                                    defer schema registry publication, otherwise schema will be registered
                                    before the first message sending.
        """
        self._mapping = mapping or {}

        # ToDo (tribunsky.kir): look like wrong place. Maybe it's better to highlight an entity
        #                       of Schema which may be checked or not. Then input mapping may be eliminated (or not).
        self._checked: Dict[Tuple[str, str, bool], SRMeta] = {}

        self._store = store
        self._sr = schema_registry
        self._serializer = serializer
        self._producer = producer
        self._header_packer = header_packer
        self._protocol_id = protocol_id

        chosen_value_serializer = value_serializer if value_serializer else serializer
        if chosen_value_serializer is None:
            msg = 'Value serializer is not specified, should be passed via value_serializer or serializer at least.'
            raise ValueError(msg)
        self._value_serializer = chosen_value_serializer

        chosen_key_serializer = key_serializer if value_serializer else serializer
        if chosen_key_serializer is None:
            msg = 'Key serializer is not specified, should be passed via key_serializer or serializer at least.'
            raise ValueError(msg)
        self._key_serializer = chosen_key_serializer

        for topic, description in self._mapping.items():
            if isinstance(description, (tuple, list)):
                msg_value, msg_key = description
                self.set_target_topic(topic, msg_value, msg_key, lazy=lazy)
            else:
                self.set_target_topic(topic, description, lazy=lazy)

    def flush(self, timeout: Optional[float] = None) -> int:  # noqa: D102 # docstring inherited from superclass.
        if timeout is None:
            return self._producer.flush()
        return self._producer.flush(timeout)

    def set_target_topic(  # noqa: D102 # docstring inherited from superclass.
        self,
        topic: str,
        value: Any,  # noqa: WPS110 # Domain. inherited from superclass.
        key: Any = None,
        *,
        lazy: bool = False,
    ) -> None:
        value_store = self._get_store(self._value_serializer)
        key_store = self._get_store(self._key_serializer)
        value_store.add(topic, value, None)
        if key is not None:
            # FixMe (tribunsky.kir): make key and value stores independent,
            #                        because now we cannot put None instead of value,
            #                        even though we need store only for the key.
            key_store.add(topic, value, key)
        if not lazy:
            value_descr = value_store.get(topic)
            self._check_schema(topic, value_descr)
            if key is not None:
                key_descr = key_store.get(topic, is_key=True)
                assert key_descr is not None
                if not key_descr.empty:
                    self._check_schema(topic, key_descr, is_key=True)

    def send_message(  # noqa: D102,WPS211 # inherited from superclass.
        self,
        topic: str,
        value: MsgValue = None,  # noqa: WPS110 # Domain. inherited from superclass.
        key: MsgKey = None,
        partition: Optional[int] = None,
        on_delivery: Optional[DeliveryCallback] = error_callback,
        *args: Any,
        blocking: bool = False,
        protocol_id: Optional[int] = None,
        **kwargs: Any,
    ) -> None:
        if protocol_id is None:
            protocol_id = self._protocol_id
        encoded_value = self._encode(topic, value, protocol_id)
        encoded_key = self._encode(topic, key, protocol_id, is_key=True)

        self._producer.send_message(
            topic,
            encoded_value,
            encoded_key,
            partition,
            on_delivery,
            blocking=blocking,
            *args,
            **kwargs,
        )

    def _check_schema(
        self,
        topic: str,
        schema: Optional[SchemaDescription],
        *,
        is_key: bool = False,
        force: bool = False,
    ) -> Optional[SRMeta]:
        """
        Ensure that we have schema's ids necessary to create message's header.

        :param topic:   Target topic against which we are working for this call.
        :param schema:  Schema container to be registered. Should contain text.
        :param is_key:  If True, schema will be requested as a message key, message value otherwise.
        :param force:   If True, do not reuse cached results, always request Schema Registry.

        :raises ValueError: If received no schema from DescriptionStore.

        :return:        Container with schema's ids.
        """
        if self._sr is None:
            logger.warning('Schema registry is not passed, skipping schema check for {0}'.format(topic))
            return None
        if schema is None:
            raise ValueError("Couldn't check schema from store.")
        uid = (topic, schema.text, is_key)

        if force is False:
            meta = self._checked.get(uid)
            if meta is not None:
                return meta

        assert schema.type is not None
        meta = self._sr.register_schema(topic, schema.text, schema.type, is_key=is_key)
        self._checked[uid] = meta
        return meta

    # ToDo (tribunsky.kir): Maybe, this method should be public for more simple extension.
    #                       'Template' pattern never works anyway.
    def _encode(self, topic: str, obj: Any, protocol_id: int, is_key: bool = False) -> Optional[bytes]:  # noqa: WPS110
        if obj is None:
            return None

        serializer = self._get_serializer(is_key)
        store = self._get_store(serializer)

        schema = store.get(topic, is_key=is_key)
        if schema is None:
            logger.warning('Missing schema for {0} (key: {1}'.format(topic, is_key))
            return None
        if schema.empty:
            return serializer.serialize(schema.text, obj, None, topic, is_key=is_key)
        else:
            available_meta = self._check_schema(topic, schema, is_key=is_key)
            # ToDo (tribunsky.kir): `_check_schema()` for now return Optional cause it is used when setting
            #                       producer per topic and should not push schema to on schemaless serializers.
            assert available_meta is not None
            # ToDo (tribunsky.kir): looks like header handler should be also placed per-payload or per-topic,
            #                       because some serializers doesn't use it (e.g. confluent string serializer)
            assert self._header_packer is not None
            # ToDo (tribunsky.kir): check if old client uses
            #                       '{"schema": "{\"type\": \"string\"}"}'
            #                       https://docs.confluent.io/platform/current/schema-registry/develop/using.html#common-sr-api-usage-examples  # noqa: E501
            #                       Looks like the new one doesn't publish string schema at all
            #                       (no type in library for that)
            header = self._header_packer(protocol_id, available_meta)
            return serializer.serialize(schema.text, obj, header, topic, is_key=is_key)

    def _get_store(self, serializer: AbstractSerializer) -> AbstractDescriptionStore:
        serializers_store = getattr(serializer, 'store', None)
        if serializers_store is not None:
            return serializers_store
        assert self._store is not None
        return self._store

    def _get_serializer(self, is_key: bool) -> AbstractSerializer:
        return self._key_serializer if is_key else self._value_serializer

__init__(producer, schema_registry, header_packer, serializer=None, store=None, mapping=None, value_serializer=None, key_serializer=None, *, protocol_id=1, lazy=False)

Init producer with specific dependencies and prepare it to work against specified topic(s).

Parameters:

Name Type Description Default
producer AbstractProducer

Producer implementation to send messages.

required
schema_registry Optional[AbstractSchemaRegistry]

Schema registry client.

required
header_packer Optional[HeaderPacker]

Callable to form binary headers.

required
serializer Optional[AbstractSerializer]

Common message serializer for the key and value. If specific value_deserializer/key_deserializer defined, it will be used instead.

None
store Optional[AbstractDescriptionStore]

Specific store to provide schema text extraction from schema description.

None
mapping Optional[Dict[TopicName, MessageDescription]]

Per-topic definition of value and/or key schema descriptions.

None
value_serializer Optional[AbstractSerializer]

Message serializer for value, if set.

None
key_serializer Optional[AbstractSerializer]

Message serializer for the key, if set.

None
protocol_id int

Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.)

1
lazy bool

If True, defer schema registry publication, otherwise schema will be registered before the first message sending.

False
Source code in wunderkafka/producers/constructor.py
def __init__(  # noqa: WPS211  # ToDo (tribunsky.kir): rethink building of producer.
    #                                                  Maybe single Callable (like in kafka-python) much better.
    self,
    producer: AbstractProducer,
    # Some serializers doesn't need SR at all, e.g. StringSerializer.
    schema_registry: Optional[AbstractSchemaRegistry],
    # As some serializers doesn't contain magic byte, we do not need to handle the first bytes of a message at all.
    header_packer: Optional[HeaderPacker],
    serializer: Optional[AbstractSerializer] = None,
    store: Optional[AbstractDescriptionStore] = None,
    # ToDo: switch mapping to something like consumer's TopicSubscription?
    mapping: Optional[Dict[TopicName, MessageDescription]] = None,
    value_serializer: Optional[AbstractSerializer] = None,
    key_serializer: Optional[AbstractSerializer] = None,
    *,
    protocol_id: int = 1,
    lazy: bool = False,
) -> None:
    """
    Init producer with specific dependencies and prepare it to work against specified topic(s).

    :param producer:            Producer implementation to send messages.
    :param schema_registry:     Schema registry client.
    :param header_packer:       Callable to form binary headers.
    :param serializer:          Common message serializer for the key and value.
                                If specific value_deserializer/key_deserializer defined, it will be used instead.
    :param store:               Specific store to provide schema text extraction from schema description.
    :param mapping:             Per-topic definition of value and/or key schema descriptions.
    :param value_serializer:    Message serializer for value, if set.
    :param key_serializer:      Message serializer for the key, if set.
    :param protocol_id:         Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.)
    :param lazy:                If True,
                                defer schema registry publication, otherwise schema will be registered
                                before the first message sending.
    """
    self._mapping = mapping or {}

    # ToDo (tribunsky.kir): look like wrong place. Maybe it's better to highlight an entity
    #                       of Schema which may be checked or not. Then input mapping may be eliminated (or not).
    self._checked: Dict[Tuple[str, str, bool], SRMeta] = {}

    self._store = store
    self._sr = schema_registry
    self._serializer = serializer
    self._producer = producer
    self._header_packer = header_packer
    self._protocol_id = protocol_id

    chosen_value_serializer = value_serializer if value_serializer else serializer
    if chosen_value_serializer is None:
        msg = 'Value serializer is not specified, should be passed via value_serializer or serializer at least.'
        raise ValueError(msg)
    self._value_serializer = chosen_value_serializer

    chosen_key_serializer = key_serializer if value_serializer else serializer
    if chosen_key_serializer is None:
        msg = 'Key serializer is not specified, should be passed via key_serializer or serializer at least.'
        raise ValueError(msg)
    self._key_serializer = chosen_key_serializer

    for topic, description in self._mapping.items():
        if isinstance(description, (tuple, list)):
            msg_value, msg_key = description
            self.set_target_topic(topic, msg_value, msg_key, lazy=lazy)
        else:
            self.set_target_topic(topic, description, lazy=lazy)