Source code for nachos.splitters.min_node_cut

from nachos.splitters.abstract_splitter import AbstractSplitter
from nachos.similarity_functions import build_similarity_functions as build_sims
from nachos.constraints import build_constraints
from import Dataset, InvertedIndex, Split, FactoredSplit
from nachos.similarity_functions.SimilarityFunctions import SimilarityFunctions
from nachos.constraints.Constraints import Constraints
from nachos.splitters import register

from typing import Optional, List, Tuple, Generator
from tqdm import tqdm
import random

[docs]@register("min_node_cut") class MinNodeCut(AbstractSplitter):
[docs] @classmethod def build(cls, conf: dict): return cls( build_sims(conf), build_constraints(conf), max_iter=conf['max_iter'], seed=conf['seed'], )
[docs] def __init__(self, sim_fn: SimilarityFunctions, constraints: Constraints, max_iter: int = 200, seed: int = 0, ): super().__init__(sim_fn, constraints) self.max_iter = max_iter self.seed = seed
[docs] def __call__(self, d: Dataset) -> Tuple[FactoredSplit, List[float]]: ''' Summary: Given a dataset, split according to a search over minimum-st node cuts, picking the s-source and t-target vetrices that minimize the constraint cost function of the split. Inputs --------------------- :param d: The dataset to split :type d: Dataset Returns -------------------- :return: The dataset split and scores :rtype: Tuple[FactoredSplit, List[float]] ''' random.seed(self.seed) d.set_random_seed(self.seed) # Set the random seed (in a slightly paranoid way, i.e., everywhere) random.seed(self.seed) d.set_random_seed(self.seed) # First verify that the graph is not complete. # This could take some time. if d.graph is None: d.make_graph(self.sim_fn) if d.check_complete(): raise ValueError("Random Splitting cannot work on a complete graph") # Initialize some values best_split = d.draw_random_node_cut() best_score = self.score(d, best_split) print(f"Iter 0: Best Score: {best_score:0.4f}") scores = [best_score] # Try up to max_iter number of st cuts for iter_num in tqdm(range(self.max_iter)): split = d.draw_random_node_cut() score = self.score(d, split) if score < best_score: best_score = score print(f"Iter {iter_num}: Best Score: {best_score:0.4f}") scores.append(best_score) best_split = split return (best_split, scores)