Ottenere errori nel programma java per scrivere su Kinesis Firehose stream


Sto cercando di scrivere solo alcuni dati da un'API (google stocks/finance API) al mio flusso AWS Firehose. Ho già scaricato e installato il plugin AWS su Eclipse, configurato il mio flusso Firehose su AWS e tutto sembra essere configurato correttamente. Sto incontrando alcuni problemi, però. La riga seguente sembra essere deprecata...Ho provato diverse varianti dall'SDK di Amazon, ma non riesco a ottenere il codice corretto.

AmazonKinesisFirehoseClient firehoseClient = nuovo AmazonKinesisFirehoseClient (credenziali);

Successivamente, sto ricevendo alcuni errori con il seguente. L'errore specifico è " Il metodo setRecord(Record) non è definito per il tipo PutRecordRequest" anche se l'ho preso direttamente dal riferimento API di Amazon.

Richiesta.setRecord (record);

FirehoseClient.putRecord (richiesta);

Anche ottenere un errore sulla seconda riga sopra: " Il metodo putRecord(com.amazonaws.servizio.kinesisfirehose.modello.PutRecordRequest) nel tipo AmazonKinesisFirehoseClient non è applicabile per gli argomenti (com.amazonaws.servizio.Kinesis.modello.PutRecordRequest)"

package com.amazonaws.samples;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;

import org.apache.http.client.CredentialsProvider;

import com.amazonaws.*;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
import com.amazonaws.services.kinesisfirehose.model.Record;

public class FirehoseExample {

    public static void main(String[] args) {
        AWSCredentials credentials = null;

        try {
            credentials = new ProfileCredentialsProvider().getCredentials();
        }

        catch (Exception e) {
            throw new AmazonClientException("Cannot load the credentials from the credential profiles file. "
                    + "Please make sure that your credentials file is at the correct "
                    + "location (/Users/elybenari/.aws/credentials), and is in valid format.", e);
        }

        AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials);
        PutRecordRequest request = new PutRecordRequest();
        request.setStreamName("project-stream");

        Record record = new Record();

        for (int i = 0; i < 20*60; i++){
            try {
                URL url = new URL("https://www.google.com/finance/info?q=NASDAQ:AMZN");
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                StringBuilder response = new StringBuilder();
                String line;

                while ((line = reader.readLine()) != null) {
                    response.append(line);  
                }
                reader.close();

                System.out.println(response.toString().replace("\n", "").replaceAll(" ", ""));
                System.out.println("****\n");

                ByteBuffer buffer = ByteBuffer.wrap(response.toString().replace("\n", "").replaceAll(" ", "").getBytes());
                record.setData(buff);

                request.setRecord(record);

                firehoseClient.putRecord(request);

                Thread.sleep(2000);


            }
            catch(Exception e){
                e.printStackTrace();
            }
        }   

    }




    }
Author: John Rotenstein, 2017-04-05

1 answers

Il problema è che hai incluso alcune classi da Kinesis, non Kinesis Firehose, pacchetto Java. Ad esempio, hai usato:

import com.amazonaws.services.kinesis.model.PutRecordRequest;

Mentre, avresti dovuto usare:

import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest;

Kinesis, Kinesis Firehose e Kinesis Analytics sono servizi diversi, anche se ricadono sotto un unico ombrello di servizi di streaming su AWS. Di conseguenza, hanno diversi spazi dei nomi dei pacchetti nell'SDK Java. Se si parte dalla documentazione ufficiale qui, si raggiunge il corretto il riferimento Java SDK qui .

EDIT: Per rispondere all'altra domanda: sì, quanto segue è deprecato:

AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials);

Dovresti invece usare quanto segue:

 AmazonKinesisFirehoseClient firehoseClient = AmazonKinesisFirehoseClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(awsCredentials)).build();

Fare riferimento alla documentazione ufficiale qui su come inizializzare correttamente AmazonKinesisFirehoseClient.

 1
Author: ketan vijayvargiya, 2017-04-05 19:03:14