Source code for MVBLS

import numpy as np
from numpy import random
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin, MultiOutputMixin
from abc import ABCMeta
from sklearn.preprocessing import StandardScaler, LabelBinarizer
from sklearn.linear_model import Lasso, Ridge, RidgeClassifier
from scipy.spatial.distance import cdist
from scipy.linalg import orth
from sklearn.utils.estimator_checks import check_estimator
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted, _check_sample_weight
from sklearn.utils import column_or_1d
import torch
import joblib


class SemiRidge:
    def __init__(self, reg_alpha=0.0, reg_laplacian=0.0, k_neighbors=3, sigma=1.0):
        self.reg_alpha = reg_alpha
        self.reg_laplacian = reg_laplacian
        self.k_neighbors = k_neighbors
        self.sigma = sigma

    @staticmethod
    def _generate_laplacian_matrix(X, k_neighbors, sigma=1.0):
        assert sigma > 0.0
        S = cdist(X, X, metric='euclidean')
        N = len(S)
        if k_neighbors > 0:
            W = np.zeros((N, N))
            distance = torch.from_numpy(S)
            _, idx_near = torch.topk(-distance, dim=-1, largest=True, k=k_neighbors + 1)
            idx_near = idx_near.numpy()
            idx_near = idx_near[:, 1:]
            for i in range(N):
                for j in idx_near[i]:
                    W[i][j] = np.exp(-S[i][j] / (2 * sigma ** 2))
                    W[j][i] = W[i][j]
            D = np.sum(W, axis=1)
            L = np.diag(D) - W
        else:
            L = np.zeros((N, N))
        return L

    def fit(self, X, y, sample_weight=None):
        X = X.copy()
        y = y.copy()
        y = np.asarray(y, dtype=X.dtype)
        num_labeled = len(y)
        X_offset = np.average(X, axis=0, weights=sample_weight)
        X -= X_offset
        y_offset = np.average(y, axis=0, weights=sample_weight[: num_labeled])
        y -= y_offset
        feature_dim = X.shape[1]
        X_labeled = X[: num_labeled]
        number_balance = sample_weight[: num_labeled].sum() / sample_weight.sum()
        L = self._generate_laplacian_matrix(X, min(self.k_neighbors, len(X) - 1), self.sigma)
        labeled_weight = np.diag(sample_weight[: num_labeled])
        sample_weight = np.diag(sample_weight)
        self.coef_ = np.linalg.inv(
            X_labeled.T @ labeled_weight @ X_labeled + self.reg_alpha * np.eye(
                feature_dim) + self.reg_laplacian * number_balance * X.T @ L @ sample_weight @ X) @ X_labeled.T @ labeled_weight @ y
        self.intercept_ = y_offset - np.dot(X_offset, self.coef_)

    def predict(self, X):
        return np.dot(X, self.coef_) + self.intercept_


class NodeGenerator:
    def __init__(self, active_function='relu', n_nodes_H=10, n_nodes_Z=10, n_groups_Z=10, reg_lambda=0.0):
        self.n_nodes_H = n_nodes_H
        self.n_nodes_Z = n_nodes_Z
        self.n_groups_Z = n_groups_Z
        self.reg_lambda = reg_lambda
        self.active_function = active_function

    @staticmethod
    def _sigmoid(data):
        return 1.0 / (1 + np.exp(-data))

    @staticmethod
    def _linear(data):
        return data

    @staticmethod
    def _tanh(data):
        return (np.exp(data) - np.exp(-data)) / (np.exp(data) + np.exp(-data))

    @staticmethod
    def _relu(data):
        return np.maximum(data, 0)

    @staticmethod
    def _generator(fea_dim, node_size):
        W = 2 * random.random(size=(fea_dim, node_size)) - 1
        b = 2 * random.random(size=(1, node_size)) - 1
        return W, b

    def _generate_h(self, data, sample_weight=None):

        self.nonlinear_ = {
            'linear': self._linear,
            'sigmoid': self._sigmoid,
            'tanh': self._tanh,
            'relu': self._relu
        }[self.active_function]
        self.scaler_H_ = StandardScaler()
        fea_dim = data.shape[1]
        W_H, self.b_H_ = self._generator(fea_dim, self.n_nodes_H)
        if fea_dim >= self.n_nodes_H:
            self.W_H_ = orth(W_H)
        else:
            self.W_H_ = orth(W_H.T).T
        data_ = np.dot(data, self.W_H_) + self.b_H_
        self.scaler_H_.fit(data_, sample_weight=sample_weight)
        return self.nonlinear_(self.scaler_H_.transform(data_))

    def _generate_z(self, X, sample_weight=None, view=None):
        fea_dim = X.shape[1]
        if view is not None:
            try:
                self.W_Z_[view] = []
                self.scaler_Z_[view] = []
            except:
                self.W_Z_ = {}
                self.scaler_Z_ = {}
                self.W_Z_[view] = []
                self.scaler_Z_[view] = []
            Z = None
            for i in range(self.n_groups_Z):
                W_Z, b_Z = self._generator(fea_dim, self.n_nodes_Z)
                data_ = np.dot(X, W_Z) + b_Z
                model = Lasso(alpha=self.reg_lambda, max_iter=1000)
                if sample_weight is not None:
                    model.fit(data_, X, sample_weight=sample_weight.copy())
                else:
                    model.fit(data_, X)
                if fea_dim > 1:
                    self.W_Z_[view].append(model.coef_)
                else:
                    self.W_Z_[view].append(model.coef_.reshape(1, -1))
                self.scaler_Z_[view].append(StandardScaler())
                Z_ = np.dot(X, self.W_Z_[view][i])
                if sample_weight is not None:
                    self.scaler_Z_[view][i].fit(Z_, sample_weight=sample_weight.copy())
                else:
                    self.scaler_Z_[view][i].fit(Z_)
                Z_ = self.scaler_Z_[view][i].transform(Z_)
                if Z is None:
                    Z = Z_
                else:
                    Z = np.c_[Z, Z_]
        else:
            self.W_Z_ = []
            self.scaler_Z_ = []
            Z = None
            for i in range(self.n_groups_Z):
                W_Z, b_Z = self._generator(fea_dim, self.n_nodes_Z)
                data_ = np.dot(X, W_Z) + b_Z
                model = Lasso(alpha=self.reg_lambda, max_iter=1000)
                if sample_weight is not None:
                    model.fit(data_, X, sample_weight=sample_weight.copy())
                else:
                    model.fit(data_, X)
                if fea_dim > 1:
                    self.W_Z_.append(model.coef_)
                else:
                    self.W_Z_.append(model.coef_.reshape(1, -1))
                self.scaler_Z_.append(StandardScaler())
                Z_ = np.dot(X, self.W_Z_[i])
                if sample_weight is not None:
                    self.scaler_Z_[i].fit(Z_, sample_weight=sample_weight.copy())
                else:
                    self.scaler_Z_[i].fit(Z_)
                Z_ = self.scaler_Z_[i].transform(Z_)
                if Z is None:
                    Z = Z_
                else:
                    Z = np.c_[Z, Z_]
        return Z

    def _transform_h(self, X=None):
        return self.nonlinear_(self.scaler_H_.transform(np.dot(X, self.W_H_) + self.b_H_))

    def _transform_z(self, X=None, view=None):
        if view is not None:
            Z = None
            for i in range(self.n_groups_Z):
                Z_ = self.scaler_Z_[view][i].transform(np.dot(X, self.W_Z_[view][i]))
                if Z is None:
                    Z = Z_
                else:
                    Z = np.c_[Z, Z_]
        else:
            Z = None
            for i in range(self.n_groups_Z):
                Z_ = self.scaler_Z_[i].transform(np.dot(X, self.W_Z_[i]))
                if Z is None:
                    Z = Z_
                else:
                    Z = np.c_[Z, Z_]
        return Z


class MVBLS(NodeGenerator, BaseEstimator, metaclass=ABCMeta):

    def __init__(self, n_nodes_H=1000, active_function='relu', n_nodes_Z=10, n_groups_Z=10, reg_alpha=0.1, reg_lambda=0.1, view_list=None,
                 random_state=0):

        NodeGenerator.__init__(self, active_function=active_function, n_nodes_H=n_nodes_H, n_nodes_Z=n_nodes_Z, n_groups_Z=n_groups_Z,
                               reg_lambda=reg_lambda)
        self.reg_alpha = reg_alpha
        self.view_list = view_list
        self.random_state = random_state

    def save_model(self, file):
        """
        Parameters
        ----------
        file: str
            Controls the filename.
        """
        check_is_fitted(self, ['estimator_'])
        joblib.dump(self, filename=file)

    def _decision_function(self, X):
        check_is_fitted(self, ['estimator_'])
        if self.view_list is not None:
            assert isinstance(X, dict)
            Z = None
            for view in self.view_list:
                X_ = check_array(X[view])
                Z_ = self._transform_z(X_, view)
                if Z is None:
                    Z = Z_
                else:
                    Z = np.c_[Z, Z_]
        else:
            X = check_array(X)
            Z = self._transform_z(X, view=None)
        H = self._transform_h(Z)
        return self.estimator_.predict(np.c_[Z, H])

    def fit(self, X, y, sample_weight=None):
        """Build a broad learning systerm model from the training set (X, y).

            Parameters
            ----------
            X : {ndarray, sparse matrix} of shape (n_samples, n_features) or dict
                Training data

            y : ndarray of shape (n_samples,) or (n_samples, n_targets)
                Target values

            Returns
            -------
            self : returns an instance of self.
        """
        np.random.seed(self.random_state)
        self.estimator_ = Ridge(alpha=self.reg_alpha)
        # generate Z

        if self.view_list is not None:
            assert isinstance(X, dict)
            Z = None
            for view in self.view_list:
                X_, y = check_X_y(X[view], y, dtype=[np.float64, np.float32], multi_output=True, y_numeric=True)
                sample_weight = _check_sample_weight(sample_weight, X_, dtype=X_.dtype)
                Z_ = self._generate_z(X_, sample_weight=sample_weight, view=view)
                if Z is None:
                    Z = Z_
                else:
                    Z = np.c_[Z, Z_]
        else:
            X, y = check_X_y(X, y, dtype=[np.float64, np.float32], multi_output=True, y_numeric=True)
            sample_weight = _check_sample_weight(sample_weight, X, dtype=X.dtype)
            Z = self._generate_z(X, sample_weight=sample_weight, view=None)
        # generate H
        H = self._generate_h(Z, sample_weight)
        self.estimator_.fit(np.c_[Z, H], y, sample_weight=sample_weight)
        return self


[docs]class MVBLSRegressor(MultiOutputMixin, RegressorMixin, MVBLS): """ MVBLS Regressor. Construct a broad learning systerm model. Parameters ---------- n_nodes_H: int, default=1000 Controls the number of enhancement nodes. active_function: {str, ('relu', 'tanh', 'sigmoid' or 'linear')}, default='relu' Controls the active function of enhancement nodes. n_nodes_Z: int, default=10 Controls the number of feature nodes in each group. n_groups_Z: int, default=10 Controls the number of feature node groups. reg_alpha: float, default=0.1 Regularization strength; must be a positive float. Regularization improves the conditioning of the problem and reduces the variance of the estimates. Larger values specify stronger regularization. reg_lambda: float, default=0.1 Constant that multiplies the L1 term. Defaults to 1.0. ``alpha = 0`` is equivalent to an ordinary least square. view_list: list, default=None List of view names. random_state: int, default=0 Controls the randomness of the estimator. """
[docs] def predict(self, X): """ Return the predicted value for each sample. Parameters ---------- X : array_like or sparse matrix, shape (n_samples, n_features) Samples. Returns ------- C : array, shape (n_samples,) Returns predicted values. """ return self._decision_function(X)
[docs]class MVBLSClassifier(ClassifierMixin, MVBLS): """ MVBLS classifier. Construct a broad learning systerm model. Parameters ---------- n_nodes_H: int, default=1000 Controls the number of enhancement nodes. active_function: {str, ('relu', 'tanh', 'sigmoid' or 'linear')}, default='relu' Controls the active function of enhancement nodes. n_nodes_Z: int, default=10 Controls the number of feature nodes in each group. n_groups_Z: int, default=10 Controls the number of feature node groups. reg_alpha: float, default=0.1 Regularization strength; must be a positive float. Regularization improves the conditioning of the problem and reduces the variance of the estimates. Larger values specify stronger regularization. reg_lambda: float, default=0.1 Constant that multiplies the L1 term. Defaults to 1.0. ``reg_lambda = 0`` is equivalent to an ordinary least square. view_list: list, default=None List of view names. random_state: int, default=0 Controls the randomness of the estimator. """
[docs] def predict(self, X): """ Predict class labels for samples in X. Parameters ---------- X : array_like or sparse matrix, shape (n_samples, n_features) Samples. Returns ------- C : array, shape [n_samples] Predicted class label per sample. """ scores = self._decision_function(X) scores = scores.ravel() if scores.shape[1] == 1 else scores if len(scores.shape) == 1: indices = (scores > 0).astype(int) else: indices = scores.argmax(axis=1) return self.classes_[indices]
[docs] def fit(self, X, y, sample_weight=None): """ Build a broad learning systerm model from the training set (X, y). Parameters ---------- X : {ndarray, sparse matrix} of shape (n_samples, n_features) or dict Training data. y : ndarray of shape (n_samples,) Target values. sample_weight : float or ndarray of shape (n_samples,), default=None Individual weights for each sample. If given a float, every sample will have the same weight. Returns ------- self : object Instance of the estimator. """ self._label_binarizer = LabelBinarizer(pos_label=1, neg_label=-1) Y = self._label_binarizer.fit_transform(y) if not self._label_binarizer.y_type_.startswith('multilabel'): _ = column_or_1d(y, warn=True) else: # we don't (yet) support multi-label classification in Ridge raise ValueError( "%s doesn't support multi-label classification" % ( self.__class__.__name__)) super().fit(X, Y, sample_weight) return self
@property def classes_(self): """ Classes labels """ return self._label_binarizer.classes_
class SemiMVBLS(MVBLS): def __init__(self, reg_laplacian=1.0, k_neighbors=5, sigma=1.0, unlabeled_data=None, **kwargs): MVBLS.__init__(self, **kwargs) self.reg_laplacian = reg_laplacian self.k_neighbors = k_neighbors self.sigma = sigma self.unlabeled_data = unlabeled_data def save_model(self, file): """ Parameters ---------- file: str Controls the filename. """ check_is_fitted(self, ['estimator_']) self.unlabeled_data = None joblib.dump(self, filename=file) def fit(self, X, y, sample_weight=None): """Build a broad learning systerm model from the training set (X, y). Parameters ---------- X : {ndarray, sparse matrix} of shape (n_samples, n_features) or dict Training data y : ndarray of shape (n_samples,) or (n_samples, n_targets) Target values Returns ------- self : returns an instance of self. """ if self.unlabeled_data is not None: assert isinstance(self.unlabeled_data, dict) np.random.seed(self.random_state) self.estimator_ = SemiRidge(reg_alpha=self.reg_alpha, reg_laplacian=self.reg_laplacian, k_neighbors=self.k_neighbors, sigma=self.sigma) # generate Z if self.view_list is not None: assert isinstance(X, dict) Z = None for view in self.view_list: X_, y = check_X_y(X[view], y, dtype=[np.float64, np.float32], multi_output=True, y_numeric=True) if Z is None: sample_weight = _check_sample_weight(sample_weight, X_, dtype=X_.dtype) if self.unlabeled_data is None: Z_ = self._generate_z(X_, sample_weight=sample_weight, view=view) else: uX_ = check_array(self.unlabeled_data[view], dtype=[np.float64, np.float32]) if Z is None: uW_ = np.ones(len(uX_)) sample_weight = np.r_[sample_weight, uW_] Z_ = self._generate_z(np.r_[X_, uX_], sample_weight=sample_weight, view=view) if Z is None: Z = Z_ else: Z = np.c_[Z, Z_] else: X, y = check_X_y(X, y, dtype=[np.float64, np.float32], multi_output=True, y_numeric=True) sample_weight = _check_sample_weight(sample_weight, X, dtype=X.dtype) if self.unlabeled_data is None: Z = self._generate_z(X, sample_weight=sample_weight, view=None) else: uX_ = check_array(self.unlabeled_data, dtype=[np.float64, np.float32]) uW_ = np.ones(len(uX_)) sample_weight = np.r_[sample_weight, uW_] Z = self._generate_z(np.r_[X, uX_], sample_weight=sample_weight, view=None) # generate H H = self._generate_h(Z, sample_weight=sample_weight) self.estimator_.fit(np.c_[Z, H], y, sample_weight) return self
[docs]class SemiMVBLSClassifier(ClassifierMixin, SemiMVBLS): """Semi-supervised MVBLS classifier. Construct a broad learning systerm model. Parameters ---------- reg_laplacian: float, default=1.0 Constant that multiplies the laplacian term. Defaults to 1.0. ``reg_laplacian = 0`` is equivalent to a Ridge regression. k_neighbors: int, default=5 Number of neighbors to use when constructing the affinity matrix. sigma: float, default=1.0 Kernel coefficient for RBF. unlabeled_data: {ndarray, sparse matrix} of shape (n_samples, n_features) or {dict} Unlabeled training data. n_nodes_H: int, default=1000 Controls the number of enhancement nodes. active_function: {str, ('relu', 'tanh', 'sigmoid' or 'linear')}, default='relu' Controls the active function of enhancement nodes. n_nodes_Z: int, default=10 Controls the number of feature nodes in each group. n_groups_Z: int, default=10 Controls the number of feature node groups. reg_alpha: float, default=0.1 Regularization strength; must be a positive float. Regularization improves the conditioning of the problem and reduces the variance of the estimates. Larger values specify stronger regularization. reg_lambda: float, default=0.1 Constant that multiplies the L1 term. Defaults to 1.0. ``reg_lambda = 0`` is equivalent to an ordinary least square. view_list: list, default=None List of view names. random_state: int, default=0 Controls the randomness of the estimator. """
[docs] def predict(self, X): """ Predict class labels for samples in X. Parameters ---------- X : array_like or sparse matrix, shape (n_samples, n_features) Samples. Returns ------- C : array, shape [n_samples] Predicted class label per sample. """ scores = self._decision_function(X) scores = scores.ravel() if scores.shape[1] == 1 else scores if len(scores.shape) == 1: indices = (scores > 0).astype(int) else: indices = scores.argmax(axis=1) return self.classes_[indices]
[docs] def fit(self, X, y, sample_weight=None): """Build a broad learning systerm model from the training set (X, y). Parameters ---------- X : {ndarray, sparse matrix} of shape (n_samples, n_features) or dict Training data. y : ndarray of shape (n_samples,) Target values. sample_weight : float or ndarray of shape (n_samples,), default=None Individual weights for each sample. If given a float, every sample will have the same weight. Returns ------- self : object Instance of the estimator. """ self._label_binarizer = LabelBinarizer(pos_label=1, neg_label=-1) Y = self._label_binarizer.fit_transform(y) if not self._label_binarizer.y_type_.startswith('multilabel'): _ = column_or_1d(y, warn=True) else: raise ValueError("%s doesn't support multi-label classification" % (self.__class__.__name__)) super().fit(X, Y, sample_weight=sample_weight) return self
@property def classes_(self): """ Classes labels """ return self._label_binarizer.classes_
[docs]class SemiMVBLSRegressor(MultiOutputMixin, RegressorMixin, SemiMVBLS): """Semi-supervised MVBLS regressor. Construct a broad learning systerm model. Parameters ---------- reg_laplacian: float, default=1.0 Constant that multiplies the laplacian term. Defaults to 1.0. ``reg_laplacian = 0`` is equivalent to a Ridge regression. k_neighbors: int, default=5 Number of neighbors to use when constructing the affinity matrix. sigma: float, default=1.0 Kernel coefficient for RBF. unlabeled_data: {ndarray, sparse matrix} of shape (n_samples, n_features) or {dict} Unlabeled training data. n_nodes_H: int, default=1000 Controls the number of enhancement nodes. active_function: {str, ('relu', 'tanh', 'sigmoid' or 'linear')}, default='relu' Controls the active function of enhancement nodes. n_nodes_Z: int, default=10 Controls the number of feature nodes in each group. n_groups_Z: int, default=10 Controls the number of feature node groups. reg_alpha: float, default=0.1 Regularization strength; must be a positive float. Regularization improves the conditioning of the problem and reduces the variance of the estimates. Larger values specify stronger regularization. reg_lambda: float, default=0.1 Constant that multiplies the L1 term. Defaults to 1.0. ``alpha = 0`` is equivalent to an ordinary least square. view_list: list, default=None List of view names. random_state: int, default=0 Controls the randomness of the estimator. """
[docs] def predict(self, X): """ Return the predicted value for each sample. Parameters ---------- X : array_like or sparse matrix, shape (n_samples, n_features) Samples. Returns ------- C : array, shape (n_samples,) Returns predicted values. """ return self._decision_function(X)
if __name__ == "__main__": # Check if meet Sklearn's manner # ------------------------------------ check_estimator(MVBLSRegressor()) check_estimator(MVBLSClassifier()) check_estimator(SemiMVBLSRegressor()) check_estimator(SemiMVBLSClassifier())