diff --git a/extract_logs.py b/extract_logs.py index 96d2bf892de78df34119466ffac49738ba9795fb..2e5b06fe30047b5a391d9ff5b6cd4685924d1bf9 100644 --- a/extract_logs.py +++ b/extract_logs.py @@ -3,17 +3,37 @@ import re from urllib.parse import parse_qs, urlencode import pandas as pd import concurrent.futures -import threading +import pyarrow as pa +import pyarrow.parquet as pq def clean_query(query_string): + """ + Bereinigt einen Query-String, indem bestimmte Parameter entfernt werden. + + Args: + query_string (str): Der zu bereinigende Query-String. + + Returns: + str: Der bereinigte Query-String. + """ params = parse_qs(query_string) + # Entferne spezifische Parameter, die für den Lasttest nicht relevant sind for param in ["shard", "shard.url", "isShard", "NOW", "wt"]: params.pop(param, None) return urlencode(params, doseq=True) def process_file(filename): + """ + Verarbeitet eine einzelne Log-Datei und extrahiert die Solr-Abfrage-URLs. + + Args: + filename (str): Der Pfad zur zu verarbeitenden Log-Datei. + + Returns: + pandas.DataFrame: Ein DataFrame mit den extrahierten URLs. + """ urls = [] with open(filename, "r") as file: for line in file: @@ -23,39 +43,54 @@ def process_file(filename): query_string = params_match.group(1) clean_query_string = clean_query(query_string) urls.append(f"/select?{clean_query_string}") - return urls + return pd.DataFrame(urls, columns=["url"]) def extract_urls_from_logs(): + """ + Extrahiert URLs aus allen Solr-Log-Dateien im 'logs/' Verzeichnis und + speichert sie in einer Parquet-Datei mit Snappy-Kompression. + + Returns: + int: Die Gesamtanzahl der extrahierten URLs. + """ log_dir = "logs/" log_pattern = re.compile(r"solr\.log\.\d+") - all_urls = [] + output_file = "solr_queries.parquet" + + # Lösche die vorhandene Parquet-Datei, falls sie existiert + if os.path.exists(output_file): + os.remove(output_file) + print(f"Vorhandene Datei {output_file} wurde gelöscht.") files_to_process = [ os.path.join(log_dir, f) for f in os.listdir(log_dir) if log_pattern.match(f) ] - with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: + # Erstelle einen PyArrow Table-Writer mit Snappy-Kompression + schema = pa.schema([("url", pa.string())]) + writer = pq.ParquetWriter(output_file, schema, compression="snappy") + + total_urls = 0 + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: future_to_file = { executor.submit(process_file, file): file for file in files_to_process } for future in concurrent.futures.as_completed(future_to_file): file = future_to_file[future] try: - urls = future.result() - all_urls.extend(urls) + df = future.result() + total_urls += len(df) + table = pa.Table.from_pandas(df) + writer.write_table(table) except Exception as exc: print(f"{file} generated an exception: {exc}") - return all_urls - - -def save_to_parquet(urls): - df = pd.DataFrame(urls, columns=["url"]) - df.to_parquet("solr_queries.parquet", engine="pyarrow") + writer.close() + return total_urls if __name__ == "__main__": - urls = extract_urls_from_logs() - save_to_parquet(urls) - print(f"{len(urls)} URLs wurden extrahiert und in das Parquet-File gespeichert.") + # Führe die Extraktion aus und gib die Gesamtanzahl der extrahierten URLs aus + total_urls = extract_urls_from_logs() + print(f"{total_urls} URLs wurden extrahiert und in das Parquet-File gespeichert.")