Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix logging #5

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/backends/cr_kvs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
-compile(export_all).

dispatch({prepare,_,_,Tx}, {state,Name,_,_,_}) ->
kvs:info(?MODULE,"KVS PUT ~p:~p~n",[element(1,Tx),element(2,Tx)]),
cr:info(?MODULE,"KVS PUT ~p:~p~n",[element(1,Tx),element(2,Tx)]),
kvs:put(Tx);

dispatch({commit,_,_,Tx}, {state,Name,_,_,_}) ->
kvs:info(?MODULE,"KVS LINK ~p:~p~n",[element(1,Tx),element(2,Tx)]),
cr:info(?MODULE,"KVS LINK ~p:~p~n",[element(1,Tx),element(2,Tx)]),
kvs:link(Tx);

dispatch({rollback,_,_,Tx}, {state,Name,_,_,_}) ->
kvs:info(?MODULE,"KVS REMOVE ~p:~p~n",[element(1,Tx),element(2,Tx)]),
cr:info(?MODULE,"KVS REMOVE ~p:~p~n",[element(1,Tx),element(2,Tx)]),
kvs:remove(Tx);

dispatch(_,_) -> ok.
44 changes: 22 additions & 22 deletions src/consensus/cr_rafter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

start_link({Index,Node}, Opts) ->
Name = list_to_atom(lists:concat([Index,':',Node])),
kvs:info(?MODULE,"RAFTER start_link ~p~n",[{Index,Node}]),
cr:info(?MODULE,"RAFTER start_link ~p~n",[{Index,Node}]),
gen_fsm:start_link({local,Node},?MODULE, [Node, Opts], []).

raftname(Name) -> list_to_atom(lists:concat(["rafter:",Name])).
Expand All @@ -26,7 +26,7 @@ init([Me, #rafter_opts{state_machine=StateMachine,cluster=Nodes}]) ->
Timer = gen_fsm:send_event_after(election_timeout(), timeout),
#meta{voted_for=VotedFor, term=Term} = cr_log:get_metadata(Me),
BackendState = StateMachine:init(Me),
kvs:info(?MODULE,"RAFTER INIT Me: ~p~n",[Me]),
cr:info(?MODULE,"RAFTER INIT Me: ~p~n",[Me]),
State = #state{term=Term,
voted_for=VotedFor,
me=Me,
Expand Down Expand Up @@ -103,7 +103,7 @@ code_change(_OldVsn, StateName, State, _Extra) ->

%% Election timeout has expired. Go to candidate state iff we are a voter.
follower(timeout, #state{config=Config, me=Me}=State0) ->
kvs:info(?MODULE,"RAFTER FOLLOWER timeout~n",[]),
cr:info(?MODULE,"RAFTER FOLLOWER timeout~n",[]),
case cr_config:has_vote(Me, Config) of
false ->
State = reset_timer(election_timeout(), State0),
Expand All @@ -116,20 +116,20 @@ follower(timeout, #state{config=Config, me=Me}=State0) ->

%% Ignore stale messages.
follower(#vote{}, State) ->
kvs:info(?MODULE,"RAFTER FOLLOWER #vote~n",[]),
cr:info(?MODULE,"RAFTER FOLLOWER #vote~n",[]),
{next_state, follower, State};
follower(#append_entries_rpy{}, State) ->
{next_state, follower, State}.

%% Vote for this candidate
follower(#request_vote{}=RequestVote, _From, State) ->
kvs:info(?MODULE,"RAFTER FOLLOWER #req_vote~n",[]),
cr:info(?MODULE,"RAFTER FOLLOWER #req_vote~n",[]),
handle_request_vote(RequestVote, State);

follower(#append_entries{term=Term}, _From,
#state{term=CurrentTerm, me=Me}=State) when CurrentTerm > Term ->
Rpy = #append_entries_rpy{from=Me, term=CurrentTerm, success=false},
kvs:info(?MODULE,"RAFTER FOLLOWER #append Me: ~p success: false~n",[Me]),
cr:info(?MODULE,"RAFTER FOLLOWER #append Me: ~p success: false~n",[Me]),
{reply, Rpy, follower, State};

follower(#append_entries{term=Term, from=From, prev_log_index=PrevLogIndex,
Expand Down Expand Up @@ -158,48 +158,48 @@ follower(#append_entries{term=Term, from=From, prev_log_index=PrevLogIndex,
end;

follower({set_config, _}, _From, #state{leader=undefined, me=Me, config=C}=State) ->
kvs:info(?MODULE,"RAFTER FOLLOWER set_config ~p~n",[Me]),
cr:info(?MODULE,"RAFTER FOLLOWER set_config ~p~n",[Me]),
Error = no_leader_error(Me, C),
{reply, {error, Error}, follower, State};

follower({set_config, _}, _From, #state{leader=Leader}=State) ->
kvs:info(?MODULE,"RAFTER FOLLOWER set_config ~p~n",[Leader]),
cr:info(?MODULE,"RAFTER FOLLOWER set_config ~p~n",[Leader]),
Reply = {error, {redirect, Leader}},
{reply, Reply, follower, State};

follower({read_op, _}, _From, #state{me=Me, config=Config, leader=undefined}=State) ->
kvs:info(?MODULE,"RAFTER FOLLOWER read_op ~p~n",[Me]),
cr:info(?MODULE,"RAFTER FOLLOWER read_op ~p~n",[Me]),
Error = no_leader_error(Me, Config),
{reply, {error, Error}, follower, State};

follower({read_op, _}, _From, #state{leader=Leader}=State) ->
kvs:info(?MODULE,"RAFTER FOLLOWER read_op~n",[]),
cr:info(?MODULE,"RAFTER FOLLOWER read_op~n",[]),
Reply = {error, {redirect, Leader}},
{reply, Reply, follower, State};

follower({op, _Command}, _From, #state{me=Me, config=Config, leader=undefined}=State) ->
kvs:info(?MODULE,"RAFTER FOLLOWER read_op~n",[]),
cr:info(?MODULE,"RAFTER FOLLOWER read_op~n",[]),
Error = no_leader_error(Me, Config),
{reply, {error, Error}, follower, State};

follower({op, _Command}, _From, #state{leader=Leader}=State) ->
kvs:info(?MODULE,"RAFTER FOLLOWER read_op~n",[]),
cr:info(?MODULE,"RAFTER FOLLOWER read_op~n",[]),
Reply = {error, {redirect, Leader}},
{reply, Reply, follower, State}.

%% This is the initial election to set the initial config. We did not
%% get a quorum for our votes, so just reply to the user here and keep trying
%% until the other nodes come up.
candidate(timeout, #state{term=1, init_config=[_Id, From]}=S) ->
kvs:info(?MODULE,"RAFTER CANDIDATE timeout ~n",[]),
cr:info(?MODULE,"RAFTER CANDIDATE timeout ~n",[]),
State0 = reset_timer(election_timeout(), S),
gen_fsm:reply(From, {error, peers_not_responding}),
State = State0#state{init_config=no_client},
{next_state, candidate, State};

%% The election timeout has elapsed so start an election
candidate(timeout, State) ->
kvs:info(?MODULE,"RAFTER CANDIDATE timeout~n",[]),
cr:info(?MODULE,"RAFTER CANDIDATE timeout~n",[]),
NewState = become_candidate(State),
{next_state, candidate, NewState};

Expand All @@ -215,7 +215,7 @@ candidate(timeout, State) ->
candidate(#vote{term=VoteTerm, success=false},
#state{term=Term, init_config=[_Id, From]}=State)
when VoteTerm > Term ->
kvs:info(?MODULE,"RAFTER CANDIDATE #vote~n",[]),
cr:info(?MODULE,"RAFTER CANDIDATE #vote~n",[]),
gen_fsm:reply(From, {error, invalid_initial_config}),
State2 = State#state{init_config=undefined, config=#config{state=blank}},
NewState = step_down(VoteTerm, State2),
Expand All @@ -224,26 +224,26 @@ candidate(#vote{term=VoteTerm, success=false},
%% We are out of date. Go back to follower state.
candidate(#vote{term=VoteTerm, success=false}, #state{term=Term}=State)
when VoteTerm > Term ->
kvs:info(?MODULE,"RAFTER CANDIDATE #vote~n",[]),
cr:info(?MODULE,"RAFTER CANDIDATE #vote~n",[]),
NewState = step_down(VoteTerm, State),
{next_state, follower, NewState};

%% This is a stale vote from an old request. Ignore it.
candidate(#vote{term=VoteTerm}, #state{term=CurrentTerm}=State)
when VoteTerm < CurrentTerm ->
kvs:info(?MODULE,"RAFTER CANDIDATE #vote~n",[]),
cr:info(?MODULE,"RAFTER CANDIDATE #vote~n",[]),
{next_state, candidate, State};

candidate(#vote{success=false, from=From}, #state{responses=Responses}=State) ->
NewResponses = dict:store(From, false, Responses),
NewState = State#state{responses=NewResponses},
kvs:info(?MODULE,"RAFTER CANDIDATE #vote~n",[]),
cr:info(?MODULE,"RAFTER CANDIDATE #vote~n",[]),
{next_state, candidate, NewState};

%% Sweet, someone likes us! Do we have enough votes to get elected?
candidate(#vote{success=true, from=From}, #state{responses=Responses, me=Me,
config=Config}=State) ->
kvs:info(?MODULE,"RAFTER CANDIDATE #vote ~p~n",[Config]),
cr:info(?MODULE,"RAFTER CANDIDATE #vote ~p~n",[Config]),
NewResponses = dict:store(From, true, Responses),
case cr_config:quorum(Me, Config, NewResponses) of
true ->
Expand All @@ -255,7 +255,7 @@ candidate(#vote{success=true, from=From}, #state{responses=Responses, me=Me,
end.

candidate({set_config, _}, _From, State) ->
kvs:info(?MODULE,"RAFTER CANDIDATE set_config~n",[]),
cr:info(?MODULE,"RAFTER CANDIDATE set_config~n",[]),
Reply = {error, election_in_progress},
{reply, Reply, follower, State};

Expand All @@ -264,11 +264,11 @@ candidate({set_config, _}, _From, State) ->
candidate(#request_vote{term=RequestTerm}=RequestVote, _From,
#state{term=Term}=State) when RequestTerm > Term ->
NewState = step_down(RequestTerm, State),
kvs:info(?MODULE,"RAFTER CANDIDATE #req_vote~n",[]),
cr:info(?MODULE,"RAFTER CANDIDATE #req_vote~n",[]),
handle_request_vote(RequestVote, NewState);
candidate(#request_vote{}, _From, #state{term=CurrentTerm, me=Me}=State) ->
Vote = #vote{term=CurrentTerm, success=false, from=Me},
kvs:info(?MODULE,"RAFTER CANDIDATE #req_vote~n",[]),
cr:info(?MODULE,"RAFTER CANDIDATE #req_vote~n",[]),
{reply, Vote, candidate, State};

%% Another peer is asserting itself as leader, and it must be correct because
Expand Down
22 changes: 19 additions & 3 deletions src/cr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ ring(C) -> {Nodes,[{0,1}|Rest]} = cr_hash:fresh(length(peers())*C,1),
lists:zip(Rest,lists:seq(1,length(Rest))))]}.

chain(Object) ->
cr:info(?MODULE,"Object ~p~n",[Object]),
{N,_} = cr:ring(),
lists:map(fun(X) -> lists:nth((X-1)*4+1,cr:seq(Object)) end,
cr:roll(element(2,cr:hash(Object)))).
Expand Down Expand Up @@ -123,7 +124,24 @@ rpc(Value) -> Value.

clean() -> kvs:destroy(), kvs:join().

log_modules() -> [cr,cr_log,cr_rafter,cr_heart].
config(Key) -> config(cr, Key, "").
config(App,Key) -> config(App,Key, "").
config(App, Key, Default) -> case application:get_env(App,Key) of
undefined -> Default;
{ok,V} -> V end.

log_modules() -> [cr,cr_log,cr_rafter,cr_heart,cr_vnode].
-define(ALLOWED, (config(cr,log_modules,cr))).

log(Module, String, Args, Fun) ->
case lists:member(Module,?ALLOWED:log_modules()) of
true -> error_logger:Fun("~p:"++String, [Module|Args]);
false -> skip end.

info(Module, String, Args) -> log(Module, String, Args, info_msg).
warning(Module,String, Args) -> log(Module, String, Args, warning_msg).
error(Module, String, Args) -> log(Module, String, Args, error_msg).


sup() -> [{T,Pid}||{T,Pid,_,_}<-supervisor:which_children(cr_sup)].
heart() -> [{_,P,_,_}]=supervisor:which_children(heart_sup), gen_server:call(P,{heart}).
Expand Down Expand Up @@ -165,5 +183,3 @@ operation_log() ->

cluster_status() -> {ok,_} = consensus_log(),
{ok,_} = operation_log().


16 changes: 8 additions & 8 deletions src/cr_heart.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ init([Name,Nodes]) ->
{Node,Timer}
end || {Node,_,P2,_}<-Nodes, Node /= cr:node()],

kvs:info(?MODULE,"HEART PROTOCOL: started: ~p~n"
cr:info(?MODULE,"HEART PROTOCOL: started: ~p~n"
"Nodes: ~p~n",[Name,Timers]),

{ok,#state{name=Name,nodes=Nodes,timers=Timers}}.
Expand All @@ -38,11 +38,11 @@ setkey(Name,Pos,List,New) ->
_Element -> lists:keyreplace(Name,Pos,List,New) end.

handle_info({'EXIT', Pid,_}, #state{} = State) ->
kvs:info(?MODULE,"HEART: EXIT~n",[]),
cr:info(?MODULE,"HEART: EXIT~n",[]),
{noreply, State};

handle_info({carrier,lost,N}, State=#state{timers=Timer}) ->
kvs:info(?MODULE,"HOST CARRIER LOST ~p~n",[N]),
cr:info(?MODULE,"HOST CARRIER LOST ~p~n",[N]),
{noreply,State};

handle_info({timer,ping,{A,P},N,S}, State=#state{timers=Timers}) ->
Expand Down Expand Up @@ -75,28 +75,28 @@ handle_info({timer,ping,{A,P},N,S}, State=#state{timers=Timers}) ->
try
case cr_rafter:set_config(cr:node(),{N,Operation}) of
{error,_} -> skip;
_ -> kvs:info(?MODULE,"Server Config Changed S/T ~p~n",
_ -> cr:info(?MODULE,"Server Config Changed S/T ~p~n",
[{N,Operation}]) end
catch
_:Err -> kvs:info(?MODULE,"CONFIG ERROR ~p~n",[Err]) end,
_:Err -> cr:info(?MODULE,"CONFIG ERROR ~p~n",[Err]) end,
ok;
false -> skip end,

{noreply,State#state{timers=setkey(N,1,Timers,{N,T})}};

handle_info(_Info, State) ->
kvs:info(?MODULE,"HEART: Info ~p~n",[_Info]),
cr:info(?MODULE,"HEART: Info ~p~n",[_Info]),
{noreply, State}.

handle_call({heart},_,Proc) ->
{reply,Proc,Proc};

handle_call(Request,_,Proc) ->
kvs:info(?MODULE,"HEART: Call ~p~n",[Request]),
cr:info(?MODULE,"HEART: Call ~p~n",[Request]),
{reply,ok,Proc}.

handle_cast(Msg, State) ->
kvs:info(?MODULE,"HEART: Cast ~p", [Msg]),
cr:info(?MODULE,"HEART: Cast ~p", [Msg]),
{stop, {error, {unknown_cast, Msg}}, State}.

terminate(_Reason, #state{}) -> ok.
Expand Down
17 changes: 9 additions & 8 deletions src/cr_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ start_link(Name,Storage) ->

init([Name,Storage]) ->
[ gen_server:cast(Name,O) || O <- kvs:entries(kvs:get(log,{pending,Name}),operation,-1) ],
kvs:info(?MODULE,"VNODE PROTOCOL: started: ~p.~n",[Name]),
cr:info(?MODULE,"VNODE PROTOCOL: started: ~p.~n",[Name]),
{ok,#state{name=Name,storage=Storage}}.

handle_info({'EXIT', Pid,_}, #state{} = State) ->
kvs:info(?MODULE,"VNODE: EXIT~n",[]),
cr:info(?MODULE,"VNODE: EXIT~n",[]),
{noreply, State};

handle_info(_Info, State) ->
kvs:info(?MODULE,"VNODE: Info ~p~n",[_Info]),
cr:info(?MODULE,"VNODE: Info ~p~n",[_Info]),
{noreply, State}.

kvs_log({Cmd,Self,[{I,N}|T],Tx}=Message, #state{name=Name}=State) ->
Id = element(2,Tx),
kvs:info(?MODULE,"XA RECEIVE: ~p~n",[{Id,Message,Name}]),
cr:info(?MODULE,"XA RECEIVE: ~p~n",[{Id,Message,Name}]),
Operation = #operation{name=Cmd,body=Message,feed_id=Name,status=pending},
{ok,Saved} = %kvs:add(Operation#operation{id=kvs:next_id(operation,1)}),
cr_log:kvs_log(cr:node(),Operation),
Expand All @@ -43,7 +43,7 @@ continuation(Next,{C,S,[{I,N}|T],Tx}=Command,State) ->
Peer = cr:peer({I,N}),
Vpid = cr:vpid({I,Peer}),
case gen_server:cast(Vpid,{pending,Command}) of
ok -> kvs:info("XA SENT OK from ~p to ~p~n",[cr:node(),Peer]), {noreply,State};
ok -> cr:info("XA SENT OK from ~p to ~p~n",[cr:node(),Peer]), {noreply,State};
Error -> timer:sleep(1000),
continuation(Next,Command,State) end.

Expand All @@ -57,11 +57,12 @@ handle_call({latency},_,#state{latency={Min,Max,Avg,N}}=State) ->
{reply,L,State};

handle_call(Request,_,Proc) ->
kvs:info(?MODULE,"VNODE: Call ~p~n",[Request]),
cr:info(?MODULE,"VNODE: Call ~p~n",[Request]),
{reply,ok,Proc}.

handle_cast({client,Client,Chain,Record}, #state{name=Name,storage=Storage}=State) ->
{I,N} = hd(Chain),
cr:info(?MODULE,"hd(Chain) ~p",[{I,N}]),
Self = cr:node(),
gen_server:cast(case cr:peer({I,N}) of
Self -> cr:local(Record);
Expand All @@ -76,14 +77,14 @@ handle_cast({pending,{Cmd,Self,[{I,N}|T],Tx}=Message}, #state{name=Name,storage=
handle_cast(#operation{name=Command,body=Message}=Operation, #state{name=Name,storage=Storage}=State) ->
{Command,Sender,[H|T]=Chain,Tx} = Message,
Replay = try cr_log:kvs_replay(cr:node(),Operation,State,status(Command))
catch E:R -> kvs:info(?MODULE,"~p REPLAY ~p~n",[code(Command),cr:stack(E,R)]),
catch E:R -> cr:info(?MODULE,"~p REPLAY ~p~n",[code(Command),cr:stack(E,R)]),
{rollback, {E,R}, Chain, Tx} end,
{Forward,Latency} = case [Chain,Replay] of
[_,A={rollback,_,_,_}] -> {A,State#state.latency};
[[Name],_] -> last(Operation,State);
[[H|T],_] -> {{Command,Sender,T,Tx},State#state.latency} end,
try continuation(H,Forward,State)
catch X:Y -> kvs:info(?MODULE,"~p SEND ~p~n",[code(Command),cr:stack(X,Y)]) end,
catch X:Y -> cr:info(?MODULE,"~p SEND ~p~n",[code(Command),cr:stack(X,Y)]) end,
{noreply,State#state{latency=Latency}}.

terminate(_Reason, #state{}) -> ok.
Expand Down
Loading