Class SimpleConfigBuilder

  • All Implemented Interfaces:
    ConfigBuilder

    public class SimpleConfigBuilder
    extends ConfigBuilderImpl
    Simple implementation of ConfigBuilder for use with KafkaConnection.

    The simple builder contains configuration marked as high importance in both the Apache Kafka Producer Config Documentation and the Apache Kafka Consumer Config Documentation. Because it caters for both scenarios it will be possible to get some warnings about unused configuration (e.g. you have configured acks on a connection that is used for both producers and consumer). These can be safely ignored or filtered from the logging (filter the classes org.apache.kafka.clients.consumer.ConsumerConfig and {code org.apache.kafka.clients.producer.ProducerConfig}

    In the adapter configuration file this class is aliased as kafka-simple-config-builder which is the preferred alternative to the fully qualified classname when building your configuration.

    • Constructor Detail

      • SimpleConfigBuilder

        public SimpleConfigBuilder()
      • SimpleConfigBuilder

        public SimpleConfigBuilder​(java.lang.String bootstrapServers)
    • Method Detail

      • build

        public java.util.Map<java.lang.String,​java.lang.Object> build()
                                                                     throws CoreException
        Description copied from interface: ConfigBuilder
        Build a configuration suitable for both a consumer and producer.
        Returns:
        the configuration
        Throws:
        CoreException - wrapping any underlying exception.
      • build

        public java.util.Map<java.lang.String,​java.lang.Object> build​(ConfigBuilder.KeyFilter filter)
                                                                     throws CoreException
        Description copied from interface: ConfigBuilder
        Build a configuration with a filter applied to the config names.
        Parameters:
        filter - the keys to keep.
        Returns:
        the configuration
        Throws:
        CoreException - wrapping any underlying exception.
        See Also:
        ConfigDefinition
      • getBootstrapServers

        @NonNull
        public @NonNull java.lang.String getBootstrapServers()
        Set the bootstrap.servers property.

        A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping; this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,..... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

      • setBootstrapServers

        public void setBootstrapServers​(@NonNull
                                        @NonNull java.lang.String bootstrapServers)
        Set the bootstrap.servers property.

        A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping; this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,..... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

        Parameters:
        bootstrapServers - the bootstrap servers
      • getGroupId

        public java.lang.String getGroupId()
        Set the group.id property.

        A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.

      • setGroupId

        public void setGroupId​(java.lang.String groupId)
        Set the group.id property.

        A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.

        Parameters:
        groupId - the groupId to set
      • getBufferMemory

        public java.lang.Long getBufferMemory()
        Set the buffer.memory property.

        The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full ({code block.on.buffer.full} defaults to false, so an exception will be thrown).

        This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.

      • setBufferMemory

        public void setBufferMemory​(java.lang.Long bufferMemory)
        Set the buffer.memory property.

        The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full ({code block.on.buffer.full} defaults to false, so an exception will be thrown).

        This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.

        Parameters:
        bufferMemory - the buffer memory
      • getCompressionType

        public ConfigBuilder.CompressionType getCompressionType()
        Set the compression.type property.

        The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, snappy, or lz4. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).

      • setCompressionType

        public void setCompressionType​(ConfigBuilder.CompressionType compressionType)
        Set the compression.type property.

        The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, snappy, or lz4. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).

        Parameters:
        compressionType - the compression type
      • getAcks

        public ConfigBuilder.Acks getAcks()
        Set the acks property.

        This specifies number of acknowledgments the producer requires the leader to have received before considering a request complete

      • setAcks

        public void setAcks​(ConfigBuilder.Acks acks)
        Set the acks property.

        This specifies number of acknowledgments the producer requires the leader to have received before considering a request complete

        Parameters:
        acks - the number of acks; default is Acks#all if not specified for the strongest available guarantee.
      • getRetries

        public java.lang.Integer getRetries()
        Set the retries property.

        Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second succeeds, then the second record may appear first.

      • setRetries

        public void setRetries​(java.lang.Integer retries)
        Set the retries property.

        Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second succeeds, then the second record may appear first.

        Parameters:
        retries - the number of retries, default is 0 if not specified.