Comment stocker les valeurs de Trident / Storm dans une liste (en utilisant l'API Java)


J'essaie de créer quelques tests unitaires pour vérifier que certaines parties de ma topologie Trident font ce qu'elles sont censées faire.

J'aimerais pouvoir récupérer toutes les valeurs résultantes après avoir exécuté la topologie et les mettre dans une liste afin que je puisse les"voir" et vérifier les conditions sur elles.

   FeederBatchSpout feederSpout = new FeederBatchSpout("some_time_field", "foo_id");
   TridentTopology topology = new TridentTopology();
   topology.newStream("spout1", feederSpout)
    .groupBy(new Fields("some_time_field", "foo_id"))
    .aggregate(new Fields("foo_id"), new FooAggregator(),
               new Fields("aggregated_foos"))
    // Soo... how do I retrieve the "aggregated_foos" from here?

J'exécute la topologie en tant que TrackedTopology (j'ai obtenu le code de une autre question S. O., merci @brianghig de l'avoir demandé et @Thomas Kielbus pour la réponse)

Voici comment je "lance" la topologie et comment j'y insère des valeurs d'échantillon:

TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology.build());
cluster.submitTopology("unit_tests", config, tracked.getTopology());

feederSpout.feed(new Values(MyUtils.makeSampleFoo(1));
feederSpout.feed(new Values(MyUtils.makeSampleFoo(2));

Quand je fais cela, je peux voir dans les messages de journal que la topologie fonctionne correctement et que les valeurs sont calculées correctement, mais j'aimerais "pêcher" les résultats dans un List (ou n'importe quelle structure, à ce stade) afin que je puisse réellement mettre un Asserts dans mes tests.

J'ai essayé [un s * * ton] de différentes approches, mais aucune d'entre elles travail.

La dernière idée était d'ajouter un boulon après l'agrégation afin qu'il "persiste" mes valeurs dans une liste:

Ci-dessous, vous verrez la classe qui essaie de parcourir tous les tuples émis par le aggregate et les mettrait dans une liste que j'avais précédemment initialisée:

class FieldFetcherStateUpdater extends BaseStateUpdater<FieldFetcherState> {
    final List<AggregatedFoo> results;

    public FieldFetcherStateUpdater(List<AggregatedFoo> results) {
        this.results = results;
    }

    @Override
    public void updateState(FieldFetcherState state, List<TridentTuple> tuples,
                            TridentCollector collector) {
        for (TridentTuple tuple : tuples) {
            results.add((AggregatedFoo) tuple.getValue(0));
        }
    }
}

Alors maintenant le code ressemblerait à:

// ...
List<AggregatedFoo> results = new ArrayList();
topology.newStream("spout1", feederSpout)
    .groupBy(new Fields("some_time_field", "foo_id"))
    .aggregate(new Fields("foo_id"), new FooAggregator(),
               new Fields("aggregated_foos"))
    .partitionPersist(new FieldFetcherFactory(),
                        new Fields("aggregated_foos"),
                        new FieldFetcherStateUpdater(results));

     LOGGER.info("Done. Checkpoint results={}", results);

Mais rien... Les journaux montrent Done. Checkpoint results=[] (liste vide)

Est-il un moyen d'obtenir cela? J'imagine que ça doit être faisable, mais je ne l'ai pas fait été en mesure de trouver un moyen...

Tout indice ou lien vers des pages ou quelque chose de similaire sera apprécié. D'avance, merci.

Author: Community, 2016-01-20

1 answers

, Vous devez utiliser un variable membre statique result. Si vous avez plusieurs tâches parallèles en cours d'exécution (c'est-à-dire parallelism_hint > 1), vous devez également synchronize l'accès en écriture à result.

Dans votre cas, result sera vide, car Storm en interne, crée une nouvelle instance de votre bolt (y compris une nouvelle instance de ArrayList). L'utilisation d'une variable statique garantit que vous avez accès au bon objet (car il n'y en aura qu'un sur toutes les instances de votre bolt).

 0
Author: Matthias J. Sax, 2016-01-20 13:20:34