from collections import UserString import logging from typing import Dict, Optional, Set import click from config import CONFIG from indexer import FollowIndexer import indexer logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) @click.command @click.option( "--ch-host", ) @click.option( "--ch-port", type=int, ) @click.option( "--ch-user", ) @click.option( "--ch-pass", ) def main( ch_host: Optional[str], ch_port: Optional[int], ch_user: Optional[str], ch_pass: Optional[str], ): logger.info("Building follow graph...") indexer = FollowIndexer( clickhouse_host=ch_host or CONFIG.clickhouse_host, clickhouse_port=ch_port or CONFIG.clickhouse_port, clickhouse_user=ch_user or CONFIG.clickhouse_user, clickhouse_pass=ch_pass or CONFIG.clickhouse_pass, batch_size=1000, ) graph: Dict[str, Set[str]] = {} def build_graph(did: str, subject: str): if did not in graph: graph[did] = set() graph[did].add(subject) indexer.stream_follows(build_graph) prox_map = {} for did in graph: first = graph.get(did, set()) second: Set[str] = set() for subject in first: second.update(graph.get(subject, set())) prox_map[did] = { "hop1": first, "hop2": second - first - {did}, } import pickle with open("prox_map.pkl", "wb") as f: pickle.dump(prox_map, f) logger.info( f"Finished building proximity map, saved to prox_map.pkl. {len(prox_map):,} users in map." ) if __name__ == "__main__": main()