Kafka
This package provides a client for Apache Kafka versions 0.11 and up. It is a work in progress, so expect breaking changes.
1 Client
(require kafka) | package: kafka-lib |
Clients transparently pool connections to brokers within a cluster. Connections are leased from the pool in order of least in-progress requests. Reconnections are handled transparently, and connection errors bubble up to the caller. Clients are thread-safe, but they may not be shared between consumers.
procedure
(make-client [ #:id id #:bootstrap-host host #:bootstrap-port port #:sasl-mechanism&ctx sasl-ctx #:ssl-ctx ssl-ctx]) → client? id : non-empty-string? = "racket-kafka" host : string? = "127.0.0.1" port : (integer-in 0 65535) = 9092
sasl-ctx :
(or/c #f (list/c 'plain string?) (list/c symbol? sasl-ctx?)) = #f ssl-ctx : (or/c #f ssl-client-context?) = #f
When a sasl-ctx is provided, it is used to authenticate the connection to the bootstrap host as well as any subsequent connections made to other nodes in the cluster.
When a ssl-ctx is provided, it is used to encrypt all connections.
procedure
(disconnect-all c) → void?
c : client?
1.1 Errors
procedure
(exn:fail:kafka? v) → boolean?
v : any/c
procedure
v : any/c
procedure
v : any/c
1.2 Topic Management
procedure
(create-topics c t ...+) → CreatedTopics?
c : client? t : CreateTopic?
When given a set of topics, some of them may succeed, and some may fail. It’s up to the caller to inspect the error codes on the returned CreatedTopics.
procedure
(delete-topics c t ...+) → DeletedTopics?
c : client? t : string?
struct
(struct CreateTopic (name partitions))
name : string? partitions : exact-positive-integer?
procedure
(make-CreateTopic #:name name #:partitions partitions [ #:replication-factor factor #:assignments assignments #:configs configs]) → CreateTopic? name : string? partitions : exact-positive-integer? factor : (or/c -1 exact-positive-integer?) = -1
assignments : (hash/c exact-nonnegative-integer? (listof exact-nonnegative-integer?)) = (hasheqv) configs : (hash/c string? string?) = (hash)
struct
(struct CreatedTopics (topics))
topics : (listof CreatedTopic?)
struct
(struct CreatedTopic (error-code error-message name))
error-code : exact-nonnegative-integer? error-message : (or/c #f string?) name : string?
struct
(struct DeletedTopics (throttle-time-ms topics tags))
throttle-time-ms : (or/c exact-nonnegative-integer?) topics : (listof DeletedTopic?) tags : (or/c #f tags/c)
struct
(struct DeletedTopic (error-code error-message name uuid tags))
error-code : error-code/c error-message : (or/c #f string?) name : string? uuid : (or/c #f bytes?) tags : (or/c #f tags/c)
1.3 Record Results
Record results represent the results of publishing individual records.
struct
(struct RecordResult (topic partition))
topic : string? partition : ProduceResponsePartition?
struct
(struct ProduceResponsePartition (id error-code offset))
id : exact-nonnegative-integer? error-code : exact-nonnegative-integer? offset : exact-nonnegative-integer?
1.4 Contracts
2 Consumer
(require kafka/consumer) | package: kafka-lib |
Consumers form consumer groups to subscribe to topics and retrieve records. As the name implies, consumer groups group consumers together so that topic partitions may be spread out across the members of the group.
Consumers are not thread-safe.
procedure
(make-consumer client group-id topic ...+ [ #:reset-strategy strategy #:session-timeout-ms session-timeout-ms]) → consumer? client : client? group-id : string? topic : string? strategy : (or/c 'earliest 'latest) = 'earliest session-timeout-ms : exact-nonnegative-integer? = 30000
The #:reset-strategy argument controls what the consumer’s initial offsets for newly-assigned partitions are going to be. When this value is 'earliest, the consumer will receive records starting from the beginning of each partition. When this value is 'latest, it will receive records starting from the time that it subscribes to each topic.
procedure
(consume-evt c [timeout-ms])
→
(evt/c (or/c (values 'rebalance (hash/c string? (hash/c integer? integer?))) (values 'records (vectorof record?)))) c : consumer? timeout-ms : exact-nonnegative-integer? = 1000
When a consumer leaves or joins the consumer group, the event will synchronize to a 'rebalance result. In that case, the consumer will automatically re-join the group and discard any un-committed offsets. The associated data is a hash from topic names to hashes of partition ids to offsets. When a rebalance happens, you must take care not to commit any old offsets (i.e. you must issue a new consume-evt before making any calls to consumer-commit).
When either the timeout passes or new records become available on the broker, the event will synchronize to a 'records result whose associated data will be a vector of records.
More result types may be added in the future.
The timeout-ms argument controls how long the server-side may wait before returning a response. If there are no records in between the time this function is called and when the timeout passes, an empty vector or records will be returned.
procedure
(consumer-commit c) → void?
c : consumer?
Call this function after you have successfully processed a batch of records received from consume-evt. If you forget to call this function, or if the consumer crashes in between calling consume-evt and calling this function, another consumer in the group will eventually receive that same batch again.
procedure
(consumer-stop c) → void?
c : consumer?
2.1 Records
Records represent individual key-value pairs on a topic.
procedure
(record-key r) → (or/c 'nil bytes?)
r : record?
procedure
(record-value r) → bytes?
r : record?
2.2 Limitations
Consumers have several limitations at the moment, some of which will be addressed in future versions.
2.2.1 Compression
The only supported compression type at the moment is 'gzip. Fetching a batch of records that is compressed using any other method will raise an error.
2.2.2 Group Assignment
Only brokers that implement client-side assignment are supported (Apache Kafka versions 0.11 and up). At the moment, only the range and round-robin group assignment strategies are implemented.
2.2.3 Error Detection
Batches retrieved from the broker contain a CRC code for error detection, but the library does not validate these at the moment.
3 Producer
(require kafka/producer) | package: kafka-lib |
Producers publish data on one or more topics. Producers batch data internally by topic & partition, and they are thread-safe.
procedure
(make-producer c [ #:acks acks #:compression compression #:flush-interval interval #:max-batch-bytes max-bytes #:max-batch-size max-size]) → producer? c : client? acks : (or/c 'none 'leader 'full) = 'leader compression : (or/c 'none 'gzip) = 'gzip interval : exact-positive-integer? = 60000 max-bytes : exact-positive-integer? = (* 100 1024 1024) max-size : exact-positive-integer? = 1000
Data is batched internally by topic & partition. Within each batch, the data is compressed according to the #:compression method.
The producer automatically flushes its data every #:flush-interval milliseconds, whenever the total size of all its batches exceeds #:max-batch-bytes, or whenever the total number of records contained in all of its batches exceeds #:max-batch-size, whichever occurs first.
During a flush, calling produce on a producer blocks until the flush completes.
procedure
p : producer? topic : string? k : bytes? v : bytes? partition : exact-nonnegative-integer? = 0
Typically, you would call this function in a loop to produce a set of data, collect the results then sync them to ensure they’ve been written to the log.
procedure
(producer-flush p) → void?
p : producer?
procedure
(producer-stop p) → void?
p : producer?
3.1 Limitations
3.1.1 Compression
Kafka supports snappy, lz4, and zstd compression in addition to gzip, but this library only supports gzip at the moment.