Comment implémenter la priorisation des tâches à l'aide d'un ExecutorService dans Java 5?


J'implémente un mécanisme de mise en commun de threads dans lequel je voudrais exécuter des tâches de priorités variables. J'aimerais avoir un bon mécanisme par lequel je peux soumettre une tâche hautement prioritaire au service et la programmer avant d'autres tâches. La priorité de la tâche est une propriété intrinsèque de la tâche elle-même (que j'exprime cette tâche comme un Callable ou un Runnable n'est pas important pour moi).

Maintenant, superficiellement, il semble que je pourrais utiliser un PriorityBlockingQueue comme file d'attente de tâches dans mon ThreadPoolExecutor, mais cela la file d'attente contient des objets Runnable, qui peuvent ou non être les tâches Runnable que je lui ai soumises. De plus, si j'ai soumis des tâches Callable, on ne sait pas comment cela serait jamais mappé.

Y a-t-il un moyen de le faire? Je préfère vraiment ne pas rouler le mien pour cela, car je suis beaucoup plus susceptible de me tromper de cette façon.

(Un aparté; oui, je suis conscient de la possibilité de famine pour les emplois moins prioritaires dans quelque chose comme ça. Points supplémentaires (?!) pour des solutions qui ont une garantie raisonnable de équité)

Author: Chris R, 2009-04-30

6 answers

, À première vue, il semblerait, vous pouvez définir une interface pour vos tâches qui s'étend Runnable ou Callable<T> et Comparable. Ensuite, enveloppez un ThreadPoolExecutor avec un PriorityBlockingQueue comme file d'attente et n'acceptez que les tâches qui implémentent votre interface.

En tenant compte de votre commentaire, il semble qu'une option consiste à étendre ThreadPoolExecutor et à remplacer les méthodes submit(). Référez-vous à AbstractExecutorService pour voir à quoi ressemblent ceux par défaut; tout ce qu'ils font est d'envelopper le Runnable ou Callable dans un FutureTask et execute(). Je ferais probablement cela en écrivant un classe wrapper qui implémente ExecutorService et délègue à un ThreadPoolExecutor interne anonyme. Enveloppez-les dans quelque chose qui a votre priorité, afin que votre Comparator puisse y arriver.

 8
Author: Adam Jaskiewicz, 2009-04-30 15:35:33

J'ai résolu ce problème de manière raisonnable, et je le décrirai ci-dessous pour référence future à moi-même et à toute autre personne qui rencontre ce problème avec les bibliothèques simultanées Java.

Utiliser un PriorityBlockingQueue comme moyen de conserver des tâches pour une exécution ultérieure est en effet un mouvement dans la bonne direction. Le problème est que le PriorityBlockingQueue doit être instancié de manière générique pour contenir des instances Runnable, et il est impossible d'appeler compareTo (ou similaire) sur un Runnable interface.

Sur la résolution du problème. Lors de la création de l'exécuteur, il doit recevoir un PriorityBlockingQueue. La file d'attente devrait en outre recevoir un comparateur personnalisé pour effectuer un tri correct sur place:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

Maintenant, un coup d'oeil à CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

Tout semble assez simple jusqu'à ce point. Cela devient un peu collant ici. Notre prochain problème est de traiter la création de futuretâches de l'exécuteur. Dans l'exécuteur, nous devons remplacer newTaskFor comme suit:

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

c est la tâche Callable que nous essayons d'exécuter. Maintenant, jetons un coup d'oeil à CustomFutureTask:

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

Notez la méthode getTask. Nous allons l'utiliser plus tard pour récupérer la tâche originale de ce CustomFutureTask que nous avons créé.

Et enfin, modifions la tâche d'origine que nous essayions d'exécuter:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

Vous pouvez voir que nous implémentons Comparable dans la tâche de déléguer au Comparator réel pour MyType.

Et là vous l'avez, priorisation personnalisée pour un exécuteur utilisant les bibliothèques Java! Il faut un peu de flexion, mais c'est le plus propre que j'ai pu trouver. J'espère que cela est utile à quelqu'un!

 16
Author: Mike, 2017-03-20 09:49:34

Vous pouvez utiliser ces classes auxiliaires:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

ET

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

ET cette méthode d'assistance:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

ET, puis l'utiliser comme ceci:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}
 4
Author: Stanislav Vitvitskyy, 2013-05-20 14:54:32

Je vais essayer d'expliquer ce problème avec un code fonctionnel. Mais avant de plonger dans le code, je voudrais expliquer à propos de PriorityBlockingQueue

PriorityBlockingQueue : PriorityBlockingQueue est une implémentation de BlockingQueue. Il accepte les tâches avec leur priorité et soumet la tâche avec la priorité la plus élevée pour l'exécution en premier. Si deux tâches ont la même priorité, nous devons fournir une logique personnalisée pour décider quelle tâche va premier.

Maintenant, entrons immédiatement dans le code.

Driver class : Cette classe crée un exécuteur qui accepte les tâches et les soumet ensuite à l'exécution. Ici, nous créons deux tâches une avec une faible priorité et l'autre avec une priorité élevée. Ici, nous disons à l'exécuteur d'exécuter un maximum de 1 threads et d'utiliser PriorityBlockingQueue.

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);


    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.MEDIUM,"Medium");
    executor.execute(new MyFutureTask(task));

}

MyTask class : MyTask implémente Runnable et accepte priority comme argument dans le constructeur. Lorsque cette tâche s'exécute, il imprime un message, puis met le thread en veille pendant 1 seconde.

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

MyFutureTask class : Puisque nous utilisons PriorityBlocingQueue pour contenir nos tâches, nos tâches doivent être enveloppées dans FutureTask et notre implémentation de FutureTask doit implémenter une interface comparable. L'interface comparable compare la priorité de 2 tâches différentes et soumet la tâche avec la priorité la plus élevée pour l'exécution.

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

Classe de Priorité : sans commentaire Classe de priorité.

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

Maintenant, lorsque nous exécutons cet exemple, nous obtenons la sortie suivante

The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low

Même si nous avons soumis la priorité BASSE en premier, mais la tâche HAUTE priorité plus tard, mais puisque nous utilisons un PriorityBlockingQueue, toute tâche avec une priorité plus élevée s'exécutera en premier.

 3
Author: thedarkpassenger, 2016-01-24 10:28:40

Ma solution préserve l'ordre de soumission des tâches pour les mêmes priorités. C'est une amélioration de cette réponse

L'ordre d'exécution des tâches {[8] } est basé sur:

  1. Priorité
  2. Soumettre l'ordre (dans la même priorité)

Testeur de classe:

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);

        //Priority=0
        executorService.submit(newCallable("A1", 200));     //Defaults to priority=0 
        executorService.execute(newRunnable("A2", 200));    //Defaults to priority=0
        executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));

        //Priority=1
        executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
        executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
        executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));

        executorService.shutdown();

    }

    private static Runnable newRunnable(String name, int delay) {
        return new Runnable() {
            @Override
            public void run() {
                System.out.println(name);
                sleep(delay);
            }
        };
    }

    private static Callable<Integer> newCallable(String name, int delay) {
        return new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(name);
                sleep(delay);
                return 10;
            }
        };
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

}

Résultat:

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

La première tâche est A1 car il n'y avait pas de priorité plus élevée dans la file d'attente lorsqu'elle a été insérée. B les tâches sont 1 priorité donc exécutées plus tôt, A les tâches sont 0 priorité donc exécutées plus tard, mais l'ordre d'exécution est le suivant ordre de soumission: B1, B2, B3,... A2, A3, A4 ...

La solution:

public class PriorityExecutors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
    }

    private static class PriorityExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        @SuppressWarnings({"unchecked"})
        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                long keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

        @Override
        public void execute(Runnable command) {
            // If this is ugly then delegator pattern needed
            if (command instanceof ComparableTask) //Already wrapped
                super.execute(command);
            else {
                super.execute(newComparableRunnableFor(command));
            }
        }

        private Runnable newComparableRunnableFor(Runnable runnable) {
            return new ComparableRunnable(ensurePriorityRunnable(runnable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new ComparableFutureTask<>(ensurePriorityCallable(callable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
        }

        private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
            return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
                    : PriorityCallable.of(callable, DEFAULT_PRIORITY);
        }

        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
        }

        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
            private Long sequentialOrder = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;

            public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
                super(priorityCallable);
                this.hasPriority = priorityCallable;
            }

            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
                super(priorityRunnable, result);
                this.hasPriority = priorityRunnable;
            }

            @Override
            public long getInstanceCount() {
                return sequentialOrder;
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }
        }

        private static class ComparableRunnable implements Runnable, ComparableTask {
            private Long instanceCount = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;
            private Runnable runnable;

            public ComparableRunnable(PriorityRunnable priorityRunnable) {
                this.runnable = priorityRunnable;
                this.hasPriority = priorityRunnable;
            }

            @Override
            public void run() {
                runnable.run();
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }

            @Override
            public long getInstanceCount() {
                return instanceCount;
            }
        }

        private interface ComparableTask extends Runnable {
            int getPriority();

            long getInstanceCount();

            public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
                return (o1, o2) -> {
                    int priorityResult = o2.getPriority() - o1.getPriority();
                    return priorityResult != 0 ? priorityResult
                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
                };
            }

        }

    }

    private static interface HasPriority {
        int getPriority();
    }

    public interface PriorityCallable<V> extends Callable<V>, HasPriority {

        public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
            return new PriorityCallable<V>() {
                @Override
                public V call() throws Exception {
                    return callable.call();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

    public interface PriorityRunnable extends Runnable, HasPriority {

        public static PriorityRunnable of(Runnable runnable, int priority) {
            return new PriorityRunnable() {
                @Override
                public void run() {
                    runnable.run();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

}
 1
Author: Daniel Hári, 2017-05-23 11:46:22

Serait-il possible d'avoir un ThreadPoolExecutor, pour chaque niveau de priorité? Un ThreadPoolExecutor peut être instancié avec une ThreadFactory et vous pouvez avoir votre propre implémentation d'un ThreadFactory pour définir les différents niveaux de priorité.

 class MaxPriorityThreadFactory implements ThreadFactory {
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setPriority(Thread.MAX_PRIORITY);
     }
 }
 0
Author: willcodejavaforfood, 2016-09-02 14:31:15