stream = [torch.cuda.Stream() for _ inrange(num_parallel)] #stream = torch.cuda.Stream()
results = [None] * num_parallel
block_size = (len(edges)) // num_parallel
# Parallelize the heur computation using PyTorch #with profiler.profile(use_cuda=True, record_shapes=True) as prof: for i inrange(num_parallel): with torch.cuda.stream(stream[i]): edge_block = edges[i * block_size:(i + 1) * block_size] if i < num_parallel - 1else edges[i * block_size:] results[i] = torch.stack([torch.tensor(heur(u, v, M, models, entity_to_id_map, relation_to_id_map, all_triples_set, full_graph, sample, datasetname), device='cuda') for u, v in edge_block])
# Wait for the computation to finish torch.cuda.synchronize()
#prof.export_chrome_trace("trace.json") for i inrange(num_parallel): sib_sum += torch.sum(results[i][:, 0]) sib_sum_h += torch.sum(results[i][:, 1]) sib_sum_t += torch.sum(results[i][:, 2]) count = len_edges
sib_sum = sib_sum/count sib_sum_h = sib_sum_h/count sib_sum_t = sib_sum_t/count model_ReliK_score.append(sib_sum.item()) model_ReliK_score_h.append(sib_sum_h.item()) model_ReliK_score_t.append(sib_sum_t.item()) tracker += 1 if tracker % 10 == 0: print(f'have done subgraph: {tracker} of {len(subgraphs)} in {embedding}')
Result😢
However, it seems that there is no speed up in the experiment, for some different num_parallel values. I suspect that it is executed sequentially. Maybe I have not thoroughly understood the code.
😃Third Try: multiprocessing
Idea is straight forward.
I have num_processors processors. Then I partition the edges list(nx.DiGraph(M).subgraph(subgraph).edges()) into num_processors parts.
Each processor is responsible for a corresponding part.
edge_chunks = [edges[i * chunk_size:(i + 1) * chunk_size] if i < num_processors - 1else edges[i * chunk_size:] for i inrange(num_processors)]
manager = mp.Manager() results = manager.list()
processes = [] for i,edge_chunk inenumerate(edge_chunks): p = mp.Process(target=process_edges_partition, args=(edge_chunk, heur, M, models, entity_to_id_map, relation_to_id_map, all_triples_set, full_graph, sample, datasetname, results)) p.start() processes.append(p) for p in processes: p.join() sib_sum = sum([r[0] for r in results]) sib_sum_h = sum([r[1] for r in results]) sib_sum_t = sum([r[2] for r in results]) #print(sib_sum, sib_sum_h, sib_sum_t)