@@ -133,8 +133,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
133
133
seed_(0 ),
134
134
tmp_batch_(new WriteBatch),
135
135
bg_compaction_scheduled_(false ),
136
- manual_compaction_(NULL ),
137
- consecutive_compaction_errors_(0 ) {
136
+ manual_compaction_(NULL ) {
138
137
mem_->Ref ();
139
138
has_imm_.Release_Store (NULL );
140
139
@@ -217,6 +216,12 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
217
216
}
218
217
219
218
void DBImpl::DeleteObsoleteFiles () {
219
+ if (!bg_error_.ok ()) {
220
+ // After a background error, we don't know whether a new version may
221
+ // or may not have been committed, so we cannot safely garbage collect.
222
+ return ;
223
+ }
224
+
220
225
// Make a set of all of the live files
221
226
std::set<uint64_t > live = pending_outputs_;
222
227
versions_->AddLiveFiles (&live);
@@ -495,7 +500,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
495
500
return s;
496
501
}
497
502
498
- Status DBImpl::CompactMemTable () {
503
+ void DBImpl::CompactMemTable () {
499
504
mutex_.AssertHeld ();
500
505
assert (imm_ != NULL );
501
506
@@ -523,9 +528,9 @@ Status DBImpl::CompactMemTable() {
523
528
imm_ = NULL ;
524
529
has_imm_.Release_Store (NULL );
525
530
DeleteObsoleteFiles ();
531
+ } else {
532
+ RecordBackgroundError (s);
526
533
}
527
-
528
- return s;
529
534
}
530
535
531
536
void DBImpl::CompactRange (const Slice* begin, const Slice* end) {
@@ -568,16 +573,18 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
568
573
}
569
574
570
575
MutexLock l (&mutex_);
571
- while (!manual.done ) {
572
- while (manual_compaction_ != NULL ) {
573
- bg_cv_.Wait ();
574
- }
575
- manual_compaction_ = &manual;
576
- MaybeScheduleCompaction ();
577
- while (manual_compaction_ == &manual) {
576
+ while (!manual.done && !shutting_down_.Acquire_Load () && bg_error_.ok ()) {
577
+ if (manual_compaction_ == NULL ) { // Idle
578
+ manual_compaction_ = &manual;
579
+ MaybeScheduleCompaction ();
580
+ } else { // Running either my compaction or another compaction.
578
581
bg_cv_.Wait ();
579
582
}
580
583
}
584
+ if (manual_compaction_ == &manual) {
585
+ // Cancel my manual compaction since we aborted early for some reason.
586
+ manual_compaction_ = NULL ;
587
+ }
581
588
}
582
589
583
590
Status DBImpl::TEST_CompactMemTable () {
@@ -596,12 +603,22 @@ Status DBImpl::TEST_CompactMemTable() {
596
603
return s;
597
604
}
598
605
606
+ void DBImpl::RecordBackgroundError (const Status& s) {
607
+ mutex_.AssertHeld ();
608
+ if (bg_error_.ok ()) {
609
+ bg_error_ = s;
610
+ bg_cv_.SignalAll ();
611
+ }
612
+ }
613
+
599
614
void DBImpl::MaybeScheduleCompaction () {
600
615
mutex_.AssertHeld ();
601
616
if (bg_compaction_scheduled_) {
602
617
// Already scheduled
603
618
} else if (shutting_down_.Acquire_Load ()) {
604
619
// DB is being deleted; no more background compactions
620
+ } else if (!bg_error_.ok ()) {
621
+ // Already got an error; no more changes
605
622
} else if (imm_ == NULL &&
606
623
manual_compaction_ == NULL &&
607
624
!versions_->NeedsCompaction ()) {
@@ -619,30 +636,12 @@ void DBImpl::BGWork(void* db) {
619
636
void DBImpl::BackgroundCall () {
620
637
MutexLock l (&mutex_);
621
638
assert (bg_compaction_scheduled_);
622
- if (!shutting_down_.Acquire_Load ()) {
623
- Status s = BackgroundCompaction ();
624
- if (s.ok ()) {
625
- // Success
626
- consecutive_compaction_errors_ = 0 ;
627
- } else if (shutting_down_.Acquire_Load ()) {
628
- // Error most likely due to shutdown; do not wait
629
- } else {
630
- // Wait a little bit before retrying background compaction in
631
- // case this is an environmental problem and we do not want to
632
- // chew up resources for failed compactions for the duration of
633
- // the problem.
634
- bg_cv_.SignalAll (); // In case a waiter can proceed despite the error
635
- Log (options_.info_log , " Waiting after background compaction error: %s" ,
636
- s.ToString ().c_str ());
637
- mutex_.Unlock ();
638
- ++consecutive_compaction_errors_;
639
- int seconds_to_sleep = 1 ;
640
- for (int i = 0 ; i < 3 && i < consecutive_compaction_errors_ - 1 ; ++i) {
641
- seconds_to_sleep *= 2 ;
642
- }
643
- env_->SleepForMicroseconds (seconds_to_sleep * 1000000 );
644
- mutex_.Lock ();
645
- }
639
+ if (shutting_down_.Acquire_Load ()) {
640
+ // No more background work when shutting down.
641
+ } else if (!bg_error_.ok ()) {
642
+ // No more background work after a background error.
643
+ } else {
644
+ BackgroundCompaction ();
646
645
}
647
646
648
647
bg_compaction_scheduled_ = false ;
@@ -653,11 +652,12 @@ void DBImpl::BackgroundCall() {
653
652
bg_cv_.SignalAll ();
654
653
}
655
654
656
- Status DBImpl::BackgroundCompaction () {
655
+ void DBImpl::BackgroundCompaction () {
657
656
mutex_.AssertHeld ();
658
657
659
658
if (imm_ != NULL ) {
660
- return CompactMemTable ();
659
+ CompactMemTable ();
660
+ return ;
661
661
}
662
662
663
663
Compaction* c;
@@ -691,6 +691,9 @@ Status DBImpl::BackgroundCompaction() {
691
691
c->edit ()->AddFile (c->level () + 1 , f->number , f->file_size ,
692
692
f->smallest , f->largest );
693
693
status = versions_->LogAndApply (c->edit (), &mutex_);
694
+ if (!status.ok ()) {
695
+ RecordBackgroundError (status);
696
+ }
694
697
VersionSet::LevelSummaryStorage tmp;
695
698
Log (options_.info_log , " Moved #%lld to level-%d %lld bytes %s: %s\n " ,
696
699
static_cast <unsigned long long >(f->number ),
@@ -701,6 +704,9 @@ Status DBImpl::BackgroundCompaction() {
701
704
} else {
702
705
CompactionState* compact = new CompactionState (c);
703
706
status = DoCompactionWork (compact);
707
+ if (!status.ok ()) {
708
+ RecordBackgroundError (status);
709
+ }
704
710
CleanupCompaction (compact);
705
711
c->ReleaseInputs ();
706
712
DeleteObsoleteFiles ();
@@ -714,9 +720,6 @@ Status DBImpl::BackgroundCompaction() {
714
720
} else {
715
721
Log (options_.info_log ,
716
722
" Compaction error: %s" , status.ToString ().c_str ());
717
- if (options_.paranoid_checks && bg_error_.ok ()) {
718
- bg_error_ = status;
719
- }
720
723
}
721
724
722
725
if (is_manual) {
@@ -732,7 +735,6 @@ Status DBImpl::BackgroundCompaction() {
732
735
}
733
736
manual_compaction_ = NULL ;
734
737
}
735
- return status;
736
738
}
737
739
738
740
void DBImpl::CleanupCompaction (CompactionState* compact) {
@@ -1002,6 +1004,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
1002
1004
if (status.ok ()) {
1003
1005
status = InstallCompactionResults (compact);
1004
1006
}
1007
+ if (!status.ok ()) {
1008
+ RecordBackgroundError (status);
1009
+ }
1005
1010
VersionSet::LevelSummaryStorage tmp;
1006
1011
Log (options_.info_log ,
1007
1012
" compacted to: %s" , versions_->LevelSummary (&tmp));
@@ -1185,13 +1190,23 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
1185
1190
{
1186
1191
mutex_.Unlock ();
1187
1192
status = log_->AddRecord (WriteBatchInternal::Contents (updates));
1193
+ bool sync_error = false ;
1188
1194
if (status.ok () && options.sync ) {
1189
1195
status = logfile_->Sync ();
1196
+ if (!status.ok ()) {
1197
+ sync_error = true ;
1198
+ }
1190
1199
}
1191
1200
if (status.ok ()) {
1192
1201
status = WriteBatchInternal::InsertInto (updates, mem_);
1193
1202
}
1194
1203
mutex_.Lock ();
1204
+ if (sync_error) {
1205
+ // The state of the log file is indeterminate: the log record we
1206
+ // just added may or may not show up when the DB is re-opened.
1207
+ // So we force the DB into a mode where all future writes fail.
1208
+ RecordBackgroundError (status);
1209
+ }
1195
1210
}
1196
1211
if (updates == tmp_batch_) tmp_batch_->Clear ();
1197
1212
0 commit comments