Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remember the generated stream_id across batches of events being appeneded. #289

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions lib/event_store/sql/statements/insert_events.sql.eex
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,24 @@ WITH
SET stream_version = stream_version + $2::bigint
WHERE stream_id = 0
RETURNING stream_version - $2::bigint as initial_stream_version
),
linked_stream_events AS (
INSERT INTO "<%= schema %>".stream_events
(
event_id,
stream_id,
stream_version,
original_stream_id,
original_stream_version
)
SELECT
new_events_indexes.event_id,
0,
linked_stream.initial_stream_version + new_events_indexes.index,
stream.stream_id,
new_events_indexes.stream_version
FROM
new_events_indexes, linked_stream, stream
)
INSERT INTO "<%= schema %>".stream_events
(
event_id,
stream_id,
stream_version,
original_stream_id,
original_stream_version
)
SELECT
new_events_indexes.event_id,
0,
linked_stream.initial_stream_version + new_events_indexes.index,
stream.stream_id,
new_events_indexes.stream_version
FROM
new_events_indexes, linked_stream, stream;

SELECT stream_id from stream;
36 changes: 20 additions & 16 deletions lib/event_store/sql/statements/insert_events_any_version.sql.eex
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,24 @@ WITH
SET stream_version = stream_version + $2::bigint
WHERE stream_id = 0
RETURNING stream_version - $2::bigint as initial_stream_version
),
linked_stream_events AS (
INSERT INTO "<%= schema %>".stream_events
(
event_id,
stream_id,
stream_version,
original_stream_id,
original_stream_version
)
SELECT
new_events_indexes.event_id,
0,
linked_stream.initial_stream_version + new_events_indexes.index,
stream.stream_id,
stream.initial_stream_version + new_events_indexes.index
FROM
new_events_indexes, linked_stream, stream
)
INSERT INTO "<%= schema %>".stream_events
(
event_id,
stream_id,
stream_version,
original_stream_id,
original_stream_version
)
SELECT
new_events_indexes.event_id,
0,
linked_stream.initial_stream_version + new_events_indexes.index,
stream.stream_id,
stream.initial_stream_version + new_events_indexes.index
FROM
new_events_indexes, linked_stream, stream;

SELECT stream_id from stream;
21 changes: 14 additions & 7 deletions lib/event_store/storage/appender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@ defmodule EventStore.Storage.Appender do
events
|> Stream.map(&encode_uuids/1)
|> Stream.chunk_every(1_000)
|> Enum.each(fn batch ->
|> Enum.reduce(stream_id, fn batch, stream_id ->
event_count = length(batch)

with :ok <- insert_event_batch(conn, stream_id, stream_uuid, batch, event_count, opts) do
with {:ok, new_stream_id} <-
insert_event_batch(conn, stream_id, stream_uuid, batch, event_count, opts) do
Logger.debug("Appended #{event_count} event(s) to stream #{inspect(stream_uuid)}")

:ok
new_stream_id
else
{:error, error} -> throw({:error, error})
end
end)

:ok
catch
{:error, error} = reply ->
Logger.warning(
Expand Down Expand Up @@ -110,9 +112,14 @@ defmodule EventStore.Storage.Appender do
params = [stream_id_or_uuid, event_count] ++ build_insert_parameters(events)

case Postgrex.query(conn, statement, params, opts) do
{:ok, %Postgrex.Result{num_rows: 0}} -> {:error, :not_found}
{:ok, %Postgrex.Result{}} -> :ok
{:error, error} -> handle_error(error)
{:ok, %Postgrex.Result{num_rows: 0}} ->
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch can no longer be hit, and whatever case it was guarding against will change. There is no test that was covering this.

{:error, :not_found}

{:ok, %Postgrex.Result{rows: [[stream_id]]}} ->
{:ok, stream_id}

{:error, error} ->
handle_error(error)
end
end

Expand Down
4 changes: 1 addition & 3 deletions lib/event_store/streams/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,7 @@ defmodule EventStore.Streams.Stream do

append_to_stream(conn, stream_uuid, expected_version, events, opts)
else
# We should never get here, but just in case we break something in another
# part of the app, this will give us better output in the tests.
{:error, :already_retried_once}
{:error, {:already_retried_once, :duplicate_stream_uuid}}
end
end

Expand Down
2 changes: 1 addition & 1 deletion test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
exclude = [:ignore, :manual, :migration, :slow]
exclude = [:ignore, :manual, :migration]

ExUnit.start(exclude: exclude)
Loading