@@ -539,13 +539,56 @@ static inline int flb_tail_file_exists(struct stat *st,
539
539
return FLB_FALSE ;
540
540
}
541
541
542
+ /*
543
+ * Based in the configuration or database offset, set the proper 'offset' for the
544
+ * file in question.
545
+ */
546
+ static int set_file_position (struct flb_tail_config * ctx ,
547
+ struct flb_tail_file * file )
548
+ {
549
+ int64_t ret ;
550
+
551
+ #ifdef FLB_HAVE_SQLDB
552
+ /*
553
+ * If the database option is enabled, try to gather the file position. The
554
+ * database function updates the file->offset entry.
555
+ */
556
+ if (ctx -> db ) {
557
+ flb_tail_db_file_set (file , ctx );
558
+ if (file -> offset > 0 ) {
559
+ ret = lseek (file -> fd , file -> offset , SEEK_SET );
560
+ if (ret == -1 ) {
561
+ flb_errno ();
562
+ return -1 ;
563
+ }
564
+ }
565
+ /* no need to seek */
566
+ return 0 ;
567
+ }
568
+ #endif
569
+
570
+ if (ctx -> read_from_head == FLB_TRUE ) {
571
+ /* no need to seek, offset position is already zero */
572
+ return 0 ;
573
+ }
574
+
575
+ /* tail... */
576
+ ret = lseek (file -> fd , 0 , SEEK_END );
577
+ if (ret == -1 ) {
578
+ flb_errno ();
579
+ return -1 ;
580
+ }
581
+ file -> offset = ret ;
582
+
583
+ return 0 ;
584
+ }
585
+
542
586
int flb_tail_file_append (char * path , struct stat * st , int mode ,
543
587
struct flb_tail_config * ctx )
544
588
{
545
589
int fd ;
546
590
int ret ;
547
591
size_t len ;
548
- int64_t offset ;
549
592
char * tag ;
550
593
size_t tag_len ;
551
594
struct flb_tail_file * file ;
@@ -685,35 +728,23 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
685
728
}
686
729
}
687
730
688
- /*
689
- * Register or update the file entry, likely if the entry already exists
690
- * into the database, the offset may be updated.
691
- */
692
- #ifdef FLB_HAVE_SQLDB
693
- if (ctx -> db ) {
694
- flb_tail_db_file_set (file , ctx );
695
- }
696
- #endif
697
-
698
- /* Seek if required */
699
- if (file -> offset > 0 ) {
700
- flb_plg_debug (ctx -> ins , "inode=%" PRIu64 " appended file following on offset=%" PRId64 ,
701
- file -> inode , file -> offset );
702
- offset = lseek (file -> fd , file -> offset , SEEK_SET );
703
- if (offset == -1 ) {
704
- flb_errno ();
705
- flb_tail_file_remove (file );
706
- goto error ;
707
- }
731
+ /* Set the file position (database offset, head or tail) */
732
+ ret = set_file_position (ctx , file );
733
+ if (ret == -1 ) {
734
+ flb_tail_file_remove (file );
735
+ goto error ;
708
736
}
709
737
738
+ /* Remaining bytes to read */
710
739
file -> pending_bytes = file -> size - file -> offset ;
711
740
712
741
#ifdef FLB_HAVE_METRICS
713
742
flb_metrics_sum (FLB_TAIL_METRIC_F_OPENED , 1 , ctx -> ins -> metrics );
714
743
#endif
715
744
716
- flb_plg_debug (ctx -> ins , "inode=%" PRIu64 " appended as %s" , file -> inode , path );
745
+ flb_plg_debug (ctx -> ins ,
746
+ "inode=%" PRIu64 " with offset=%" PRId64 " appended as %s" ,
747
+ file -> inode , file -> offset , path );
717
748
return 0 ;
718
749
719
750
error :
@@ -798,6 +829,46 @@ int flb_tail_file_remove_all(struct flb_tail_config *ctx)
798
829
return count ;
799
830
}
800
831
832
+ static int adjust_counters (struct flb_tail_config * ctx , struct flb_tail_file * file )
833
+ {
834
+ int ret ;
835
+ int64_t offset ;
836
+ struct stat st ;
837
+
838
+ ret = fstat (file -> fd , & st );
839
+ if (ret == -1 ) {
840
+ flb_errno ();
841
+ return FLB_TAIL_ERROR ;
842
+ }
843
+
844
+ /* Check if the file was truncated */
845
+ if (file -> offset > st .st_size ) {
846
+ offset = lseek (file -> fd , 0 , SEEK_SET );
847
+ if (offset == -1 ) {
848
+ flb_errno ();
849
+ return FLB_TAIL_ERROR ;
850
+ }
851
+
852
+ flb_plg_debug (ctx -> ins , "inode=%" PRIu64 " file truncated %s" ,
853
+ file -> inode , file -> name );
854
+ file -> offset = offset ;
855
+ file -> buf_len = 0 ;
856
+
857
+ /* Update offset in the database file */
858
+ #ifdef FLB_HAVE_SQLDB
859
+ if (ctx -> db ) {
860
+ flb_tail_db_file_offset (file , ctx );
861
+ }
862
+ #endif
863
+ }
864
+ else {
865
+ file -> size = st .st_size ;
866
+ file -> pending_bytes = (st .st_size - file -> offset );
867
+ }
868
+
869
+ return FLB_TAIL_OK ;
870
+ }
871
+
801
872
int flb_tail_file_chunk (struct flb_tail_file * file )
802
873
{
803
874
int ret ;
@@ -903,23 +974,21 @@ int flb_tail_file_chunk(struct flb_tail_file *file)
903
974
return FLB_TAIL_ERROR ;
904
975
}
905
976
else {
906
- file -> size = st . st_size ;
907
- file -> pending_bytes = ( st . st_size - file -> offset );
977
+ /* adjust file counters, returns FLB_TAIL_OK or FLB_TAIL_ERROR */
978
+ ret = adjust_counters ( ctx , file );
908
979
}
909
980
/* Data was consumed but likely some bytes still remain */
910
- return FLB_TAIL_OK ;
981
+ return ret ;
911
982
}
912
983
else if (bytes == 0 ) {
913
984
/* We reached the end of file, let's wait for some incoming data */
914
- ret = fstat (file -> fd , & st );
915
- if (ret == -1 ) {
916
- flb_errno ();
985
+ ret = adjust_counters (ctx , file );
986
+ if (ret == FLB_TAIL_OK ) {
987
+ return FLB_TAIL_WAIT ;
988
+ }
989
+ else {
917
990
return FLB_TAIL_ERROR ;
918
991
}
919
- file -> size = st .st_size ;
920
- file -> pending_bytes = (st .st_size - file -> offset );
921
-
922
- return FLB_TAIL_WAIT ;
923
992
}
924
993
else {
925
994
/* error */
0 commit comments