NAME Net::Kafka::Producer::Avro - Apache Kafka message producer based on librdkafka, Avro serialization and Confluent Schema Registry validation. SYNOPSIS use Net::Kafka::Producer::Avro; use Confluent::SchemaRegistry; use AnyEvent; use JSON; my $producer = Net::Kafka::Producer::Avro->new( 'bootstrap.servers' => 'localhost:9092', 'schema-registry' => Confluent::SchemaRegistry->new(), # defaults to http://localhost:8081 'compression.codec' => 'gzip', # optional, one of: 'none' (def.), 'gzip', 'snappy', 'lz4', 'zstd' 'log_level' => 0, # suppress librdkafka internal logging 'error_cb' => sub { my ($self, $err, $msg) = @_; die "Connection error:\n\t- err: " . $err . "\n\t- msg: " . $msg . "\n"; } ); # creates the header object (if you need to add headers to the message) my $headers = Net::Kafka::Headers->new(); $headers->add('my-header-1', 'foo'); $headers->add('my-header-2', 'bar'); my $condvar = AnyEvent->condvar; my $promise = $producer->produce( topic => 'mytopic', partition => 0, key => 1000, key_schema => to_json( { name => 'id', type => 'long' } ), payload => { id => 1210120, f1 => 'text message' }, payload_schema => to_json( { type => 'record', name => 'myrecord', fields => [ { name => 'id', type => 'long' }, { name => 'f1', type => 'string' } ] } ), headers => $headers ); die "Error requesting message production: " . $producer->get_error() . "\n" unless $promise; $promise->then( sub { my $delivery_report = shift; $condvar->send; # resolve the promise print "Message delivered with offset " . $delivery_report->{offset}; }, sub { my $error = shift; $condvar->send; # resolve the promise die "Unable to produce message: " . $error->{error} . ", code: " . $error->{code}; } ); $condvar->recv; # wait for the promise resolution print "Message produced", "\n"; DESCRIPTION "Net::Kafka::Producer::Avro" main goal is to provide object-oriented API to produce Avro-serialized messages according to *Confluent SchemaRegistry*. "Net::Kafka::Producer::Avro" inerhits from and extends Net::Kafka::Producer module. INSTALL Installation of "Net::Kafka::Producer::Avro" is a canonical: perl Makefile.PL make make test make install TESTING TROUBLESHOOTING Tests are focused on verifying Avro-formatted messages and theirs interactions with Confluent Schema Registry and are intended to extend the "Net::Kafka::Producer"'s test suite. It's expected that a local Apache Kafka and Schema Registry services are listening on "localhost:9092" and "http://localhost:8081". You can either set different endpoints by exporting the following environment variables: "KAFKA_HOST" "KAFKA_PORT" "CONFLUENT_SCHEMA_REGISTY_URL" For example: export KAFKA_HOST=my-kafka-host.my-domain.org export KAFKA_PORT=9092 export CONFLUENT_SCHEMA_REGISTY_URL=http://my-schema-registry-host.my-domain.org USAGE CONSTRUCTOR "new" Creates a message producer. new() method expects the same arguments set as the Net::Kafka::Producer parent constructor. In addition, takes in the following mandatory argument: "SchemaRegistry => $schema_registry" (mandatory) Is a Confluent::SchemaRegistry instance. METHODS The following methods are defined for the "Net::Kafka::Producer::Avro" class: "schema_registry"() Returns the Confluent::SchemaRegistry instance supplied to the construcor. "get_error"() Returns a string containing last error message. produce( %named_params ) Sends Avro-formatted key/message pairs. According to "Net::Kafka::Producer", returns a promise value if the message was successfully sent. In order to handle Avro format, the "Net::Kafka::Producer|Net::Kafka::Producer"'s produce() method has been extended with two more arguments, "key_schema" and "payload_schema": $producer->produce( topic => $topic, # scalar partition => $partition, # scalar key_schema => $key_schema, # (optional) scalar representing a JSON string of the Avro schema to use for the key key => $key, # (optional) scalar | hashref payload_schema => $payload_schema, # (optional) scalar representing a JSON string of the Avro schema to use for the payload payload => $payload, # scalar | hashref timestamp => $timestamp, # (optional) scalar representing milliseconds since epoch headers => $headers, # (optional) Net::Kafka::Headers object # ...other params accepted by Net::Kafka::Producer's produce() method ); Both $key_schema and $payload_schema parameters are optional and must provide a JSON strings representing the Avro schemas to use for validating and serializing key and payload. These schemas will be validated against the $schema_registry supplied to the "new" method and, if compliant, will be added to the registry under the "$topic+'key'" or "$topic+'value'" Schema Registry subjects. If a schema isn't provided, the latest version from Schema Registry will be used accordingly to the (topic + key/value) subject. AUTHOR Alvaro Livraghi, CONTRIBUTE BUGS Please use GitHub project link above to report problems or contact authors. COPYRIGHT AND LICENSE Copyright 2026 by Alvaro Livraghi This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.