Select Git revision
extract_logs.py
extract_logs.py 3.12 KiB
import os
import re
from urllib.parse import parse_qs, urlencode
import pandas as pd
import concurrent.futures
import pyarrow as pa
import pyarrow.parquet as pq
def clean_query(query_string):
"""
Bereinigt einen Query-String, indem bestimmte Parameter entfernt werden.
Args:
query_string (str): Der zu bereinigende Query-String.
Returns:
str: Der bereinigte Query-String.
"""
params = parse_qs(query_string)
# Entferne spezifische Parameter, die für den Lasttest nicht relevant sind
for param in ["shard", "shard.url", "isShard", "NOW", "wt"]:
params.pop(param, None)
return urlencode(params, doseq=True)
def process_file(filename):
"""
Verarbeitet eine einzelne Log-Datei und extrahiert die Solr-Abfrage-URLs.
Args:
filename (str): Der Pfad zur zu verarbeitenden Log-Datei.
Returns:
pandas.DataFrame: Ein DataFrame mit den extrahierten URLs.
"""
urls = []
with open(filename, "r") as file:
for line in file:
if "o.a.s.c.S.Request" in line and "params=" in line:
params_match = re.search(r"params=\{(.+?)\}", line)
if params_match:
query_string = params_match.group(1)
clean_query_string = clean_query(query_string)
urls.append(f"/select?{clean_query_string}")
return pd.DataFrame(urls, columns=["url"])
def extract_urls_from_logs():
"""
Extrahiert URLs aus allen Solr-Log-Dateien im 'logs/' Verzeichnis und
speichert sie in einer Parquet-Datei mit Snappy-Kompression.
Returns:
int: Die Gesamtanzahl der extrahierten URLs.
"""
log_dir = "logs/"
log_pattern = re.compile(r"solr\.log\.\d+")
output_file = "solr_queries.parquet"
# Lösche die vorhandene Parquet-Datei, falls sie existiert
if os.path.exists(output_file):
os.remove(output_file)
print(f"Vorhandene Datei {output_file} wurde gelöscht.")
files_to_process = [
os.path.join(log_dir, f) for f in os.listdir(log_dir) if log_pattern.match(f)
]
# Erstelle einen PyArrow Table-Writer mit Snappy-Kompression
schema = pa.schema([("url", pa.string())])
writer = pq.ParquetWriter(output_file, schema, compression="snappy")
total_urls = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to_file = {
executor.submit(process_file, file): file for file in files_to_process
}
for future in concurrent.futures.as_completed(future_to_file):
file = future_to_file[future]
try:
df = future.result()
total_urls += len(df)
table = pa.Table.from_pandas(df)
writer.write_table(table)
except Exception as exc:
print(f"{file} generated an exception: {exc}")
writer.close()
return total_urls
if __name__ == "__main__":
# Führe die Extraktion aus und gib die Gesamtanzahl der extrahierten URLs aus
total_urls = extract_urls_from_logs()
print(f"{total_urls} URLs wurden extrahiert und in das Parquet-File gespeichert.")