Source code for outlierdenstream.outlierdenstream

# -*- coding: utf-8 -*-

"""Main module."""

simulation = True

import sys
import copy
import math
import time
import numpy as np
import pandas as pd

from sklearn.cluster import DBSCAN

# from sample import Sample
# from cluster import Cluster
# from microCluster import MicroCluster
    
[docs]class Sample(): """ Each record of the stream has to be declared as a `Sample` class. :param value: the `values` of the current sample. :param timestamp: the `timestamp` of current sample. """ def __init__(self, value, timestamp: int): self.value = value self.timestamp = 0 self.realTimestamp = timestamp
[docs] def getValue(self): """ :return: :attr:`value` """ return self.value
[docs] def setTimestamp(self, timestamp: int): """ :set: :attr:`timestamp` """ self.timestamp = timestamp
def setRealTimestamp(self, timestamp): self.realTimestamp = timestamp
[docs] def setMicroClusterNumber(self, microClusterNumber: int): """ Assign to each sample the microClusterNumber in which was merged. :set: :attr:`microClusterNumber` """ self.microClusterNumber = microClusterNumber
def computeReductionFactor(lamb, steps): return math.pow(2, -lamb * steps)
[docs]class MicroCluster(): """ Micro-Cluster class :param currenttimestamp: the `timestamp` in which the cluster is created. :param lamb: the `lamb` parameter used as decay factor. :param clusterNumber: the `number` of the micro-cluster. """ def __init__(self, currenttimestamp, lamb, clusterNumber): self.dimensions = None self.creationTimeStamp = currenttimestamp self.lamb = lamb self.reductionFactor = computeReductionFactor(self.lamb, 1) self.clusterNumber = clusterNumber
[docs] def insertSample(self, sample, timestamp=0): """ Adds a sample to a micro-cluster. Updates the variables of the micro-cluster with :meth:`updateRealTimeWeight` and :meth:`updateRealTimeLSandSS` :param sample: the `sample` object :param timestamp: deprecated, not needed anymore. Will be removed in the next versions. """ if self.dimensions == None: if isinstance(sample.value, type(list)): self.dimensions = len(sample.value) elif isinstance(sample.value, float): self.dimensions = 1 elif isinstance(sample.value, np.ndarray): self.dimensions = len(sample.value) else: print ('Received {}'.format(sample.value)) print ('Type {}'.format(type(sample.value))) sys.exit('Error instance sample.value type') ### incremental parameteres ### self.N = 0 self.weight = 0 self.LS = np.zeros(self.dimensions) self.SS = np.zeros(self.dimensions) self.center = np.zeros(self.dimensions) self.radius = 0 self.N += 1 self.updateRealTimeWeight() self.updateRealTimeLSandSS(sample)
[docs] def updateRealTimeWeight(self): """ Updates the Weight of the micro-cluster by the fading factor and increases it by 1. """ self.weight *= self.reductionFactor self.weight += 1
[docs] def updateRealTimeLSandSS(self, sample): """ Updates the `Weighted Linear Sum` (WLS), the `Weighted Squared Sum` (WSS), the `center` and the `radius` of the micro-cluster when a new sample is merged. :param sample: the `sample` to merge into the micro-cluster. """ sample = np.array(sample.value) self.LS = np.multiply(self.LS, self.reductionFactor) self.SS = np.multiply(self.SS, self.reductionFactor) self.LS = self.LS + sample self.SS = self.SS + np.power(sample, 2) self.center = np.divide(self.LS, float(self.weight)) LSd = np.power(self.center, 2) SSd = np.divide(self.SS, float(self.weight)) maxRad = np.nanmax(np.sqrt(SSd.astype(float)-LSd.astype(float))) # maxRad = np.nanmax(np.lib.scimath.sqrt(SSd-LSd)) self.radius = maxRad
[docs] def noNewSamples(self): """ Updates the `Weighted Linear Sum` (WLS), the `Weighted Squared Sum` (WSS) and the weight of the micro-cluster when no new samples are merged. """ self.LS = np.multiply(self.LS, self.reductionFactor) self.SS = np.multiply(self.SS, self.reductionFactor) # self.weight = np.multiply(self.weight, self.reductionFactor) self.weight *= self.reductionFactor
[docs] def getCenter(self): """ :return: the `center` of the micro-cluster. """ return self.center
[docs] def getRadius(self): """ :return: the `radius` of the micro-cluster. """ return self.radius
[docs]class Cluster(): """ Cluster class. Contains the list of the micro-cluster and the number of micro-clusters. """ def __init__(self): self.clusters = [] self.N = 0
[docs] def insert(self, mc): """ Inserts a micro-cluster into the cluster :param mc: the `micro-cluster` to be added to the list of the micro-clusters that make up the cluster. Increases the counter of micro-clusters into the cluster by 1. """ self.clusters.append(mc) self.N += 1
def show(self): print ('Number of Clusters: ' + str(len(self.clusters))) print ('-----') for cluster in self.clusters: print ('Cluster #'+str(self.clusters.index(cluster))) print ('Samples: '+str(cluster.N)) print ('Weight: '+str(cluster.weight)) print ('Creation Time: '+str(cluster.creationTimeStamp)) print ('LastEdit Time: '+str(cluster.lastEditTimeStamp))
[docs]class OutlierDenStream(): """ OutlierDenStream class. :param lamb: the `lambda` parameter - fading factor :param epsilon: the `epsilon` parameter :param beta: the `beta` parameter :param mu: the `mu` parameter :param numberInitialSamples: samples to use as initial buffer :param startgingBuffer: initial `buffer` on which apply DBScan or use it as unique class. :param tp: frequency at which to apply the pruning strategy and remove old micro-clusters. """ def __init__(self, lamb, epsilon=1, minPts=1, beta=1, mu=1,\ numberInitialSamples=None, startingBuffer=None, tp=60, radiusFactor = 1): self.lamb = lamb self.minPts = minPts self.beta = beta self.numberInitialSamples = numberInitialSamples self.buffer = startingBuffer self.tp = tp self.radiusFactor = radiusFactor self.exportVariables = False ### Check input type: epsilon ### if isinstance(epsilon, int) or isinstance(epsilon, float): self.epsilon = epsilon elif isinstance(epsilon, str) or isinstance(epsilon, unicode): self.epsilon = epsilon # if epsilon == 'auto': # self.epsilon = 'auto' # if epsilon == 'max': # self.epsilon = 'max' # if epsilon == 'mean': # self.epsilon = 'mean' # if epsilon == 'median': # self.epsilon = 'median' else: sys.exit('Error in parameter: epsilon') ### Check input type: mu ### if isinstance(mu, int) or isinstance(mu, float): self.mu = mu elif isinstance(mu, str) or isinstance(mu, unicode): if mu == 'auto': self.mu = 'auto' else: sys.exit('Error in parameter: mu') ### Running parameters ### self.inizialized = False ### Real timestamp or steps ### if simulation: self.currentTimestamp = 0 else: self.currentTimestamp = time.time()
[docs] def resetLearningImpl(self): """ Initializes two empty `Cluster` as a p-micro-cluter list and o-micro-cluster list. If `mu` is `auto` computes the value """ if simulation: self.currentTimestamp = 0 else: self.currentTimestamp = time.time() self.inizialized = False self.pMicroCluster = Cluster() self.oMicroCluster = Cluster() if isinstance(self.mu, str): if self.mu == 'auto': self.mu = (1/(1-math.pow(2, -self.lamb))) if isinstance(self.tp, str): if self.tp == 'auto': self.tp = round((1/self.lamb)*np.log(self.beta*self.mu/(self.beta*self.mu-1))) self.th_beta_mu = self.beta * self.mu
# def initialDBScanSciLearn(self): # db = DBSCAN(eps=8, min_samples=self.minPts, algorithm='brute').fit(self.buffer) # clusters = db.labels_ # self.buffer['clusters'] = clusters # clusterNumber = np.unique(clusters) # for clusterId in clusterNumber: # if (clusterId != -1): # cl = self.buffer[self.buffer['clusters'] == clusterId] # cl = cl.drop('clusters', axis=1) # sample = Sample(cl.iloc[0].tolist()) # mc = MicroCluster(sample, self.currentTimestamp, self.lamb) # for sampleNumber in range(len(cl[1:])): # sample = Sample(cl.iloc[sampleNumber].tolist()) # mc.insertSample(sample, self.currentTimestamp) # self.pMicroCluster.insert(mc)
[docs] def initWithoutDBScan(self): """ Produces a micro-cluster merging all the samples passed into the initial buffer If `epsilon` is auto computes `epsilon` as the maxium radius obtained from these initial samples. """ sample = Sample(self.buffer[0], 0) sample.setTimestamp(1) mc = MicroCluster(1, self.lamb, self.pMicroCluster.N + 1) maxEpsilon = 0 epsilons = [] for sampleNumber in range(0, len(self.buffer)): sample = Sample(self.buffer[sampleNumber], sampleNumber) sample.setTimestamp(sampleNumber+1) mc.insertSample(sample, self.currentTimestamp) epsilons.append(mc.radius) if mc.radius > maxEpsilon: maxEpsilon = mc.radius self.pMicroCluster.insert(mc) if isinstance(self.epsilon, str): samplesToSkip = 10 maxEps = np.max(epsilons[samplesToSkip:]) meanEps = np.mean(epsilons[samplesToSkip:]) medianEps = np.median(epsilons[samplesToSkip:]) self.epsilons = epsilons if self.epsilon == 'auto' or self.epsilon == 'max': self.epsilon = maxEpsilon if self.epsilon == 'mean': self.epsilon = meanEps if self.epsilon == 'median': self.epsilon = medianEps if self.epsilon == 'radiusfactor': self.epsilon = self.radiusFactor * np.median(epsilons)
[docs] def initDBScan(self): """ Init with DBSCAN """ db=DBSCAN(eps=0.05, min_samples=2) db.fit(self.buffer) labels = pd.DataFrame(db.labels_+1) for x in range(1, labels[0].max()+1): samples = self.buffer[labels[labels[0]==x].index] sample = Sample(samples[0], 0) sample.setTimestamp(1) mc = MicroCluster(1, self.lamb, self.pMicroCluster.N + 1) for sampleNumber in range(0, len(samples)): sample = Sample(samples[sampleNumber], sampleNumber) sample.setTimestamp(sampleNumber+1) mc.insertSample(sample, self.currentTimestamp) self.pMicroCluster.insert(mc)
def nearestCluster (self, sample, timestamp, kind): minDist = 0.0 minCluster = None if kind == 'cluster': clusterList = self.pMicroCluster.clusters elif kind == 'outlier': clusterList = self.oMicroCluster.clusters else: sys.exit('Error in choosing kind nearestCluster type: if pMicroCluster or oMicroCluster') for cluster in clusterList: if (minCluster == None): minCluster = cluster minDist = np.linalg.norm(sample.value - cluster.center) dist = np.linalg.norm(sample.value - cluster.center) dist -= cluster.radius if (dist < minDist): minDist = dist minCluster = cluster if kind == 'cluster': self.dist_nearest_pmc = minDist self.dist_nearest_omc = 0 if kind == 'outlier': self.dist_nearest_omc = minDist self.dist_nearest_pmc = 0 return minCluster def updateAll(self, mc): for cluster in self.pMicroCluster.clusters: if (cluster != mc): cluster.noNewSamples() for cluster in self.oMicroCluster.clusters: if (cluster != mc): cluster.noNewSamples()
[docs] def runInitialization(self): """ Initializes the variables of the main algorithm with the methods :meth:`resetLearningImpl` and :meth:`initWithoutDBScan` """ self.resetLearningImpl() self.initWithoutDBScan() self.inizialized = True
[docs] def runDBSCanInitialization(self): """ Initializes the variables of the main algorithm with the methods :meth:`resetLearningImpl` and :meth:`initDBScan` """ self.resetLearningImpl() self.initDBScan() self.inizialized = True
[docs] def runOnNewSample(self, sample): """ Performs the basic DenStream procedure for merging new samples. * Try to merge the sample to the closest core-micro-cluster (or) * Try to merge the sample to the closest outlier-micro-cluster (or) * Generate new outlier-micro-cluster by the sample :param sample: the new available `sample` in the stream :return: ``False`` if the sample is merged to an existing core-micro-cluster otherwise ``True`` meaning "anomalous" sample. """ if simulation: self.currentTimestamp += 1 sample.setTimestamp(self.currentTimestamp) else: self.currentTimestamp = time.time() ### INITIALIZATION PHASE ### if not self.inizialized: self.buffer.append(sample) if (len(self.buffer) >= self.numberInitialSamples): self.resetLearningImpl() self.initialDBScanSciLearn() self.inizialized = True ### MERGING PHASE ### else: merged = False TrueOutlier = True returnOutlier = True if len(self.pMicroCluster.clusters) != 0: closestMicroCluster = self.nearestCluster(sample, self.currentTimestamp, kind='cluster') backupClosestCluster = copy.deepcopy(closestMicroCluster) backupClosestCluster.insertSample(sample, self.currentTimestamp) if (backupClosestCluster.radius <= self.epsilon): # closestMicroCluster.insertSample(sample, self.currentTimestamp) # sample.setMicroClusterNumber(closestMicroCluster.clusterNumber) # merged = True # TrueOutlier = False # returnOutlier = False # self.updateAll(closestMicroCluster) self.pMicroCluster.clusters.pop(self.pMicroCluster.clusters.index(closestMicroCluster)) self.pMicroCluster.insert(backupClosestCluster) sample.setMicroClusterNumber(backupClosestCluster.clusterNumber) merged=True TrueOutlier=False returnOutlier=False self.updateAll(backupClosestCluster) if not merged and len(self.oMicroCluster.clusters) != 0: closestMicroCluster = self.nearestCluster(sample, self.currentTimestamp, kind='outlier') backupClosestCluster = copy.deepcopy(closestMicroCluster) backupClosestCluster.insertSample(sample, self.currentTimestamp) if (backupClosestCluster.radius <= self.epsilon): # closestMicroCluster.insertSample(sample, self.currentTimestamp) # merged = True # sample.setMicroClusterNumber(closestMicroCluster.clusterNumber) # if (closestMicroCluster.weight > self.beta * self.mu): # self.oMicroCluster.clusters.pop(self.oMicroCluster.clusters.index(closestMicroCluster)) # closestMicroCluster.clusterNumber = self.pMicroCluster.N + 1 # self.pMicroCluster.insert(closestMicroCluster) # self.updateAll(closestMicroCluster) merged = True self.oMicroCluster.clusters.pop(self.oMicroCluster.clusters.index(closestMicroCluster)) # if (backupClosestCluster.weight > self.beta * self.mu): if (backupClosestCluster.weight > self.th_beta_mu): backupClosestCluster.clusterNumber = self.pMicroCluster.N +1 self.pMicroCluster.insert(backupClosestCluster) else: self.oMicroCluster.insert(backupClosestCluster) sample.setMicroClusterNumber(backupClosestCluster.clusterNumber) self.updateAll(backupClosestCluster) if not merged: newOutlierMicroCluster = MicroCluster(self.currentTimestamp, self.lamb, 0) newOutlierMicroCluster.insertSample(sample, self.currentTimestamp) # for clusterTest in self.pMicroCluster.clusters: # if np.linalg.norm(clusterTest.center-newOutlierMicroCluster.center) < 2 * self.epsilon: # TrueOutlier = False if TrueOutlier: newOutlierMicroCluster.clusterNumber = self.oMicroCluster.N + 1 self.oMicroCluster.insert(newOutlierMicroCluster) sample.setMicroClusterNumber(newOutlierMicroCluster.clusterNumber) self.updateAll(newOutlierMicroCluster) else: newOutlierMicroCluster.clusterNumber = self.pMicroCluster.N + 1 self.pMicroCluster.insert(newOutlierMicroCluster) sample.setMicroClusterNumber(newOutlierMicroCluster.clusterNumber) self.updateAll(newOutlierMicroCluster) returnOutlier = False if self.currentTimestamp % self.tp == 0: for cluster in self.pMicroCluster.clusters: # if cluster.weight < self.beta * self.mu: if cluster.weight < self.th_beta_mu: self.pMicroCluster.clusters.pop(self.pMicroCluster.clusters.index(cluster)) for cluster in self.oMicroCluster.clusters: creationTimestamp = cluster.creationTimeStamp xs1 = math.pow(2, -self.lamb * (self.currentTimestamp - creationTimestamp + self.tp)) - 1 xs2 = math.pow(2, -self.lamb * self.tp) - 1 xsi = xs1 / xs2 if cluster.weight < xsi: self.oMicroCluster.clusters.pop(self.oMicroCluster.clusters.index(cluster)) if self.exportVariables: record = { 'pMicroClusters': len(self.pMicroCluster.clusters), 'oMicroClusters': len(self.oMicroCluster.clusters), 'pmc': copy.deepcopy(self.pMicroCluster.clusters), 'omc': copy.deepcopy(self.oMicroCluster.clusters), 'dist_nearest_pmc': self.dist_nearest_pmc, 'dist_nearest_omc': self.dist_nearest_omc, 'result': returnOutlier, 'sample': sample } return record else: return returnOutlier