Lab 6 — Distributed Machine Learning

Question 1 — Generate Observations

var True_regressor float64 = 2
var True_intercept float64 = 5
var number_data int = 1000
x_values_node_1 := make([]float64, number_data)
y_values_node_1 := make([]float64, number_data)
x_values_node_2 := make([]float64, number_data)
y_values_node_2 := make([]float64, number_data)
x_values_node_3 := make([]float64, number_data)
y_values_node_3 := make([]float64, number_data)
s1 := rand.NewSource(time.Now().UnixNano())
r1 := rand.New(s1)
for i := 0; i < number_data; i++ {
  var rnd = r1.NormFloat64()
  x_values_node_1[i] = r1.Float64()
  y_values_node_1[i] = True_regressor*x_values_node_1[i] + True_intercept + r1.NormFloat64()
  x_values_node_2[i] = r1.Float64()
  y_values_node_2[i] = True_regressor*x_values_node_2[i] + True_intercept + r1.NormFloat64()
  x_values_node_3[i] = r1.Float64()
  y_values_node_3[i] = True_regressor*x_values_node_3[i] + True_intercept + r1.NormFloat64()
}
s1 := rand.NewSource(time.Now().UnixNano()) sert à initialiser la seed du générateur de nombres aléatoires avec une valeur basée sur le temps actuel en nanosecondes. Cela garantit que chaque exécution du programme produit une séquence différente de nombres aléatoires. Ensuite, r1 := rand.New(s1) crée une nouvelle instance du générateur de nombres aléatoires en utilisant cette seed. Ainsi, toutes les valeurs générées par r1 seront pseudo-aléatoires et dépendront de la seed initiale.

Question 2 — Gradient Descent

Ce problème d'optimisation se résout en regardant où sont les minimas de la fonction FF.
Fa=2Kk=1Kxik(yikaxikb),Fb=2Kk=1K(yikaxikb). \begin{aligned} \frac{\partial F}{\partial a} &= -\frac{2}{K} \sum_{k=1}^{K} x_i^k \bigl(y_i^k - a x_i^k - b\bigr), \\[6pt] \frac{\partial F}{\partial b} &= -\frac{2}{K} \sum_{k=1}^{K} \bigl(y_i^k - a x_i^k - b\bigr). \end{aligned}
Pour implémenter cela, l'idée serait d'itérer sur un certain nombre d'itérations, en calculant à chaque fois les gradients par rapport à aa et bb. Ensuite, on met à jour aa et bb en soustrayant une fraction (le taux d'apprentissage) des gradients calculés. On continue ce processus jusqu'à ce que les changements dans aa et bb soient inférieurs à une certaine tolérance, ou jusqu'à atteindre un nombre maximum d'itérations.
Inputs: x[1..K], y[1..K], learning rate η, max iterations N, tolerance ε

Initialize: 
    a ← 0
    b ← 0

Repeat for iter = 1 to N:
    grad_a ← 0
    grad_b ← 0

    For k = 1 to K:
        error ← y[k] − (a * x[k] + b)
        grad_a ← grad_a − 2 * x[k] * error
        grad_b ← grad_b − 2 * error
    End For

    grad_a ← grad_a / K
    grad_b ← grad_b / K

    a_new ← a − η * grad_a
    b_new ← b − η * grad_b

    If |a_new − a| < ε AND |b_new − b| < ε then
        a ← a_new
        b ← b_new
        Stop
    End If

    a ← a_new
    b ← b_new

End Repeat

Return a, b

Question 3 — Gradient Descent Implementation

func step(b_current float64, a_current float64, x_values []float64, y_values []float64, learning_rate float64) (float64, float64) {
    var b_gradient float64 = 0
    var a_gradient float64 = 0
    length := len(y_values)
    
    for i := 0; i < length; i++ {
        two_over_n := float64(2) / float64(length)
        b_gradient += -two_over_n * (y_values[i] - (a_current*x_values[i] + b_current))
        a_gradient += -two_over_n * x_values[i] * (y_values[i] - (a_current*x_values[i] + b_current))
    }

    new_b := b_current - learning_rate*b_gradient
    new_a := a_current - learning_rate*a_gradient
    return new_b, new_a
}

Question 4 — Regression Implementation

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func Regression(x_values []float64, y_values []float64, learning_rate float64, epochs int) (float64, float64) {
    if len(x_values) == 0 || len(y_values) == 0 {
        panic("Input arrays must not be empty.")
    }

    // Initialization
    var b_current float64 = 0
    var a_current float64 = 0

    // Iterations of the gradient step
    for i := 0; i < epochs; i++ {
        b_current, a_current = step(b_current, a_current, x_values, y_values, learning_rate)
    }

    return b_current, a_current
}

func main() {
    // Paramètres des données
    number_data := 100
    true_a := 2.0
    true_b := 5.0

    // Génération des données
    x_values := make([]float64, number_data)
    y_values := make([]float64, number_data)

    s := rand.NewSource(time.Now().UnixNano())
    r := rand.New(s)

    for i := 0; i < number_data; i++ {
        x_values[i] = r.Float64()
        y_values[i] = true_a*x_values[i] + true_b + r.NormFloat64() // bruit normal
    }

    // Paramètres du gradient descent
    learning_rate := 0.1
    epochs := 1000

    // Appel de la régression
    est_b, est_a := Regression(x_values, y_values, learning_rate, epochs)

    fmt.Printf("Estimation : a = %.4f, b = %.4f\n", est_a, est_b)
    fmt.Printf("Valeurs réelles : a = %.4f, b = %.4f\n", true_a, true_b)
}

Question 5 — Distributed Set-up

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// Structure pour les paramètres de la régression
type Linear_regression_Parameters struct {
    Intercept  float64
    Regressor  float64
}

// Step function: mise à jour locale
func step(b_current float64, a_current float64, x_values []float64, y_values []float64, learning_rate float64) (float64, float64) {
    b_gradient := 0.0
    a_gradient := 0.0
    length := len(y_values)

    for i := 0; i < length; i++ {
        two_over_n := float64(2) / float64(length)
        b_gradient += -two_over_n * (y_values[i] - (a_current*x_values[i] + b_current))
        a_gradient += -two_over_n * x_values[i] * (y_values[i] - (a_current*x_values[i] + b_current))
    }

    new_b := b_current - learning_rate*b_gradient
    new_a := a_current - learning_rate*a_gradient
    return new_b, new_a
}

// Regression distribuée pour un nœud
func Regression(x_values []float64, y_values []float64, learning_rate float64, c chan Linear_regression_Parameters, epochs int) {
    b_current := 0.0
    a_current := 0.0

    for i := 0; i < epochs; i++ {
        // Étape locale
        b_local, a_local := step(b_current, a_current, x_values, y_values, learning_rate)

        // Pull depuis le channel global
        global_params := <-c

        // Mise à jour en moyenne avec les paramètres globaux
        b_current = 0.5*b_local + 0.5*global_params.Intercept
        a_current = 0.5*a_local + 0.5*global_params.Regressor

        // Push des nouveaux paramètres dans le channel
        c <- Linear_regression_Parameters{Intercept: b_current, Regressor: a_current}
    }
}

func main() {
    rand.Seed(time.Now().UnixNano())
    c := make(chan Linear_regression_Parameters, 1)

    // Paramètres des données
    True_regressor := 2.0
    True_intercept := 5.0
    number_data := 1000

    x_values_node_1 := make([]float64, number_data)
    y_values_node_1 := make([]float64, number_data)
    x_values_node_2 := make([]float64, number_data)
    y_values_node_2 := make([]float64, number_data)
    x_values_node_3 := make([]float64, number_data)
    y_values_node_3 := make([]float64, number_data)

    r := rand.New(rand.NewSource(time.Now().UnixNano()))
    for i := 0; i < number_data; i++ {
        x_values_node_1[i] = r.Float64()
        y_values_node_1[i] = True_regressor*x_values_node_1[i] + True_intercept + r.NormFloat64()
        x_values_node_2[i] = r.Float64()
        y_values_node_2[i] = True_regressor*x_values_node_2[i] + True_intercept + r.NormFloat64()
        x_values_node_3[i] = r.Float64()
        y_values_node_3[i] = True_regressor*x_values_node_3[i] + True_intercept + r.NormFloat64()
    }

    // Initialisation du channel avec des paramètres neutres
    c <- Linear_regression_Parameters{Intercept: 0, Regressor: 0}

    epochs := 1000
    learning_rate := 0.1

    // Lancement des 3 nœuds en parallèle
    go Regression(x_values_node_1, y_values_node_1, learning_rate, c, epochs)
    go Regression(x_values_node_2, y_values_node_2, learning_rate, c, epochs)
    go Regression(x_values_node_3, y_values_node_3, learning_rate, c, epochs)

    // Attendre la convergence
    time.Sleep(2 * time.Second)

    // Récupérer les paramètres finaux
    final_params := <-c
    fmt.Printf("Intercept: %f\n", final_params.Intercept)
    fmt.Printf("Coefficient: %f\n", final_params.Regressor)
}

Question 6 — Approximation Stochastique

On considère pour chaque nœud ii le problème de régression linéaire suivant :
Fi(a,b)=1Kk=1K(yikaxikb)2F_i(a,b) = \frac{1}{K} \sum_{k=1}^{K} (y_i^k - a x_i^k - b)^2
Les gradients de cette fonction de coût sont donnés par :
Fia=2Kk=1Kxik(yikaxikb),Fib=2Kk=1K(yikaxikb).\begin{aligned} \frac{\partial F_i}{\partial a} &= -\frac{2}{K} \sum_{k=1}^{K} x_i^k \bigl(y_i^k - a x_i^k - b\bigr), \\[2mm] \frac{\partial F_i}{\partial b} &= -\frac{2}{K} \sum_{k=1}^{K} \bigl(y_i^k - a x_i^k - b\bigr). \end{aligned}
Dans l'algorithme distribué, chaque nœud effectue à chaque itération une étape locale :
(blocal,alocal)=step(bcurrent,acurrent,{xik},{yik},η)(b_{\text{local}}, a_{\text{local}}) = \text{step}(b_{\text{current}}, a_{\text{current}}, \{x_i^k\}, \{y_i^k\}, \eta)
Puis il récupère les paramètres globaux depuis le canal et met à jour ses paramètres en moyenne :
(bcurrent,acurrent)=12(blocal,alocal)+12(bglobal,aglobal)(b_{\text{current}}, a_{\text{current}}) = \frac{1}{2} (b_{\text{local}}, a_{\text{local}}) + \frac{1}{2} (b_{\text{global}}, a_{\text{global}})
Si chaque nœud ne regarde qu'un sous-ensemble de ses données à chaque itération, le gradient calculé devient un gradient bruité :
gi(θ(t),ξt)=Fi(θ(t);ξt),θ=(b,a)g_i(\theta^{(t)}, \xi_t) = \nabla F_i(\theta^{(t)}; \xi_t), \quad \theta = (b,a)
La mise à jour globale peut alors s'écrire comme une approximation stochastique :
θ(t+1)=θ(t)γtgi(θ(t),ξt)+12(θglobalθ(t))\theta^{(t+1)} = \theta^{(t)} - \gamma_t g_i(\theta^{(t)}, \xi_t) + \frac{1}{2}(\theta_{\text{global}} - \theta^{(t)})
  • γtgi(θ(t),ξt)-\gamma_t g_i(\theta^{(t)}, \xi_t) : gradient bruité local
  • 12(θglobalθ(t))\frac{1}{2}(\theta_{\text{global}} - \theta^{(t)}) : terme de consensus pour stabiliser et synchroniser les nœuds
  • La approximation stochastique explique pourquoi les paramètres oscillent légèrement autour de la solution globale (le gradient local est bruité par l'échantillonnage des données).
  • Le terme de consensus réduit ces oscillations et accélère la convergence.
  • Avec suffisamment d'itérations et de nœuds, l'algorithme converge vers une solution proche de la régression linéaire classique sur toutes les données.
L'algorithme distribué est une approximation stochastique avec un terme de consensus, ce qui explique sa stabilité et la vitesse de convergence observée.