diff --git a/trainer/utils/wget_parser.py b/trainer/utils/wget_parser.py new file mode 100644 index 0000000000000000000000000000000000000000..3ffbcffda1c5e9176cf1e5219b8c1e0ed1574055 --- /dev/null +++ b/trainer/utils/wget_parser.py @@ -0,0 +1,820 @@ +import argparse +import json +import os + +import xxhash +import tqdm +import logging +import networkx as nx +from tqdm import tqdm +import time +import datetime + + +valid_node_type = ['file', 'process_memory', 'task', 'mmaped_file', 'path', 'socket', 'address', 'link'] +CONSOLE_ARGUMENTS = None + + +def hashgen(l): + """Generate a single hash value from a list. @l is a list of + string values, which can be properties of a node/edge. This + function returns a single hashed integer value.""" + hasher = xxhash.xxh64() + for e in l: + hasher.update(e) + return hasher.intdigest() + + +def parse_nodes(json_string, node_map): + """Parse a CamFlow JSON string that may contain nodes ("activity" or "entity"). + Parsed nodes populate @node_map, which is a dictionary that maps the node's UID, + which is assigned by CamFlow to uniquely identify a node object, to a hashed + value (in str) which represents the 'type' of the node. """ + json_object = None + try: + # use "ignore" if non-decodeable exists in the @json_string + json_object = json.loads(json_string) + except Exception as e: + print("Exception ({}) occurred when parsing a node in JSON:".format(e)) + print(json_string) + exit(1) + if "activity" in json_object: + activity = json_object["activity"] + for uid in activity: + if not uid in node_map: # only parse unseen nodes + if "prov:type" not in activity[uid]: + # a node must have a type. + # record this issue if logging is turned on + if CONSOLE_ARGUMENTS.verbose: + logging.debug("skipping a problematic activity node with no 'prov:type': {}".format(uid)) + else: + node_map[uid] = activity[uid]["prov:type"] + + if "entity" in json_object: + entity = json_object["entity"] + for uid in entity: + if not uid in node_map: + if "prov:type" not in entity[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("skipping a problematic entity node with no 'prov:type': {}".format(uid)) + else: + node_map[uid] = entity[uid]["prov:type"] + + +def parse_all_nodes(filename, node_map): + """Parse all nodes in CamFlow data. @filename is the file path of + the CamFlow data to parse. @node_map contains the mappings of all + CamFlow nodes to their hashed attributes. """ + description = '\x1b[6;30;42m[STATUS]\x1b[0m Parsing nodes in CamFlow data from {}'.format(filename) + pb = tqdm(desc=description, mininterval=1.0, unit=" recs") + with open(filename, 'r') as f: + # each line in CamFlow data could contain multiple + # provenance nodes, we call @parse_nodes routine. + for line in f: + pb.update() # for progress tracking + parse_nodes(line, node_map) + f.close() + pb.close() + + +def parse_all_edges(inputfile, outputfile, node_map, noencode): + """Parse all edges (including their timestamp) from CamFlow data file @inputfile + to an @outputfile. Before this function is called, parse_all_nodes should be called + to populate the @node_map for all nodes in the CamFlow file. If @noencode is set, + we do not hash the nodes' original UUIDs generated by CamFlow to integers. This + function returns the total number of valid edge parsed from CamFlow dataset. + + The output edgelist has the following format for each line, if -s is not set: + <source_node_id> \t <destination_node_id> \t <hashed_source_type>:<hashed_destination_type>:<hashed_edge_type>:<edge_logical_timestamp> + If -s is set, each line would look like: + <source_node_id> \t <destination_node_id> \t <hashed_source_type>:<hashed_destination_type>:<hashed_edge_type>:<edge_logical_timestamp>:<timestamp_stats>""" + total_edges = 0 + smallest_timestamp = None + # scan through the entire file to find the smallest timestamp from all the edges. + # this step is only needed if we need to add some statistical information. + if CONSOLE_ARGUMENTS.stats: + description = '\x1b[6;30;42m[STATUS]\x1b[0m Scanning edges in CamFlow data from {}'.format(inputfile) + pb = tqdm(desc=description, mininterval=1.0, unit=" recs") + with open(inputfile, 'r') as f: + for line in f: + pb.update() + json_object = json.loads(line) + + if "used" in json_object: + used = json_object["used"] + for uid in used: + if "prov:type" not in used[uid]: + continue + if "cf:date" not in used[uid]: + continue + if "prov:entity" not in used[uid]: + continue + if "prov:activity" not in used[uid]: + continue + srcUUID = used[uid]["prov:entity"] + dstUUID = used[uid]["prov:activity"] + if srcUUID not in node_map: + continue + if dstUUID not in node_map: + continue + timestamp_str = used[uid]["cf:date"] + ts = time.mktime(datetime.datetime.strptime(timestamp_str, "%Y:%m:%dT%H:%M:%S").timetuple()) + if smallest_timestamp == None or ts < smallest_timestamp: + smallest_timestamp = ts + + if "wasGeneratedBy" in json_object: + wasGeneratedBy = json_object["wasGeneratedBy"] + for uid in wasGeneratedBy: + if "prov:type" not in wasGeneratedBy[uid]: + continue + if "cf:date" not in wasGeneratedBy[uid]: + continue + if "prov:entity" not in wasGeneratedBy[uid]: + continue + if "prov:activity" not in wasGeneratedBy[uid]: + continue + srcUUID = wasGeneratedBy[uid]["prov:activity"] + dstUUID = wasGeneratedBy[uid]["prov:entity"] + if srcUUID not in node_map: + continue + if dstUUID not in node_map: + continue + timestamp_str = wasGeneratedBy[uid]["cf:date"] + ts = time.mktime(datetime.datetime.strptime(timestamp_str, "%Y:%m:%dT%H:%M:%S").timetuple()) + if smallest_timestamp == None or ts < smallest_timestamp: + smallest_timestamp = ts + + if "wasInformedBy" in json_object: + wasInformedBy = json_object["wasInformedBy"] + for uid in wasInformedBy: + if "prov:type" not in wasInformedBy[uid]: + continue + if "cf:date" not in wasInformedBy[uid]: + continue + if "prov:informant" not in wasInformedBy[uid]: + continue + if "prov:informed" not in wasInformedBy[uid]: + continue + srcUUID = wasInformedBy[uid]["prov:informant"] + dstUUID = wasInformedBy[uid]["prov:informed"] + if srcUUID not in node_map: + continue + if dstUUID not in node_map: + continue + timestamp_str = wasInformedBy[uid]["cf:date"] + ts = time.mktime(datetime.datetime.strptime(timestamp_str, "%Y:%m:%dT%H:%M:%S").timetuple()) + if smallest_timestamp == None or ts < smallest_timestamp: + smallest_timestamp = ts + + if "wasDerivedFrom" in json_object: + wasDerivedFrom = json_object["wasDerivedFrom"] + for uid in wasDerivedFrom: + if "prov:type" not in wasDerivedFrom[uid]: + continue + if "cf:date" not in wasDerivedFrom[uid]: + continue + if "prov:usedEntity" not in wasDerivedFrom[uid]: + continue + if "prov:generatedEntity" not in wasDerivedFrom[uid]: + continue + srcUUID = wasDerivedFrom[uid]["prov:usedEntity"] + dstUUID = wasDerivedFrom[uid]["prov:generatedEntity"] + if srcUUID not in node_map: + continue + if dstUUID not in node_map: + continue + timestamp_str = wasDerivedFrom[uid]["cf:date"] + ts = time.mktime(datetime.datetime.strptime(timestamp_str, "%Y:%m:%dT%H:%M:%S").timetuple()) + if smallest_timestamp == None or ts < smallest_timestamp: + smallest_timestamp = ts + + if "wasAssociatedWith" in json_object: + wasAssociatedWith = json_object["wasAssociatedWith"] + for uid in wasAssociatedWith: + if "prov:type" not in wasAssociatedWith[uid]: + continue + if "cf:date" not in wasAssociatedWith[uid]: + continue + if "prov:agent" not in wasAssociatedWith[uid]: + continue + if "prov:activity" not in wasAssociatedWith[uid]: + continue + srcUUID = wasAssociatedWith[uid]["prov:agent"] + dstUUID = wasAssociatedWith[uid]["prov:activity"] + if srcUUID not in node_map: + continue + if dstUUID not in node_map: + continue + timestamp_str = wasAssociatedWith[uid]["cf:date"] + ts = time.mktime(datetime.datetime.strptime(timestamp_str, "%Y:%m:%dT%H:%M:%S").timetuple()) + if smallest_timestamp == None or ts < smallest_timestamp: + smallest_timestamp = ts + f.close() + pb.close() + + # we will go through the CamFlow data (again) and output edgelist to a file + output = open(outputfile, "w+") + description = '\x1b[6;30;42m[STATUS]\x1b[0m Parsing edges in CamFlow data from {}'.format(inputfile) + pb = tqdm(desc=description, mininterval=1.0, unit=" recs") + with open(inputfile, 'r') as f: + for line in f: + pb.update() + json_object = json.loads(line) + + if "used" in json_object: + used = json_object["used"] + for uid in used: + if "prov:type" not in used[uid]: + # an edge must have a type; if not, + # we will have to skip the edge. Log + # this issue if verbose is set. + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (used) record without type: {}".format(uid)) + continue + else: + edgetype = "used" + # cf:id is used as logical timestamp to order edges + if "cf:id" not in used[uid]: + # an edge must have a logical timestamp; + # if not we will have to skip the edge. + # Log this issue if verbose is set. + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (used) record without logical timestamp: {}".format(uid)) + continue + else: + timestamp = used[uid]["cf:id"] + if "prov:entity" not in used[uid]: + # an edge's source node must exist; + # if not, we will have to skip the + # edge. Log this issue if verbose is set. + if CONSOLE_ARGUMENTS.verbose: + logging.debug( + "edge (used/{}) record without source UUID: {}".format(used[uid]["prov:type"], uid)) + continue + if "prov:activity" not in used[uid]: + # an edge's destination node must exist; + # if not, we will have to skip the edge. + # Log this issue if verbose is set. + if CONSOLE_ARGUMENTS.verbose: + logging.debug( + "edge (used/{}) record without destination UUID: {}".format(used[uid]["prov:type"], + uid)) + continue + srcUUID = used[uid]["prov:entity"] + dstUUID = used[uid]["prov:activity"] + # both source and destination node must + # exist in @node_map; if not, we will + # have to skip the edge. Log this issue + # if verbose is set. + if srcUUID not in node_map: + if CONSOLE_ARGUMENTS.verbose: + logging.debug( + "edge (used/{}) record with an unseen srcUUID: {}".format(used[uid]["prov:type"], uid)) + continue + else: + srcVal = node_map[srcUUID] + if dstUUID not in node_map: + if CONSOLE_ARGUMENTS.verbose: + logging.debug( + "edge (used/{}) record with an unseen dstUUID: {}".format(used[uid]["prov:type"], uid)) + continue + else: + dstVal = node_map[dstUUID] + if "cf:date" not in used[uid]: + # an edge must have a timestamp; if + # not, we will have to skip the edge. + # Log this issue if verbose is set. + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (used) record without timestamp: {}".format(uid)) + continue + else: + # we only record @adjusted_ts if we need + # to record stats of CamFlow dataset. + if CONSOLE_ARGUMENTS.stats: + ts_str = used[uid]["cf:date"] + ts = time.mktime(datetime.datetime.strptime(ts_str, "%Y:%m:%dT%H:%M:%S").timetuple()) + adjusted_ts = ts - smallest_timestamp + if "cf:jiffies" not in used[uid]: + # an edge must have a jiffies timestamp; if + # not, we will have to skip the edge. + # Log this issue if verbose is set. + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (used) record without jiffies: {}".format(uid)) + continue + else: + # we only record @jiffies if + # the option is set + if CONSOLE_ARGUMENTS.jiffies: + jiffies = used[uid]["cf:jiffies"] + total_edges += 1 + if noencode: + if CONSOLE_ARGUMENTS.stats: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(srcUUID, dstUUID, srcVal, dstVal, edgetype, timestamp, adjusted_ts)) + elif CONSOLE_ARGUMENTS.jiffies: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(srcUUID, dstUUID, srcVal, dstVal, edgetype, timestamp, + jiffies)) + else: + output.write( + "{}\t{}\t{}:{}:{}:{}\n".format(srcUUID, dstUUID, srcVal, dstVal, edgetype, timestamp)) + else: + if CONSOLE_ARGUMENTS.stats: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(hashgen([srcUUID]), hashgen([dstUUID]), srcVal, dstVal, edgetype, timestamp, + adjusted_ts)) + elif CONSOLE_ARGUMENTS.jiffies: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(hashgen([srcUUID]), hashgen([dstUUID]), srcVal, dstVal, edgetype, timestamp, + jiffies)) + else: + output.write( + "{}\t{}\t{}:{}:{}:{}\n".format(hashgen([srcUUID]), hashgen([dstUUID]), srcVal, dstVal, edgetype, timestamp)) + + if "wasGeneratedBy" in json_object: + wasGeneratedBy = json_object["wasGeneratedBy"] + for uid in wasGeneratedBy: + if "prov:type" not in wasGeneratedBy[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasGeneratedBy) record without type: {}".format(uid)) + continue + else: + edgetype = "wasGeneratedBy" + if "cf:id" not in wasGeneratedBy[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasGeneratedBy) record without logical timestamp: {}".format(uid)) + continue + else: + timestamp = wasGeneratedBy[uid]["cf:id"] + if "prov:entity" not in wasGeneratedBy[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasGeneratedBy/{}) record without source UUID: {}".format( + wasGeneratedBy[uid]["prov:type"], uid)) + continue + if "prov:activity" not in wasGeneratedBy[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasGeneratedBy/{}) record without destination UUID: {}".format( + wasGeneratedBy[uid]["prov:type"], uid)) + continue + srcUUID = wasGeneratedBy[uid]["prov:activity"] + dstUUID = wasGeneratedBy[uid]["prov:entity"] + if srcUUID not in node_map: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasGeneratedBy/{}) record with an unseen srcUUID: {}".format( + wasGeneratedBy[uid]["prov:type"], uid)) + continue + else: + srcVal = node_map[srcUUID] + if dstUUID not in node_map: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasGeneratedBy/{}) record with an unsen dstUUID: {}".format( + wasGeneratedBy[uid]["prov:type"], uid)) + continue + else: + dstVal = node_map[dstUUID] + if "cf:date" not in wasGeneratedBy[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasGeneratedBy) record without timestamp: {}".format(uid)) + continue + else: + if CONSOLE_ARGUMENTS.stats: + ts_str = wasGeneratedBy[uid]["cf:date"] + ts = time.mktime(datetime.datetime.strptime(ts_str, "%Y:%m:%dT%H:%M:%S").timetuple()) + adjusted_ts = ts - smallest_timestamp + if "cf:jiffies" not in wasGeneratedBy[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasGeneratedBy) record without jiffies: {}".format(uid)) + continue + else: + if CONSOLE_ARGUMENTS.jiffies: + jiffies = wasGeneratedBy[uid]["cf:jiffies"] + total_edges += 1 + if noencode: + if CONSOLE_ARGUMENTS.stats: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(srcUUID, dstUUID, srcVal, dstVal, edgetype, timestamp, + adjusted_ts)) + elif CONSOLE_ARGUMENTS.jiffies: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(srcUUID, dstUUID, srcVal, dstVal, edgetype, timestamp, + jiffies)) + else: + output.write( + "{}\t{}\t{}:{}:{}:{}\n".format(srcUUID, dstUUID, srcVal, dstVal, edgetype, timestamp)) + else: + if CONSOLE_ARGUMENTS.stats: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(hashgen([srcUUID]), hashgen([dstUUID]), srcVal, + dstVal, edgetype, timestamp, + adjusted_ts)) + elif CONSOLE_ARGUMENTS.jiffies: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(hashgen([srcUUID]), hashgen([dstUUID]), srcVal, + dstVal, edgetype, timestamp, + jiffies)) + else: + output.write( + "{}\t{}\t{}:{}:{}:{}\n".format(hashgen([srcUUID]), hashgen([dstUUID]), srcVal, dstVal, + edgetype, timestamp)) + + if "wasInformedBy" in json_object: + wasInformedBy = json_object["wasInformedBy"] + for uid in wasInformedBy: + if "prov:type" not in wasInformedBy[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasInformedBy) record without type: {}".format(uid)) + continue + else: + edgetype = "wasInformedBy" + if "cf:id" not in wasInformedBy[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasInformedBy) record without logical timestamp: {}".format(uid)) + continue + else: + timestamp = wasInformedBy[uid]["cf:id"] + if "prov:informant" not in wasInformedBy[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasInformedBy/{}) record without source UUID: {}".format( + wasInformedBy[uid]["prov:type"], uid)) + continue + if "prov:informed" not in wasInformedBy[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasInformedBy/{}) record without destination UUID: {}".format( + wasInformedBy[uid]["prov:type"], uid)) + continue + srcUUID = wasInformedBy[uid]["prov:informant"] + dstUUID = wasInformedBy[uid]["prov:informed"] + if srcUUID not in node_map: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasInformedBy/{}) record with an unseen srcUUID: {}".format( + wasInformedBy[uid]["prov:type"], uid)) + continue + else: + srcVal = node_map[srcUUID] + if dstUUID not in node_map: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasInformedBy/{}) record with an unseen dstUUID: {}".format( + wasInformedBy[uid]["prov:type"], uid)) + continue + else: + dstVal = node_map[dstUUID] + if "cf:date" not in wasInformedBy[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasInformedBy) record without timestamp: {}".format(uid)) + continue + else: + if CONSOLE_ARGUMENTS.stats: + ts_str = wasInformedBy[uid]["cf:date"] + ts = time.mktime(datetime.datetime.strptime(ts_str, "%Y:%m:%dT%H:%M:%S").timetuple()) + adjusted_ts = ts - smallest_timestamp + if "cf:jiffies" not in wasInformedBy[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasInformedBy) record without jiffies: {}".format(uid)) + continue + else: + if CONSOLE_ARGUMENTS.jiffies: + jiffies = wasInformedBy[uid]["cf:jiffies"] + total_edges += 1 + if noencode: + if CONSOLE_ARGUMENTS.stats: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(srcUUID, dstUUID, srcVal, dstVal, edgetype, timestamp, + adjusted_ts)) + elif CONSOLE_ARGUMENTS.jiffies: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(srcUUID, dstUUID, srcVal, dstVal, edgetype, timestamp, + jiffies)) + else: + output.write( + "{}\t{}\t{}:{}:{}:{}\n".format(srcUUID, dstUUID, srcVal, dstVal, edgetype, timestamp)) + else: + if CONSOLE_ARGUMENTS.stats: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(hashgen([srcUUID]), hashgen([dstUUID]), srcVal, + dstVal, edgetype, timestamp, + adjusted_ts)) + elif CONSOLE_ARGUMENTS.jiffies: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(hashgen([srcUUID]), hashgen([dstUUID]), srcVal, + dstVal, edgetype, timestamp, + jiffies)) + else: + output.write( + "{}\t{}\t{}:{}:{}:{}\n".format(hashgen([srcUUID]), hashgen([dstUUID]), srcVal, dstVal, + edgetype, timestamp)) + + if "wasDerivedFrom" in json_object: + wasDerivedFrom = json_object["wasDerivedFrom"] + for uid in wasDerivedFrom: + if "prov:type" not in wasDerivedFrom[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasDerivedFrom) record without type: {}".format(uid)) + continue + else: + edgetype = "wasDerivedFrom" + if "cf:id" not in wasDerivedFrom[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasDerivedFrom) record without logical timestamp: {}".format(uid)) + continue + else: + timestamp = wasDerivedFrom[uid]["cf:id"] + if "prov:usedEntity" not in wasDerivedFrom[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasDerivedFrom/{}) record without source UUID: {}".format( + wasDerivedFrom[uid]["prov:type"], uid)) + continue + if "prov:generatedEntity" not in wasDerivedFrom[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasDerivedFrom/{}) record without destination UUID: {}".format( + wasDerivedFrom[uid]["prov:type"], uid)) + continue + srcUUID = wasDerivedFrom[uid]["prov:usedEntity"] + dstUUID = wasDerivedFrom[uid]["prov:generatedEntity"] + if srcUUID not in node_map: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasDerivedFrom/{}) record with an unseen srcUUID: {}".format( + wasDerivedFrom[uid]["prov:type"], uid)) + continue + else: + srcVal = node_map[srcUUID] + if dstUUID not in node_map: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasDerivedFrom/{}) record with an unseen dstUUID: {}".format( + wasDerivedFrom[uid]["prov:type"], uid)) + continue + else: + dstVal = node_map[dstUUID] + if "cf:date" not in wasDerivedFrom[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasDerivedFrom) record without timestamp: {}".format(uid)) + continue + else: + if CONSOLE_ARGUMENTS.stats: + ts_str = wasDerivedFrom[uid]["cf:date"] + ts = time.mktime(datetime.datetime.strptime(ts_str, "%Y:%m:%dT%H:%M:%S").timetuple()) + adjusted_ts = ts - smallest_timestamp + if "cf:jiffies" not in wasDerivedFrom[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasDerivedFrom) record without jiffies: {}".format(uid)) + continue + else: + if CONSOLE_ARGUMENTS.jiffies: + jiffies = wasDerivedFrom[uid]["cf:jiffies"] + total_edges += 1 + if noencode: + if CONSOLE_ARGUMENTS.stats: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(srcUUID, dstUUID, srcVal, dstVal, edgetype, timestamp, + adjusted_ts)) + elif CONSOLE_ARGUMENTS.jiffies: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(srcUUID, dstUUID, srcVal, dstVal, edgetype, timestamp, + jiffies)) + else: + output.write( + "{}\t{}\t{}:{}:{}:{}\n".format(srcUUID, dstUUID, srcVal, dstVal, edgetype, timestamp)) + else: + if CONSOLE_ARGUMENTS.stats: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(hashgen([srcUUID]), hashgen([dstUUID]), srcVal, + dstVal, edgetype, timestamp, + adjusted_ts)) + elif CONSOLE_ARGUMENTS.jiffies: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(hashgen([srcUUID]), hashgen([dstUUID]), srcVal, + dstVal, edgetype, timestamp, + jiffies)) + else: + output.write( + "{}\t{}\t{}:{}:{}:{}\n".format(hashgen([srcUUID]), hashgen([dstUUID]), srcVal, dstVal, + edgetype, timestamp)) + + if "wasAssociatedWith" in json_object: + wasAssociatedWith = json_object["wasAssociatedWith"] + for uid in wasAssociatedWith: + if "prov:type" not in wasAssociatedWith[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasAssociatedWith) record without type: {}".format(uid)) + continue + else: + edgetype = "wasAssociatedWith" + if "cf:id" not in wasAssociatedWith[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasAssociatedWith) record without logical timestamp: {}".format(uid)) + continue + else: + timestamp = wasAssociatedWith[uid]["cf:id"] + if "prov:agent" not in wasAssociatedWith[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasAssociatedWith/{}) record without source UUID: {}".format( + wasAssociatedWith[uid]["prov:type"], uid)) + continue + if "prov:activity" not in wasAssociatedWith[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasAssociatedWith/{}) record without destination UUID: {}".format( + wasAssociatedWith[uid]["prov:type"], uid)) + continue + srcUUID = wasAssociatedWith[uid]["prov:agent"] + dstUUID = wasAssociatedWith[uid]["prov:activity"] + if srcUUID not in node_map: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasAssociatedWith/{}) record with an unseen srcUUID: {}".format( + wasAssociatedWith[uid]["prov:type"], uid)) + continue + else: + srcVal = node_map[srcUUID] + if dstUUID not in node_map: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasAssociatedWith/{}) record with an unseen dstUUID: {}".format( + wasAssociatedWith[uid]["prov:type"], uid)) + continue + else: + dstVal = node_map[dstUUID] + if "cf:date" not in wasAssociatedWith[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasAssociatedWith) record without timestamp: {}".format(uid)) + continue + else: + if CONSOLE_ARGUMENTS.stats: + ts_str = wasAssociatedWith[uid]["cf:date"] + ts = time.mktime(datetime.datetime.strptime(ts_str, "%Y:%m:%dT%H:%M:%S").timetuple()) + adjusted_ts = ts - smallest_timestamp + if "cf:jiffies" not in wasAssociatedWith[uid]: + if CONSOLE_ARGUMENTS.verbose: + logging.debug("edge (wasAssociatedWith) record without jiffies: {}".format(uid)) + continue + else: + if CONSOLE_ARGUMENTS.jiffies: + jiffies = wasAssociatedWith[uid]["cf:jiffies"] + total_edges += 1 + if noencode: + if CONSOLE_ARGUMENTS.stats: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(srcUUID, dstUUID, srcVal, dstVal, edgetype, timestamp, + adjusted_ts)) + elif CONSOLE_ARGUMENTS.jiffies: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(srcUUID, dstUUID, srcVal, dstVal, edgetype, timestamp, + jiffies)) + else: + output.write( + "{}\t{}\t{}:{}:{}:{}\n".format(srcUUID, dstUUID, srcVal, dstVal, edgetype, timestamp)) + else: + if CONSOLE_ARGUMENTS.stats: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(hashgen([srcUUID]), hashgen([dstUUID]), srcVal, + dstVal, edgetype, timestamp, + adjusted_ts)) + elif CONSOLE_ARGUMENTS.jiffies: + output.write( + "{}\t{}\t{}:{}:{}:{}:{}\n".format(hashgen([srcUUID]), hashgen([dstUUID]), srcVal, + dstVal, edgetype, timestamp, + jiffies)) + else: + output.write( + "{}\t{}\t{}:{}:{}:{}\n".format(hashgen([srcUUID]), hashgen([dstUUID]), srcVal, dstVal, + edgetype, timestamp)) + f.close() + output.close() + pb.close() + return total_edges + +def read_single_graph(file_name, threshold): + graph = [] + edge_cnt = 0 + with open(file_name, 'r') as f: + for line in f: + try: + edge = line.strip().split("\t") + new_edge = [edge[0], edge[1]] + attributes = edge[2].strip().split(":") + source_node_type = attributes[0] + destination_node_type = attributes[1] + edge_type = attributes[2] + edge_order = attributes[3] + + new_edge.append(source_node_type) + new_edge.append(destination_node_type) + new_edge.append(edge_type) + new_edge.append(edge_order) + graph.append(new_edge) + edge_cnt += 1 + except: + print("{}".format(line)) + f.close() + graph.sort(key=lambda e: e[5]) + if len(graph) <= threshold: + return graph + else: + return graph[:threshold] + + +def process_graph(name, threshold): + graph = read_single_graph(name, threshold) + result_graph = nx.DiGraph() + cnt = 0 + for num, edge in enumerate(graph): + cnt += 1 + src, dst, src_type, dst_type, edge_type = edge[:5] + if True:# src_type in valid_node_type and dst_type in valid_node_type: + if not result_graph.has_node(src): + result_graph.add_node(src, type=src_type) + if not result_graph.has_node(dst): + result_graph.add_node(dst, type=dst_type) + if not result_graph.has_edge(src, dst): + result_graph.add_edge(src, dst, type=edge_type) + if bidirection: + result_graph.add_edge(dst, src, type='reverse_{}'.format(edge_type)) + return cnt, result_graph + + +node_type_list = [] +edge_type_list = [] +node_type_dict = {} +edge_type_dict = {} + + +def format_graph(g, name): + new_g = nx.DiGraph() + node_map = {} + node_cnt = 0 + for n in g.nodes: + node_map[n] = node_cnt + new_g.add_node(node_cnt, type=g.nodes[n]['type']) + node_cnt += 1 + for e in g.edges: + new_g.add_edge(node_map[e[0]], node_map[e[1]], type=g.edges[e]['type']) + for n in new_g.nodes: + node_type = new_g.nodes[n]['type'] + if not node_type in node_type_dict: + node_type_list.append(node_type) + node_type_dict[node_type] = 1 + else: + node_type_dict[node_type] += 1 + for e in new_g.edges: + edge_type = new_g.edges[e]['type'] + if not edge_type in edge_type_dict: + edge_type_list.append(edge_type) + edge_type_dict[edge_type] = 1 + else: + edge_type_dict[edge_type] += 1 + for n in new_g.nodes: + new_g.nodes[n]['type'] = node_type_list.index(new_g.nodes[n]['type']) + for e in new_g.edges: + new_g.edges[e]['type'] = edge_type_list.index(new_g.edges[e]['type']) + with open('{}.json'.format(name), 'w', encoding='utf-8') as f: + json.dump(nx.node_link_data(new_g), f) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Convert CamFlow JSON to Unicorn edgelist') + args = parser.parse_args() + args.stats = False + args.verbose = False + args.jiffies = False + args.input = '../data/wget/raw/' + args.output = '../data/wget/processed/' + args.final_output = '../data/wget/final/' + args.noencode = False + if not os.path.exists(args.input): + os.mkdir(args.input) + if not os.path.exists(args.output): + os.mkdir(args.output) + if not os.path.exists(args.final_output): + os.mkdir(args.final_output) + CONSOLE_ARGUMENTS = args + + if args.verbose: + logging.basicConfig(filename=args.log, level=logging.DEBUG) + cnt = 0 + for fname in os.listdir(args.input): + cnt += 1 + node_map = dict() + parse_all_nodes(args.input + '/{}'.format(fname), node_map) + total_edges = parse_all_edges(args.input + '/{}'.format(fname), args.output + '/{}.log'.format(cnt), node_map, + args.noencode) + if args.stats: + total_nodes = len(node_map) + stats = open(args.stats_file + '/{}.log'.format(cnt), "a+") + stats.write("{},{},{}\n".format(args.input + '/{}'.format(fname), total_nodes, total_edges)) + + bidirection = False + threshold = 10000000 # infinity + interaction_dict = [] + graph_cnt = 0 + result_graphs = [] + input = args.output + base = args.final_output + + line_cnt = 0 + for i in tqdm(range(cnt)): + single_cnt, result_graph = process_graph('{}{}.log'.format(input, i + 1), threshold) + format_graph(result_graph, '{}{}'.format(base, i)) + line_cnt += single_cnt + + print(line_cnt // 150) + print(len(node_type_list)) + print(node_type_dict) + print(len(edge_type_list)) + print(edge_type_dict) +