Skip to content
Snippets Groups Projects
Commit 49b2c841 authored by Thomas Bär's avatar Thomas Bär
Browse files

Dinge der KI

parent 35cf2007
No related branches found
No related tags found
No related merge requests found
......@@ -3,17 +3,37 @@ import re
from urllib.parse import parse_qs, urlencode
import pandas as pd
import concurrent.futures
import threading
import pyarrow as pa
import pyarrow.parquet as pq
def clean_query(query_string):
"""
Bereinigt einen Query-String, indem bestimmte Parameter entfernt werden.
Args:
query_string (str): Der zu bereinigende Query-String.
Returns:
str: Der bereinigte Query-String.
"""
params = parse_qs(query_string)
# Entferne spezifische Parameter, die für den Lasttest nicht relevant sind
for param in ["shard", "shard.url", "isShard", "NOW", "wt"]:
params.pop(param, None)
return urlencode(params, doseq=True)
def process_file(filename):
"""
Verarbeitet eine einzelne Log-Datei und extrahiert die Solr-Abfrage-URLs.
Args:
filename (str): Der Pfad zur zu verarbeitenden Log-Datei.
Returns:
pandas.DataFrame: Ein DataFrame mit den extrahierten URLs.
"""
urls = []
with open(filename, "r") as file:
for line in file:
......@@ -23,39 +43,54 @@ def process_file(filename):
query_string = params_match.group(1)
clean_query_string = clean_query(query_string)
urls.append(f"/select?{clean_query_string}")
return urls
return pd.DataFrame(urls, columns=["url"])
def extract_urls_from_logs():
"""
Extrahiert URLs aus allen Solr-Log-Dateien im 'logs/' Verzeichnis und
speichert sie in einer Parquet-Datei mit Snappy-Kompression.
Returns:
int: Die Gesamtanzahl der extrahierten URLs.
"""
log_dir = "logs/"
log_pattern = re.compile(r"solr\.log\.\d+")
all_urls = []
output_file = "solr_queries.parquet"
# Lösche die vorhandene Parquet-Datei, falls sie existiert
if os.path.exists(output_file):
os.remove(output_file)
print(f"Vorhandene Datei {output_file} wurde gelöscht.")
files_to_process = [
os.path.join(log_dir, f) for f in os.listdir(log_dir) if log_pattern.match(f)
]
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
# Erstelle einen PyArrow Table-Writer mit Snappy-Kompression
schema = pa.schema([("url", pa.string())])
writer = pq.ParquetWriter(output_file, schema, compression="snappy")
total_urls = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to_file = {
executor.submit(process_file, file): file for file in files_to_process
}
for future in concurrent.futures.as_completed(future_to_file):
file = future_to_file[future]
try:
urls = future.result()
all_urls.extend(urls)
df = future.result()
total_urls += len(df)
table = pa.Table.from_pandas(df)
writer.write_table(table)
except Exception as exc:
print(f"{file} generated an exception: {exc}")
return all_urls
def save_to_parquet(urls):
df = pd.DataFrame(urls, columns=["url"])
df.to_parquet("solr_queries.parquet", engine="pyarrow")
writer.close()
return total_urls
if __name__ == "__main__":
urls = extract_urls_from_logs()
save_to_parquet(urls)
print(f"{len(urls)} URLs wurden extrahiert und in das Parquet-File gespeichert.")
# Führe die Extraktion aus und gib die Gesamtanzahl der extrahierten URLs aus
total_urls = extract_urls_from_logs()
print(f"{total_urls} URLs wurden extrahiert und in das Parquet-File gespeichert.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment