Skip to content

Commit 70e33fa

Browse files
committed
in_tail: read from tail (duh!) (#1667 #1761 #474 #1645 #1330)
From now on, Tail plugin implements the following behavior: 1. If a file is already registered in the database and contains an offset, the file is consumed from that offset position. 2. Upon start, if a file is not known by the database, read from it tail. 3. If the new 'read_from_head' property (default: false) is enabled, for newly discovered files read from the beginning. This flag don't override the behavior of a file that already exists in the database. Additional fix: When a file is being monitored in 'static mode', handle truncation properly. Signed-off-by: Eduardo Silva <eduardo@treasure-data.com>
1 parent 0f6b4a7 commit 70e33fa

File tree

3 files changed

+109
-33
lines changed

3 files changed

+109
-33
lines changed

plugins/in_tail/tail.c

+7-1
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,12 @@ static struct flb_config_map config_map[] = {
471471
"as a string under the key name log. This option allows to define an "
472472
"alternative name for that key."
473473
},
474+
{
475+
FLB_CONFIG_MAP_BOOL, "read_from_head", "false",
476+
0, FLB_TRUE, offsetof(struct flb_tail_config, read_from_head),
477+
"For new discovered files on start (without a database offset/position), read the "
478+
"content from the head of the file, not tail."
479+
},
474480
{
475481
FLB_CONFIG_MAP_STR, "refresh_interval", "60",
476482
0, FLB_FALSE, 0,
@@ -481,7 +487,7 @@ static struct flb_config_map config_map[] = {
481487
0, FLB_TRUE, offsetof(struct flb_tail_config, watcher_interval),
482488
},
483489
{
484-
FLB_CONFIG_MAP_INT, "rotate_wait", FLB_TAIL_ROTATE_WAIT,
490+
FLB_CONFIG_MAP_TIME, "rotate_wait", FLB_TAIL_ROTATE_WAIT,
485491
0, FLB_TRUE, offsetof(struct flb_tail_config, rotate_wait),
486492
"specify the number of extra time in seconds to monitor a file once is "
487493
"rotated in case some pending data is flushed."

plugins/in_tail/tail_config.h

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ struct flb_tail_config {
6868
#endif
6969
int refresh_interval_sec; /* seconds to re-scan */
7070
long refresh_interval_nsec;/* nanoseconds to re-scan */
71+
int read_from_head; /* read new files from head */
7172
int rotate_wait; /* sec to wait on rotated files */
7273
int watcher_interval; /* watcher interval */
7374
int ignore_older; /* ignore fields older than X seconds */

plugins/in_tail/tail_file.c

+101-32
Original file line numberDiff line numberDiff line change
@@ -539,13 +539,56 @@ static inline int flb_tail_file_exists(struct stat *st,
539539
return FLB_FALSE;
540540
}
541541

542+
/*
543+
* Based in the configuration or database offset, set the proper 'offset' for the
544+
* file in question.
545+
*/
546+
static int set_file_position(struct flb_tail_config *ctx,
547+
struct flb_tail_file *file)
548+
{
549+
int64_t ret;
550+
551+
#ifdef FLB_HAVE_SQLDB
552+
/*
553+
* If the database option is enabled, try to gather the file position. The
554+
* database function updates the file->offset entry.
555+
*/
556+
if (ctx->db) {
557+
flb_tail_db_file_set(file, ctx);
558+
if (file->offset > 0) {
559+
ret = lseek(file->fd, file->offset, SEEK_SET);
560+
if (ret == -1) {
561+
flb_errno();
562+
return -1;
563+
}
564+
}
565+
/* no need to seek */
566+
return 0;
567+
}
568+
#endif
569+
570+
if (ctx->read_from_head == FLB_TRUE) {
571+
/* no need to seek, offset position is already zero */
572+
return 0;
573+
}
574+
575+
/* tail... */
576+
ret = lseek(file->fd, 0, SEEK_END);
577+
if (ret == -1) {
578+
flb_errno();
579+
return -1;
580+
}
581+
file->offset = ret;
582+
583+
return 0;
584+
}
585+
542586
int flb_tail_file_append(char *path, struct stat *st, int mode,
543587
struct flb_tail_config *ctx)
544588
{
545589
int fd;
546590
int ret;
547591
size_t len;
548-
int64_t offset;
549592
char *tag;
550593
size_t tag_len;
551594
struct flb_tail_file *file;
@@ -685,35 +728,23 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
685728
}
686729
}
687730

688-
/*
689-
* Register or update the file entry, likely if the entry already exists
690-
* into the database, the offset may be updated.
691-
*/
692-
#ifdef FLB_HAVE_SQLDB
693-
if (ctx->db) {
694-
flb_tail_db_file_set(file, ctx);
695-
}
696-
#endif
697-
698-
/* Seek if required */
699-
if (file->offset > 0) {
700-
flb_plg_debug(ctx->ins, "inode=%"PRIu64" appended file following on offset=%"PRId64,
701-
file->inode, file->offset);
702-
offset = lseek(file->fd, file->offset, SEEK_SET);
703-
if (offset == -1) {
704-
flb_errno();
705-
flb_tail_file_remove(file);
706-
goto error;
707-
}
731+
/* Set the file position (database offset, head or tail) */
732+
ret = set_file_position(ctx, file);
733+
if (ret == -1) {
734+
flb_tail_file_remove(file);
735+
goto error;
708736
}
709737

738+
/* Remaining bytes to read */
710739
file->pending_bytes = file->size - file->offset;
711740

712741
#ifdef FLB_HAVE_METRICS
713742
flb_metrics_sum(FLB_TAIL_METRIC_F_OPENED, 1, ctx->ins->metrics);
714743
#endif
715744

716-
flb_plg_debug(ctx->ins, "inode=%"PRIu64" appended as %s", file->inode, path);
745+
flb_plg_debug(ctx->ins,
746+
"inode=%"PRIu64" with offset=%"PRId64" appended as %s",
747+
file->inode, file->offset, path);
717748
return 0;
718749

719750
error:
@@ -798,6 +829,46 @@ int flb_tail_file_remove_all(struct flb_tail_config *ctx)
798829
return count;
799830
}
800831

832+
static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *file)
833+
{
834+
int ret;
835+
int64_t offset;
836+
struct stat st;
837+
838+
ret = fstat(file->fd, &st);
839+
if (ret == -1) {
840+
flb_errno();
841+
return FLB_TAIL_ERROR;
842+
}
843+
844+
/* Check if the file was truncated */
845+
if (file->offset > st.st_size) {
846+
offset = lseek(file->fd, 0, SEEK_SET);
847+
if (offset == -1) {
848+
flb_errno();
849+
return FLB_TAIL_ERROR;
850+
}
851+
852+
flb_plg_debug(ctx->ins, "inode=%"PRIu64" file truncated %s",
853+
file->inode, file->name);
854+
file->offset = offset;
855+
file->buf_len = 0;
856+
857+
/* Update offset in the database file */
858+
#ifdef FLB_HAVE_SQLDB
859+
if (ctx->db) {
860+
flb_tail_db_file_offset(file, ctx);
861+
}
862+
#endif
863+
}
864+
else {
865+
file->size = st.st_size;
866+
file->pending_bytes = (st.st_size - file->offset);
867+
}
868+
869+
return FLB_TAIL_OK;
870+
}
871+
801872
int flb_tail_file_chunk(struct flb_tail_file *file)
802873
{
803874
int ret;
@@ -903,23 +974,21 @@ int flb_tail_file_chunk(struct flb_tail_file *file)
903974
return FLB_TAIL_ERROR;
904975
}
905976
else {
906-
file->size = st.st_size;
907-
file->pending_bytes = (st.st_size - file->offset);
977+
/* adjust file counters, returns FLB_TAIL_OK or FLB_TAIL_ERROR */
978+
ret = adjust_counters(ctx, file);
908979
}
909980
/* Data was consumed but likely some bytes still remain */
910-
return FLB_TAIL_OK;
981+
return ret;
911982
}
912983
else if (bytes == 0) {
913984
/* We reached the end of file, let's wait for some incoming data */
914-
ret = fstat(file->fd, &st);
915-
if (ret == -1) {
916-
flb_errno();
985+
ret = adjust_counters(ctx, file);
986+
if (ret == FLB_TAIL_OK) {
987+
return FLB_TAIL_WAIT;
988+
}
989+
else {
917990
return FLB_TAIL_ERROR;
918991
}
919-
file->size = st.st_size;
920-
file->pending_bytes = (st.st_size - file->offset);
921-
922-
return FLB_TAIL_WAIT;
923992
}
924993
else {
925994
/* error */

0 commit comments

Comments
 (0)