Skip to content

This week, I am mainly working on parallelization of enumerating \(x_{h'r't'}\): for u,v in nx.DiGraph(M).subgraph(subgraph).edges().

Target Code

for subgraph in subgraphs:
    count = 0
    sib_sum = 0
    sib_sum_h = 0
    sib_sum_t = 0
    start_uv = timeit.default_timer()
    for u,v in nx.DiGraph(M).subgraph(subgraph).edges():
        w, w1, w2 = heur(u, v, M, models, entity_to_id_map, relation_to_id_map, all_triples_set, full_graph, sample, datasetname)
        count += 1
        sib_sum += w
        sib_sum_h += w1
        sib_sum_t += w2
        end_uv = timeit.default_timer()
        print(f'have done subgraph: {id(subgraph)} in {end_uv - start_uv}')

        sib_sum = sib_sum/count
        sib_sum_h = sib_sum_h/count
        sib_sum_t = sib_sum_t/count
        model_ReliK_score.append(sib_sum)
        model_ReliK_score_h.append(sib_sum_h)
        model_ReliK_score_t.append(sib_sum_t)
        tracker += 1
        if tracker % 10 == 0: print(f'have done {tracker} of {len(subgraphs)} in {embedding}')

😭First Try: ThreadPoolExecutor

for subgraph in subgraphs:
    count = 0
    sib_sum = 0
    sib_sum_h = 0
    sib_sum_t = 0

    edges = list(nx.DiGraph(M).subgraph(subgraph).edges())

    start_uv = timeit.default_timer()
    results = None
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(lambda x: heur(x[0], x[1], M, models, entity_to_id_map, relation_to_id_map, all_triples_set, full_graph, sample, datasetname), edges))
    for w, w1, w2 in results:
        count += 1
        sib_sum += w
        sib_sum_h += w1
        sib_sum_t += w2
    end_uv = timeit.default_timer()
    print(f'have done subgraph: {id(subgraph)} in {end_uv - start_uv}')
    sib_sum = sib_sum/count
    sib_sum_h = sib_sum_h/count
    sib_sum_t = sib_sum_t/count
    model_ReliK_score.append(sib_sum)
    model_ReliK_score_h.append(sib_sum_h)
    model_ReliK_score_t.append(sib_sum_t)
    tracker += 1
    if tracker % 10 == 0: print(f'have done {tracker} of {len(subgraphs)} in {embedding}')

On Countries, for each subgraph it takes around 3000 ms, while on SmallcodeX, it takes around 120 s per subgraph.

Result😩

This approach is definitely negative optimization.

❌ABORTED.

😭Second Try: Parallelization on GPU

Then try to do parallelization using PyTorch + CUDA.

for subgraph in subgraphs:
    count = 0
    sib_sum = 0
    sib_sum_h = 0
    sib_sum_t = 0
    start_uv = timeit.default_timer()

    edges = list(nx.DiGraph(M).subgraph(subgraph).edges())

    len_edges = len(edges)

    #print(len(edges))
    num_parallel = 100

    stream = [torch.cuda.Stream() for _ in range(num_parallel)]
    #stream = torch.cuda.Stream()

    results = [None] * num_parallel

    block_size = (len(edges)) // num_parallel

    # Parallelize the heur computation using PyTorch
    #with profiler.profile(use_cuda=True, record_shapes=True) as prof:
    for i in range(num_parallel):
        with torch.cuda.stream(stream[i]):
            edge_block = edges[i * block_size:(i + 1) * block_size] if i < num_parallel - 1 else edges[i * block_size:]
            results[i] = torch.stack([torch.tensor(heur(u, v, M, models, entity_to_id_map, relation_to_id_map, all_triples_set, full_graph, sample, datasetname), device='cuda') for u, v in edge_block])

    # Wait for the computation to finish
    torch.cuda.synchronize()

    #prof.export_chrome_trace("trace.json")

    for i in range(num_parallel):
        sib_sum += torch.sum(results[i][:, 0])
        sib_sum_h += torch.sum(results[i][:, 1])
        sib_sum_t += torch.sum(results[i][:, 2])
    count = len_edges

    end_uv = timeit.default_timer()
    print(f'have done subgraph: {id(subgraph)} in {end_uv - start_uv}')

    sib_sum = sib_sum/count
    sib_sum_h = sib_sum_h/count
    sib_sum_t = sib_sum_t/count
    model_ReliK_score.append(sib_sum.item())
    model_ReliK_score_h.append(sib_sum_h.item())
    model_ReliK_score_t.append(sib_sum_t.item())
    tracker += 1
    if tracker % 10 == 0: print(f'have done subgraph: {tracker} of {len(subgraphs)} in {embedding}')

Result😢

However, it seems that there is no speed up in the experiment, for some different num_parallel values. I suspect that it is executed sequentially. Maybe I have not thoroughly understood the code.

😃Third Try: multiprocessing

Idea is straight forward.

I have num_processors processors. Then I partition the edges list(nx.DiGraph(M).subgraph(subgraph).edges()) into num_processors parts.

Each processor is responsible for a corresponding part.

for subgraph in subgraphs:
    count = 0
    sib_sum = 0
    sib_sum_h = 0
    sib_sum_t = 0
    start_uv = timeit.default_timer()

    edges = list(nx.DiGraph(M).subgraph(subgraph).edges())
    count = len(edges)
    num_processors = 10

    chunk_size = len(edges) // num_processors

    edge_chunks = [edges[i * chunk_size:(i + 1) * chunk_size] if i < num_processors - 1 else edges[i * chunk_size:] for i in range(num_processors)]


    manager = mp.Manager()
    results = manager.list()

    processes = []
    for i,edge_chunk in enumerate(edge_chunks):
        p = mp.Process(target=process_edges_partition, args=(edge_chunk, heur, M, models, entity_to_id_map, relation_to_id_map, all_triples_set, full_graph, sample, datasetname, results))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

    sib_sum = sum([r[0] for r in results])
    sib_sum_h = sum([r[1] for r in results])
    sib_sum_t = sum([r[2] for r in results])
    #print(sib_sum, sib_sum_h, sib_sum_t)

    end_uv = timeit.default_timer()
    print(f'have done subgraph: {id(subgraph)} in {end_uv - start_uv}')

    sib_sum = sib_sum/count
    sib_sum_h = sib_sum_h/count
    sib_sum_t = sib_sum_t/count
    model_ReliK_score.append(sib_sum)
    model_ReliK_score_h.append(sib_sum_h)
    model_ReliK_score_t.append(sib_sum_t)
    tracker += 1
    if tracker % 10 == 0: print(f'have done {tracker} of {len(subgraphs)} in {embedding}')

I need to write the method process_edges_partition outside to process each part.

def process_edges_partition(edge_partition, heur, M, models, entity_to_id_map, relation_to_id_map, all_triples_set, full_graph, sample, datasetname, results):
    #count = 0
    sib_sum = 0
    sib_sum_h = 0
    sib_sum_t = 0

    # Process each edge in the partition
    for u, v in edge_partition:
        w, w1, w2 = heur(u, v, M, models, entity_to_id_map, relation_to_id_map, all_triples_set, full_graph, sample, datasetname)
        #count += 1
        sib_sum += w
        sib_sum_h += w1
        sib_sum_t += w2

    results.append((sib_sum, sib_sum_h, sib_sum_t))

Experiment🤩

I ran experiment on dataset Countries and CodexSmall respectively.

We want to speed up the process for calculating ReliK scores.

Setting

Set num_processors = 10;

Using TransE embeddings.

Dataset - Countries

python experiment_controller.py -d Countries -e TransE -t ReliK

For raw implementation, it takes around 470 seconds.

For multiprocessing optimization, it takes around 123 seconds.

Speedup: Around 3.8x

Dataset - CodexSmall

python experiment_controller.py -d CodexSmall -e TransE -t ReliK

For raw implementation, it takes around 12900 seconds = 215 minutes ≈ 3.58 hours.

For multiprocessing optimization, it takes around 1610 second ≈26.8 minutes📖👍

Speedup: Around 8.0x

Staying up debugging until 2:30 a.m. finally paid off.😎