|
| 1 | +""" |
| 2 | +""" |
| 3 | + |
| 4 | +import copy |
| 5 | +from pathlib import Path |
| 6 | + |
| 7 | +import pandas as pd |
| 8 | +import pytest |
| 9 | +import test_utils |
| 10 | +from base import BaseTest |
| 11 | +from file_conflicts_pathtable import get_pathtable |
| 12 | + |
| 13 | + |
| 14 | +class BaseTransfer(BaseTest): |
| 15 | + |
| 16 | + # ---------------------------------------------------------------------------------- |
| 17 | + # Test File Transfer - All Options |
| 18 | + # ---------------------------------------------------------------------------------- |
| 19 | + |
| 20 | + @pytest.fixture( |
| 21 | + scope="class", |
| 22 | + ) |
| 23 | + def pathtable_and_project(self, tmpdir_factory): |
| 24 | + """ |
| 25 | + Create a new test project with a test project folder |
| 26 | + and file structure (see `get_pathtable()` for definition). |
| 27 | + """ |
| 28 | + tmp_path = tmpdir_factory.mktemp("test") |
| 29 | + |
| 30 | + base_path = tmp_path / "test with space" |
| 31 | + test_project_name = "test_file_conflicts" |
| 32 | + |
| 33 | + project, cwd = test_utils.setup_project_fixture( |
| 34 | + base_path, test_project_name |
| 35 | + ) |
| 36 | + |
| 37 | + pathtable = get_pathtable(project.cfg["local_path"]) |
| 38 | + |
| 39 | + self.create_all_pathtable_files(pathtable) |
| 40 | + |
| 41 | + yield [pathtable, project] |
| 42 | + |
| 43 | + test_utils.teardown_project(cwd, project) |
| 44 | + |
| 45 | + def get_expected_transferred_paths( |
| 46 | + self, pathtable, sub_names, ses_names, datatype |
| 47 | + ): |
| 48 | + """ |
| 49 | + Process the expected files that are transferred using the logic in |
| 50 | + `make_pathtable_search_filter()` to |
| 51 | + """ |
| 52 | + parsed_sub_names = self.parse_arguments(pathtable, sub_names, "sub") |
| 53 | + parsed_ses_names = self.parse_arguments(pathtable, ses_names, "ses") |
| 54 | + parsed_datatype = self.parse_arguments(pathtable, datatype, "datatype") |
| 55 | + |
| 56 | + # Filter pathtable to get files that were expected to be transferred |
| 57 | + ( |
| 58 | + sub_ses_dtype_arguments, |
| 59 | + extra_arguments, |
| 60 | + ) = self.make_pathtable_search_filter( |
| 61 | + parsed_sub_names, parsed_ses_names, parsed_datatype |
| 62 | + ) |
| 63 | + |
| 64 | + datatype_folders = self.query_table(pathtable, sub_ses_dtype_arguments) |
| 65 | + extra_folders = self.query_table(pathtable, extra_arguments) |
| 66 | + |
| 67 | + expected_paths = pd.concat([datatype_folders, extra_folders]) |
| 68 | + expected_paths = expected_paths.drop_duplicates(subset="path") |
| 69 | + |
| 70 | + expected_paths = self.remove_path_before_rawdata(expected_paths.path) |
| 71 | + |
| 72 | + return expected_paths |
| 73 | + |
| 74 | + def make_pathtable_search_filter(self, sub_names, ses_names, datatype): |
| 75 | + """ |
| 76 | + Create a string of arguments to pass to pd.query() that will |
| 77 | + create the table of only transferred sub, ses and datatype. |
| 78 | +
|
| 79 | + Two arguments must be created, one of all sub / ses / datatypes |
| 80 | + and the other of all non sub/ non ses / non datatype |
| 81 | + folders. These must be handled separately as they are |
| 82 | + mutually exclusive. |
| 83 | + """ |
| 84 | + sub_ses_dtype_arguments = [] |
| 85 | + extra_arguments = [] |
| 86 | + |
| 87 | + for sub in sub_names: |
| 88 | + if sub == "all_non_sub": |
| 89 | + extra_arguments += ["is_non_sub == True"] |
| 90 | + else: |
| 91 | + for ses in ses_names: |
| 92 | + if ses == "all_non_ses": |
| 93 | + extra_arguments += [ |
| 94 | + f"(parent_sub == '{sub}' & is_non_ses == True)" |
| 95 | + ] |
| 96 | + else: |
| 97 | + for dtype in datatype: |
| 98 | + if dtype == "all_non_datatype": |
| 99 | + extra_arguments += [ |
| 100 | + f"(parent_sub == '{sub}' & parent_ses == '{ses}' " |
| 101 | + f"& is_ses_level_non_datatype == True)" |
| 102 | + ] |
| 103 | + else: |
| 104 | + sub_ses_dtype_arguments += [ |
| 105 | + f"(parent_sub == '{sub}' & parent_ses == '{ses}' " |
| 106 | + f"& (parent_datatype == '{dtype}' " |
| 107 | + f"| parent_datatype == '{dtype}'))" |
| 108 | + ] |
| 109 | + |
| 110 | + return sub_ses_dtype_arguments, extra_arguments |
| 111 | + |
| 112 | + def remove_path_before_rawdata(self, list_of_paths): |
| 113 | + """ |
| 114 | + Remove the path to project files before the "rawdata" so |
| 115 | + they can be compared no matter where the project was stored |
| 116 | + (e.g. on a central server vs. local filesystem). |
| 117 | + """ |
| 118 | + cut_paths = [] |
| 119 | + for path_ in list_of_paths: |
| 120 | + parts = Path(path_).parts |
| 121 | + cut_paths.append(Path(*parts[parts.index("rawdata") :])) |
| 122 | + return cut_paths |
| 123 | + |
| 124 | + def query_table(self, pathtable, arguments): |
| 125 | + """ |
| 126 | + Search the table for arguments, return empty |
| 127 | + if arguments empty |
| 128 | + """ |
| 129 | + if any(arguments): |
| 130 | + folders = pathtable.query(" | ".join(arguments)) |
| 131 | + else: |
| 132 | + folders = pd.DataFrame() |
| 133 | + return folders |
| 134 | + |
| 135 | + def parse_arguments(self, pathtable, list_of_names, field): |
| 136 | + """ |
| 137 | + Replicate datashuttle name formatting by parsing |
| 138 | + "all" arguments and turning them into a list of all names, |
| 139 | + (subject or session), taken from the pathtable. |
| 140 | + """ |
| 141 | + if list_of_names in [["all"], [f"all_{field}"]]: |
| 142 | + entries = pathtable.query(f"parent_{field} != False")[ |
| 143 | + f"parent_{field}" |
| 144 | + ] |
| 145 | + entries = list(set(entries)) |
| 146 | + if list_of_names == ["all"]: |
| 147 | + entries += ( |
| 148 | + [f"all_non_{field}"] |
| 149 | + if field != "datatype" |
| 150 | + else ["all_non_datatype"] |
| 151 | + ) |
| 152 | + list_of_names = entries |
| 153 | + return list_of_names |
| 154 | + |
| 155 | + def create_all_pathtable_files(self, pathtable): |
| 156 | + """ |
| 157 | + Create the entire test project in the defined |
| 158 | + location (usually project's `local_path`). |
| 159 | + """ |
| 160 | + for i in range(pathtable.shape[0]): |
| 161 | + filepath = pathtable["base_folder"][i] / pathtable["path"][i] |
| 162 | + filepath.parents[0].mkdir(parents=True, exist_ok=True) |
| 163 | + test_utils.write_file(filepath, contents="test_entry") |
| 164 | + |
| 165 | + def central_from_local(self, path_): |
| 166 | + return Path(str(copy.copy(path_)).replace("local", "central")) |
0 commit comments