Come posso produrre messaggi con l'API Kafka 8.2 in Java?


Sto cercando di lavorare con l'API kafka in java. Sto usando la seguente dipendenza maven:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.0</version>
</dependency>

Ho problemi a connettermi a un server kafka remoto. Ho cambiato il server di kafka.l'attributo della porta del file delle proprietà deve essere la porta 8080. Posso avviare sia lo zookeeper che il server kafka senza problemi. Posso anche utilizzare le applicazioni console producer e consumer fornite con il download di kafka. (Versione Scala 2.10)

Sto usando il seguente codice client per creare un telecomando KafkaProducer

Properties propsProducer = new Properties();

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("topic.metadata.refresh.interval.ms", "0");

KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer);

Una volta creato il produttore, posso eseguire la seguente riga e ottenere informazioni argomento valido restituito, concesso strTopic è un nome di argomento esistente.

List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic);

Quando provo a inviare un messaggio, faccio quanto segue:

ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes());

RecordMetadata futureData = m_kafkaProducer.send(prMessage).get();

La chiamata a send() blocca indefinitamente e quando termino manualmente il processo, vedo che l'ERRORE di chiusura del socket a causa di un errore sul server kafka(IOException, Reset della connessione tramite Peer) errore.

Inoltre, non vale nulla che il host.name, advertised.host.name, e pubblicizzato.le proprietà della porta sono ancora commentate sul ' server.file delle proprietà. Oh, e se cambio la riga:

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
Da

A

propsProducer.put("bootstrap.servers", "127.0.0.1:8080");

Ed eseguirlo sullo stesso server in cui è installato il server kafka, funziona ma sto cercando di lavorarci da remoto.

Apprezzare qualsiasi aiuto e se posso chiarire a tutti fatemelo sapere.

Author: Patrick Hovsepian, 2015-03-30

1 answers

Dopo molti scavi, ho deciso di implementare l'esempio trovato qui: Kafka Producer Example. Ho accorciato il codice e non ho implementato una classe partizionatore. Ho aggiornato il mio pom con la dipendenza elencata e stavo ancora avendo lo stesso problema. In definitiva, ho apportato alcune modifiche alla configurazione e tutto ha funzionato.

L'ultimo pezzo del puzzle era la definizione del server Kafka in /etc/host sia del server che delle macchine client. Ho aggiunto quanto segue a entrambi file.

172.xx.xx.xxx     serverHost1

Di nuovo, le x sono solo maschere. Quindi, ho impostato il advertised.host.name nel server.file delle proprietà su serverHost1. NOTA: ho ottenuto quell'IP dopo aver eseguito un ifconfig sul computer server.

Ho cambiato la riga

propsProducer.put("metadata.broker.list", "172.xx.xx.xxx:8080");
Da

A

propsProducer.put("metadata.broker.list", "serverHost1:8080");

All'API di Kafka non piaceva il fatto che stavo definendo un IP come una stringa. Invece stava cercando l'IP dall'interno del file etc / hosts anche se la documentazione dice:

" Nome host a cui il broker farà pubblicità produttori e consumatori. Se non impostato, utilizza il valore per "host.name" se configurato. In caso contrario, utilizzerà il valore restituito da java.net.InetAddress. getCanonicalHostName (). "

Che restituirà solo l'IP, nella forma stringa, che stavo usando in precedenza se non definito in etc/hosts della macchina client, altrimenti restituisce il nome associato all'IP (serverHost1 nel mio caso). Inoltre, non ho mai impostato il valore di host.name neanche.

 3
Author: Patrick Hovsepian, 2015-04-02 15:24:47