Flux de Java 8: pourquoi le flux parallèle est-il plus lent?


Je joue avec les flux de Java 8 et je ne peux pas comprendre les résultats de performance que j'obtiens. J'ai un processeur 2 cœurs (Intel i73520M), Windows 8 x64 et Java 8 update 5 64 bits. Je fais une carte simple sur le flux / flux parallèle de chaînes et j'ai constaté que la version parallèle est un peu plus lente.

Function<Stream<String>, Long> timeOperation = (Stream<String> stream) -> {
  long time1 = System.nanoTime();
  final List<String> list = 
     stream
       .map(String::toLowerCase)
       .collect(Collectors.toList());
  long time2 = System.nanoTime();
  return time2 - time1;
};

Consumer<Stream<String>> printTime = stream ->
  System.out.println(timeOperation.apply(stream) / 1000000f);

String[] array = new String[1000000];
Arrays.fill(array, "AbabagalamagA");

printTime.accept(Arrays.stream(array));            // prints around 600
printTime.accept(Arrays.stream(array).parallel()); // prints around 900

La version parallèle ne devrait-elle pas être plus rapide, compte tenu du fait que j'ai 2 cœurs de processeur? Quelqu'un pourrait-il me donner un indice pourquoi la version parallèle est plus lente?

Author: axiopisty, 2014-04-19

4 answers

Il y a plusieurs questions qui se déroulent ici en parallèle, pour ainsi dire.

La première est que résoudre un problème en parallèle implique toujours d'effectuer plus de travail réel que de le faire séquentiellement. La surcharge est impliquée dans la division du travail entre plusieurs threads et la jointure ou la fusion des résultats. Des problèmes tels que la conversion de chaînes courtes en minuscules sont suffisamment petits pour qu'ils risquent d'être submergés par la surcharge de fractionnement parallèle.

Le deuxième problème est que l'analyse comparative Programme Java est très subtil, et il est très facile d'obtenir des résultats étranges. Deux problèmes courants sont la compilation JIT et l'élimination du code mort. Les benchmarks courts se terminent souvent avant ou pendant la compilation JIT, ils ne mesurent donc pas le débit de pointe, et en effet ils pourraient mesurer le JIT lui-même. Lorsque la compilation se produit est quelque peu non déterministe, cela peut donc entraîner des résultats extrêmement variables.

Pour les petits benchmarks synthétiques, la charge de travail calcule souvent les résultats qui sont jetés loin. Les compilateurs JIT sont assez bons pour détecter cela et éliminer le code qui ne produit pas de résultats qui sont utilisés n'importe où. Cela ne se produit probablement pas dans ce cas, mais si vous bricolez avec d'autres charges de travail synthétiques, cela peut certainement arriver. Bien sûr, si le JIT élimine la charge de travail du benchmark, il rend le benchmark inutile.

Je recommande fortement d'utiliser un cadre d'analyse comparative bien développé tel que JMH au lieu de rouler à la main l'un des vôtres. JMH a des installations pour aider à éviter les pièges courants de l'analyse comparative, y compris ceux-ci, et c'est assez facile à configurer et à exécuter. Voici votre benchmark converti pour utiliser JMH:

package com.stackoverflow.questions;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;

public class SO23170832 {
    @State(Scope.Benchmark)
    public static class BenchmarkState {
        static String[] array;
        static {
            array = new String[1000000];
            Arrays.fill(array, "AbabagalamagA");
        }
    }

    @GenerateMicroBenchmark
    @OutputTimeUnit(TimeUnit.SECONDS)
    public List<String> sequential(BenchmarkState state) {
        return
            Arrays.stream(state.array)
                  .map(x -> x.toLowerCase())
                  .collect(Collectors.toList());
    }

    @GenerateMicroBenchmark
    @OutputTimeUnit(TimeUnit.SECONDS)
    public List<String> parallel(BenchmarkState state) {
        return
            Arrays.stream(state.array)
                  .parallel()
                  .map(x -> x.toLowerCase())
                  .collect(Collectors.toList());
    }
}

J'ai exécuté cela en utilisant la commande:

java -jar dist/microbenchmarks.jar ".*SO23170832.*" -wi 5 -i 5 -f 1

(Les options indiquent cinq itérations d'échauffement, cinq itérations de référence et une JVM fourchue.) Au cours de son exécution, JMH émet beaucoup de messages verbeux, que j'ai élucidés. Les résultats récapitulatifs sont les suivants.

Benchmark                       Mode   Samples         Mean   Mean error    Units
c.s.q.SO23170832.parallel      thrpt         5        4.600        5.995    ops/s
c.s.q.SO23170832.sequential    thrpt         5        1.500        1.727    ops/s

Notez que les résultats sont en ops par seconde, donc cela ressemble à le parallèle était environ trois fois plus rapide que l'ordre d'exécution. Mais ma machine n'a que deux cœurs. Hmmm. Et l'erreur moyenne par exécution est en fait plus grande que l'exécution moyenne! WAT? Quelque chose de louche se passe ici.

Cela nous amène à une troisième question. En regardant de plus près la charge de travail, nous pouvons voir qu'elle alloue un nouvel objet String pour chaque entrée, et qu'elle collecte également les résultats dans une liste, ce qui implique beaucoup de réallocation et de copie. Je suppose que cela aura un bonne quantité de collecte des ordures. Nous pouvons le voir en réexécutant le benchmark avec les messages GC activés:

java -verbose:gc -jar dist/microbenchmarks.jar ".*SO23170832.*" -wi 5 -i 5 -f 1

Cela donne des résultats comme:

[GC (Allocation Failure)  512K->432K(130560K), 0.0024130 secs]
[GC (Allocation Failure)  944K->520K(131072K), 0.0015740 secs]
[GC (Allocation Failure)  1544K->777K(131072K), 0.0032490 secs]
[GC (Allocation Failure)  1801K->1027K(132096K), 0.0023940 secs]
# Run progress: 0.00% complete, ETA 00:00:20
# VM invoker: /Users/src/jdk/jdk8-b132.jdk/Contents/Home/jre/bin/java
# VM options: -verbose:gc
# Fork: 1 of 1
[GC (Allocation Failure)  512K->424K(130560K), 0.0015460 secs]
[GC (Allocation Failure)  933K->552K(131072K), 0.0014050 secs]
[GC (Allocation Failure)  1576K->850K(131072K), 0.0023050 secs]
[GC (Allocation Failure)  3075K->1561K(132096K), 0.0045140 secs]
[GC (Allocation Failure)  1874K->1059K(132096K), 0.0062330 secs]
# Warmup: 5 iterations, 1 s each
# Measurement: 5 iterations, 1 s each
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: com.stackoverflow.questions.SO23170832.parallel
# Warmup Iteration   1: [GC (Allocation Failure)  7014K->5445K(132096K), 0.0184680 secs]
[GC (Allocation Failure)  7493K->6346K(135168K), 0.0068380 secs]
[GC (Allocation Failure)  10442K->8663K(135168K), 0.0155600 secs]
[GC (Allocation Failure)  12759K->11051K(139776K), 0.0148190 secs]
[GC (Allocation Failure)  18219K->15067K(140800K), 0.0241780 secs]
[GC (Allocation Failure)  22167K->19214K(145920K), 0.0208510 secs]
[GC (Allocation Failure)  29454K->25065K(147456K), 0.0333080 secs]
[GC (Allocation Failure)  35305K->30729K(153600K), 0.0376610 secs]
[GC (Allocation Failure)  46089K->39406K(154624K), 0.0406060 secs]
[GC (Allocation Failure)  54766K->48299K(164352K), 0.0550140 secs]
[GC (Allocation Failure)  71851K->62725K(165376K), 0.0612780 secs]
[GC (Allocation Failure)  86277K->74864K(184320K), 0.0649210 secs]
[GC (Allocation Failure)  111216K->94203K(185856K), 0.0875710 secs]
[GC (Allocation Failure)  130555K->114932K(199680K), 0.1030540 secs]
[GC (Allocation Failure)  162548K->141952K(203264K), 0.1315720 secs]
[Full GC (Ergonomics)  141952K->59696K(159232K), 0.5150890 secs]
[GC (Allocation Failure)  105613K->85547K(184832K), 0.0738530 secs]
1.183 ops/s

Remarque: les lignes commençant par # sont des lignes de sortie JMH normales. Tout le reste sont des messages GC. Ce n'est que la première des cinq itérations d'échauffement, qui précède cinq itérations de référence. Les messages du GC ont continué dans la même veine pendant le reste des itérations. Je pense qu'il est sûr de dire que les performances mesurées est dominé par les frais généraux du GC et que les résultats rapportés ne doivent pas être crus.

À ce stade, on ne sait pas quoi faire. Il s'agit d'une charge de travail purement synthétique. Il implique clairement très peu de temps CPU faire le travail réel par rapport à l'allocation et la copie. Il est difficile de dire ce que vous essayez vraiment de mesurer ici. Une approche serait de proposer une charge de travail différente qui est en quelque sorte plus "réelle"."Une autre approche serait de modifier les paramètres du tas et du GC pour éviter le GC pendant l'indice de référence exécuter.

 153
Author: Stuart Marks, 2015-07-15 16:47:33

Lorsque vous effectuez des benchmarks, vous devez faire attention à la compilation JIT, et que les comportements de synchronisation peuvent changer, en fonction de la quantité de chemins de code compilés JIT. Si j'ajoute une phase d'échauffement à votre programme de test, la version parallèle est un peu plus rapide que la version séquentielle. Voici les résultats:

Warmup...
Benchmark...
Run 0:  sequential 0.12s  -  parallel 0.11s
Run 1:  sequential 0.13s  -  parallel 0.08s
Run 2:  sequential 0.15s  -  parallel 0.08s
Run 3:  sequential 0.12s  -  parallel 0.11s
Run 4:  sequential 0.13s  -  parallel 0.08s

Le fragment de code suivant contient le code source complet que j'ai utilisé pour ce test.

public static void main(String... args) {
    String[] array = new String[1000000];
    Arrays.fill(array, "AbabagalamagA");
    System.out.println("Warmup...");
    for (int i = 0; i < 100; ++i) {
        sequential(array);
        parallel(array);
    }
    System.out.println("Benchmark...");
    for (int i = 0; i < 5; ++i) {
        System.out.printf("Run %d:  sequential %s  -  parallel %s\n",
            i,
            test(() -> sequential(array)),
            test(() -> parallel(array)));
    }
}
private static void sequential(String[] array) {
    Arrays.stream(array).map(String::toLowerCase).collect(Collectors.toList());
}
private static void parallel(String[] array) {
    Arrays.stream(array).parallel().map(String::toLowerCase).collect(Collectors.toList());
}
private static String test(Runnable runnable) {
    long start = System.currentTimeMillis();
    runnable.run();
    long elapsed = System.currentTimeMillis() - start;
    return String.format("%4.2fs", elapsed / 1000.0);
}
 17
Author: nosid, 2018-10-03 19:04:04

L'utilisation de plusieurs threads pour traiter vos données entraîne des coûts de configuration initiaux, par exemple l'initialisation du pool de threads. Ces coûts peuvent l'emporter sur le gain de l'utilisation de ces threads, surtout si le temps d'exécution est déjà assez faible. De plus, en cas de conflit, par exemple d'autres threads en cours d'exécution, des processus en arrière-plan, etc., les performances du traitement parallèle peuvent encore diminuer.

Ce problème n'est pas nouveau pour le traitement parallèle. Cet article donne quelques détails à la lumière de Java 8 parallel() et quelques autres choses à considérer: https://dzone.com/articles/think-twice-using-java-8

 10
Author: joe776, 2021-01-20 10:17:41

L'implémentation du flux en Java est par défaut séquentielle à moins qu'elle ne soit explicitement mentionnée en parallèle. Lorsqu'un flux s'exécute en parallèle, le runtime Java partitionne le flux en plusieurs sous-flux. Les opérations d'agrégation parcourent et traitent ces sous-flux en parallèle, puis combinent les résultats. Ainsi, les flux parallèles peuvent être utilisés si les développeurs ont des implications en termes de performances avec les flux séquentiels. Veuillez vérifier la comparaison des performances : https://github.com/prathamket/Java-8/blob/master/Performance_Implications.java Vous aurez l'idée globale de la performance.

 3
Author: Prathamesh Ketgale, 2019-08-27 17:52:23