Skip to content

Paths

GetPaths

Get the paths from a list of sources to a list of sinks

Source code in wisp/paths.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
class GetPaths:
    """Get the paths from a list of sources to a list of sinks"""

    def __init__(
        self,
        corr_matrix: npt.NDArray[np.floating],
        srcs: Collection[int],
        snks: Collection[int],
        context: MutableMapping[str, Any],
        residue_keys: npt.ArrayLike,
    ):
        """Identify paths that link the source and the sink and order them by their
        lengths.

        Args:
            corr_matrix: a np.array, the calculated correlation matrix
            srcs: a list of ints, the indices of the sources for path finding
            snks: a list of ints, the indices of the sinks for path finding
            context: the user-specified command-line parameters, a UserInput object
            residue_keys: a list containing string representations of each residue
        """
        # populate graph nodes and weighted edges
        G = nx.Graph(incoming_graph_data=corr_matrix)

        # first calculate length of shortest path between any source and sink
        logger.info("Calculating paths...")
        logger.info(
            "Calculating the shortest path between any of the specified sources and any of the specified sinks...",
        )
        shortest_length, shortest_path = self.get_shortest_path_length(
            corr_matrix, srcs, snks, G
        )
        logger.info(
            f"The shortest path has length {str(shortest_length)}",
        )

        path = [shortest_length]
        path.extend(shortest_path)
        pths = [
            path
        ]  # need to create this initial path in case only one path is requrested

        cutoff = shortest_length

        # Check for comb explosion
        log_n_paths = get_log_n_paths(G, cutoff)
        if log_n_paths > np.log(context["n_paths_max"]):
            logger.error(
                f"Estimated number of paths is greater than {context['n_paths_max']}"
            )
            logger.error("Please increase n_paths_max to proceed.")
            logger.error("Terminating calculation.")
            sys.exit(1)

        cutoff_yields_max_num_paths_below_target = 0
        cutoff_yields_min_num_paths_above_target = 1000000.0

        # first step, keep incrementing a little until you have more than the desired number of paths
        logger.info(
            "Identifying the cutoff required to produce "
            + str(context["n_paths"])
            + " paths...",
        )
        num_paths = 1
        while num_paths < context["n_paths"]:
            logger.info(f"Testing the cutoff {str(cutoff)}...")
            cutoff_in_array = np.array([cutoff], np.float64)
            pths = self.remove_redundant_paths(
                self.get_paths_between_multiple_endpoints(
                    cutoff_in_array, corr_matrix, srcs, snks, G, context
                )
            )
            num_paths = len(pths)

            logger.info(
                f"The cutoff {str(cutoff)} produces {num_paths} paths...",
            )

            if (
                num_paths < context["n_paths"]
                and cutoff > cutoff_yields_max_num_paths_below_target
            ):
                cutoff_yields_max_num_paths_below_target = cutoff
            if (
                num_paths > context["n_paths"]
                and cutoff < cutoff_yields_min_num_paths_above_target
            ):
                cutoff_yields_min_num_paths_above_target = cutoff

            # Original code adds .1 each time... but this may be to fast for
            # some systems and very slow for others... lets try increasing by
            # a percentage of the minimum path length instead... ideally this
            # could be an input parameter in the future.
            cutoff = cutoff + shortest_length * 0.1

        pths = self.remove_redundant_paths(pths)

        pths.sort()  # sort the paths by length

        if num_paths != context["n_paths"]:  # so further refinement is needed
            pths = pths[: context["n_paths"]]
            logger.info(
                "Keeping the first " + str(context["n_paths"]) + " of these paths...",
            )

        self.paths_description = ""

        self.paths_description = (
            self.paths_description + "\n# Output identified paths" + "\n"
        )
        index = 1

        for path in pths:
            self.paths_description = (
                f"{self.paths_description}Path {str(index)}:" + "\n"
            )
            self.paths_description = (
                f"{self.paths_description}   Length: {str(path[0])}" + "\n"
            )
            self.paths_description = (
                f"{self.paths_description}   Nodes: "
                + " - ".join([residue_keys[item] for item in path[1:]])
                + "\n"
            )
            index = index + 1

        if context["write_formatted_paths"]:
            formatted_paths_path = os.path.join(
                context["output_dir"], "simply_formatted_paths.txt"
            )
            with open(formatted_paths_path, "w", encoding="utf-8") as f:
                f.writelines(" ".join([str(item) for item in path]) + "\n")

        self.paths = pths

    def remove_redundant_paths(self, pths):
        """Removes redundant paths

        Args:
            pths: a list of paths

        Returns:
            A list of paths with the redundant ones eliminated.
        """

        if len(pths) == 1:
            # no reason to check if there's only one
            return pths

        for indx1 in range(len(pths) - 1):
            path1 = pths[indx1]
            if path1 is not None:
                for indx2 in range(indx1 + 1, len(pths)):
                    path2 = pths[indx2]
                    if path2 is not None and len(path1) == len(
                        path2
                    ):  # paths are the same length
                        pth1 = copy.deepcopy(path1[1:])
                        pth2 = copy.deepcopy(path2[1:])

                        if pth1[0] < pth1[-1]:
                            pth1.reverse()
                        if pth2[0] < pth2[-1]:
                            pth2.reverse()

                        if pth1 == pth2:
                            pths[indx2] = None

        while None in pths:
            pths.remove(None)

        return pths

    def get_shortest_path_length(
        self, corr_matrix, srcs, snks, G
    ):  # where sources and sinks are lists
        """Identify the length of the shortest path connecting any of the sources and any of the sinks

        Args:
            corr_matrix: a np.array, the calculated correlation matrix
            srcs: a list of ints, the indices of the sources for path finding
            snks: a list of ints, the indices of the sinks for path finding
            G: a nx.Graph object describing the connectivity of the different nodes

        Returns:
            a float, the length of the shortest path, and a list of ints corresponding
            to the nodes of the shortest path.
        """

        shortest_length = 99999999.999
        shortest_path = []

        for source in srcs:
            for sink in snks:
                if source != sink:  # important to avoid this situation
                    short_path = nx.dijkstra_path(G, source, sink, weight="weight")
                    length = self.get_length_of_path(short_path, corr_matrix)
                    if length < shortest_length:
                        shortest_length = length
                        shortest_path = short_path
        return shortest_length, shortest_path

    def get_length_of_path(self, path, corr_matrix):
        """Calculate the length of a path

        Args:
            path: a list of ints, the indices of the path
            corr_matrix: a np.array, the calculated correlation matrix

        Returns:
            a float, the length of the path
        """

        length = 0.0
        for t in range(len(path) - 1):
            length = length + corr_matrix[path[t], path[t + 1]]
        return length

    def get_paths_between_multiple_endpoints(
        self, cutoff, corr_matrix, srcs, snks, G, context
    ):  # where sources and sinks are lists
        """Get paths between sinks and sources

        Args:
            cutoff: a np.array containing a single float, the cutoffspecifying the maximum permissible path length
            corr_matrix: a np.array, the calculated correlation matrix
            srcs: a list of ints, the indices of the sources for path finding
            snks: a list of ints, the indices of the sinks for path finding
            G: a nx.Graph object describing the connectivity of the different nodes
            context: the user-specified command-line parameters, a UserInput object

        Returns:
            a list of paths, where each path is represented by a list. The first item in each path is the length
            of the path (float). The remaining items are the indices of the nodes in the path (int).
        """

        pths = []
        for source in srcs:
            for sink in snks:
                if source != sink:  # avoid this situation
                    pths.extend(
                        self.get_paths_fixed_endpoints(
                            cutoff, corr_matrix, source, sink, G, context
                        )
                    )
        return pths

    def get_paths_fixed_endpoints(self, cutoff, corr_matrix, source, sink, G, context):
        """Get paths between a single sink and a single source

        Args:
            cutoff: a np.array containing a single float, the cutoff specifying the
                maximum permissible path length
            corr_matrix: a np.array, the calculated correlation matrix
            source: the index of the source for path finding
            sink: the index of the sink for path finding
            G: a nx.Graph object describing the connectivity of the different nodes
            context: the user-specified command-line parameters, a UserInput object

        Returns:
            a list of paths, where each path is represented by a list. The first item
            in each path is the length of the path (float). The remaining items are
            the indices of the nodes in the path (int).
        """

        if source == sink:
            return []

        source_lengths, source_paths = nx.single_source_dijkstra(
            G, source, target=None, cutoff=None, weight="weight"
        )
        sink_lengths, sink_paths = nx.single_source_dijkstra(
            G, sink, target=None, cutoff=None, weight="weight"
        )

        so_l = [source_lengths[key] for key in source_lengths.keys()]
        so_p = [source_paths[key] for key in source_paths.keys()]
        si_l = [sink_lengths[key] for key in sink_lengths.keys()]
        si_p = [sink_paths[key] for key in sink_paths.keys()]

        check_list_1 = []
        check_list_2 = []
        for i in range(len(so_l)):
            check_list_1.extend([so_p[i][-1]])
            check_list_2.extend([si_p[i][-1]])

        node_list = []
        dijkstra_list = []
        upper_minimum_length = 0
        if not set(check_list_1).difference(check_list_2):
            for i, _ in enumerate(so_l):
                if so_l[i] + si_l[i] <= cutoff:
                    node_list.extend(so_p[i][:])
                    node_list.extend(si_p[i][:])
                    si_pReversed = si_p[i][:]
                    si_pReversed.reverse()
                    temp_path = so_p[i][:] + si_pReversed[1:]
                    temp_length = so_l[i] + si_l[i]
                    dijkstra_list.append(temp_path)
                    if (so_l[i] + si_l[i]) > upper_minimum_length:
                        upper_minimum_length = temp_length
        else:
            logger.critical("paths do not match up")

        unique_nodes = list(set(node_list))
        unique_nodes.sort()

        node_length = len(unique_nodes)
        new_matrix = np.zeros((len(corr_matrix), len(corr_matrix)))

        for i in range(node_length):
            for j in range(node_length):
                new_matrix[unique_nodes[i]][unique_nodes[j]] = corr_matrix[
                    unique_nodes[i]
                ][unique_nodes[j]]

        corr_matrix = new_matrix
        G = nx.Graph(incoming_graph_data=corr_matrix, labels=unique_nodes)

        length = 0.0
        paths_growing_out_from_source = [[length, source]]
        full_paths_from_start_to_sink = []

        # This is essentially this list-addition replacement for a recursive
        # algorithm you've envisioned.
        # To parallelize, just get the first N branches, and send them off to each node.
        # Rest of branches filled out in separate processes.

        find_paths_object = find_paths()
        if context["n_cores"] == 1:
            while paths_growing_out_from_source:
                find_paths_object.expand_growing_paths_one_step(
                    paths_growing_out_from_source,
                    full_paths_from_start_to_sink,
                    cutoff,
                    sink,
                    G,
                )
        else:
            # just get some of the initial paths on a single processor
            logger.info(
                "Starting serial portion of path-finding algorithm (will run for "
                + str(context["seconds_to_wait_before_parallelizing_path_finding"])
                + " seconds)...",
            )
            atime = time.time()
            while (
                paths_growing_out_from_source
                and time.time() - atime
                < context["seconds_to_wait_before_parallelizing_path_finding"]
            ):
                find_paths_object.expand_growing_paths_one_step(
                    paths_growing_out_from_source,
                    full_paths_from_start_to_sink,
                    cutoff,
                    sink,
                    G,
                )

            # ok, so having generated just a first few, divy up those among multiple processors
            if paths_growing_out_from_source:  # in case you've already finished
                logger.info(
                    "Starting parallel portion of path-finding algorithm running on "
                    + str(context["n_cores"])
                    + " processors...",
                )
                paths_growing_out_from_source = [
                    (cutoff, sink, G, path) for path in paths_growing_out_from_source
                ]
                additional_full_paths_from_start_to_sink = multi_threading_find_paths(
                    paths_growing_out_from_source, context["n_cores"]
                )
                full_paths_from_start_to_sink.extend(
                    additional_full_paths_from_start_to_sink.results
                )
            else:
                logger.info(
                    "(All paths found during serial path finding; parallelization not required)",
                )

        full_paths_from_start_to_sink.sort()

        pths = []

        for full_path_from_start_to_sink in full_paths_from_start_to_sink:
            pths.append(full_path_from_start_to_sink)

        return pths

__init__(corr_matrix, srcs, snks, context, residue_keys)

Identify paths that link the source and the sink and order them by their lengths.

Parameters:

Name Type Description Default
corr_matrix NDArray[floating]

a np.array, the calculated correlation matrix

required
srcs Collection[int]

a list of ints, the indices of the sources for path finding

required
snks Collection[int]

a list of ints, the indices of the sinks for path finding

required
context MutableMapping[str, Any]

the user-specified command-line parameters, a UserInput object

required
residue_keys ArrayLike

a list containing string representations of each residue

required
Source code in wisp/paths.py
def __init__(
    self,
    corr_matrix: npt.NDArray[np.floating],
    srcs: Collection[int],
    snks: Collection[int],
    context: MutableMapping[str, Any],
    residue_keys: npt.ArrayLike,
):
    """Identify paths that link the source and the sink and order them by their
    lengths.

    Args:
        corr_matrix: a np.array, the calculated correlation matrix
        srcs: a list of ints, the indices of the sources for path finding
        snks: a list of ints, the indices of the sinks for path finding
        context: the user-specified command-line parameters, a UserInput object
        residue_keys: a list containing string representations of each residue
    """
    # populate graph nodes and weighted edges
    G = nx.Graph(incoming_graph_data=corr_matrix)

    # first calculate length of shortest path between any source and sink
    logger.info("Calculating paths...")
    logger.info(
        "Calculating the shortest path between any of the specified sources and any of the specified sinks...",
    )
    shortest_length, shortest_path = self.get_shortest_path_length(
        corr_matrix, srcs, snks, G
    )
    logger.info(
        f"The shortest path has length {str(shortest_length)}",
    )

    path = [shortest_length]
    path.extend(shortest_path)
    pths = [
        path
    ]  # need to create this initial path in case only one path is requrested

    cutoff = shortest_length

    # Check for comb explosion
    log_n_paths = get_log_n_paths(G, cutoff)
    if log_n_paths > np.log(context["n_paths_max"]):
        logger.error(
            f"Estimated number of paths is greater than {context['n_paths_max']}"
        )
        logger.error("Please increase n_paths_max to proceed.")
        logger.error("Terminating calculation.")
        sys.exit(1)

    cutoff_yields_max_num_paths_below_target = 0
    cutoff_yields_min_num_paths_above_target = 1000000.0

    # first step, keep incrementing a little until you have more than the desired number of paths
    logger.info(
        "Identifying the cutoff required to produce "
        + str(context["n_paths"])
        + " paths...",
    )
    num_paths = 1
    while num_paths < context["n_paths"]:
        logger.info(f"Testing the cutoff {str(cutoff)}...")
        cutoff_in_array = np.array([cutoff], np.float64)
        pths = self.remove_redundant_paths(
            self.get_paths_between_multiple_endpoints(
                cutoff_in_array, corr_matrix, srcs, snks, G, context
            )
        )
        num_paths = len(pths)

        logger.info(
            f"The cutoff {str(cutoff)} produces {num_paths} paths...",
        )

        if (
            num_paths < context["n_paths"]
            and cutoff > cutoff_yields_max_num_paths_below_target
        ):
            cutoff_yields_max_num_paths_below_target = cutoff
        if (
            num_paths > context["n_paths"]
            and cutoff < cutoff_yields_min_num_paths_above_target
        ):
            cutoff_yields_min_num_paths_above_target = cutoff

        # Original code adds .1 each time... but this may be to fast for
        # some systems and very slow for others... lets try increasing by
        # a percentage of the minimum path length instead... ideally this
        # could be an input parameter in the future.
        cutoff = cutoff + shortest_length * 0.1

    pths = self.remove_redundant_paths(pths)

    pths.sort()  # sort the paths by length

    if num_paths != context["n_paths"]:  # so further refinement is needed
        pths = pths[: context["n_paths"]]
        logger.info(
            "Keeping the first " + str(context["n_paths"]) + " of these paths...",
        )

    self.paths_description = ""

    self.paths_description = (
        self.paths_description + "\n# Output identified paths" + "\n"
    )
    index = 1

    for path in pths:
        self.paths_description = (
            f"{self.paths_description}Path {str(index)}:" + "\n"
        )
        self.paths_description = (
            f"{self.paths_description}   Length: {str(path[0])}" + "\n"
        )
        self.paths_description = (
            f"{self.paths_description}   Nodes: "
            + " - ".join([residue_keys[item] for item in path[1:]])
            + "\n"
        )
        index = index + 1

    if context["write_formatted_paths"]:
        formatted_paths_path = os.path.join(
            context["output_dir"], "simply_formatted_paths.txt"
        )
        with open(formatted_paths_path, "w", encoding="utf-8") as f:
            f.writelines(" ".join([str(item) for item in path]) + "\n")

    self.paths = pths

get_length_of_path(path, corr_matrix)

Calculate the length of a path

Parameters:

Name Type Description Default
path

a list of ints, the indices of the path

required
corr_matrix

a np.array, the calculated correlation matrix

required

Returns:

Type Description

a float, the length of the path

Source code in wisp/paths.py
def get_length_of_path(self, path, corr_matrix):
    """Calculate the length of a path

    Args:
        path: a list of ints, the indices of the path
        corr_matrix: a np.array, the calculated correlation matrix

    Returns:
        a float, the length of the path
    """

    length = 0.0
    for t in range(len(path) - 1):
        length = length + corr_matrix[path[t], path[t + 1]]
    return length

get_paths_between_multiple_endpoints(cutoff, corr_matrix, srcs, snks, G, context)

Get paths between sinks and sources

Parameters:

Name Type Description Default
cutoff

a np.array containing a single float, the cutoffspecifying the maximum permissible path length

required
corr_matrix

a np.array, the calculated correlation matrix

required
srcs

a list of ints, the indices of the sources for path finding

required
snks

a list of ints, the indices of the sinks for path finding

required
G

a nx.Graph object describing the connectivity of the different nodes

required
context

the user-specified command-line parameters, a UserInput object

required

Returns:

Type Description

a list of paths, where each path is represented by a list. The first item in each path is the length

of the path (float). The remaining items are the indices of the nodes in the path (int).

Source code in wisp/paths.py
def get_paths_between_multiple_endpoints(
    self, cutoff, corr_matrix, srcs, snks, G, context
):  # where sources and sinks are lists
    """Get paths between sinks and sources

    Args:
        cutoff: a np.array containing a single float, the cutoffspecifying the maximum permissible path length
        corr_matrix: a np.array, the calculated correlation matrix
        srcs: a list of ints, the indices of the sources for path finding
        snks: a list of ints, the indices of the sinks for path finding
        G: a nx.Graph object describing the connectivity of the different nodes
        context: the user-specified command-line parameters, a UserInput object

    Returns:
        a list of paths, where each path is represented by a list. The first item in each path is the length
        of the path (float). The remaining items are the indices of the nodes in the path (int).
    """

    pths = []
    for source in srcs:
        for sink in snks:
            if source != sink:  # avoid this situation
                pths.extend(
                    self.get_paths_fixed_endpoints(
                        cutoff, corr_matrix, source, sink, G, context
                    )
                )
    return pths

get_paths_fixed_endpoints(cutoff, corr_matrix, source, sink, G, context)

Get paths between a single sink and a single source

Parameters:

Name Type Description Default
cutoff

a np.array containing a single float, the cutoff specifying the maximum permissible path length

required
corr_matrix

a np.array, the calculated correlation matrix

required
source

the index of the source for path finding

required
sink

the index of the sink for path finding

required
G

a nx.Graph object describing the connectivity of the different nodes

required
context

the user-specified command-line parameters, a UserInput object

required

Returns:

Type Description

a list of paths, where each path is represented by a list. The first item

in each path is the length of the path (float). The remaining items are

the indices of the nodes in the path (int).

Source code in wisp/paths.py
def get_paths_fixed_endpoints(self, cutoff, corr_matrix, source, sink, G, context):
    """Get paths between a single sink and a single source

    Args:
        cutoff: a np.array containing a single float, the cutoff specifying the
            maximum permissible path length
        corr_matrix: a np.array, the calculated correlation matrix
        source: the index of the source for path finding
        sink: the index of the sink for path finding
        G: a nx.Graph object describing the connectivity of the different nodes
        context: the user-specified command-line parameters, a UserInput object

    Returns:
        a list of paths, where each path is represented by a list. The first item
        in each path is the length of the path (float). The remaining items are
        the indices of the nodes in the path (int).
    """

    if source == sink:
        return []

    source_lengths, source_paths = nx.single_source_dijkstra(
        G, source, target=None, cutoff=None, weight="weight"
    )
    sink_lengths, sink_paths = nx.single_source_dijkstra(
        G, sink, target=None, cutoff=None, weight="weight"
    )

    so_l = [source_lengths[key] for key in source_lengths.keys()]
    so_p = [source_paths[key] for key in source_paths.keys()]
    si_l = [sink_lengths[key] for key in sink_lengths.keys()]
    si_p = [sink_paths[key] for key in sink_paths.keys()]

    check_list_1 = []
    check_list_2 = []
    for i in range(len(so_l)):
        check_list_1.extend([so_p[i][-1]])
        check_list_2.extend([si_p[i][-1]])

    node_list = []
    dijkstra_list = []
    upper_minimum_length = 0
    if not set(check_list_1).difference(check_list_2):
        for i, _ in enumerate(so_l):
            if so_l[i] + si_l[i] <= cutoff:
                node_list.extend(so_p[i][:])
                node_list.extend(si_p[i][:])
                si_pReversed = si_p[i][:]
                si_pReversed.reverse()
                temp_path = so_p[i][:] + si_pReversed[1:]
                temp_length = so_l[i] + si_l[i]
                dijkstra_list.append(temp_path)
                if (so_l[i] + si_l[i]) > upper_minimum_length:
                    upper_minimum_length = temp_length
    else:
        logger.critical("paths do not match up")

    unique_nodes = list(set(node_list))
    unique_nodes.sort()

    node_length = len(unique_nodes)
    new_matrix = np.zeros((len(corr_matrix), len(corr_matrix)))

    for i in range(node_length):
        for j in range(node_length):
            new_matrix[unique_nodes[i]][unique_nodes[j]] = corr_matrix[
                unique_nodes[i]
            ][unique_nodes[j]]

    corr_matrix = new_matrix
    G = nx.Graph(incoming_graph_data=corr_matrix, labels=unique_nodes)

    length = 0.0
    paths_growing_out_from_source = [[length, source]]
    full_paths_from_start_to_sink = []

    # This is essentially this list-addition replacement for a recursive
    # algorithm you've envisioned.
    # To parallelize, just get the first N branches, and send them off to each node.
    # Rest of branches filled out in separate processes.

    find_paths_object = find_paths()
    if context["n_cores"] == 1:
        while paths_growing_out_from_source:
            find_paths_object.expand_growing_paths_one_step(
                paths_growing_out_from_source,
                full_paths_from_start_to_sink,
                cutoff,
                sink,
                G,
            )
    else:
        # just get some of the initial paths on a single processor
        logger.info(
            "Starting serial portion of path-finding algorithm (will run for "
            + str(context["seconds_to_wait_before_parallelizing_path_finding"])
            + " seconds)...",
        )
        atime = time.time()
        while (
            paths_growing_out_from_source
            and time.time() - atime
            < context["seconds_to_wait_before_parallelizing_path_finding"]
        ):
            find_paths_object.expand_growing_paths_one_step(
                paths_growing_out_from_source,
                full_paths_from_start_to_sink,
                cutoff,
                sink,
                G,
            )

        # ok, so having generated just a first few, divy up those among multiple processors
        if paths_growing_out_from_source:  # in case you've already finished
            logger.info(
                "Starting parallel portion of path-finding algorithm running on "
                + str(context["n_cores"])
                + " processors...",
            )
            paths_growing_out_from_source = [
                (cutoff, sink, G, path) for path in paths_growing_out_from_source
            ]
            additional_full_paths_from_start_to_sink = multi_threading_find_paths(
                paths_growing_out_from_source, context["n_cores"]
            )
            full_paths_from_start_to_sink.extend(
                additional_full_paths_from_start_to_sink.results
            )
        else:
            logger.info(
                "(All paths found during serial path finding; parallelization not required)",
            )

    full_paths_from_start_to_sink.sort()

    pths = []

    for full_path_from_start_to_sink in full_paths_from_start_to_sink:
        pths.append(full_path_from_start_to_sink)

    return pths

get_shortest_path_length(corr_matrix, srcs, snks, G)

Identify the length of the shortest path connecting any of the sources and any of the sinks

Parameters:

Name Type Description Default
corr_matrix

a np.array, the calculated correlation matrix

required
srcs

a list of ints, the indices of the sources for path finding

required
snks

a list of ints, the indices of the sinks for path finding

required
G

a nx.Graph object describing the connectivity of the different nodes

required

Returns:

Type Description

a float, the length of the shortest path, and a list of ints corresponding

to the nodes of the shortest path.

Source code in wisp/paths.py
def get_shortest_path_length(
    self, corr_matrix, srcs, snks, G
):  # where sources and sinks are lists
    """Identify the length of the shortest path connecting any of the sources and any of the sinks

    Args:
        corr_matrix: a np.array, the calculated correlation matrix
        srcs: a list of ints, the indices of the sources for path finding
        snks: a list of ints, the indices of the sinks for path finding
        G: a nx.Graph object describing the connectivity of the different nodes

    Returns:
        a float, the length of the shortest path, and a list of ints corresponding
        to the nodes of the shortest path.
    """

    shortest_length = 99999999.999
    shortest_path = []

    for source in srcs:
        for sink in snks:
            if source != sink:  # important to avoid this situation
                short_path = nx.dijkstra_path(G, source, sink, weight="weight")
                length = self.get_length_of_path(short_path, corr_matrix)
                if length < shortest_length:
                    shortest_length = length
                    shortest_path = short_path
    return shortest_length, shortest_path

remove_redundant_paths(pths)

Removes redundant paths

Parameters:

Name Type Description Default
pths

a list of paths

required

Returns:

Type Description

A list of paths with the redundant ones eliminated.

Source code in wisp/paths.py
def remove_redundant_paths(self, pths):
    """Removes redundant paths

    Args:
        pths: a list of paths

    Returns:
        A list of paths with the redundant ones eliminated.
    """

    if len(pths) == 1:
        # no reason to check if there's only one
        return pths

    for indx1 in range(len(pths) - 1):
        path1 = pths[indx1]
        if path1 is not None:
            for indx2 in range(indx1 + 1, len(pths)):
                path2 = pths[indx2]
                if path2 is not None and len(path1) == len(
                    path2
                ):  # paths are the same length
                    pth1 = copy.deepcopy(path1[1:])
                    pth2 = copy.deepcopy(path2[1:])

                    if pth1[0] < pth1[-1]:
                        pth1.reverse()
                    if pth2[0] < pth2[-1]:
                        pth2.reverse()

                    if pth1 == pth2:
                        pths[indx2] = None

    while None in pths:
        pths.remove(None)

    return pths

find_paths

Path-finding data processing on a single processor

Source code in wisp/paths.py
class find_paths:  # other, more specific classes with inherit this one
    """Path-finding data processing on a single processor"""

    results = []

    def runit(self, running, mutex, results_queue, items):
        """Path-finding data processing on a single processor.

        Args:
            running: a mp.Value() object
            mutex: a mp.Lock() object
            results_queue: where the results will be stored [mp.Queue()]
            items: the data to be processed, in a list
        """

        for item in items:
            self.value_func(item, results_queue)
        mutex.acquire()
        running.value -= 1
        mutex.release()
        results_queue.put(self.results)

    def value_func(
        self, item, results_queue
    ):  # this is the function that changes through inheritance
        """Process a single path-finding "branch"

        Args:
            item: a tuple containing required information.
                The first is a numpy array containing a single float, the path-length cutoff
                The second is an index corresponding to the ultimate path sink
                The third is a nx.Graph object describing the connectivity of the different nodes
                The fourth is a list corresponding to a path. The first item is the length of the path (float).
                    The remaining items are the indices of the nodes in the path (int).
            results_queue: where the results will be stored [mp.Queue()]
        """

        cutoff = item[0]
        sink = item[1]
        G = item[2]

        paths_growing_out_from_source = [item[3]]
        full_paths_from_start_to_sink = []

        while paths_growing_out_from_source:
            self.expand_growing_paths_one_step(
                paths_growing_out_from_source,
                full_paths_from_start_to_sink,
                cutoff,
                sink,
                G,
            )

        # here save the results for later compilation
        self.results.append(full_paths_from_start_to_sink)

    def expand_growing_paths_one_step(
        self,
        paths_growing_out_from_source,
        full_paths_from_start_to_sink,
        cutoff,
        sink,
        G,
    ):
        """Expand the paths growing out from the source to the sink by one step
           (to the neighbors of the terminal node) of the expanding paths

        Args:
            paths_growing_out_from_source: a list of paths, where each path is
                represented by a list. The first item in each path is the length of
                the path (float). The remaining items are the indices of the nodes
                in the path (int).
            full_paths_from_start_to_sink: a growing list of identified paths that
                connect the source and the sink, where each path is formatted as above.
            cutoff: a numpy array containing a single element (float), the length
                cutoff. Paths with lengths greater than the cutoff will be ignored.
            sink: the index of the sink (int)
            G: a nx.Graph object describing the connectivity of the different nodes
        """

        for i, path_growing_out_from_source in enumerate(paths_growing_out_from_source):
            if path_growing_out_from_source[0] > cutoff:
                # Because if the path is already greater than the cutoff, no
                # use continuing to branch out, since subsequent branhes will
                # be longer.
                paths_growing_out_from_source.pop(i)
                break
            elif path_growing_out_from_source[-1] == sink:
                # so the sink has been reached
                full_paths_from_start_to_sink.append(path_growing_out_from_source)
                paths_growing_out_from_source.pop(i)
                break
            elif path_growing_out_from_source[-1] != sink:
                # sink not yet reached, but paths still short enough. So add
                # new paths, same as old, but with neighboring element
                # appended.
                node_neighbors = list(G.neighbors(path_growing_out_from_source[-1]))
                for j, node_neighbor in enumerate(node_neighbors):
                    if not node_neighbor in path_growing_out_from_source:
                        temp = path_growing_out_from_source[:]
                        temp.append(node_neighbor)
                        temp[0] = temp[0] + G.edges[temp[-2], temp[-1]]["weight"]
                        paths_growing_out_from_source.insert((i + j + 1), temp)
                paths_growing_out_from_source.pop(i)
                break
            else:
                logger.critical("SOMETHING IS WRONG")

expand_growing_paths_one_step(paths_growing_out_from_source, full_paths_from_start_to_sink, cutoff, sink, G)

Expand the paths growing out from the source to the sink by one step (to the neighbors of the terminal node) of the expanding paths

Parameters:

Name Type Description Default
paths_growing_out_from_source

a list of paths, where each path is represented by a list. The first item in each path is the length of the path (float). The remaining items are the indices of the nodes in the path (int).

required
full_paths_from_start_to_sink

a growing list of identified paths that connect the source and the sink, where each path is formatted as above.

required
cutoff

a numpy array containing a single element (float), the length cutoff. Paths with lengths greater than the cutoff will be ignored.

required
sink

the index of the sink (int)

required
G

a nx.Graph object describing the connectivity of the different nodes

required
Source code in wisp/paths.py
def expand_growing_paths_one_step(
    self,
    paths_growing_out_from_source,
    full_paths_from_start_to_sink,
    cutoff,
    sink,
    G,
):
    """Expand the paths growing out from the source to the sink by one step
       (to the neighbors of the terminal node) of the expanding paths

    Args:
        paths_growing_out_from_source: a list of paths, where each path is
            represented by a list. The first item in each path is the length of
            the path (float). The remaining items are the indices of the nodes
            in the path (int).
        full_paths_from_start_to_sink: a growing list of identified paths that
            connect the source and the sink, where each path is formatted as above.
        cutoff: a numpy array containing a single element (float), the length
            cutoff. Paths with lengths greater than the cutoff will be ignored.
        sink: the index of the sink (int)
        G: a nx.Graph object describing the connectivity of the different nodes
    """

    for i, path_growing_out_from_source in enumerate(paths_growing_out_from_source):
        if path_growing_out_from_source[0] > cutoff:
            # Because if the path is already greater than the cutoff, no
            # use continuing to branch out, since subsequent branhes will
            # be longer.
            paths_growing_out_from_source.pop(i)
            break
        elif path_growing_out_from_source[-1] == sink:
            # so the sink has been reached
            full_paths_from_start_to_sink.append(path_growing_out_from_source)
            paths_growing_out_from_source.pop(i)
            break
        elif path_growing_out_from_source[-1] != sink:
            # sink not yet reached, but paths still short enough. So add
            # new paths, same as old, but with neighboring element
            # appended.
            node_neighbors = list(G.neighbors(path_growing_out_from_source[-1]))
            for j, node_neighbor in enumerate(node_neighbors):
                if not node_neighbor in path_growing_out_from_source:
                    temp = path_growing_out_from_source[:]
                    temp.append(node_neighbor)
                    temp[0] = temp[0] + G.edges[temp[-2], temp[-1]]["weight"]
                    paths_growing_out_from_source.insert((i + j + 1), temp)
            paths_growing_out_from_source.pop(i)
            break
        else:
            logger.critical("SOMETHING IS WRONG")

runit(running, mutex, results_queue, items)

Path-finding data processing on a single processor.

Parameters:

Name Type Description Default
running

a mp.Value() object

required
mutex

a mp.Lock() object

required
results_queue

where the results will be stored [mp.Queue()]

required
items

the data to be processed, in a list

required
Source code in wisp/paths.py
def runit(self, running, mutex, results_queue, items):
    """Path-finding data processing on a single processor.

    Args:
        running: a mp.Value() object
        mutex: a mp.Lock() object
        results_queue: where the results will be stored [mp.Queue()]
        items: the data to be processed, in a list
    """

    for item in items:
        self.value_func(item, results_queue)
    mutex.acquire()
    running.value -= 1
    mutex.release()
    results_queue.put(self.results)

value_func(item, results_queue)

Process a single path-finding "branch"

Parameters:

Name Type Description Default
item

a tuple containing required information. The first is a numpy array containing a single float, the path-length cutoff The second is an index corresponding to the ultimate path sink The third is a nx.Graph object describing the connectivity of the different nodes The fourth is a list corresponding to a path. The first item is the length of the path (float). The remaining items are the indices of the nodes in the path (int).

required
results_queue

where the results will be stored [mp.Queue()]

required
Source code in wisp/paths.py
def value_func(
    self, item, results_queue
):  # this is the function that changes through inheritance
    """Process a single path-finding "branch"

    Args:
        item: a tuple containing required information.
            The first is a numpy array containing a single float, the path-length cutoff
            The second is an index corresponding to the ultimate path sink
            The third is a nx.Graph object describing the connectivity of the different nodes
            The fourth is a list corresponding to a path. The first item is the length of the path (float).
                The remaining items are the indices of the nodes in the path (int).
        results_queue: where the results will be stored [mp.Queue()]
    """

    cutoff = item[0]
    sink = item[1]
    G = item[2]

    paths_growing_out_from_source = [item[3]]
    full_paths_from_start_to_sink = []

    while paths_growing_out_from_source:
        self.expand_growing_paths_one_step(
            paths_growing_out_from_source,
            full_paths_from_start_to_sink,
            cutoff,
            sink,
            G,
        )

    # here save the results for later compilation
    self.results.append(full_paths_from_start_to_sink)

multi_threading_find_paths

Launches path finding on multiple processors

Source code in wisp/paths.py
class multi_threading_find_paths:
    """Launches path finding on multiple processors"""

    results = []

    def __init__(self, inputs: Collection, num_processors: int | None = None):
        """
        Args:
            inputs: the data to be processed, in a list
            num_processors: the number of processors to use to process this data.
        """

        self.results = []

        # First, we determine the number of available cores.
        if num_processors is None:
            num_processors = mp.cpu_count()
        # reduce the number of processors if too many have been specified
        if len(inputs) < num_processors:
            logger.debug("Number of cores is higher than number of inputs.")
            num_processors = len(inputs)
            if num_processors == 0:
                num_processors = 1
        logger.debug(f"Setting the number of cores to {num_processors}")

        # now, divide the inputs into the appropriate number of processors
        inputs_divided: MutableMapping[str, Collection] = {
            t: [] for t in range(num_processors)
        }
        for t in range(0, len(inputs), num_processors):
            for t2 in range(num_processors):
                index = t + t2
                if index < len(inputs):
                    inputs_divided[t2].append(inputs[index])

        # now, run each division on its own processor
        running = mp.Value("i", num_processors)
        mutex = mp.Lock()

        arrays = []
        threads = []
        for _ in range(num_processors):
            threads.append(find_paths())
            arrays.append(mp.Array("i", [0, 1]))

        results_queue = mp.Queue()  # to keep track of the results

        processes = []
        for i in range(num_processors):
            p = mp.Process(
                target=threads[i].runit,
                args=(running, mutex, results_queue, inputs_divided[i]),
            )
            p.start()
            processes.append(p)

        while running.value > 0:
            continue  # wait for everything to finish

        # compile all results
        for _ in threads:
            chunk = results_queue.get()
            for chun in chunk:
                self.results.extend(chun)

__init__(inputs, num_processors=None)

Parameters:

Name Type Description Default
inputs Collection

the data to be processed, in a list

required
num_processors int | None

the number of processors to use to process this data.

None
Source code in wisp/paths.py
def __init__(self, inputs: Collection, num_processors: int | None = None):
    """
    Args:
        inputs: the data to be processed, in a list
        num_processors: the number of processors to use to process this data.
    """

    self.results = []

    # First, we determine the number of available cores.
    if num_processors is None:
        num_processors = mp.cpu_count()
    # reduce the number of processors if too many have been specified
    if len(inputs) < num_processors:
        logger.debug("Number of cores is higher than number of inputs.")
        num_processors = len(inputs)
        if num_processors == 0:
            num_processors = 1
    logger.debug(f"Setting the number of cores to {num_processors}")

    # now, divide the inputs into the appropriate number of processors
    inputs_divided: MutableMapping[str, Collection] = {
        t: [] for t in range(num_processors)
    }
    for t in range(0, len(inputs), num_processors):
        for t2 in range(num_processors):
            index = t + t2
            if index < len(inputs):
                inputs_divided[t2].append(inputs[index])

    # now, run each division on its own processor
    running = mp.Value("i", num_processors)
    mutex = mp.Lock()

    arrays = []
    threads = []
    for _ in range(num_processors):
        threads.append(find_paths())
        arrays.append(mp.Array("i", [0, 1]))

    results_queue = mp.Queue()  # to keep track of the results

    processes = []
    for i in range(num_processors):
        p = mp.Process(
            target=threads[i].runit,
            args=(running, mutex, results_queue, inputs_divided[i]),
        )
        p.start()
        processes.append(p)

    while running.value > 0:
        continue  # wait for everything to finish

    # compile all results
    for _ in threads:
        chunk = results_queue.get()
        for chun in chunk:
            self.results.extend(chun)