How to correctly multiply a matrix by another matrix in multiple threads?


My task is this. I need to multiply a 200x248 matrix by a 248x333 matrix in parallel on 8 threads. On the internet, I found a simple example of multiplying two 4x4 matrices on 4 threads, but I don't quite understand the logic of splitting this task between threads. Why does each flow have different cycle boundaries and how do they form at all? Why is there already 3 cycles in each thread, and not 2? Can you explain the algorithm to me so that I can use its analogy to multiply huge matrices by 8 streams?

Here is a part of this code (there is also data input from a file and output of the result to another file, a graphical interface and other things, but this is not so important in this matter).

Initializing static fields:

public static int[][] a;
public static int[][] b;
public static int[][] c;

Somewhere in main, threads are created and started:

            c = new int[a.length][b[0].length];

            Thread1 thread1 = new Thread1();
            Thread2 thread2 = new Thread2();
            Thread3 thread3 = new Thread3();
            Thread4 thread4 = new Thread4();

            thread1.start();
            thread2.start();
            thread3.start();
            thread4.start();

            try {
                thread1.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try {
                thread2.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try {
                thread3.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try {
                thread4.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

Four-thread code:

public static class Thread1 extends Thread {

    @Override
    public void run() {
        int m = a.length;
        int n = b[0].length;
        int k = (a.length) / 4;

        for (int i = 0; i <= k; i++) {
            for (int j = 0; j < n; j++) {
                for (int l = 0; l < b.length; l++) {
                    c[i][j] = c[i][j] + a[i][l] * b[l][j];
                }
            }
        }
    }
}

public static class Thread2 extends Thread {

    @Override
    public void run() {
        int m = a.length;
        int n = b[0].length;
        int k = (a.length) / 2 + 1;
        int s = ((a.length) / 4) + 1;

        for (int i = s; i < k; i++) {
            for (int j = 0; j < n; j++) {
                for (int l = 0; l < b.length; l++) {
                    c[i][j] = c[i][j] + a[i][l] * b[l][j];
                }
            }
        }
    }
}

public static class Thread3 extends Thread {

    @Override
    public void run() {
        int m = a.length;
        int n = b[0].length;
        int k = ((3 * (a.length)) / 4) + 1;
        int s = (a.length) / 2 + 1;

        for (int i = s; i < k; i++) {
            for (int j = 0; j < n; j++) {
                for (int l = 0; l < b.length; l++) {
                    c[i][j] = c[i][j] + a[i][l] * b[l][j];
                }
            }
        }
    }
}

public static class Thread4 extends Thread {

    @Override
    public void run() {
        int m = a.length;
        int n = b[0].length;
        int k = ((3 * (a.length)) / 4) + 1;


        for (int i = k; i < m; i++) {
            for (int j = 0; j < n; j++) {
          for (int l = 0; l < b.length; l++) {
                        c[i][j] = c[i][j] + a[i][l] * b[l][j];
                    }
                }
            }
        }
    }
Author: kostyabakay, 2015-12-11

2 answers

A complete test program that computes the product of matrices in multiple threads. In contrast to the option suggested in the other answer, it distributes calculations between threads more "fairly". The thread does not necessarily calculate the whole row of the new matrix, the calculations can start and end on any cell of the matrix.

import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;

/** Поток-вычислитель группы ячеек матрицы. */
class MultiplierThread extends Thread
{
    /** Первая (левая) матрица. */
    private final int[][] firstMatrix;
    /** Вторая (правая) матрица. */
    private final int[][] secondMatrix;
    /** Результирующая матрица. */
    private final int[][] resultMatrix;
    /** Начальный индекс. */
    private final int firstIndex;
    /** Конечный индекс. */
    private final int lastIndex;
    /** Число членов суммы при вычислении значения ячейки. */
    private final int sumLength;

    /**
     * @param firstMatrix  Первая (левая) матрица.
     * @param secondMatrix Вторая (правая) матрица.
     * @param resultMatrix Результирующая матрица.
     * @param firstIndex   Начальный индекс (ячейка с этим индексом вычисляется).
     * @param lastIndex    Конечный индекс (ячейка с этим индексом не вычисляется).
     */
    public MultiplierThread(final int[][] firstMatrix,
                            final int[][] secondMatrix,
                            final int[][] resultMatrix,
                            final int firstIndex,
                            final int lastIndex)
    {
        this.firstMatrix  = firstMatrix;
        this.secondMatrix = secondMatrix;
        this.resultMatrix = resultMatrix;
        this.firstIndex   = firstIndex;
        this.lastIndex    = lastIndex;

        sumLength = secondMatrix.length;
    }

    /**Вычисление значения в одной ячейке.
     *
     * @param row Номер строки ячейки.
     * @param col Номер столбца ячейки.
     */
    private void calcValue(final int row, final int col)
    {
        int sum = 0;
        for (int i = 0; i < sumLength; ++i)
            sum += firstMatrix[row][i] * secondMatrix[i][col];
        resultMatrix[row][col] = sum;
    }

    /** Рабочая функция потока. */
    @Override
    public void run()
    {
        System.out.println("Thread " + getName() + " started. Calculating cells from " + firstIndex + " to " + lastIndex + "...");

        final int colCount = secondMatrix[0].length;  // Число столбцов результирующей матрицы.
        for (int index = firstIndex; index < lastIndex; ++index)
            calcValue(index / colCount, index % colCount);

        System.out.println("Thread " + getName() + " finished.");
    }
}

class Main
{
    /** Заполнение матрицы случайными числами.
     *
     * @param matrix Заполняемая матрица.
     */
    private static void randomMatrix(final int[][] matrix)
    {
        final Random random = new Random();  // Генератор случайных чисел.

        for (int row = 0; row < matrix.length; ++row)           // Цикл по строкам матрицы.
            for (int col = 0; col < matrix[row].length; ++col)  // Цикл по столбцам матрицы.
                matrix[row][col] = random.nextInt(100);         // Случайное число от 0 до 100.
    }

    //

    /** Вывод матрицы в файл.
     * Производится выравнивание значений для лучшего восприятия.
     *
     * @param fileWriter Объект, представляющий собой файл для записи.
     * @param matrix Выводимая матрица.
     * @throws IOException
     */
    private static void printMatrix(final FileWriter fileWriter,
                                    final int[][] matrix) throws IOException
    {
        boolean hasNegative = false;  // Признак наличия в матрице отрицательных чисел.
        int     maxValue    = 0;      // Максимальное по модулю число в матрице.

        // Вычисляем максимальное по модулю число в матрице и проверяем на наличие отрицательных чисел.
        for (final int[] row : matrix) {  // Цикл по строкам матрицы.
            for (final int element : row) {  // Цикл по столбцам матрицы.
                int temp = element;
                if (element < 0) {
                    hasNegative = true;
                    temp = -temp;
                }
                if (temp > maxValue)
                    maxValue = temp;
            }
        }

        // Вычисление длины позиции под число.
        int len = Integer.toString(maxValue).length() + 1;  // Одно знакоместо под разделитель (пробел).
        if (hasNegative)
            ++len;  // Если есть отрицательные, добавляем знакоместо под минус.

        // Построение строки формата.
        final String formatString = "%" + len + "d";

        // Вывод элементов матрицы в файл.
        for (final int[] row : matrix) {  // Цикл по строкам матрицы.
            for (final int element : row)  // Цикл по столбцам матрицы.
                fileWriter.write(String.format(formatString, element));

            fileWriter.write("\n");  // Разделяем строки матрицы переводом строки.
        }
    }

    /**
     * Вывод трёх матриц в файл. Файл будет перезаписан.
     *
     * @param fileName     Имя файла для вывода.
     * @param firstMatrix  Первая матрица.
     * @param secondMatrix Вторая матрица.
     * @param resultMatrix Результирующая матрица.
     */
    private static void printAllMatrix(final String fileName,
                                       final int[][] firstMatrix,
                                       final int[][] secondMatrix,
                                       final int[][] resultMatrix)
    {
        try (final FileWriter fileWriter = new FileWriter(fileName, false)) {
            fileWriter.write("First matrix:\n");
            printMatrix(fileWriter, firstMatrix);

            fileWriter.write("\nSecond matrix:\n");
            printMatrix(fileWriter, secondMatrix);

            fileWriter.write("\nResult matrix:\n");
            printMatrix(fileWriter, resultMatrix);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    /** Однопоточное умножение матриц.
     *
     * @param firstMatrix  Первая матрица.
     * @param secondMatrix Вторая матрица.
     * @return Результирующая матрица.
     */
    private static int[][] multiplyMatrix(final int[][] firstMatrix,
                                          final int[][] secondMatrix)
    {
        final int rowCount = firstMatrix.length;             // Число строк результирующей матрицы.
        final int colCount = secondMatrix[0].length;         // Число столбцов результирующей матрицы.
        final int sumLength = secondMatrix.length;           // Число членов суммы при вычислении значения ячейки.
        final int[][] result = new int[rowCount][colCount];  // Результирующая матрица.

        for (int row = 0; row < rowCount; ++row) {  // Цикл по строкам матрицы.
            for (int col = 0; col < colCount; ++col) {  // Цикл по столбцам матрицы.
                int sum = 0;
                for (int i = 0; i < sumLength; ++i)
                    sum += firstMatrix[row][i] * secondMatrix[i][col];
                result[row][col] = sum;
            }
        }

        return result;
    }

    /** Многопоточное умножение матриц.
     *
     * @param firstMatrix  Первая (левая) матрица.
     * @param secondMatrix Вторая (правая) матрица.
     * @param threadCount Число потоков.
     * @return Результирующая матрица.
     */
    private static int[][] multiplyMatrixMT(final int[][] firstMatrix,
                                            final int[][] secondMatrix,
                                            int threadCount)
    {
        assert threadCount > 0;

        final int rowCount = firstMatrix.length;             // Число строк результирующей матрицы.
        final int colCount = secondMatrix[0].length;         // Число столбцов результирующей матрицы.
        final int[][] result = new int[rowCount][colCount];  // Результирующая матрица.

        final int cellsForThread = (rowCount * colCount) / threadCount;  // Число вычисляемых ячеек на поток.
        int firstIndex = 0;  // Индекс первой вычисляемой ячейки.
        final MultiplierThread[] multiplierThreads = new MultiplierThread[threadCount];  // Массив потоков.

        // Создание и запуск потоков.
        for (int threadIndex = threadCount - 1; threadIndex >= 0; --threadIndex) {
            int lastIndex = firstIndex + cellsForThread;  // Индекс последней вычисляемой ячейки.
            if (threadIndex == 0) {
                /* Один из потоков должен будет вычислить не только свой блок ячеек,
                   но и остаток, если число ячеек не делится нацело на число потоков. */
                lastIndex = rowCount * colCount;
            }
            multiplierThreads[threadIndex] = new MultiplierThread(firstMatrix, secondMatrix, result, firstIndex, lastIndex);
            multiplierThreads[threadIndex].start();
            firstIndex = lastIndex;
        }

        // Ожидание завершения потоков.
        try {
            for (final MultiplierThread multiplierThread : multiplierThreads)
                multiplierThread.join();
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }

        return result;
    }

    /** Число строк первой матрицы. */
    final static int FIRST_MATRIX_ROWS  = 1000;
    /** Число столбцов первой матрицы. */
    final static int FIRST_MATRIX_COLS  = 1000;
    /** Число строк второй матрицы (должно совпадать с числом столбцов первой матрицы). */
    final static int SECOND_MATRIX_ROWS = FIRST_MATRIX_COLS;
    /** Число столбцов второй матрицы. */
    final static int SECOND_MATRIX_COLS = 1000;

    public static void main(String[] args)
    {
        final int[][] firstMatrix  = new int[FIRST_MATRIX_ROWS][FIRST_MATRIX_COLS];    // Первая (левая) матрица.
        final int[][] secondMatrix = new int[SECOND_MATRIX_ROWS][SECOND_MATRIX_COLS];  // Вторая (правая) матрица.

        randomMatrix(firstMatrix);
        randomMatrix(secondMatrix);

        final int[][] resultMatrixMT = multiplyMatrixMT(firstMatrix, secondMatrix, Runtime.getRuntime().availableProcessors());

        // Проверка многопоточных вычислений с помощью однопоточных.
        final int[][] resultMatrix = multiplyMatrix(firstMatrix, secondMatrix);

        for (int row = 0; row < FIRST_MATRIX_ROWS; ++row) {
            for (int col = 0; col < SECOND_MATRIX_COLS; ++col) {
                if (resultMatrixMT[row][col] != resultMatrix[row][col]) {
                    System.out.println("Error in multithreaded calculation!");
                    return;
                }
            }
        }

        printAllMatrix("Matrix.txt", firstMatrix, secondMatrix, resultMatrixMT);
    }
}

P.S. Among the features-formatted output of matrices to a file and automatic determination of the size of the matrices in the calculation functions. As a bonus - single-threaded calculation and control of a multithreaded result using a single-threaded one.

 7
Author: , 2015-12-12 09:41:20

Here I threw the working code:

    public static class CalcThread extends Thread {
        private int startRow, endRow;
        private int[][] a, b, result;
        private int n;

        public CalcThread(int[][] a, int[][] b, int[][] result, int startRow, int endRow) {
            this.a = a;
            this.b = b;
            this.result = result;
            this.startRow = startRow;
            this.endRow = endRow;
            this.n = b.length;
        }

        @Override
        public void run() {
            System.out.println("Считаю со строки " + startRow + " до строки " + endRow + " включительно");
            for (int row = startRow; row <= endRow ; row++) {
                for (int col = 0; col < result[row].length; col++) {
                    result[row][col] = calcSingleValue(row, col);
                }
            }
        }

        private int calcSingleValue(int row, int col) {
            int c = 0;
            for (int i = 0; i < n; i++) {
                c += a[row][i] * b[i][col];
            }
            return c;
        }

    }

    public static int[][] multiply(int[][] a, int[][] b, int threadsCount) {
        //проверки
        if (a == null || a.length == 0 || a[0] == null || a[0].length == 0) {
            throw new IllegalArgumentException("a");
        }
        if (b == null || b.length == 0 || b[0] == null || b[0].length == 0) {
            throw new IllegalArgumentException("b");
        }
        if (a[0].length != b.length) {
            throw new IllegalArgumentException("матрицы не согласованы");
        }
        //определяем размеры результирующей матрицы
        int m = a.length;
        int q = b[0].length;
        int[][] result = new int[m][q];
        //если количество потоков больше чем количество строк - уменьшим кол-во потоков
        if (threadsCount > m) {
            threadsCount = m;
        }
        //посчитаем сколько строк результирующей матрицы будет считать каждый поток
        int count = m / threadsCount;
        int additional = m % threadsCount; //если не делится на threadsCount, то добавим к первому потоку
        //создаем и запускаем потоки
        Thread[] threads = new Thread[threadsCount];
        int start = 0;
        for (int i = 0; i < threadsCount; i++) {
            int cnt = ((i == 0) ? count + additional : count);
            threads[i] = new CalcThread(a, b, result, start, start + cnt - 1);
            start += cnt;
            threads[i].start();
        }
        //ждем завершения
        try {
            for (Thread thread : threads) {
                thread.join();
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted");
        }
        return result;
    }

    public static void main(String[] args) {
        int[][] a = {{1, 2, 3, 4}, {5, 6, 7, 8}, {9, 10, 11, 12}};
        int[][] b = {{2, 1, 2, 1, 2, 1, 2}, {1, 2, 1, 2, 1, 2, 1}, {2, 1, 2, 1, 2, 1, 2}, {1, 2, 1, 2, 1, 2, 1}};
        int[][] c = multiply(a, b, 8);

        for (int[] ints : c) {
            for (int anInt : ints) {
                System.out.print(anInt + " ");
            }
            System.out.println();
        }
}
 5
Author: Russtam, 2015-12-11 16:09:40