from nachos.splitters.abstract_splitter import AbstractSplitter
from nachos.similarity_functions import build_similarity_functions as build_sims
from nachos.constraints import build_constraints
from nachos.data.Data import Dataset, InvertedIndex, Split, FactoredSplit
from nachos.similarity_functions.SimilarityFunctions import SimilarityFunctions
from nachos.constraints.Constraints import Constraints
from nachos.splitters import register
from typing import Optional, List, Tuple, Generator
from tqdm import tqdm
import random
[docs]@register("min_node_cut")
class MinNodeCut(AbstractSplitter):
[docs] @classmethod
def build(cls, conf: dict):
return cls(
build_sims(conf),
build_constraints(conf),
max_iter=conf['max_iter'],
seed=conf['seed'],
)
[docs] def __init__(self,
sim_fn: SimilarityFunctions,
constraints: Constraints,
max_iter: int = 200,
seed: int = 0,
):
super().__init__(sim_fn, constraints)
self.max_iter = max_iter
self.seed = seed
[docs] def __call__(self, d: Dataset) -> Tuple[FactoredSplit, List[float]]:
'''
Summary:
Given a dataset, split according to a search over minimum-st
node cuts, picking the s-source and t-target vetrices that
minimize the constraint cost function of the split.
Inputs
---------------------
:param d: The dataset to split
:type d: Dataset
Returns
--------------------
:return: The dataset split and scores
:rtype: Tuple[FactoredSplit, List[float]]
'''
random.seed(self.seed)
d.set_random_seed(self.seed)
# Set the random seed (in a slightly paranoid way, i.e., everywhere)
random.seed(self.seed)
d.set_random_seed(self.seed)
# First verify that the graph is not complete.
# This could take some time.
if d.graph is None:
d.make_graph(self.sim_fn)
if d.check_complete():
raise ValueError("Random Splitting cannot work on a complete graph")
# Initialize some values
best_split = d.draw_random_node_cut()
best_score = self.score(d, best_split)
print(f"Iter 0: Best Score: {best_score:0.4f}")
scores = [best_score]
# Try up to max_iter number of st cuts
for iter_num in tqdm(range(self.max_iter)):
split = d.draw_random_node_cut()
score = self.score(d, split)
if score < best_score:
best_score = score
print(f"Iter {iter_num}: Best Score: {best_score:0.4f}")
scores.append(best_score)
best_split = split
return (best_split, scores)