Skip to content
Snippets Groups Projects
Commit 1bc2f09d authored by George Marchment's avatar George Marchment
Browse files

Add support RO-Crate

parent 20cd8255
No related branches found
No related tags found
No related merge requests found
import re
import json
from .code_ import Code
from .outils import get_next_param
from .executor import Executor
......@@ -40,12 +41,21 @@ class Call(Executor):
def get_first_element_called(self):
return self.first_element_called
def get_elements_called(self, tab = []):
def get_elements_called(self, tab_input = [], first_call = True):
tab = tab_input.copy()
#if(first_call):
# print(tab)
# if(tab!=[]):
# raise Exception("herer")
# tab = []
tab += [self.first_element_called]
for para in self.parameters:
if(para.get_type()=="Call"):
tab = para.get_elements_called(tab)
return list(set(tab))
tab = para.get_elements_called(tab = tab.copy(), first_call = False)
temp = list(set(tab))
#del tab
return temp
def get_code_split_space(self, code):
......
from .nextflow_building_blocks import Nextflow_Building_Blocks
from .bioflowinsighterror import BioFlowInsightError
import re
from .outils import get_dico_from_tab_from_id
from . import constant
......@@ -24,6 +25,16 @@ class Main_DSL2(Nextflow_Building_Blocks):
def is_initialised(self):
return self.initialised
def get_all_called(self):
called = []
for exe in self.get_executors():
if(exe.get_type()=="Call"):
called+=exe.get_elements_called()
else:
for o in exe.get_origins():
if(o.get_type()=="Call"):
called+=o.get_elements_called()
return called
def get_processes(self):
return self.origin.get_processes()+super().get_processes()
......@@ -181,4 +192,25 @@ class Main_DSL2(Nextflow_Building_Blocks):
# c.get_structure(dico)
#
##return dico
def add_2_rocrate(self, dico, parent_key):
main_key = f"{parent_key}/MAIN"
dico_main = get_dico_from_tab_from_id(dico, main_key)
if(dico_main==None):
dico_main = {}
dico_main["@id"] = main_key
dico_main["name"] = "Main Workflow"
dico_main["@type"] = ["SoftwareSourceCode", "ComputationalWorkflow"]
#TODO -> check if this remains true
#dico_main["conformsTo"] = {"@id": "https://bioschemas.org/profiles/ComputationalWorkflow/0.5-DRAFT-2020_07_21"}
#dico_main["dct:conformsTo"]= "https://bioschemas.org/profiles/ComputationalWorkflow/1.0-RELEASE/"
dico_main["input"] = []
dico_main["output"] = []
dico_main["isPartOf"] = [{"@id": parent_key}]
dico_main["hasPart"] = []
called = self.get_all_called()
for c in called:
c.add_2_rocrate(dico, main_key)
dico_main["hasPart"].append({"@id":c.get_rocrate_key(dico)})
dico["@graph"].append(dico_main)
\ No newline at end of file
......@@ -74,6 +74,9 @@ class Nextflow_Building_Blocks:
def get_name_file(self):
return self.origin.get_name_file()
def get_rocrate_key(self, dico):
return f"{self.get_file_address()[len(dico['temp_directory'])+1:]}/{self.get_name()}"
......
......@@ -13,7 +13,7 @@ from . import constant
warnings.filterwarnings("ignore")
from .nextflow_building_blocks import Nextflow_Building_Blocks
from .outils import extract_curly, get_curly_count, get_parenthese_count
from .outils import extract_curly, get_curly_count, get_parenthese_count, get_dico_from_tab_from_id
from .bioflowinsighterror import BioFlowInsightError
......@@ -41,6 +41,7 @@ class Nextflow_File(Nextflow_Building_Blocks):
self.graph = None
self.display_info = display_info
self.all_includes = []
self.added_2_rocrate = False
self.check_file_correctness()
self.set_DSL(DSL=DSL)
self.extract_metadata()
......@@ -124,6 +125,7 @@ class Nextflow_File(Nextflow_Building_Blocks):
self.already_added_structure = False
self.graph = None
self.all_includes = []
self.added_2_rocrate = False
def extract_metadata(self):
......@@ -658,5 +660,57 @@ class Nextflow_File(Nextflow_Building_Blocks):
#def get_metadata_graph_wo_operations(self):
# self.graph.get_metadata_graph_wo_operations()
def add_processes_2_rocrate(self, dico, file_dico, file_name):
for p in self.processes:
process_key = f"{file_name}/{p.get_name()}"
file_dico["hasPart"].append(process_key)
p.add_2_rocrate(dico, file_name)
def add_includes_2_rocrate(self, dico, file_dico, file_name):
for include in self.includes:
included_key = include.get_file().get_file_address()[len(dico["temp_directory"])+1:]
file_dico["hasPart"].append({"@id":included_key})
included_dico = get_dico_from_tab_from_id(dico, included_key)
included_dico["isPartOf"].append({"@id":file_name})
include.get_file().add_2_rocrate(dico)
def add_subworkflows_2_rocrate(self, dico, file_dico, file_name):
for sub in self.subworkflows:
sub_key = sub.get_rocrate_key(dico)
file_dico["hasPart"].append(sub_key)
sub.add_2_rocrate(dico, file_name)
def add_2_rocrate(self, dico):
if(not self.added_2_rocrate):
self.added_2_rocrate = True
file_name = self.get_file_address()[len(dico["temp_directory"])+1:]
file_dico = get_dico_from_tab_from_id(dico, file_name)
if(self.first_file):
#Case DSL1
if(self.get_DSL()=="DSL1"):
file_dico["@type"].append("ComputationalWorkflow")
self.add_processes_2_rocrate(dico, file_dico, file_name)
#Case DSL2
elif(self.get_DSL()=="DSL2"):
self.add_processes_2_rocrate(dico, file_dico, file_name)
self.add_includes_2_rocrate(dico, file_dico, file_name)
self.main.add_2_rocrate(dico, file_name)
self.add_subworkflows_2_rocrate(dico, file_dico, file_name)
else:
raise Exception("This shoudn't happen!")
else:
if(self.get_DSL()=="DSL2"):
self.add_processes_2_rocrate(dico, file_dico, file_name)
self.add_includes_2_rocrate(dico, file_dico, file_name)
self.add_subworkflows_2_rocrate(dico, file_dico, file_name)
#TODO
else:
raise Exception("This shoudn't happen!")
......@@ -818,3 +818,16 @@ def extract_inside_parentheses(code, bit_of_code):
left = get_code_until_parenthese_count(code[:start], 1, left_2_right = False)
right = get_code_until_parenthese_count(code[end:], -1, left_2_right = True)
return (left[1:]+bit_of_code+right[:-1]).strip()
#This is used to get a dico from from the graph tab in a RO-Crate file
def get_dico_from_tab_from_id(dico, id):
for temp_dico in dico["@graph"]:
if(temp_dico["@id"]==id):
return temp_dico
return None
def check_if_element_in_tab_rocrate(tab, id):
for ele in tab:
if(ele["@id"]==id):
return True
return False
\ No newline at end of file
......@@ -2,7 +2,7 @@ import re
from .code_ import Code
from .nextflow_building_blocks import Nextflow_Building_Blocks
from .outils import remove_jumps_inbetween_parentheses, sort_and_filter
from .outils import remove_jumps_inbetween_parentheses, sort_and_filter, get_dico_from_tab_from_id, check_if_element_in_tab_rocrate
from .bioflowinsighterror import BioFlowInsightError
from . import constant
......@@ -32,6 +32,9 @@ class Process(Nextflow_Building_Blocks):
def get_name(self):
return self.name
def get_tools(self):
return self.tools
#def get_source(self):
# return [self]
......@@ -263,3 +266,58 @@ class Process(Nextflow_Building_Blocks):
self.initialise_name()
self.initialise_parts()
self.initialise_inputs_outputs()
def add_2_rocrate(self, dico, parent_key):
process_key = self.get_rocrate_key(dico)
dico_process = get_dico_from_tab_from_id(dico, process_key)
if(dico_process==None):
dico_process = {}
dico_process["@id"] = process_key
dico_process["name"] = "Process"
dico_process["@type"] = ["SoftwareSourceCode", "Script"]
#ADD INPUTS
dico_process["input"] = []
for input in self.get_inputs():
if(type(input)==str):
name_input = input
else:
name_input = input.get_code()
dico_input = get_dico_from_tab_from_id(dico, name_input)
if(dico_input==None):
dico_input = {"@id":name_input, "@type": "FormalParameter"}
dico["@graph"].append(dico_input)
dico_process["input"].append({"@id":dico_input["@id"]})
#ADD OUTPUTS
dico_process["output"] = []
for output in self.get_outputs():
if(type(output)==str):
name_output = output
else:
name_output = output.get_code()
dico_output = get_dico_from_tab_from_id(dico, name_output)
if(dico_output==None):
dico_output = {"@id":name_output, "@type": "FormalParameter"}
dico["@graph"].append(dico_output)
dico_process["output"].append({"@id":dico_output["@id"]})
#ADD isPartOf
dico_process["isPartOf"] = []
dico_process["isPartOf"].append({"@id":parent_key})
#ADD hasPart
dico_process["hasPart"] = []
for tool in self.get_tools():
dico_tool = get_dico_from_tab_from_id(dico, tool)
if(dico_tool==None):
dico_tool = {"@id":tool,
"name": "Tool"
#TODO in later versions
#, "url": "https://some.link.com"
#, "identifier": "tool_identifier"
}
dico["@graph"].append(dico_tool)
dico_process["hasPart"].append({"@id":dico_tool["@id"]})
dico["@graph"].append(dico_process)
else:
if(not check_if_element_in_tab_rocrate(dico_process["isPartOf"], parent_key)):
dico_process["isPartOf"].append({"@id":parent_key})
......@@ -8,6 +8,7 @@ class RO_Crate:
self.directory = '/'.join(workflow.get_file_address().split('/')[:-1])
self.files = []
self.dico = {}
self.dico["temp_directory"] = self.directory
def get_files(self):
self.files = glob.glob(f'{self.directory}/**/*.*', recursive=True)
......@@ -111,10 +112,14 @@ class RO_Crate:
dico["hasPart"] = []
self.dico["@graph"].append(dico)
def fill_from_workflow(self):
self.workflow.add_2_rocrate(self.dico)
def initialise(self):
self.initialise_dico()
for file in self.files:
self.initialise_file(file)
self.fill_from_workflow()
with open(f"{self.workflow.get_output_dir()}/ro-crate-metadata-{self.workflow.get_name()}.json", 'w') as output_file :
json.dump(self.dico, output_file, indent=2)
\ No newline at end of file
......@@ -3,7 +3,7 @@ from . import constant
from .code_ import Code
from .main_DSL2 import Main_DSL2
from .bioflowinsighterror import BioFlowInsightError
from .outils import remove_jumps_inbetween_parentheses
from .outils import remove_jumps_inbetween_parentheses, get_dico_from_tab_from_id, check_if_element_in_tab_rocrate
......@@ -14,9 +14,9 @@ class Subworkflow(Main_DSL2):
self.name = name.replace("'", "").replace('"', '')
self.alias = self.name
#These are the different parts of of a subworkflow -> work corresponds to the main
self.take = None
self.take = []
self.work = None
self.emit = None
self.emit = []
self.initialised = False
......@@ -116,7 +116,7 @@ class Subworkflow(Main_DSL2):
return None
def initialise_takes(self):
if(self.take!=None):
if(self.take!=[]):
code = remove_jumps_inbetween_parentheses(self.take.get_code()).split('\n')
tab = []
for i in range(len(code)):
......@@ -165,7 +165,7 @@ class Subworkflow(Main_DSL2):
def initialise_emit(self):
from .operation import Operation
if(self.emit!=None):
if(self.emit!=[]):
code = remove_jumps_inbetween_parentheses(self.emit.get_code()).split('\n')
tab = []
for i in range(len(code)):
......@@ -203,16 +203,12 @@ class Subworkflow(Main_DSL2):
return self.emit
def get_nb_emit(self):
if(self.emit==None):
return 0
return len(self.emit)
def get_takes(self):
return self.take
def get_nb_takes(self):
if(self.take==None):
return 0
return len(self.take)
def get_nb_inputs(self):
......@@ -230,15 +226,78 @@ class Subworkflow(Main_DSL2):
def get_structure(self, dico):
super().get_structure(dico)
if(self.take!=None):
for ope in self.get_takes():
#ope.set_operation_type("Branch")
ope.get_structure(dico, to_remove = True)
for ope in self.get_takes():
#ope.set_operation_type("Branch")
ope.get_structure(dico, to_remove = True)
if(self.emit!=None):
for ope in self.get_emit():
#ope.set_operation_type("Branch")
ope.get_structure(dico, to_remove = True)
for ope in self.get_emit():
#ope.set_operation_type("Branch")
ope.get_structure(dico, to_remove = True)
def add_2_rocrate(self, dico, parent_key):
sub_key = self.get_rocrate_key(dico)
dico_sub = get_dico_from_tab_from_id(dico, sub_key)
if(dico_sub==None):
dico_sub = {}
dico_sub["@id"] = sub_key
dico_sub["name"] = "Subworkflow"
dico_sub["@type"] = ["SoftwareSourceCode", "ComputationalWorkflow"]
#TODO -> check if this remains true
#dico_main["conformsTo"] = {"@id": "https://bioschemas.org/profiles/ComputationalWorkflow/0.5-DRAFT-2020_07_21"}
#dico_main["dct:conformsTo"]= "https://bioschemas.org/profiles/ComputationalWorkflow/1.0-RELEASE/"
#ADD INPUTS
dico_sub["input"] = []
for input in self.get_takes():
if(type(input)==str):
name_input = input
else:
name_input = input.get_code()
dico_input = get_dico_from_tab_from_id(dico, name_input)
if(dico_input==None):
dico_input = {"@id":name_input, "@type": "FormalParameter"}
dico["@graph"].append(dico_input)
dico_sub["input"].append({"@id":dico_input["@id"]})
#ADD OUTPUTS
dico_sub["output"] = []
for output in self.get_emit():
if(type(output)==str):
name_output = output
else:
name_output = output.get_code()
dico_output = get_dico_from_tab_from_id(dico, name_output)
if(dico_output==None):
dico_output = {"@id":name_output, "@type": "FormalParameter"}
dico["@graph"].append(dico_output)
dico_sub["output"].append({"@id":dico_output["@id"]})
dico_sub["isPartOf"] = [{"@id": parent_key}]
dico_sub["hasPart"] = []
called = []
for exe in self.get_executors():
if(exe.get_type()=="Call"):
called+=exe.get_elements_called()
else:
for o in exe.get_origins():
if(o.get_type()=="Call"):
print("works")
called+=o.get_elements_called()
for c in called:
if(c==self):
raise Exception("This shoudn't happen!")
c.add_2_rocrate(dico, sub_key)
dico_sub["hasPart"].append({"@id":c.get_rocrate_key(dico)})
dico["@graph"].append(dico_sub)
else:
if(not check_if_element_in_tab_rocrate(dico_sub["isPartOf"], parent_key)):
dico_sub["isPartOf"].append({"@id":parent_key})
......
......@@ -63,6 +63,9 @@ class Workflow:
def get_file_address(self):
return self.nextflow_file.get_file_address()
def add_2_rocrate(self, dico):
self.nextflow_file.add_2_rocrate(dico)
def initialise_rocrate(self):
self.rocrate = RO_Crate(self)
self.rocrate.initialise()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment