Skip to content
Snippets Groups Projects
Commit 67075607 authored by Andreas Romeyke's avatar Andreas Romeyke
Browse files

- added bloomfilter to speedup file checks if shared between multiple IEs, see...

- added bloomfilter to speedup file checks if shared between multiple IEs, see issue #1
parent 6ccdb02e
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
use strict; use strict;
use warnings; use warnings;
use utf8;
use feature qw(say); use feature qw(say);
use Carp; use Carp;
use Path::Tiny; use Path::Tiny;
...@@ -30,6 +31,7 @@ use Getopt::Long::Complete qw(GetOptionsWithCompletion); ...@@ -30,6 +31,7 @@ use Getopt::Long::Complete qw(GetOptionsWithCompletion);
use Digest::CRC; use Digest::CRC;
use Digest::MD5; use Digest::MD5;
use Digest::SHA; use Digest::SHA;
use Bloom::Filter;
use Pod::Usage; use Pod::Usage;
use IO::Handle; use IO::Handle;
use POSIX qw(strftime); use POSIX qw(strftime);
...@@ -39,6 +41,7 @@ STDOUT->autoflush(1); ...@@ -39,6 +41,7 @@ STDOUT->autoflush(1);
binmode(STDOUT, ":encoding(UTF-8)"); binmode(STDOUT, ":encoding(UTF-8)");
my @algorithms = qw(CRC32 MD5 SHA1 SHA256 SHA512); # used fixity algorithm names in Rosetta my @algorithms = qw(CRC32 MD5 SHA1 SHA256 SHA512); # used fixity algorithm names in Rosetta
my $capacity = 500; # bloomfilter capacity (n last seen files)
# scans a given dir for IEs and stores in temporary file # scans a given dir for IEs and stores in temporary file
# There are two parts: # There are two parts:
...@@ -293,6 +296,19 @@ sub check_file_fixities($$) { ...@@ -293,6 +296,19 @@ sub check_file_fixities($$) {
return $result; return $result;
} }
sub bloomfilter_to_unseen {
my $bf = shift;
my $files_ref = shift;
my @fileids = map { $_->{fileid} } @{ $files_ref };
my @have_seen = $bf->check( @fileids );
my @unseen;
for (my $idx = 0; $idx <= $#fileids; $idx++) {
next if $have_seen[$idx];
push @unseen, $files_ref->[$idx];
}
return \@unseen;
}
sub stage1 ($$$) { sub stage1 ($$$) {
my $tmp_ies_unsorted_path = shift; my $tmp_ies_unsorted_path = shift;
my $search_dir = shift; my $search_dir = shift;
...@@ -315,7 +331,6 @@ sub stage2 ($$$$) { ...@@ -315,7 +331,6 @@ sub stage2 ($$$$) {
my $report_path = shift; my $report_path = shift;
my $map_path = shift; my $map_path = shift;
my $recovery = shift; my $recovery = shift;
say "checking IEs";
my $fh_unsorted_file = $tmp_ies_unsorted_path->openr(); my $fh_unsorted_file = $tmp_ies_unsorted_path->openr();
my $cnt_unsorted_files = 0; my $cnt_unsorted_files = 0;
while (<$fh_unsorted_file>) { while (<$fh_unsorted_file>) {
...@@ -323,6 +338,12 @@ sub stage2 ($$$$) { ...@@ -323,6 +338,12 @@ sub stage2 ($$$$) {
} }
seek $fh_unsorted_file, 0, 0; # seek to first byte seek $fh_unsorted_file, 0, 0; # seek to first byte
my $count = 0; my $count = 0;
my $bf = Bloom::Filter->new(
capacity => $capacity, # we cache n last seen files
error_rate => 0.0001,
);
say "using bloomfilter of length ".int($bf->length/8)." bytes";
say "checking IEs";
my $progressbar = Time::Progress->new(min => 0, max => $cnt_unsorted_files, smoothing => 1); my $progressbar = Time::Progress->new(min => 0, max => $cnt_unsorted_files, smoothing => 1);
my $stat; my $stat;
$stat->{IEs} = 0; $stat->{IEs} = 0;
...@@ -330,16 +351,31 @@ sub stage2 ($$$$) { ...@@ -330,16 +351,31 @@ sub stage2 ($$$$) {
$stat->{errors} = 0; $stat->{errors} = 0;
$stat->{scansize} = 0; $stat->{scansize} = 0;
$stat->{begin} = time; $stat->{begin} = time;
my $bfreset="";
my $prev_ie = "";
while (<$fh_unsorted_file>) { while (<$fh_unsorted_file>) {
# scan each IE # scan each IE
$stat->{IEs}++; $stat->{IEs}++;
chomp; chomp;
my $transferrate_in_MBs = sprintf("%0.2f", $stat->{scansize} / (time - $stat->{begin} + 1) / 1024 / 1024); my $transferrate_in_MBs = sprintf("%0.2f", $stat->{scansize} / (time - $stat->{begin} + 1) / 1024 / 1024);
print $progressbar->report("parse IE files: %40b running: %L ETA: %E ($count/$cnt_unsorted_files IEs, tfr=$transferrate_in_MBs MB/s) \r", ++$count); my $bfusage = int($bf->key_count()*100/$capacity );
print $progressbar->report("parse IE files: %40b $bfreset running: %L ETA: %E ($count/$cnt_unsorted_files IEs, tfr=$transferrate_in_MBs MB/s, bfu=$bfusage%) \r", ++$count);
my $timestamp = strftime("%Y-%m-%d %H:%M:%S %z (%Z)", localtime(time)); my $timestamp = strftime("%Y-%m-%d %H:%M:%S %z (%Z)", localtime(time));
$report_path->append_utf8("$timestamp, IE $_ with following errors:\n"); $report_path->append_utf8("$timestamp, IE $_ with following errors:\n");
my $ret = parse_iexml($_, $recovery); my $ret = parse_iexml($_, $recovery);
foreach my $fileobj (@{$ret->{files}}) { my $unseen = bloomfilter_to_unseen($bf, $ret->{files} );
if (scalar @{$unseen} == 0 ) { $report_path->append_utf8("skipped because files already checked using IE $prev_ie\n"); }
$prev_ie = $_;
foreach my $fileobj (@{ $unseen }) {
if ($bf->key_count() >= (0.8 * $capacity)) { # reset Bloom filter if 80% filled
#print "reset bloomfilter\n";
$bf = Bloom::Filter->new(
capacity => $capacity, # we cache n last seen files
error_rate => 0.0001,
);
$bfreset = "";
} else { $bfreset = "";}
$bf->add( $fileobj->{fileid} );
$fileobj->{file_mounted} = map_file($map_path, $fileobj->{filepath}); $fileobj->{file_mounted} = map_file($map_path, $fileobj->{filepath});
$stat->{files}++; $stat->{files}++;
my $result; my $result;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment