Skip to content

Commit eb39f53

Browse files
committed
wip ra_kv
1 parent ae8cbf2 commit eb39f53

9 files changed

+400
-174
lines changed

src/ra.erl

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
consistent_query/3,
3636
ping/2,
3737
% cluster operations
38-
start_cluster/1,
3938
start_cluster/2,
4039
start_cluster/3,
4140
start_cluster/4,
@@ -350,16 +349,6 @@ start_cluster(System, ClusterName, Machine, ServerIds, Timeout)
350349
end || Id <- ServerIds],
351350
start_cluster(System, Configs, Timeout).
352351

353-
%% @doc Same as `start_cluster/2' but uses the default Ra system.
354-
%% @param ServerConfigs a list of initial server configurations
355-
%% DEPRECATED: use start_cluster/2
356-
%% @end
357-
-spec start_cluster([ra_server:ra_server_config()]) ->
358-
{ok, [ra_server_id()], [ra_server_id()]} |
359-
{error, cluster_not_formed}.
360-
start_cluster(ServerConfigs) ->
361-
start_cluster(default, ServerConfigs).
362-
363352
%% @doc Starts a new distributed ra cluster.
364353
%% @param System the system name
365354
%% @param ServerConfigs a list of initial server configurations

src/ra.hrl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@
8383
%% represent a unique entry in the ra log
8484
-type log_entry() :: {ra_index(), ra_term(), term()}.
8585

86-
-type chunk_flag() :: next | last.
86+
-type chunk_flag() :: pre | next | last.
8787

8888
-type consistent_query_ref() :: {From :: term(), Query :: ra:query_fun(), ConmmitIndex :: ra_index()}.
8989

@@ -169,7 +169,7 @@
169169
{term :: ra_term(), % the leader's term
170170
leader_id :: ra_server_id(),
171171
meta :: snapshot_meta(),
172-
chunk_state :: {pos_integer(), chunk_flag()} | undefined,
172+
chunk_state = {0, pre} :: {pos_integer(), chunk_flag()},
173173
data :: term()
174174
}).
175175

src/ra_kv.erl

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
-module(ra_kv).
2+
-behaviour(ra_machine).
3+
-include("src/ra.hrl").
4+
5+
-include_lib("eunit/include/eunit.hrl").
6+
7+
-export([
8+
init/1,
9+
apply/3,
10+
% state_enter/2,
11+
% tick/2,
12+
init_aux/1,
13+
handle_aux/5,
14+
live_indexes/1,
15+
overview/1
16+
]).
17+
18+
-export([
19+
start_cluster/3,
20+
add_member/3,
21+
22+
put/4
23+
]).
24+
25+
26+
-define(STATE, ?MODULE).
27+
28+
-type key() :: binary().
29+
-type value() :: term().
30+
31+
-record(?STATE, {keys = #{} ::
32+
#{key() := [ra:index() | Hash :: non_neg_integer()]}}).
33+
34+
35+
-record(put, {key :: key(),
36+
value :: term(),
37+
meta :: #{size := non_neg_integer(),
38+
hash := integer()}}).
39+
40+
-type command() :: #put{}.
41+
-opaque state() :: #?STATE{}.
42+
43+
-export_type([state/0,
44+
command/0]).
45+
46+
%% mgmt
47+
-spec start_cluster(atom(), atom(), map()) ->
48+
{ok, [ra_server_id()], [ra_server_id()]} |
49+
{error, cluster_not_formed}.
50+
start_cluster(System, Name, #{members := ServerIds})
51+
when is_atom(Name) andalso
52+
is_atom(System) ->
53+
Machine = {module, ?MODULE, #{}},
54+
Configs = [begin
55+
UId = ra:new_uid(ra_lib:to_binary(Name)),
56+
#{id => Id,
57+
uid => UId,
58+
cluster_name => Name,
59+
log_init_args => #{uid => UId},
60+
initial_members => ServerIds,
61+
machine => Machine}
62+
end || Id <- ServerIds],
63+
ra:start_cluster(System, Configs).
64+
65+
add_member(System, {Name, _} = Id, LeaderId) ->
66+
{ok, Members, _} = ra:members(LeaderId),
67+
UId = ra:new_uid(ra_lib:to_binary(Name)),
68+
Machine = {module, ?MODULE, #{}},
69+
Config = #{id => Id,
70+
uid => UId,
71+
cluster_name => Name,
72+
log_init_args => #{uid => UId},
73+
initial_members => Members,
74+
machine => Machine},
75+
ok = ra:start_server(System, Config),
76+
{ok, _, _} = ra:add_member(LeaderId, Id),
77+
ok.
78+
79+
80+
%% client
81+
-spec put(ra:server_id(), key(), value(), non_neg_integer()) ->
82+
{ok, map()} | {error, term()} | {timeout, ra:server_id()}.
83+
put(ServerId, Key, Value, Timeout) ->
84+
Hash = erlang:phash2(Value),
85+
Put = #put{key = Key,
86+
value = Value,
87+
meta = #{size => erlang:external_size(Value),
88+
hash => Hash}},
89+
case ra:process_command(ServerId, Put, Timeout) of
90+
{ok, {ok, Meta}, LeaderId} ->
91+
{ok, Meta#{leader => LeaderId}};
92+
Err ->
93+
Err
94+
end.
95+
96+
97+
%% get performs a consistent query that returns the index, hash and member set
98+
%% then perform an aux query to actually get the data for a given index.
99+
%% if addressing a follower (say there is a local one) then the read may need
100+
%% to wait if the index isn't yet available locally (term also need to be checked)
101+
%% or check that the machien state has the right index for a given key before
102+
%% reading the value from the log
103+
104+
105+
%% state machine
106+
107+
init(_) ->
108+
#?MODULE{}.
109+
110+
apply(#{index := Idx} = Meta,
111+
#put{key = Key,
112+
meta = #{hash := Hash}},
113+
#?STATE{keys = Keys} = State0) ->
114+
State = State0#?STATE{keys = maps:put(Key, [Idx | Hash], Keys)},
115+
{State, {ok, Meta}, []}.
116+
117+
live_indexes(#?STATE{keys = Keys}) ->
118+
maps:fold(fun (_K, [Idx | _], Acc) ->
119+
[Idx | Acc]
120+
end, [], Keys).
121+
122+
-record(aux, {}).
123+
init_aux(_) ->
124+
#aux{}.
125+
126+
handle_aux(_RaState, {call, _From}, take_snapshot, Aux, Internal) ->
127+
MacState = ra_aux:machine_state(Internal),
128+
LastAppliedIdx = ra_aux:last_applied(Internal),
129+
%% TODO: replace release cursor with simpler snapshot effect that is always
130+
%% attempted?
131+
{reply, ok, Aux, Internal,
132+
[{release_cursor, LastAppliedIdx, MacState}]};
133+
handle_aux(_RaState, _, _, Aux, Internal) ->
134+
{no_reply, Aux, Internal}.
135+
136+
overview(#?STATE{keys = Keys} = State) ->
137+
#{num_keys => maps:size(Keys),
138+
live_indexes => live_indexes(State)}.

0 commit comments

Comments
 (0)