# Consume

Required Frends version: 5.5+

Required .NET version: 6.0

Compatible Agents: Crossplatform

Source code: <https://github.com/FrendsPlatform/Frends.Kafka>

{% tabs %}
{% tab title="Parameter: Input" %}

| Name                                   | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
| -------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Host : `String`                        | <p>Initial list of brokers as a CSV list of broker host or host:port.</p><p><br>Default: <code>-</code><br>Example: <code>localhost:1234</code></p>                                                                                                                                                                                                                                                                                                                                                                                 |
| Topic : `String`                       | <p>Topic.</p><p><br>Default: <code>-</code><br>Example: <code>ExampleTopic</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                               |
| SecurityProtocol : `SecurityProtocols` | <p>Protocol used to communicate with brokers.</p><p>Possible values:</p><ul><li><code>Plaintext</code>:</li></ul><p>Protocol used to communicate with brokers.</p><ul><li><code>Ssl</code>:</li></ul><p>Protocol used to communicate with brokers.</p><ul><li><code>SaslPlaintext</code>:</li></ul><p>Protocol used to communicate with brokers.</p><ul><li><code>SaslSsl</code>:</li></ul><p>Protocol used to communicate with brokers.</p><p><br>Default: <code>0</code><br>Example: <code>SecurityProtocols.Plaintext</code></p> |
| MessageCount : `Int32`                 | <p>Amount of consumed messages before ending this task.</p><p>0 = unlimited, consume until timeout or task cancellation.</p><p><br>Default: <code>-</code><br>Example: <code>10</code></p>                                                                                                                                                                                                                                                                                                                                          |
| Timeout : `Int32`                      | <p>Consume operation timeout (value in ms).</p><p>0=unlimited. See other timeout options in Options-tab.</p><p><br>Default: <code>0</code><br>Example: <code>60000</code></p>                                                                                                                                                                                                                                                                                                                                                       |
| Partition : `Int32`                    | <p>Set Kafka partition.</p><p>Consume from all topic's partitions if set to -1.</p><p><br>Default: <code>-</code><br>Example: <code>10</code></p>                                                                                                                                                                                                                                                                                                                                                                                   |
| {% endtab %}                           |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |

{% tab title="Parameter: Socket" %}

| Name                                     | Description                                                                                                                                                                                                                                                                                                                                                                                                                           |
| ---------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| SocketKeepaliveEnable : `Boolean`        | <p>Enable TCP keep-alives (SO\_KEEPALIVE) on broker sockets.</p><p><br>Default: <code>False</code><br>Example: <code>false</code></p>                                                                                                                                                                                                                                                                                                 |
| SocketNagleDisable : `Boolean`           | <p>Disable the Nagle algorithm (TCP\_NODELAY) on broker sockets.</p><p><br>Default: <code>False</code><br>Example: <code>false</code></p>                                                                                                                                                                                                                                                                                             |
| SocketTimeoutMs : `Int32`                | <p>Default timeout for network requests.</p><p>Consumer: FetchRequests will use Options.FetchWaitMaxMs + Socket.SocketTimeoutMs.</p><p><br>Default: <code>60000</code><br>Example: <code>60000</code></p>                                                                                                                                                                                                                             |
| SocketConnectionSetupTimeoutMs : `Int32` | <p>Maximum time allowed for broker connection setup (TCP connection setup as well SSL and SASL handshake).</p><p>If the connection to the broker is not fully functional after this the connection will be closed and retried.</p><p><br>Default: <code>30000</code><br>Example: <code>30000</code></p>                                                                                                                               |
| SocketMaxFails : `Int32`                 | <p>Disconnect from broker when this number of Produce failures (e.g., timed out requests) is reached.</p><p>Disable with 0.</p><p>WARNING: It is highly recommended to leave this setting at its default value of 1 to avoid the client and broker to become desynchronized in case of request timeouts.</p><p>NOTE: The connection is automatically re-established.</p><p><br>Default: <code>1</code><br>Example: <code>1</code></p> |
| SocketReceiveBufferBytes : `Int32`       | <p>Broker socket receive buffer size.</p><p>0 = System default.</p><p><br>Default: <code>0</code><br>Example: <code>0</code></p>                                                                                                                                                                                                                                                                                                      |
| {% endtab %}                             |                                                                                                                                                                                                                                                                                                                                                                                                                                       |

{% tab title="Parameter: Sasl" %}

| Name                                             | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| ------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| UseSasl : `Boolean`                              | <p>Use SASL.</p><p><br>Default: <code>False</code><br>Example: <code>false</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| SaslMechanism : `SaslMechanisms`                 | <p>SASL mechanism to use for authentication.</p><p>Possible values:</p><ul><li><code>Gssapi</code>:</li></ul><p>SASL mechanism to use for authentication.</p><ul><li><code>Plain</code>:</li></ul><p>SASL mechanism to use for authentication.</p><ul><li><code>ScramSha256</code>:</li></ul><p>SASL mechanism to use for authentication.</p><ul><li><code>ScramSha512</code>:</li></ul><p>SASL mechanism to use for authentication.</p><ul><li><code>OAuthBearer</code>:</li></ul><p>SASL mechanism to use for authentication.</p><p><br>Default: <code>1</code><br>Example: <code>SaslMechanisms.Plain</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                      |
| SaslUsername : `String`                          | <p>SASL username for use with the PLAIN, ScramSha256 or ScramSha512 mechanism.</p><p><br>Default: <code>-</code><br>Example: <code>ExampleUser</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| 🗝SaslPassword : `String`                        | <p>SASL password for use with the PLAIN, ScramSha256 or ScramSha512 mechanism.</p><p><br>Default: <code>-</code><br>Example: <code>ExamplePassword</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| SaslOauthbearerMethod : `SaslOauthbearerMethods` | <p>Set to "default" or "oidc" to control which login method to be used.</p><p>If set to "oidc", the following properties must also be be specified:</p><p>Sasl.SaslOauthbearerClientId,</p><p>Sasl.SaslOauthbearerClientSecret,</p><p>Sasl.SaslOauthbearerTokenEndpointUrl.</p><p>Possible values:</p><ul><li><code>Default</code>:</li></ul><p>Set to "default" or "oidc" to control which login method to be used.</p><p>If set to "oidc", the following properties must also be be specified:</p><p>Sasl.SaslOauthbearerClientId,</p><p>Sasl.SaslOauthbearerClientSecret,</p><p>Sasl.SaslOauthbearerTokenEndpointUrl.</p><ul><li><code>Oidc</code>:</li></ul><p>Set to "default" or "oidc" to control which login method to be used.</p><p>If set to "oidc", the following properties must also be be specified:</p><p>Sasl.SaslOauthbearerClientId,</p><p>Sasl.SaslOauthbearerClientSecret,</p><p>Sasl.SaslOauthbearerTokenEndpointUrl.</p><p><br>Default: <code>0</code><br>Example: <code>SaslOauthbearerMethod.Default</code></p> |
| SaslOauthbearerClientId : `String`               | <p>Public identifier for the application.</p><p>Must be unique across all clients that the authorization server handles.</p><p><br>Default: <code>-</code><br>Example: <code>ExampleClient</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| 🗝SaslOauthbearerClientSecret : `String`         | <p>Client secret only known to the application and the authorization server.</p><p><br>Default: <code>-</code><br>Example: <code>ExampleSecret</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| SaslOauthbearerTokenEndpointUrl : `String`       | <p>OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token.</p><p><br>Default: <code>-</code><br>Example: <code>ExampleURL</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
| SaslOauthbearerExtensions : `String`             | <p>Allow additional information to be provided to the broker.</p><p>Comma-separated list of key=value pairs. E.g., "supportFeatureX=true,organizationId=sales-emea".</p><p><br>Default: <code>-</code><br>Example: <code>supportFeatureX=true,organizationId=sales-emea</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| SaslOauthbearerScope : `String`                  | <p>Client use this to specify the scope of the access request to the broker.</p><p><br>Default: <code>-</code><br>Example: <code>ExampleScope</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| SaslOauthbearerConfig : `String`                 | <p>SASL/OAUTHBEARER configuration.</p><p><br>Default: <code>-</code><br>Example: <code>principal=admin extension\_traceId=123</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| SaslKerberosKeytab : `String`                    | <p>Path to Kerberos keytab file.</p><p>Not supported on Windows.</p><p><br>Default: <code>-</code><br>Example: <code>c:\temp</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| SaslKerberosMinTimeBeforeRelogin : `Int32`       | <p>Minimum time in milliseconds between key refresh attempts.</p><p>Disable automatic key refresh by setting this property to 0.</p><p>Not supported on Windows.</p><p><br>Default: <code>-</code><br>Example: <code>60000</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| SaslKerberosPrincipal : `String`                 | <p>This client's Kerberos principal name.</p><p>Not supported on Windows.</p><p><br>Default: <code>-</code><br>Example: <code>kafkaclient</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| SaslKerberosServiceName : `String`               | <p>Kerberos principal name that Kafka runs as, not including /hostname\@REALM</p><p>Not supported on Windows.</p><p><br>Default: <code>-</code><br>Example: <code>kafka</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| {% endtab %}                                     |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |

{% tab title="Parameter: Ssl" %}

| Name                                                                       | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| -------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| UseSsl : `Boolean`                                                         | <p>Use SSL.</p><p><br>Default: <code>False</code><br>Example: <code>false</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
| SslEndpointIdentificationAlgorithm : `SslEndpointIdentificationAlgorithms` | <p>Endpoint identification algorithm to validate broker hostname using broker certificate.</p><p>https - Server (broker) hostname verification as specified in RFC2818.</p><p>none - No endpoint verification.</p><p>OpenSSL >= 1.0.2 required.</p><p>Possible values:</p><ul><li><code>None</code>:</li></ul><p>Endpoint identification algorithm to validate broker hostname using broker certificate.</p><p>https - Server (broker) hostname verification as specified in RFC2818.</p><p>none - No endpoint verification.</p><p>OpenSSL >= 1.0.2 required.</p><ul><li><code>Https</code>:</li></ul><p>Endpoint identification algorithm to validate broker hostname using broker certificate.</p><p>https - Server (broker) hostname verification as specified in RFC2818.</p><p>none - No endpoint verification.</p><p>OpenSSL >= 1.0.2 required.</p><p><br>Default: <code>0</code><br>Example: <code>SslEndpointIdentificationAlgorithm.None</code></p> |
| EnableSslCertificateVerification : `Boolean`                               | <p>Enable OpenSSL's builtin broker (server) certificate verification.</p><p><br>Default: <code>True</code><br>Example: <code>true</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| SslCertificateLocation : `String`                                          | <p>Path to client's public key (PEM) used for authentication.</p><p><br>Default: <code>-</code><br>Example: <code>/path/to/your/file</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| SslCaLocation : `String`                                                   | <p>File or directory path to CA certificate(s) for verifying the broker's key.</p><p><br>Default: <code>-</code><br>Example: <code>Root</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
| SslKeyLocation : `String`                                                  | <p>Path to client's private key (PEM) used for authentication.</p><p><br>Default: <code>-</code><br>Example: <code>/path/to/your/file</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| SslKeystoreLocation : `String`                                             | <p>Path to client's keystore (PKCS#12) used for authentication.</p><p><br>Default: <code>-</code><br>Example: <code>/path/to/your/file</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| SslEngineLocation : `String`                                               | <p>Path to OpenSSL engine library.</p><p>OpenSSL >= 1.1.0 required.</p><p><br>Default: <code>-</code><br>Example: <code>/path/to/your/file</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
| SslCrlLocation : `String`                                                  | <p>Path to CRL for verifying broker's certificate validity.</p><p><br>Default: <code>-</code><br>Example: <code>/path/to/your/file</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| 🗝SslCertificatePem : `String`                                             | <p>Client's public key string (PEM format) used for authentication.</p><p><br>Default: <code>-</code><br>Example: <code>—–BEGIN PRIVATE KEY—–MIIES42Cg6zn—–END PRIVATE KEY—–</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| 🗝SslCaPem : `String`                                                      | <p>A certificate string (PEM format) for verifying the broker's key.</p><p><br>Default: <code>-</code><br>Example: <code>—–BEGIN PRIVATE KEY—–MIIES42Cg6zn—–END PRIVATE KEY—–</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| 🗝SslKeyPem : `String`                                                     | <p>Client's private key string (PEM format) used for authentication.</p><p><br>Default: <code>-</code><br>Example: <code>—–BEGIN PRIVATE KEY—–MIIES42Cg6zn—–END PRIVATE KEY—–</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| SslCaCertificateStores : `String`                                          | <p>Comma-separated list of Windows Certificate stores to load CA certificates from.</p><p>Certificates will be loaded in the same order as stores are specified.</p><p>If no certificates can be loaded from any of the specified stores an error is logged and the OpenSSL library's default CA location is used instead.</p><p><br>Default: <code>Root</code><br>Example: <code>Root</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| 🗝SslKeystorePassword : `String`                                           | <p>Client's keystore (PKCS#12) password.</p><p><br>Default: <code>-</code><br>Example: <code>ExamplePassword</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| 🗝SslKeyPassword : `String`                                                | <p>Private key passphrase for use with Ssl.SslKeyLocation)</p><p><br>Default: <code>-</code><br>Example: <code>ExamplePassword</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| SslCipherSuites : `String`                                                 | <p>A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.</p><p><br>Default: <code>-</code><br>Example: <code>foo</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| SslCurvesList : `String`                                                   | <p>The supported-curves extension in the TLS ClientHello message specifies the curves (standard/named, or 'explicit' GF(2^k) or GF(p)) the client is willing to have the server use.</p><p><br>Default: <code>-</code><br>Example: <code>1</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
| SslSigalgsList : `String`                                                  | <p>The client uses the TLS ClientHello signature\_algorithms extension to indicate to the server which signature/hash algorithm pairs may be used in digital signatures.</p><p><br>Default: <code>-</code><br>Example: <code>1</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| {% endtab %}                                                               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |

{% tab title="Parameter: SchemaRegistry" %}

| Name                                                  | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
| ----------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| UseSchemaRegistry : `Boolean`                         | <p>Use Avro schema registry.</p><p><br>Default: <code>False</code><br>Example: <code>false</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| SchemaRegistryUrl : `String`                          | <p>A comma-separated list of URLs for schema registry instances that are used to register or lookup schemas.</p><p><br>Default: <code>-</code><br>Example: <code><http://localhost:8081></code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| BasicAuthCredentialsSource : `AuthCredentialsSources` | <p>Authentication credentials source.</p><p>UserInfo = Credentials are specified via the BasicAuthUserInfo property in the form username:password. If BasicAuthUserInfo is not set, authentication is disabled.</p><p>SaslInherit = Credentials are specified via the <code>Sasl.SaslUsername</code> and <code>Sasl.SaslPassword</code> parameters.</p><p>Possible values:</p><ul><li><code>UserInfo</code>:</li></ul><p>Authentication credentials source.</p><p>UserInfo = Credentials are specified via the BasicAuthUserInfo property in the form username:password. If BasicAuthUserInfo is not set, authentication is disabled.</p><p>SaslInherit = Credentials are specified via the <code>Sasl.SaslUsername</code> and <code>Sasl.SaslPassword</code> parameters.</p><ul><li><code>SaslInherit</code>:</li></ul><p>Authentication credentials source.</p><p>UserInfo = Credentials are specified via the BasicAuthUserInfo property in the form username:password. If BasicAuthUserInfo is not set, authentication is disabled.</p><p>SaslInherit = Credentials are specified via the <code>Sasl.SaslUsername</code> and <code>Sasl.SaslPassword</code> parameters.</p><p><br>Default: <code>0</code><br>Example: <code>AuthCredentialsSource.UserInfo</code></p> |
| 🗝BasicAuthUserInfo : `String`                        | <p>Basic auth credentials in the form {username}:{password}.</p><p><br>Default: <code>-</code><br>Example: <code>foo:bar</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| MaxCachedSchemas : `Int32`                            | <p>Specifies the maximum number of schemas CachedSchemaRegistryClient should cache locally.</p><p><br>Default: <code>1000</code><br>Example: <code>1000</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
| RequestTimeoutMs : `Int32`                            | <p>Specifies the timeout for requests to Confluent Schema Registry.</p><p><br>Default: <code>30000</code><br>Example: <code>30000</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| SslCaLocation : `String`                              | <p>File path to CA certificate(s) for verifying the Schema Registry's key.</p><p>System CA certs will be used if not specified.</p><p><br>Default: <code>-</code><br>Example: <code>/path/to/your/file</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
| SslKeystoreLocation : `String`                        | <p>SSL keystore (PKCS#12) location.</p><p><br>Default: <code>-</code><br>Example: <code>/path/to/your/file</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| SslKeystorePassword : `String`                        | <p>SSL keystore (PKCS#12) password.</p><p><br>Default: <code>-</code><br>Example: <code>foo</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| {% endtab %}                                          |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |

{% tab title="Parameter: Options" %}

| Name                                         | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| -------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| EncodeMessageKey : `Boolean`                 | <p>Try to decode consumed message key from byte\[] to string.</p><p><br>Default: <code>True</code><br>Example: <code>true</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| Acks : `Ack`                                 | <p>This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request.</p><p>Possible values:</p><ul><li><code>None</code>:</li></ul><p>This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request.</p><ul><li><code>Leader</code>:</li></ul><p>This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request.</p><ul><li><code>All</code>:</li></ul><p>This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request.</p><p><br>Default: <code>0</code><br>Example: <code>Ack.None</code></p> |
| AutoCommitIntervalMs : `Int32`               | <p>The frequency in milliseconds that the consumer offsets are committed (written) to offset storage.</p><p>(0 = disable). This setting is used by the high-level consumer.</p><p><br>Default: <code>5000</code><br>Example: <code>5000</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| AutoOffsetReset : `AutoOffsetResets`         | <p>Action to take when there is no initial offset in offset store or the desired offset is out of range.</p><p>Possible values:</p><ul><li><code>Latest</code>:</li></ul><p>Action to take when there is no initial offset in offset store or the desired offset is out of range.</p><ul><li><code>Earliest</code>:</li></ul><p>Action to take when there is no initial offset in offset store or the desired offset is out of range.</p><ul><li><code>Error</code>:</li></ul><p>Action to take when there is no initial offset in offset store or the desired offset is out of range.</p><p><br>Default: <code>0</code><br>Example: <code>AutoOffsetResets.</code></p>                                                                                                                  |
| EnableAutoOffsetStore : `Boolean`            | <p>Automatically store offset of last message provided to application.</p><p>The offset store is an in-memory store of the next offset to (auto-)commit for each partition.</p><p><br>Default: <code>True</code><br>Example: <code>true</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| BrokerAddressFamily : `BrokerAddressFamilys` | <p>Allowed broker IP address families.</p><p>Possible values:</p><ul><li><code>Any</code>:</li></ul><p>Allowed broker IP address families.</p><ul><li><code>V4</code>:</li></ul><p>Allowed broker IP address families.</p><ul><li><code>V6</code>:</li></ul><p>Allowed broker IP address families.</p><p><br>Default: <code>0</code><br>Example: <code>BrokerAddressFamilys.Any</code></p>                                                                                                                                                                                                                                                                                                                                                                                               |
| ConnectionsMaxIdleMs : `Int32`               | <p>Close broker connections after the specified time of inactivity.</p><p>Disable with 0.</p><p>If this property is left at its default value some heuristics are performed to determine a suitable default value, this is currently limited to identifying brokers on Azure.</p><p><br>Default: <code>0</code><br>Example: <code>0</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                           |
| CheckCrcs : `Boolean`                        | <p>Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred.</p><p><br>Default: <code>False</code><br>Example: <code>false</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
| EnableAutoCommit : `Boolean`                 | <p>Automatically and periodically commit offsets in the background.</p><p>Note: setting this to false does not prevent the consumer from fetching previously committed start offsets.</p><p><br>Default: <code>True</code><br>Example: <code>true</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| FetchErrorBackoffMs : `Int32`                | <p>How long to postpone the next fetch request for a topic+partition in case of a fetch error.</p><p><br>Default: <code>500</code><br>Example: <code>500</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| FetchMaxBytes : `Int32`                      | <p>Maximum amount of data the broker shall return for a Fetch request.</p><p>Messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the Fetch request is larger than this value, then the message batch will still be returned to ensure the consumer can make progress.</p><p>The maximum message batch size accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or <code>max.message.bytes</code> (broker topic config).</p><p>FetchMaxBytes is automatically adjusted upwards to be at least Options.MessageMaxBytes</p><p><br>Default: <code>52428800</code><br>Example: <code>52428800</code></p>                                                                                      |
| FetchMinBytes : `Int32`                      | <p>Minimum number of bytes the broker responds with.</p><p>If option.FetchWaitMaxMs expires the accumulated data will be sent to the client regardless of this setting.</p><p><br>Default: <code>1</code><br>Example: <code>1</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| FetchWaitMaxMs : `Int32`                     | <p>Maximum time the broker may wait to fill the Fetch response with Options.FetchMinBytes of messages.</p><p><br>Default: <code>500</code><br>Example: <code>500</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| GroupId : `String`                           | <p>Client group id string.</p><p>All clients sharing the same Options.GroupId belong to the same group.</p><p><br>Default: <code>-</code><br>Example: <code>csharp-group-1</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| GroupInstanceId : `String`                   | <p>Enable static group membership.</p><p>Static group members are able to leave and rejoin a group within the configured Options.SessionTimeoutMs without prompting a group rebalance.</p><p>This should be used in combination with a larger Options.SessionTimeoutMs to avoid group rebalances caused by transient unavailability (e.g. process restarts).</p><p>Requires broker version >= 2.3.0.</p><p><br>Default: <code>-</code><br>Example: <code>csharp-group-1</code></p>                                                                                                                                                                                                                                                                                                       |
| HeartbeatIntervalMs : `Int32`                | <p>Group session keepalive heartbeat interval.</p><p><br>Default: <code>3000</code><br>Example: <code>3000</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| IsolationLevel : `IsolationLevels`           | <p>Controls how to read messages written transactionally.</p><p>Possible values:</p><ul><li><code>ReadUncommitted</code>:</li></ul><p>Controls how to read messages written transactionally.</p><ul><li><code>ReadCommitted</code>:</li></ul><p>Controls how to read messages written transactionally.</p><p><br>Default: <code>1</code><br>Example: <code>IsolationLevels.ReadCommitted</code></p>                                                                                                                                                                                                                                                                                                                                                                                      |
| MessageMaxBytes : `Int32`                    | <p>Maximum Kafka protocol request message size.</p><p>Due to differing framing overhead between protocol versions the producer is unable to reliably enforce a strict max message limit at produce time and may exceed the maximum size by one message in protocol ProduceRequests, the broker will enforce the the topic's Options.MessageMaxBytes limit (see Apache Kafka documentation)</p><p><br>Default: <code>1000000</code><br>Example: <code>1000000</code></p>                                                                                                                                                                                                                                                                                                                  |
| MaxInFlight : `Int32`                        | <p>Maximum number of in-flight requests per broker connection.</p><p>This is a generic property applied to all broker communication, however it is primarily relevant to produce requests.</p><p>In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one.</p><p><br>Default: <code>1000000</code><br>Example: <code>1000000</code></p>                                                                                                                                                                                                                                                                                                                                                                                        |
| MaxPollIntervalMs : `Int32`                  | <p>Maximum allowed time between calls to consume messages for high-level consumers.</p><p>If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member.</p><p>Warning: Offset commits may be not possible at this point.</p><p>Options.MaxPollIntervalMs must be >= Options.SessionTimeoutMs.</p><p>The interval is checked two times per second.</p><p><br>Default: <code>300000</code><br>Example: <code>300000</code></p>                                                                                                                                                                                                                                                         |
| QueuedMaxMessagesKbytes : `Int32`            | <p>Maximum number of kilobytes of queued pre-fetched messages in the local consumer queue.</p><p>If using the high-level consumer this setting applies to the single consumer queue, regardless of the number of partitions.</p><p>When using the legacy simple consumer or when separate partition queues are used this setting applies per partition.</p><p>This value may be overshot by Options.FetchMaxBytes.</p><p>This property has higher priority than Options.QueuedMinMessages.</p><p><br>Default: <code>65536</code><br>Example: <code>65536</code></p>                                                                                                                                                                                                                      |
| QueuedMinMessages : `Int32`                  | <p>Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue.</p><p><br>Default: <code>100000</code><br>Example: <code>100000</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
| ReconnectBackoffMaxMs : `Int32`              | <p>The maximum time to wait before reconnecting to a broker after the connection has been closed.</p><p><br>Default: <code>10000</code><br>Example: <code>10000</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
| ReconnectBackoffMs : `Int32`                 | <p>The initial time to wait before reconnecting to a broker after the connection has been closed.</p><p>The time is increased exponentially until Options.ReconnectBackoffMaxMs is reached.</p><p>-25% to +50% jitter is applied to each reconnect backoff.</p><p>A value of 0 disables the backoff and reconnects immediately.</p><p><br>Default: <code>100</code><br>Example: <code>100</code></p>                                                                                                                                                                                                                                                                                                                                                                                     |
| SessionTimeoutMs : `Int32`                   | <p>Client group session and failure detection timeout.</p><p>The consumer sends periodic heartbeats (Options.HeartbeatIntervalMs) to indicate its liveness to the broker.</p><p>If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance.</p><p>Options.MaxPollIntervalMs must be >= Options.SessionTimeoutMs.</p><p><br>Default: <code>45000</code><br>Example: <code>45000</code></p>                                                                                                                                                                                                                                                                                         |
| Debug : `String`                             | <p>A comma-separated list of debug contexts to enable.</p><p>Detailed Consumer debugging: consumer,cgrp,topic,fetch.</p><p><br>Default: <code>-</code><br>Example: <code>broker</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
| {% endtab %}                                 |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |

{% tab title="Result: Result" %}

| Name                   | Description                                                                              |
| ---------------------- | ---------------------------------------------------------------------------------------- |
| Success : `Boolean`    | <p>True if messages have been consumed without errors.<br>Example: <code>true</code></p> |
| Data : `List<Message>` | <p>Result data.<br>Example: <code>Object { key, value }</code></p>                       |
| {% endtab %}           |                                                                                          |

{% tab title="Changelog" %}

## Changelog

### \[2.0.0] - 2024-05-15

#### Added

* Support for Confluent Schema Registry based Avro.
* New parameter Options.Debug.

#### Changed

* Confluent.Kafka updated from version 1.9.3 to 2.4.
* Input.Partition change: Consume from all topic's partitions if set to -1.
* New parameter: Options.EncodeMessageKey to choose whether this Task will try to encode consumed key to string from byte\[]
* Message class change: string Key, string Value replaced by dynamic Key, dynamic Value.
* Result.Messages renamed to Result.Data.
* Removed optional parameters: ApiVersionRequest, ApiVersionFallbackMs, ApiVersionRequestTimeoutMs, AllowAutoCreateTopics,

### \[1.1.0] - 2023-11-27

#### Added

* Added a partition as input parameter to the task.

### \[1.0.1] - 2023-04-17

#### Fixed

* Changed Task to set ssl.SslCaCertificateStores only if it's set as parameter.

### \[1.0.0] - 2022-10-19

#### Added

* Initial implementation
  {% endtab %}
  {% endtabs %}
