From 3a6f1bc4b9f8b3fd83fa70473df36c87543f974e Mon Sep 17 00:00:00 2001 From: Joel Fernandes Date: Sat, 8 Jul 2017 13:38:55 -0700 Subject: [PATCH 1/3] trappy/utils: merge_dfs for primary and secondary DFs based on pivot Forward propogate secondary DF into primary DF and return the merged DF. Implements: https://github.com/ARM-software/trappy/issues/250 Change-Id: I312d77302bbca8bb13bfa598785ebc0cc879fe34 Signed-off-by: Joel Fernandes --- trappy/utils.py | 107 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/trappy/utils.py b/trappy/utils.py index eb73752..bab4895 100644 --- a/trappy/utils.py +++ b/trappy/utils.py @@ -13,6 +13,9 @@ # limitations under the License. # +import pandas as pd +import numpy as np + """Generic functions that can be used in multiple places in trappy """ @@ -102,3 +105,107 @@ def handle_duplicate_index(data, dup_index_left += 1 return data.reindex(new_index) + +# Iterate fast over all rows in a data frame and apply fn +def apply_callback(df, fn, *kwargs): + """ + A generic API to apply a function onto a data frame and optionally pass + it an argument (kwargs). This is faster than `DataFrame.apply` method. + + :param df: DataFrame to apply fn onto + :type df: :mod:`pandas.DataFrame` + + :param fn: Function that is applied onto the DataFrame + :type fn: function + + :param kwargs: Optional argument to pass to the callback + :type kwargs: dict + """ + iters = df.itertuples() + event_tuple = iters.next() + + # Column names beginning with underscore will not be preserved in tuples + # due to constraints on namedtuple field names, so store mappings from + # column name to column number for each trace event. + col_idxs = { name: idx for idx, name in enumerate(['Time'] + df.columns.tolist()) } + + while True: + if not event_tuple: + break + event_dict = { col: event_tuple[idx] for col, idx in col_idxs.iteritems() } + + if kwargs: + fn(event_dict, kwargs) + else: + fn(event_dict) + + event_tuple = next(iters, None) + +def merge_dfs(pr_df, sec_df, pivot): + """ + Merge information from secondary DF into a primary DF. During the merge + take into account the latest secondary rows based on time stamp. A pivot + is used to differentiate between rows. The returned DF has same number of + rows as the primary DF. + + For example, Say the following is a primary DF: + cpu util_avg __line + Time + 2943.184105 7 825 1 + 2943.184106 6 292 2 + 2943.184108 7 850 4 + 2943.184110 6 315 6 + + And the following is a secondary DF: + cpu cpu_scale_factor __line + Time + 2943.184105 7 1 0 + 2943.184107 6 2 3 + 2943.184109 7 3 5 + + And, pivot = 'cpu'. Then, the returned DF will look like: + __line cpu cpu_scale_factor util_avg + Time + 2943.184105 1 7 1.0 825.0 + 2943.184106 2 6 NaN 292.0 + 2943.184108 4 7 1.0 850.0 + 2943.184110 6 6 2.0 315.0 + + :param pr_df: Primary data frame to merge into. + :type pr_df: :mod:`pandas.DataFrame` + + :param sec_df: Secondary data frame as source of data. + :type sec_df: :mod:`pandas.DataFrame` + """ + + # Keep track of last secondary event + pivot_map = {} + + # An array accumating dicts with merged data + merged_data = [] + def df_fn(data): + # Store the latest secondary info + if data['Time'][0] == 'secondary': + pivot_map[data[pivot]] = data + # Get rid of primary/secondary labels + data['Time'] = data['Time'][1] + return + + # Propogate latest secondary info + for key, value in data.iteritems(): + if key == pivot: + continue + # Fast check for if value is nan (faster than np.isnan + try/except) + if value != value and pivot_map.has_key(data[pivot]): + data[key] = pivot_map[data[pivot]][key] + + # Get rid of primary/secondary labels + data['Time'] = data['Time'][1] + merged_data.append(data) + + df = pd.concat([pr_df, sec_df], keys=['primary', 'secondary']).sort_values(by='__line') + apply_callback(df, df_fn) + merged_df = pd.DataFrame.from_dict(merged_data) + merged_df.set_index('Time', inplace=True) + + return merged_df -- GitLab From 8537f7234403ff7f2e987faef98501c366dabce6 Mon Sep 17 00:00:00 2001 From: Joel Fernandes Date: Sun, 16 Jul 2017 17:01:30 -0700 Subject: [PATCH 2/3] trappy/utils: Add test case for merge_dfs Signed-off-by: Joel Fernandes --- tests/test_base.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/test_base.py b/tests/test_base.py index a0a4920..0c8b865 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -19,6 +19,8 @@ import sys import unittest import utils_tests import trappy +import tempfile +import numpy as np from trappy.base import trace_parser_explode_array sys.path.append(os.path.join(utils_tests.TESTS_DIRECTORY, "..", "trappy")) @@ -238,3 +240,27 @@ class TestBase(utils_tests.SetupDirectory): self.assertListEqual(df["my_field"].tolist(), ["foo", "foo=bar", "foo=bar=baz", 1, "1=2", "1=foo", "1foo=2"]) + + def test_merge_dfs(self): + trace_file = tempfile.mktemp(dir="/tmp", suffix=".txt") + lines = [ + " adbd-5709 [007] 2943.184105: sched_contrib_scale_f: cpu=7 cpu_scale_factor=1\n" + " adbd-5709 [007] 2943.184105: sched_load_avg_cpu: cpu=7 util_avg=825\n" + " ->transport-5713 [006] 2943.184106: sched_load_avg_cpu: cpu=6 util_avg=292\n" + " ->transport-5713 [006] 2943.184107: sched_contrib_scale_f: cpu=6 cpu_scale_factor=2\n" + " adbd-5709 [007] 2943.184108: sched_load_avg_cpu: cpu=7 util_avg=850\n" + " adbd-5709 [007] 2943.184109: sched_contrib_scale_f: cpu=7 cpu_scale_factor=3\n" + " adbd-5709 [007] 2943.184110: sched_load_avg_cpu: cpu=6 util_avg=315\n" + ] + with open(trace_file, 'w') as fh: + for line in lines: + fh.write(line) + + trace = trappy.ftrace.FTrace(trace_file, events=['sched_contrib_scale_f', 'sched_load_avg_cpu'], + normalize_time=False) + + df1 = trace.sched_load_avg_cpu.data_frame[['cpu', 'util_avg', '__line']] + df2 = trace.sched_contrib_scale_f.data_frame[['cpu', 'cpu_scale_factor', '__line']] + df3 = trappy.utils.merge_dfs(df1, df2, 'cpu') + cpu_scale_list = ["NaN" if np.isnan(x) else x for x in df3["cpu_scale_factor"].tolist()] + self.assertListEqual(cpu_scale_list, [1.0, "NaN", 1.0, 2.0]) -- GitLab From 56a812f6ad2593b7553972866ec09eb7d17018a9 Mon Sep 17 00:00:00 2001 From: Joel Fernandes Date: Sun, 16 Jul 2017 16:21:54 -0700 Subject: [PATCH 3/3] trappy/ftrace: allow apply_callbacks take optional arguments This allows passing of arguments to callback, needed for the residency analysis state machine in LISA. Also update docstring. Signed-off-by: Joel Fernandes --- trappy/ftrace.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/trappy/ftrace.py b/trappy/ftrace.py index 226cd96..88e78cc 100644 --- a/trappy/ftrace.py +++ b/trappy/ftrace.py @@ -413,7 +413,7 @@ is part of the trace. return ret - def apply_callbacks(self, fn_map): + def apply_callbacks(self, fn_map, *kwarg): """ Apply callback functions to trace events in chronological order. @@ -429,6 +429,12 @@ is part of the trace. "sched_switch": callback_fn1, "sched_wakeup": callback_fn2 }) + + :param fn_map: A dict of event to function mapping + :type fn_map: dict + + :param kwarg: Optional argument to pass to callbacks + :type kwarg: dict """ dfs = {event: getattr(self, event).data_frame for event in fn_map.keys()} events = [event for event in fn_map.keys() if not dfs[event].empty] @@ -455,7 +461,12 @@ is part of the trace. event_dict = { col: event_tuple[idx] for col, idx in col_idxs[event_name].iteritems() } - fn_map[event_name](event_dict) + + if kwarg: + fn_map[event_name](event_dict, kwarg) + else: + fn_map[event_name](event_dict) + event_row = next(iters[event_name], None) if event_row: next_rows[event_name] = event_row -- GitLab