Skip to content

API

Raw Bytes

BytesConsumer

Bases: AbstractConsumer

Consumer implementation of extended interface for raw messages.

Source code in wunderkafka/consumers/bytes.py
class BytesConsumer(AbstractConsumer):
    """Consumer implementation of extended interface for raw messages."""

    # FixMe (tribunsky.kir): add watchdog page reference
    def __init__(self, config: ConsumerConfig, sasl_watchdog: Optional[Watchdog] = None) -> None:
        """
        Init consumer.

        :param config:          Pydantic BaseSettings model with librdkafka consumer's configuration.
        :param sasl_watchdog:   Callable to handle the global state of kerberos auth (see Watchdog).
        """
        try:
            super().__init__(config.dict())
        except KafkaException as exc:
            config = challenge_krb_arg(exc, config)
            super().__init__(config.dict())
        self.subscription_offsets: Optional[Dict[str, HowToSubscribe]] = None

        self._config = config
        self._last_poll_ts = time.perf_counter()
        self._sasl_watchdog = sasl_watchdog
        # ToDo (tribunsky-kir): make it configurable
        atexit.register(self.close)

    def __str__(self) -> str:
        """
        Get human-readable representation of consumer.

        :return:    string with consumer gid.
        """
        return '{0}:{1}'.format(self.__class__.__name__, self._config.group_id)

    def batch_poll(  # noqa: D102 # inherited from superclass.
        self,
        timeout: float = 1.0,
        num_messages: int = 1000000,
        *,
        raise_on_lost: bool = False,
    ) -> List[Message]:
        if self._sasl_watchdog is not None:
            self._sasl_watchdog()

        # ToDo (tribunsky.kir): naybe it better to use on_lost callback within subscribe()
        dt = int((time.perf_counter() - self._last_poll_ts) * 1000)
        if dt > self._config.max_poll_interval_ms:
            msg = 'Exceeded max.poll.interval.ms ({0}): {1}'.format(self._config.max_poll_interval_ms, dt)

            if raise_on_lost:
                # ToDo (tribunsky.kir): resubscribe by ts?
                raise ConsumerException(msg)
            logger.warning(msg)

        msgs = self.consume(num_messages=num_messages, timeout=timeout)
        self._last_poll_ts = time.perf_counter()
        return msgs

    # ToDo (tribunsky.kir): do not override original API and wrap it in superclass
    def subscribe(  # noqa: D102,WPS211  # inherited from superclass.
        self,
        topics: List[Union[str, TopicSubscription]],
        *,
        from_beginning: Optional[bool] = None,
        offset: Optional[int] = None,
        ts: Optional[int] = None,
        with_timedelta: Optional[datetime.timedelta] = None,
    ) -> None:
        """
        Subscribe to a list of topics, or a list of specific TopicSubscriptions.

        This method overrides original `subscribe()` method of `confluent-kafka.Consumer` and allows to subscribe
        to topic via specific offset or timestamp.

        .. warning::
            Currently this method doesn't allow to pass callbacks and uses it's own to reset partitions.
        """  # noqa: E501
        subscriptions = {}
        for tpc in topics:
            if isinstance(tpc, str):
                tpc = TopicSubscription(
                    topic=tpc, from_beginning=from_beginning, offset=offset, ts=ts, with_timedelta=with_timedelta,
                )
            subscriptions[tpc.topic] = tpc.how

        # We have a specific subscription at least once
        if any(subscriptions.values()):
            self.subscription_offsets = subscriptions
            # ToDo (tribunsky.kir): avoid mutation of self.subscription_offset and remove it as a field
            super().subscribe(topics=list(self.subscription_offsets), on_assign=reset_partitions)
        else:
            super().subscribe(topics=list(subscriptions))

__init__(config, sasl_watchdog=None)

Init consumer.

Parameters:

Name Type Description Default
config ConsumerConfig

Pydantic BaseSettings model with librdkafka consumer's configuration.

required
sasl_watchdog Optional[Watchdog]

Callable to handle the global state of kerberos auth (see Watchdog).

None
Source code in wunderkafka/consumers/bytes.py
def __init__(self, config: ConsumerConfig, sasl_watchdog: Optional[Watchdog] = None) -> None:
    """
    Init consumer.

    :param config:          Pydantic BaseSettings model with librdkafka consumer's configuration.
    :param sasl_watchdog:   Callable to handle the global state of kerberos auth (see Watchdog).
    """
    try:
        super().__init__(config.dict())
    except KafkaException as exc:
        config = challenge_krb_arg(exc, config)
        super().__init__(config.dict())
    self.subscription_offsets: Optional[Dict[str, HowToSubscribe]] = None

    self._config = config
    self._last_poll_ts = time.perf_counter()
    self._sasl_watchdog = sasl_watchdog
    # ToDo (tribunsky-kir): make it configurable
    atexit.register(self.close)

__str__()

Get human-readable representation of consumer.

Returns:

Type Description
str

string with consumer gid.

Source code in wunderkafka/consumers/bytes.py
def __str__(self) -> str:
    """
    Get human-readable representation of consumer.

    :return:    string with consumer gid.
    """
    return '{0}:{1}'.format(self.__class__.__name__, self._config.group_id)

subscribe(topics, *, from_beginning=None, offset=None, ts=None, with_timedelta=None)

Subscribe to a list of topics, or a list of specific TopicSubscriptions.

This method overrides original subscribe() method of confluent-kafka.Consumer and allows to subscribe to topic via specific offset or timestamp.

.. warning:: Currently this method doesn't allow to pass callbacks and uses it's own to reset partitions.

Source code in wunderkafka/consumers/bytes.py
def subscribe(  # noqa: D102,WPS211  # inherited from superclass.
    self,
    topics: List[Union[str, TopicSubscription]],
    *,
    from_beginning: Optional[bool] = None,
    offset: Optional[int] = None,
    ts: Optional[int] = None,
    with_timedelta: Optional[datetime.timedelta] = None,
) -> None:
    """
    Subscribe to a list of topics, or a list of specific TopicSubscriptions.

    This method overrides original `subscribe()` method of `confluent-kafka.Consumer` and allows to subscribe
    to topic via specific offset or timestamp.

    .. warning::
        Currently this method doesn't allow to pass callbacks and uses it's own to reset partitions.
    """  # noqa: E501
    subscriptions = {}
    for tpc in topics:
        if isinstance(tpc, str):
            tpc = TopicSubscription(
                topic=tpc, from_beginning=from_beginning, offset=offset, ts=ts, with_timedelta=with_timedelta,
            )
        subscriptions[tpc.topic] = tpc.how

    # We have a specific subscription at least once
    if any(subscriptions.values()):
        self.subscription_offsets = subscriptions
        # ToDo (tribunsky.kir): avoid mutation of self.subscription_offset and remove it as a field
        super().subscribe(topics=list(self.subscription_offsets), on_assign=reset_partitions)
    else:
        super().subscribe(topics=list(subscriptions))

BytesProducer

Bases: AbstractProducer

Producer implementation of extended interface for raw messages.

Source code in wunderkafka/producers/bytes.py
class BytesProducer(AbstractProducer):
    """Producer implementation of extended interface for raw messages."""

    # FixMe (tribunsky.kir): add watchdog page reference
    def __init__(self, config: ProducerConfig, sasl_watchdog: Optional[Watchdog] = None) -> None:
        """
        Init producer.

        :param config:          Pydantic model with librdkafka producer's configuration.
        :param sasl_watchdog:   Callable to handle global state of kerberos auth (see Watchdog).
        """
        try:
            super().__init__(config.dict())
        except KafkaException as exc:
            config = challenge_krb_arg(exc, config)
            super().__init__(config.dict())

        self._config = config
        self._sasl_watchdog = sasl_watchdog
        atexit.register(self.flush)

    # ToDo (tribunsky.kir): make inherited from RDConfig models immutable.
    #                       Currently it explodes because of mutation in watchdog.
    #                       Do we need re-initiation of consumer/producer in runtime?
    @property
    def config(self) -> ProducerConfig:
        """
        Get the producer's config.

        :return:        Pydantic model with librdkafka producer's configuration.
        """
        return self._config

    def send_message(  # noqa: D102,WPS211  # inherited from superclass.
        self,
        topic: str,
        value: Optional[Union[str, bytes]] = None,  # noqa: WPS110  # Domain. inherited from superclass.
        key: Optional[Union[str, bytes]] = None,
        partition: Optional[int] = None,
        on_delivery: Optional[DeliveryCallback] = error_callback,
        *args: Any,
        blocking: bool = False,
        **kwargs: Any,
    ) -> None:
        if self._sasl_watchdog is not None:
            self._sasl_watchdog()
        if partition is not None:
            self.produce(topic, value, key=key, partition=partition, on_delivery=on_delivery, **kwargs)
        else:
            self.produce(topic, value, key=key, on_delivery=on_delivery, **kwargs)
        if blocking:
            self.flush()
        else:
            self.poll(0)

config property

Get the producer's config.

Returns:

Type Description
ProducerConfig

Pydantic model with librdkafka producer's configuration.

__init__(config, sasl_watchdog=None)

Init producer.

Parameters:

Name Type Description Default
config ProducerConfig

Pydantic model with librdkafka producer's configuration.

required
sasl_watchdog Optional[Watchdog]

Callable to handle global state of kerberos auth (see Watchdog).

None
Source code in wunderkafka/producers/bytes.py
def __init__(self, config: ProducerConfig, sasl_watchdog: Optional[Watchdog] = None) -> None:
    """
    Init producer.

    :param config:          Pydantic model with librdkafka producer's configuration.
    :param sasl_watchdog:   Callable to handle global state of kerberos auth (see Watchdog).
    """
    try:
        super().__init__(config.dict())
    except KafkaException as exc:
        config = challenge_krb_arg(exc, config)
        super().__init__(config.dict())

    self._config = config
    self._sasl_watchdog = sasl_watchdog
    atexit.register(self.flush)

Schemaless

SchemaLessJSONStringConsumer

Bases: HighLevelDeserializingConsumer

Kafka Consumer client to get JSON-serialized messages without any schema.

Source code in wunderkafka/factories/schemaless.py
class SchemaLessJSONStringConsumer(HighLevelDeserializingConsumer):
    """Kafka Consumer client to get JSON-serialized messages without any schema."""

    def __init__(self, config: ConsumerConfig) -> None:
        """
        Init consumer from pre-defined blocks.

        :param config:      Configuration for:

                                - Librdkafka consumer.
                                - Schema registry client (conventional options for HTTP).

                            Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.
        """

        self._default_timeout: int = 60

        config, watchdog = check_watchdog(config)
        super().__init__(
            consumer=BytesConsumer(config, watchdog),
            schema_registry=None,
            headers_handler=None,
            value_deserializer=SchemaLessJSONDeserializer(),
            key_deserializer=StringDeserializer(),
            stream_result=False,
        )

__init__(config)

Init consumer from pre-defined blocks.

Parameters:

Name Type Description Default
config ConsumerConfig

Configuration for: - Librdkafka consumer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

required
Source code in wunderkafka/factories/schemaless.py
def __init__(self, config: ConsumerConfig) -> None:
    """
    Init consumer from pre-defined blocks.

    :param config:      Configuration for:

                            - Librdkafka consumer.
                            - Schema registry client (conventional options for HTTP).

                        Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.
    """

    self._default_timeout: int = 60

    config, watchdog = check_watchdog(config)
    super().__init__(
        consumer=BytesConsumer(config, watchdog),
        schema_registry=None,
        headers_handler=None,
        value_deserializer=SchemaLessJSONDeserializer(),
        key_deserializer=StringDeserializer(),
        stream_result=False,
    )

SchemaLessJSONStringProducer

Bases: HighLevelSerializingProducer

Kafka Producer client to serialize and send any value as JSON without any schema.

Source code in wunderkafka/factories/schemaless.py
class SchemaLessJSONStringProducer(HighLevelSerializingProducer):
    """Kafka Producer client to serialize and send any value as JSON without any schema."""

    def __init__(
        self,
        mapping: Optional[Dict[TopicName, MessageDescription]],
        config: ProducerConfig,
    ) -> None:
        """
        Init producer from pre-defined blocks.

        :param mapping:     Topic-to-Schemas mapping.
                            Mapping's value should contain at least message's value schema to be used for serialization.
        :param config:      Configuration for:

                                - Librdkafka producer.
                                - Schema registry client (conventional options for HTTP).

                            Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.
        """
        config, watchdog = check_watchdog(config)
        super().__init__(
            producer=BytesProducer(config, watchdog),
            schema_registry=None,
            header_packer=None,
            value_serializer=SchemaLessJSONSerializer(),
            key_serializer=StringSerializer(),
            mapping=mapping,
        )

__init__(mapping, config)

Init producer from pre-defined blocks.

Parameters:

Name Type Description Default
mapping Optional[Dict[TopicName, MessageDescription]]

Topic-to-Schemas mapping. Mapping's value should contain at least message's value schema to be used for serialization.

required
config ProducerConfig

Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

required
Source code in wunderkafka/factories/schemaless.py
def __init__(
    self,
    mapping: Optional[Dict[TopicName, MessageDescription]],
    config: ProducerConfig,
) -> None:
    """
    Init producer from pre-defined blocks.

    :param mapping:     Topic-to-Schemas mapping.
                        Mapping's value should contain at least message's value schema to be used for serialization.
    :param config:      Configuration for:

                            - Librdkafka producer.
                            - Schema registry client (conventional options for HTTP).

                        Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.
    """
    config, watchdog = check_watchdog(config)
    super().__init__(
        producer=BytesProducer(config, watchdog),
        schema_registry=None,
        header_packer=None,
        value_serializer=SchemaLessJSONSerializer(),
        key_serializer=StringSerializer(),
        mapping=mapping,
    )

SchemaLessJSONModelStringProducer

Bases: HighLevelSerializingProducer

Kafka Producer client to serialize and send any instance of pydantic model as JSON without any schema.

Source code in wunderkafka/factories/schemaless.py
class SchemaLessJSONModelStringProducer(HighLevelSerializingProducer):
    """Kafka Producer client to serialize and send any instance of pydantic model as JSON without any schema."""

    def __init__(
        self,
        mapping: Optional[Dict[TopicName, MessageDescription]],
        config: ProducerConfig,
    ) -> None:
        """
        Init producer from pre-defined blocks.

        :param mapping:     Topic-to-Schemas mapping.
                            Mapping's value should contain at least message's value schema to be used for serialization.
        :param config:      Configuration for:

                                - Librdkafka producer.
                                - Schema registry client (conventional options for HTTP).

                            Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.
        """

        config, watchdog = check_watchdog(config)
        super().__init__(
            producer=BytesProducer(config, watchdog),
            schema_registry=None,
            header_packer=None,
            value_serializer=SchemaLessJSONModelSerializer(),
            key_serializer=StringSerializer(),
            mapping=mapping,
        )

__init__(mapping, config)

Init producer from pre-defined blocks.

Parameters:

Name Type Description Default
mapping Optional[Dict[TopicName, MessageDescription]]

Topic-to-Schemas mapping. Mapping's value should contain at least message's value schema to be used for serialization.

required
config ProducerConfig

Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

required
Source code in wunderkafka/factories/schemaless.py
def __init__(
    self,
    mapping: Optional[Dict[TopicName, MessageDescription]],
    config: ProducerConfig,
) -> None:
    """
    Init producer from pre-defined blocks.

    :param mapping:     Topic-to-Schemas mapping.
                        Mapping's value should contain at least message's value schema to be used for serialization.
    :param config:      Configuration for:

                            - Librdkafka producer.
                            - Schema registry client (conventional options for HTTP).

                        Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.
    """

    config, watchdog = check_watchdog(config)
    super().__init__(
        producer=BytesProducer(config, watchdog),
        schema_registry=None,
        header_packer=None,
        value_serializer=SchemaLessJSONModelSerializer(),
        key_serializer=StringSerializer(),
        mapping=mapping,
    )

Avro

AvroConsumer

Bases: HighLevelDeserializingConsumer

Kafka Consumer client to get AVRO-serialized messages from Confluent/Cloudera installation.

Source code in wunderkafka/factories/avro.py
class AvroConsumer(HighLevelDeserializingConsumer):
    """Kafka Consumer client to get AVRO-serialized messages from Confluent/Cloudera installation."""

    def __init__(
        self,
        config: ConsumerConfig,
        *,
        sr_client: Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]] = None,
    ) -> None:
        """
        Init consumer from pre-defined blocks.

        :param config:      Configuration for:

                                - Librdkafka consumer.
                                - Schema registry client (conventional options for HTTP).

                            Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

        :param sr_client:   Client for schema registry

        :raises ValueError: If schema registry configuration is missing.
        """
        sr = config.sr
        self._default_timeout: int = 60
        if sr is None:
            raise ValueError('Schema registry config is necessary for {0}'.format(self.__class__.__name__))
        if sr_client is None:
            sr_client = ClouderaSRClient

        config, watchdog = check_watchdog(config)
        super().__init__(
            consumer=BytesConsumer(config, watchdog),
            schema_registry=sr_client(
                KerberizableHTTPClient(sr, requires_kerberos=config_requires_kerberos(config),
                                       cmd_kinit=config.sasl_kerberos_kinit_cmd),
                SimpleCache(),
            ),
            headers_handler=ConfluentClouderaHeadersHandler().parse,
            deserializer=FastAvroDeserializer(),
        )

__init__(config, *, sr_client=None)

Init consumer from pre-defined blocks.

Parameters:

Name Type Description Default
config ConsumerConfig

Configuration for: - Librdkafka consumer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

required
sr_client Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]]

Client for schema registry

None

Raises:

Type Description
ValueError

If schema registry configuration is missing.

Source code in wunderkafka/factories/avro.py
def __init__(
    self,
    config: ConsumerConfig,
    *,
    sr_client: Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]] = None,
) -> None:
    """
    Init consumer from pre-defined blocks.

    :param config:      Configuration for:

                            - Librdkafka consumer.
                            - Schema registry client (conventional options for HTTP).

                        Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

    :param sr_client:   Client for schema registry

    :raises ValueError: If schema registry configuration is missing.
    """
    sr = config.sr
    self._default_timeout: int = 60
    if sr is None:
        raise ValueError('Schema registry config is necessary for {0}'.format(self.__class__.__name__))
    if sr_client is None:
        sr_client = ClouderaSRClient

    config, watchdog = check_watchdog(config)
    super().__init__(
        consumer=BytesConsumer(config, watchdog),
        schema_registry=sr_client(
            KerberizableHTTPClient(sr, requires_kerberos=config_requires_kerberos(config),
                                   cmd_kinit=config.sasl_kerberos_kinit_cmd),
            SimpleCache(),
        ),
        headers_handler=ConfluentClouderaHeadersHandler().parse,
        deserializer=FastAvroDeserializer(),
    )

AvroProducer

Bases: HighLevelSerializingProducer

Kafka Producer client to serialize and send dictionaries or built-in types as messages.

Source code in wunderkafka/factories/avro.py
class AvroProducer(HighLevelSerializingProducer):
    """Kafka Producer client to serialize and send dictionaries or built-in types as messages."""

    def __init__(
        self,
        mapping: Optional[Dict[TopicName, MessageDescription]],
        config: ProducerConfig,
        *,
        sr_client: Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]] = None,
        protocol_id: int = 1
    ) -> None:
        """
        Init producer from pre-defined blocks.

        :param mapping:     Topic-to-Schemas mapping.
                            Mapping's value should contain at least message's value schema to be used for serialization.
        :param config:      Configuration for:

                                - Librdkafka producer.
                                - Schema registry client (conventional options for HTTP).

                            Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

        :param sr_client:   Client for schema registry
        :param protocol_id: Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.)

        :raises ValueError: If schema registry configuration is missing.
        """
        sr = config.sr
        if sr is None:
            raise ValueError('Schema registry config is necessary for {0}'.format(self.__class__.__name__))
        if sr_client is None:
            sr_client = ClouderaSRClient
        self._default_timeout: int = 60

        config, watchdog = check_watchdog(config)
        super().__init__(
            producer=BytesProducer(config, watchdog),
            schema_registry=sr_client(
                KerberizableHTTPClient(sr, requires_kerberos=config_requires_kerberos(config),
                                       cmd_kinit=config.sasl_kerberos_kinit_cmd),
                SimpleCache(),
            ),
            header_packer=ConfluentClouderaHeadersHandler().pack,
            serializer=FastAvroSerializer(),
            store=SchemaTextRepo(schema_type=SchemaType.AVRO),
            mapping=mapping,
            protocol_id=protocol_id
        )

__init__(mapping, config, *, sr_client=None, protocol_id=1)

Init producer from pre-defined blocks.

Parameters:

Name Type Description Default
mapping Optional[Dict[TopicName, MessageDescription]]

Topic-to-Schemas mapping. Mapping's value should contain at least message's value schema to be used for serialization.

required
config ProducerConfig

Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

required
sr_client Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]]

Client for schema registry

None
protocol_id int

Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.)

1

Raises:

Type Description
ValueError

If schema registry configuration is missing.

Source code in wunderkafka/factories/avro.py
def __init__(
    self,
    mapping: Optional[Dict[TopicName, MessageDescription]],
    config: ProducerConfig,
    *,
    sr_client: Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]] = None,
    protocol_id: int = 1
) -> None:
    """
    Init producer from pre-defined blocks.

    :param mapping:     Topic-to-Schemas mapping.
                        Mapping's value should contain at least message's value schema to be used for serialization.
    :param config:      Configuration for:

                            - Librdkafka producer.
                            - Schema registry client (conventional options for HTTP).

                        Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

    :param sr_client:   Client for schema registry
    :param protocol_id: Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.)

    :raises ValueError: If schema registry configuration is missing.
    """
    sr = config.sr
    if sr is None:
        raise ValueError('Schema registry config is necessary for {0}'.format(self.__class__.__name__))
    if sr_client is None:
        sr_client = ClouderaSRClient
    self._default_timeout: int = 60

    config, watchdog = check_watchdog(config)
    super().__init__(
        producer=BytesProducer(config, watchdog),
        schema_registry=sr_client(
            KerberizableHTTPClient(sr, requires_kerberos=config_requires_kerberos(config),
                                   cmd_kinit=config.sasl_kerberos_kinit_cmd),
            SimpleCache(),
        ),
        header_packer=ConfluentClouderaHeadersHandler().pack,
        serializer=FastAvroSerializer(),
        store=SchemaTextRepo(schema_type=SchemaType.AVRO),
        mapping=mapping,
        protocol_id=protocol_id
    )

AvroModelProducer

Bases: HighLevelSerializingProducer

Kafka Producer client to serialize and send models or dataclasses as messages.

Source code in wunderkafka/factories/avro.py
class AvroModelProducer(HighLevelSerializingProducer):
    """Kafka Producer client to serialize and send models or dataclasses as messages."""

    def __init__(
        self,
        mapping: Optional[Dict[TopicName, MessageDescription]],
        config: ProducerConfig,
        *,
        sr_client: Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]] = None,
        protocol_id: int = 1
    ) -> None:
        """
        Init producer from pre-defined blocks.

        :param mapping:     Topic-to-Schemas mapping.
                            Mapping's value should contain at least message's value model to derive schema which will
                            be used for serialization.
        :param config:      Configuration for:

                                - Librdkafka producer.
                                - Schema registry client (conventional options for HTTP).

                            Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

        :param sr_client:   Client for schema registry
        :param protocol_id: Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.)

        :raises ValueError: If schema registry configuration is missing.
        """
        sr = config.sr
        if sr is None:
            raise ValueError('Schema registry config is necessary for {0}'.format(self.__class__.__name__))

        if sr_client is None:
            sr_client = ClouderaSRClient
        self._default_timeout: int = 60

        config, watchdog = check_watchdog(config)
        super().__init__(
            producer=BytesProducer(config, watchdog),
            schema_registry=sr_client(
                KerberizableHTTPClient(sr, requires_kerberos=config_requires_kerberos(config),
                                       cmd_kinit=config.sasl_kerberos_kinit_cmd),
                SimpleCache(),
            ),
            header_packer=ConfluentClouderaHeadersHandler().pack,
            serializer=AvroModelSerializer(),
            store=AvroModelRepo(),
            mapping=mapping,
            protocol_id=protocol_id
        )

__init__(mapping, config, *, sr_client=None, protocol_id=1)

Init producer from pre-defined blocks.

Parameters:

Name Type Description Default
mapping Optional[Dict[TopicName, MessageDescription]]

Topic-to-Schemas mapping. Mapping's value should contain at least message's value model to derive schema which will be used for serialization.

required
config ProducerConfig

Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

required
sr_client Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]]

Client for schema registry

None
protocol_id int

Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.)

1

Raises:

Type Description
ValueError

If schema registry configuration is missing.

Source code in wunderkafka/factories/avro.py
def __init__(
    self,
    mapping: Optional[Dict[TopicName, MessageDescription]],
    config: ProducerConfig,
    *,
    sr_client: Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]] = None,
    protocol_id: int = 1
) -> None:
    """
    Init producer from pre-defined blocks.

    :param mapping:     Topic-to-Schemas mapping.
                        Mapping's value should contain at least message's value model to derive schema which will
                        be used for serialization.
    :param config:      Configuration for:

                            - Librdkafka producer.
                            - Schema registry client (conventional options for HTTP).

                        Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

    :param sr_client:   Client for schema registry
    :param protocol_id: Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.)

    :raises ValueError: If schema registry configuration is missing.
    """
    sr = config.sr
    if sr is None:
        raise ValueError('Schema registry config is necessary for {0}'.format(self.__class__.__name__))

    if sr_client is None:
        sr_client = ClouderaSRClient
    self._default_timeout: int = 60

    config, watchdog = check_watchdog(config)
    super().__init__(
        producer=BytesProducer(config, watchdog),
        schema_registry=sr_client(
            KerberizableHTTPClient(sr, requires_kerberos=config_requires_kerberos(config),
                                   cmd_kinit=config.sasl_kerberos_kinit_cmd),
            SimpleCache(),
        ),
        header_packer=ConfluentClouderaHeadersHandler().pack,
        serializer=AvroModelSerializer(),
        store=AvroModelRepo(),
        mapping=mapping,
        protocol_id=protocol_id
    )

JSON

JSONConsumer

Bases: HighLevelDeserializingConsumer

Kafka Consumer client to get JSON-serialized messages from Confluent/Cloudera installation.

Source code in wunderkafka/factories/json.py
class JSONConsumer(HighLevelDeserializingConsumer):
    """Kafka Consumer client to get JSON-serialized messages from Confluent/Cloudera installation."""

    def __init__(
        self,
        config: ConsumerConfig,
        *,
        sr_client: Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]] = None,
    ) -> None:
        """
        Init consumer from pre-defined blocks.

        :param config:      Configuration for:

                                - Librdkafka consumer.
                                - Schema registry client (conventional options for HTTP).

                            Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

        :param sr_client:   Client for schema registry

        :raises ValueError: If schema registry configuration is missing.
        """
        sr = config.sr
        self._default_timeout: int = 60
        if sr is None:
            raise ValueError('Schema registry config is necessary for {0}'.format(self.__class__.__name__))
        if sr_client is None:
            sr_client = ClouderaSRClient

        config, watchdog = check_watchdog(config)
        super().__init__(
            consumer=BytesConsumer(config, watchdog),
            schema_registry=sr_client(
                KerberizableHTTPClient(
                    sr,
                    requires_kerberos=config_requires_kerberos(config),
                    cmd_kinit=config.sasl_kerberos_kinit_cmd,
                ),
                SimpleCache(),
            ),
            headers_handler=ConfluentClouderaHeadersHandler().parse,
            deserializer=JSONDeserializer(),
        )

__init__(config, *, sr_client=None)

Init consumer from pre-defined blocks.

Parameters:

Name Type Description Default
config ConsumerConfig

Configuration for: - Librdkafka consumer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

required
sr_client Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]]

Client for schema registry

None

Raises:

Type Description
ValueError

If schema registry configuration is missing.

Source code in wunderkafka/factories/json.py
def __init__(
    self,
    config: ConsumerConfig,
    *,
    sr_client: Optional[Union[Type[ClouderaSRClient], Type[ConfluentSRClient]]] = None,
) -> None:
    """
    Init consumer from pre-defined blocks.

    :param config:      Configuration for:

                            - Librdkafka consumer.
                            - Schema registry client (conventional options for HTTP).

                        Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

    :param sr_client:   Client for schema registry

    :raises ValueError: If schema registry configuration is missing.
    """
    sr = config.sr
    self._default_timeout: int = 60
    if sr is None:
        raise ValueError('Schema registry config is necessary for {0}'.format(self.__class__.__name__))
    if sr_client is None:
        sr_client = ClouderaSRClient

    config, watchdog = check_watchdog(config)
    super().__init__(
        consumer=BytesConsumer(config, watchdog),
        schema_registry=sr_client(
            KerberizableHTTPClient(
                sr,
                requires_kerberos=config_requires_kerberos(config),
                cmd_kinit=config.sasl_kerberos_kinit_cmd,
            ),
            SimpleCache(),
        ),
        headers_handler=ConfluentClouderaHeadersHandler().parse,
        deserializer=JSONDeserializer(),
    )

JSONProducer

Bases: HighLevelSerializingProducer

Kafka Producer client to serialize and send dictionaries or built-in types as messages.

Source code in wunderkafka/factories/json.py
class JSONProducer(HighLevelSerializingProducer):
    """Kafka Producer client to serialize and send dictionaries or built-in types as messages."""

    def __init__(
        self,
        mapping: Optional[Dict[TopicName, MessageDescription]],
        config: ProducerConfig,
        *,
        sr_client: Optional[Type[ConfluentSRClient]] = None,
        protocol_id: int = 1
    ) -> None:
        """
        Init producer from pre-defined blocks.

        :param mapping:     Topic-to-Schemas mapping.
                            Mapping's value should contain at least message's value schema to be used for serialization.
        :param config:      Configuration for:

                                - Librdkafka producer.
                                - Schema registry client (conventional options for HTTP).

                            Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

        :param sr_client:   Client for schema registry
        :param protocol_id: Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.)

        :raises ValueError: If schema registry configuration is missing.
        """
        sr = config.sr
        if sr is None:
            raise ValueError('Schema registry config is necessary for {0}'.format(self.__class__.__name__))
        if sr_client is None:
            sr_client = ConfluentSRClient
        self._default_timeout: int = 60

        config, watchdog = check_watchdog(config)
        schema_registry = sr_client(
            KerberizableHTTPClient(
                sr, requires_kerberos=config_requires_kerberos(config), cmd_kinit=config.sasl_kerberos_kinit_cmd,
            ),
            SimpleCache(),
        )
        super().__init__(
            producer=BytesProducer(config, watchdog),
            schema_registry=schema_registry,
            header_packer=ConfluentClouderaHeadersHandler().pack,
            serializer=JSONSerializer(schema_registry.client),
            store=SchemaTextRepo(schema_type=SchemaType.JSON),
            mapping=mapping,
            protocol_id=protocol_id
        )

__init__(mapping, config, *, sr_client=None, protocol_id=1)

Init producer from pre-defined blocks.

Parameters:

Name Type Description Default
mapping Optional[Dict[TopicName, MessageDescription]]

Topic-to-Schemas mapping. Mapping's value should contain at least message's value schema to be used for serialization.

required
config ProducerConfig

Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

required
sr_client Optional[Type[ConfluentSRClient]]

Client for schema registry

None
protocol_id int

Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.)

1

Raises:

Type Description
ValueError

If schema registry configuration is missing.

Source code in wunderkafka/factories/json.py
def __init__(
    self,
    mapping: Optional[Dict[TopicName, MessageDescription]],
    config: ProducerConfig,
    *,
    sr_client: Optional[Type[ConfluentSRClient]] = None,
    protocol_id: int = 1
) -> None:
    """
    Init producer from pre-defined blocks.

    :param mapping:     Topic-to-Schemas mapping.
                        Mapping's value should contain at least message's value schema to be used for serialization.
    :param config:      Configuration for:

                            - Librdkafka producer.
                            - Schema registry client (conventional options for HTTP).

                        Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

    :param sr_client:   Client for schema registry
    :param protocol_id: Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.)

    :raises ValueError: If schema registry configuration is missing.
    """
    sr = config.sr
    if sr is None:
        raise ValueError('Schema registry config is necessary for {0}'.format(self.__class__.__name__))
    if sr_client is None:
        sr_client = ConfluentSRClient
    self._default_timeout: int = 60

    config, watchdog = check_watchdog(config)
    schema_registry = sr_client(
        KerberizableHTTPClient(
            sr, requires_kerberos=config_requires_kerberos(config), cmd_kinit=config.sasl_kerberos_kinit_cmd,
        ),
        SimpleCache(),
    )
    super().__init__(
        producer=BytesProducer(config, watchdog),
        schema_registry=schema_registry,
        header_packer=ConfluentClouderaHeadersHandler().pack,
        serializer=JSONSerializer(schema_registry.client),
        store=SchemaTextRepo(schema_type=SchemaType.JSON),
        mapping=mapping,
        protocol_id=protocol_id
    )

JSONModelProducer

Bases: HighLevelSerializingProducer

Kafka Producer client to serialize and send models or dataclasses as messages.

Source code in wunderkafka/factories/json.py
class JSONModelProducer(HighLevelSerializingProducer):
    """Kafka Producer client to serialize and send models or dataclasses as messages."""

    def __init__(
        self,
        mapping: Optional[Dict[TopicName, MessageDescription]],
        config: ProducerConfig,
        *,
        sr_client: Optional[Type[ConfluentSRClient]] = None,
        protocol_id: int = 1
    ) -> None:
        """
        Init producer from pre-defined blocks.

        :param mapping:     Topic-to-Schemas mapping.
                            Mapping's value should contain at least message's value model to derive schema which will
                            be used for serialization.
        :param config:      Configuration for:

                                - Librdkafka producer.
                                - Schema registry client (conventional options for HTTP).

                            Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

        :param sr_client:   Client for schema registry
        :param protocol_id: Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.)

        :raises ValueError: If schema registry configuration is missing.
        """
        sr = config.sr
        if sr is None:
            raise ValueError('Schema registry config is necessary for {0}'.format(self.__class__.__name__))

        if sr_client is None:
            sr_client = ConfluentSRClient
        self._default_timeout: int = 60

        config, watchdog = check_watchdog(config)
        schema_registry = sr_client(
            KerberizableHTTPClient(
                sr,
                requires_kerberos=config_requires_kerberos(config),
                cmd_kinit=config.sasl_kerberos_kinit_cmd,
            ),
            SimpleCache(),
        )
        super().__init__(
            producer=BytesProducer(config, watchdog),
            schema_registry=schema_registry,
            header_packer=ConfluentClouderaHeadersHandler().pack,
            serializer=JSONModelSerializer(schema_registry.client),
            store=JSONModelRepo(),
            mapping=mapping,
            protocol_id=protocol_id
        )

__init__(mapping, config, *, sr_client=None, protocol_id=1)

Init producer from pre-defined blocks.

Parameters:

Name Type Description Default
mapping Optional[Dict[TopicName, MessageDescription]]

Topic-to-Schemas mapping. Mapping's value should contain at least message's value model to derive schema which will be used for serialization.

required
config ProducerConfig

Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

required
sr_client Optional[Type[ConfluentSRClient]]

Client for schema registry

None
protocol_id int

Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.)

1

Raises:

Type Description
ValueError

If schema registry configuration is missing.

Source code in wunderkafka/factories/json.py
def __init__(
    self,
    mapping: Optional[Dict[TopicName, MessageDescription]],
    config: ProducerConfig,
    *,
    sr_client: Optional[Type[ConfluentSRClient]] = None,
    protocol_id: int = 1
) -> None:
    """
    Init producer from pre-defined blocks.

    :param mapping:     Topic-to-Schemas mapping.
                        Mapping's value should contain at least message's value model to derive schema which will
                        be used for serialization.
    :param config:      Configuration for:

                            - Librdkafka producer.
                            - Schema registry client (conventional options for HTTP).

                        Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

    :param sr_client:   Client for schema registry
    :param protocol_id: Protocol id for producer (1 - Cloudera, 0 - Confluent, etc.)

    :raises ValueError: If schema registry configuration is missing.
    """
    sr = config.sr
    if sr is None:
        raise ValueError('Schema registry config is necessary for {0}'.format(self.__class__.__name__))

    if sr_client is None:
        sr_client = ConfluentSRClient
    self._default_timeout: int = 60

    config, watchdog = check_watchdog(config)
    schema_registry = sr_client(
        KerberizableHTTPClient(
            sr,
            requires_kerberos=config_requires_kerberos(config),
            cmd_kinit=config.sasl_kerberos_kinit_cmd,
        ),
        SimpleCache(),
    )
    super().__init__(
        producer=BytesProducer(config, watchdog),
        schema_registry=schema_registry,
        header_packer=ConfluentClouderaHeadersHandler().pack,
        serializer=JSONModelSerializer(schema_registry.client),
        store=JSONModelRepo(),
        mapping=mapping,
        protocol_id=protocol_id
    )

Mixed

These pre-configured consumers and producers are provided for convenience and as explicit example of how to define own factories with mixed (de)serializers.

Note

The naming follow producer API, where the key follows the value. That\'s why the classes are called: Value(de)Serializer + Key(de)Serializer.

AVRO + String

AvroModelStringProducer

Bases: HighLevelSerializingProducer

Kafka Producer client to send models as avro-serialized message values and string-serialized keys.

Source code in wunderkafka/factories/mixed.py
class AvroModelStringProducer(HighLevelSerializingProducer):
    """Kafka Producer client to send models as avro-serialized message values and string-serialized keys."""

    def __init__(
        self,
        mapping: Optional[Dict[TopicName, MessageDescription]],
        config: ProducerConfig,
        *,
        sr_client: Optional[Type[ConfluentSRClient]] = None,
    ) -> None:
        """
        Init producer from pre-defined blocks.

        :param mapping:     Topic-to-Schemas mapping.
                            Mapping's value should contain at least message's value model to derive schema which will
                            be used for serialization.
        :param config:      Configuration for:

                                - Librdkafka producer.
                                - Schema registry client (conventional options for HTTP).

                            Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

        :param sr_client:   Client for schema registry

        :raises ValueError: If schema registry configuration is missing.
        """
        self._default_timeout: int = 60
        sr = config.sr
        if sr is None:
            raise ValueError("Schema registry config is necessary for {0}".format(self.__class__.__name__))

        if sr_client is None:
            sr_client = ConfluentSRClient

        config, watchdog = check_watchdog(config)
        super().__init__(
            producer=BytesProducer(config, watchdog),
            schema_registry=sr_client(
                KerberizableHTTPClient(
                    sr,
                    requires_kerberos=config_requires_kerberos(config),
                    cmd_kinit=config.sasl_kerberos_kinit_cmd,
                ),
                SimpleCache()),
            header_packer=ConfluentClouderaHeadersHandler().pack,
            value_serializer=AvroModelSerializer(AvroModelRepo()),
            key_serializer=StringSerializer(),
            mapping=mapping,
            protocol_id=0,
        )

__init__(mapping, config, *, sr_client=None)

Init producer from pre-defined blocks.

Parameters:

Name Type Description Default
mapping Optional[Dict[TopicName, MessageDescription]]

Topic-to-Schemas mapping. Mapping's value should contain at least message's value model to derive schema which will be used for serialization.

required
config ProducerConfig

Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

required
sr_client Optional[Type[ConfluentSRClient]]

Client for schema registry

None

Raises:

Type Description
ValueError

If schema registry configuration is missing.

Source code in wunderkafka/factories/mixed.py
def __init__(
    self,
    mapping: Optional[Dict[TopicName, MessageDescription]],
    config: ProducerConfig,
    *,
    sr_client: Optional[Type[ConfluentSRClient]] = None,
) -> None:
    """
    Init producer from pre-defined blocks.

    :param mapping:     Topic-to-Schemas mapping.
                        Mapping's value should contain at least message's value model to derive schema which will
                        be used for serialization.
    :param config:      Configuration for:

                            - Librdkafka producer.
                            - Schema registry client (conventional options for HTTP).

                        Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

    :param sr_client:   Client for schema registry

    :raises ValueError: If schema registry configuration is missing.
    """
    self._default_timeout: int = 60
    sr = config.sr
    if sr is None:
        raise ValueError("Schema registry config is necessary for {0}".format(self.__class__.__name__))

    if sr_client is None:
        sr_client = ConfluentSRClient

    config, watchdog = check_watchdog(config)
    super().__init__(
        producer=BytesProducer(config, watchdog),
        schema_registry=sr_client(
            KerberizableHTTPClient(
                sr,
                requires_kerberos=config_requires_kerberos(config),
                cmd_kinit=config.sasl_kerberos_kinit_cmd,
            ),
            SimpleCache()),
        header_packer=ConfluentClouderaHeadersHandler().pack,
        value_serializer=AvroModelSerializer(AvroModelRepo()),
        key_serializer=StringSerializer(),
        mapping=mapping,
        protocol_id=0,
    )

AvroStringConsumer

Bases: HighLevelDeserializingConsumer

Kafka Consumer client to get messages with avro-serialized values and string-serialized keys.

Source code in wunderkafka/factories/mixed.py
class AvroStringConsumer(HighLevelDeserializingConsumer):
    """Kafka Consumer client to get messages with avro-serialized values and string-serialized keys."""

    def __init__(
        self,
        config: ConsumerConfig,
        *,
        sr_client: Optional[Type[ConfluentSRClient]] = None,
    ) -> None:
        """
        Init consumer from pre-defined blocks.

        :param config:      Configuration for:

                                - Librdkafka consumer.
                                - Schema registry client (conventional options for HTTP).

                            Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

        :param sr_client:   Client for schema registry

        :raises ValueError: If schema registry configuration is missing.
        """
        self._default_timeout: int = 60
        sr = config.sr
        if sr is None:
            raise ValueError("Schema registry config is necessary for {0}".format(self.__class__.__name__))
        if sr_client is None:
            sr_client = ConfluentSRClient

        config, watchdog = check_watchdog(config)
        super().__init__(
            consumer=BytesConsumer(config, None),
            schema_registry=sr_client(
                KerberizableHTTPClient(sr),
                SimpleCache(),
            ),
            headers_handler=ConfluentClouderaHeadersHandler().parse,
            value_deserializer=FastAvroDeserializer(),
            key_deserializer=StringDeserializer(),
            stream_result=True,
        )

__init__(config, *, sr_client=None)

Init consumer from pre-defined blocks.

Parameters:

Name Type Description Default
config ConsumerConfig

Configuration for: - Librdkafka consumer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

required
sr_client Optional[Type[ConfluentSRClient]]

Client for schema registry

None

Raises:

Type Description
ValueError

If schema registry configuration is missing.

Source code in wunderkafka/factories/mixed.py
def __init__(
    self,
    config: ConsumerConfig,
    *,
    sr_client: Optional[Type[ConfluentSRClient]] = None,
) -> None:
    """
    Init consumer from pre-defined blocks.

    :param config:      Configuration for:

                            - Librdkafka consumer.
                            - Schema registry client (conventional options for HTTP).

                        Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

    :param sr_client:   Client for schema registry

    :raises ValueError: If schema registry configuration is missing.
    """
    self._default_timeout: int = 60
    sr = config.sr
    if sr is None:
        raise ValueError("Schema registry config is necessary for {0}".format(self.__class__.__name__))
    if sr_client is None:
        sr_client = ConfluentSRClient

    config, watchdog = check_watchdog(config)
    super().__init__(
        consumer=BytesConsumer(config, None),
        schema_registry=sr_client(
            KerberizableHTTPClient(sr),
            SimpleCache(),
        ),
        headers_handler=ConfluentClouderaHeadersHandler().parse,
        value_deserializer=FastAvroDeserializer(),
        key_deserializer=StringDeserializer(),
        stream_result=True,
    )

JSON + String

JSONModelStringProducer

Bases: HighLevelSerializingProducer

Kafka Producer client to send models as JSON-serialized message values and string-serialized keys.

Source code in wunderkafka/factories/mixed.py
class JSONModelStringProducer(HighLevelSerializingProducer):
    """Kafka Producer client to send models as JSON-serialized message values and string-serialized keys."""

    def __init__(
        self,
        mapping: Optional[Dict[TopicName, MessageDescription]],
        config: ProducerConfig,
        *,
        sr_client: Optional[Type[ConfluentSRClient]] = None,
    ) -> None:
        """
        Init producer from pre-defined blocks.

        :param mapping:     Topic-to-Schemas mapping.
                            Mapping's value should contain at least message's value model to derive schema which will
                            be used for serialization.
        :param config:      Configuration for:

                                - Librdkafka producer.
                                - Schema registry client (conventional options for HTTP).

                            Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

        :param sr_client:   Client for schema registry

        :raises ValueError: If schema registry configuration is missing.
        """
        self._default_timeout: int = 60

        sr = config.sr
        if sr is None:
            raise ValueError("Schema registry config is necessary for {0}".format(self.__class__.__name__))

        if sr_client is None:
            sr_client = ConfluentSRClient

        config, watchdog = check_watchdog(config)
        schema_registry = sr_client(
            KerberizableHTTPClient(
                sr,
                requires_kerberos=config_requires_kerberos(config),
                cmd_kinit=config.sasl_kerberos_kinit_cmd,
            ),
            SimpleCache(),
        )
        super().__init__(
            producer=BytesProducer(config, watchdog),
            schema_registry=schema_registry,
            header_packer=ConfluentClouderaHeadersHandler().pack,
            value_serializer=JSONModelSerializer(schema_registry.client),
            key_serializer=StringSerializer(),
            mapping=mapping,
            protocol_id=0,
        )

__init__(mapping, config, *, sr_client=None)

Init producer from pre-defined blocks.

Parameters:

Name Type Description Default
mapping Optional[Dict[TopicName, MessageDescription]]

Topic-to-Schemas mapping. Mapping's value should contain at least message's value model to derive schema which will be used for serialization.

required
config ProducerConfig

Configuration for: - Librdkafka producer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

required
sr_client Optional[Type[ConfluentSRClient]]

Client for schema registry

None

Raises:

Type Description
ValueError

If schema registry configuration is missing.

Source code in wunderkafka/factories/mixed.py
def __init__(
    self,
    mapping: Optional[Dict[TopicName, MessageDescription]],
    config: ProducerConfig,
    *,
    sr_client: Optional[Type[ConfluentSRClient]] = None,
) -> None:
    """
    Init producer from pre-defined blocks.

    :param mapping:     Topic-to-Schemas mapping.
                        Mapping's value should contain at least message's value model to derive schema which will
                        be used for serialization.
    :param config:      Configuration for:

                            - Librdkafka producer.
                            - Schema registry client (conventional options for HTTP).

                        Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

    :param sr_client:   Client for schema registry

    :raises ValueError: If schema registry configuration is missing.
    """
    self._default_timeout: int = 60

    sr = config.sr
    if sr is None:
        raise ValueError("Schema registry config is necessary for {0}".format(self.__class__.__name__))

    if sr_client is None:
        sr_client = ConfluentSRClient

    config, watchdog = check_watchdog(config)
    schema_registry = sr_client(
        KerberizableHTTPClient(
            sr,
            requires_kerberos=config_requires_kerberos(config),
            cmd_kinit=config.sasl_kerberos_kinit_cmd,
        ),
        SimpleCache(),
    )
    super().__init__(
        producer=BytesProducer(config, watchdog),
        schema_registry=schema_registry,
        header_packer=ConfluentClouderaHeadersHandler().pack,
        value_serializer=JSONModelSerializer(schema_registry.client),
        key_serializer=StringSerializer(),
        mapping=mapping,
        protocol_id=0,
    )

JSONStringConsumer

Bases: HighLevelDeserializingConsumer

Kafka Consumer client to get messages with JSON-serialized values and string-serialized keys.

Source code in wunderkafka/factories/mixed.py
class JSONStringConsumer(HighLevelDeserializingConsumer):
    """Kafka Consumer client to get messages with JSON-serialized values and string-serialized keys."""

    def __init__(
        self,
        config: ConsumerConfig,
        *,
        sr_client: Optional[Type[ConfluentSRClient]] = None,
    ) -> None:
        """
        Init consumer from pre-defined blocks.

        :param config:      Configuration for:

                                - Librdkafka consumer.
                                - Schema registry client (conventional options for HTTP).

                            Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

        :param sr_client:   Client for schema registry

        :raises ValueError: If schema registry configuration is missing.
        """
        self._default_timeout: int = 60
        sr = config.sr
        if sr is None:
            raise ValueError("Schema registry config is necessary for {0}".format(self.__class__.__name__))
        if sr_client is None:
            sr_client = ConfluentSRClient

        config, watchdog = check_watchdog(config)
        super().__init__(
            consumer=BytesConsumer(config, watchdog),
            schema_registry=sr_client(
                KerberizableHTTPClient(sr),
                SimpleCache(),
            ),
            headers_handler=ConfluentClouderaHeadersHandler().parse,
            value_deserializer=JSONDeserializer(),
            key_deserializer=StringDeserializer(),
            stream_result=True,
        )

__init__(config, *, sr_client=None)

Init consumer from pre-defined blocks.

Parameters:

Name Type Description Default
config ConsumerConfig

Configuration for: - Librdkafka consumer. - Schema registry client (conventional options for HTTP). Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

required
sr_client Optional[Type[ConfluentSRClient]]

Client for schema registry

None

Raises:

Type Description
ValueError

If schema registry configuration is missing.

Source code in wunderkafka/factories/mixed.py
def __init__(
    self,
    config: ConsumerConfig,
    *,
    sr_client: Optional[Type[ConfluentSRClient]] = None,
) -> None:
    """
    Init consumer from pre-defined blocks.

    :param config:      Configuration for:

                            - Librdkafka consumer.
                            - Schema registry client (conventional options for HTTP).

                        Refer original CONFIGURATION.md (https://git.io/JmgCl) or generated config.

    :param sr_client:   Client for schema registry

    :raises ValueError: If schema registry configuration is missing.
    """
    self._default_timeout: int = 60
    sr = config.sr
    if sr is None:
        raise ValueError("Schema registry config is necessary for {0}".format(self.__class__.__name__))
    if sr_client is None:
        sr_client = ConfluentSRClient

    config, watchdog = check_watchdog(config)
    super().__init__(
        consumer=BytesConsumer(config, watchdog),
        schema_registry=sr_client(
            KerberizableHTTPClient(sr),
            SimpleCache(),
        ),
        headers_handler=ConfluentClouderaHeadersHandler().parse,
        value_deserializer=JSONDeserializer(),
        key_deserializer=StringDeserializer(),
        stream_result=True,
    )

TopicSubscription

Bases: object

Allows custom definition of subscription for topic without need to build full TopicPartition list.

Source code in wunderkafka/consumers/subscription.py
class TopicSubscription(object):
    """Allows custom definition of subscription for topic without need to build full TopicPartition list."""

    def __init__(  # noqa: WPS211  # ToDo (tribunsky.kir): reconsider API of 'how'
        self,
        topic: str,
        *,
        from_beginning: Optional[bool] = None,
        offset: Optional[int] = None,
        ts: Optional[int] = None,
        with_timedelta: Optional[datetime.timedelta] = None,
    ) -> None:
        """
        Init topic subscription object.

        Only one method of subscription per topic is allowed at time:

        - from beginning (depends on your retention policy)
        - from end (consume only latest messages)
        - via specific offset
        - via specific timestamp
        - via specific timedelta (from current datetime)
        - no special option (consumer will use "default" value of auto.offset.reset)

        :param topic:           Topic to subscribe.
        :param from_beginning:  If True, subscribe to get earliest available messages. If False, get latest messages.
        :param offset:          Subscribe to specific offset.
                                If offset not found, will behave with built-in default.
        :param ts:              Subscribe to specific timestamp (milliseconds).
                                If timestamp not found, will behave with built-in default.
        :param with_timedelta:  Subscribe to some moment in the past, from current datetime for a given timedelta.
                                Will calculate specific timestamp and subscribe via ts.
        """
        self.topic = topic
        self.how = choose_offset(
            from_beginning=from_beginning, offset=offset, ts=ts, with_timedelta=with_timedelta,
        )

__init__(topic, *, from_beginning=None, offset=None, ts=None, with_timedelta=None)

Init topic subscription object.

Only one method of subscription per topic is allowed at time:

  • from beginning (depends on your retention policy)
  • from end (consume only latest messages)
  • via specific offset
  • via specific timestamp
  • via specific timedelta (from current datetime)
  • no special option (consumer will use "default" value of auto.offset.reset)

Parameters:

Name Type Description Default
topic str

Topic to subscribe.

required
from_beginning Optional[bool]

If True, subscribe to get earliest available messages. If False, get latest messages.

None
offset Optional[int]

Subscribe to specific offset. If offset not found, will behave with built-in default.

None
ts Optional[int]

Subscribe to specific timestamp (milliseconds). If timestamp not found, will behave with built-in default.

None
with_timedelta Optional[timedelta]

Subscribe to some moment in the past, from current datetime for a given timedelta. Will calculate specific timestamp and subscribe via ts.

None
Source code in wunderkafka/consumers/subscription.py
def __init__(  # noqa: WPS211  # ToDo (tribunsky.kir): reconsider API of 'how'
    self,
    topic: str,
    *,
    from_beginning: Optional[bool] = None,
    offset: Optional[int] = None,
    ts: Optional[int] = None,
    with_timedelta: Optional[datetime.timedelta] = None,
) -> None:
    """
    Init topic subscription object.

    Only one method of subscription per topic is allowed at time:

    - from beginning (depends on your retention policy)
    - from end (consume only latest messages)
    - via specific offset
    - via specific timestamp
    - via specific timedelta (from current datetime)
    - no special option (consumer will use "default" value of auto.offset.reset)

    :param topic:           Topic to subscribe.
    :param from_beginning:  If True, subscribe to get earliest available messages. If False, get latest messages.
    :param offset:          Subscribe to specific offset.
                            If offset not found, will behave with built-in default.
    :param ts:              Subscribe to specific timestamp (milliseconds).
                            If timestamp not found, will behave with built-in default.
    :param with_timedelta:  Subscribe to some moment in the past, from current datetime for a given timedelta.
                            Will calculate specific timestamp and subscribe via ts.
    """
    self.topic = topic
    self.how = choose_offset(
        from_beginning=from_beginning, offset=offset, ts=ts, with_timedelta=with_timedelta,
    )