Skip to content
This repository has been archived by the owner on Aug 14, 2023. It is now read-only.

Commit

Permalink
Support batch for all operations (#11)
Browse files Browse the repository at this point in the history
Closes #7

## Added

- `Mxpanel.Operation` struct. This struct holds all the information necessary
to one API operation. It can be delivered alone or grouped in batches.
- `Mxpanel.deliver/2` function.
- `Mxpanel.deliver_later/2` function.

## Changed

- All functions were updated to build a `Mxpanel.Operation` instead of
making a API request directly. The generated operation can be piped to
`Mxpanel.deliver/2` or `Mxpanel.deliver_later/2` to provide a single interface
for delivering information to Mixpanel API. This allow all operations to be batched.
- Default `:pool_size` for `Mxpanel.Batcher` changed from `10` to `System.schedulers_online()`.
- Buffers info telemetry event metadata changed to return the buffer sizes by supported endpoint.

## Removed

- `Mxpanel.Event` struct. Now the build of the event can be made directly
in the `Mxpanel.track/4` function.
- `Mxpanel.track_later/2`. Superseded by `Mxpanel.deliver_later/2`
  • Loading branch information
thiamsantos authored Jul 9, 2021
1 parent 0bf0369 commit 52acab9
Show file tree
Hide file tree
Showing 20 changed files with 1,247 additions and 1,243 deletions.
2 changes: 1 addition & 1 deletion .credo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@
{Credo.Check.Readability.SeparateAliasRequire, []},
{Credo.Check.Readability.SinglePipe, []},
{Credo.Check.Readability.Specs, false},
{Credo.Check.Readability.StrictModuleLayout, []},
{Credo.Check.Readability.StrictModuleLayout, false},
{Credo.Check.Readability.WithCustomTaggedTuple, []},
{Credo.Check.Refactor.ABCSize, []},
{Credo.Check.Refactor.AppendSingleItem, []},
Expand Down
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## Added

- `Mxpanel.Operation` struct. This struct holds all the information necessary
to one API operation. It can be delivered alone or grouped in batches.
- `Mxpanel.deliver/2` function.
- `Mxpanel.deliver_later/2` function.

## Changed

- All functions were updated to build a `Mxpanel.Operation` instead of
making a API request directly. The generated operation can be piped to
`Mxpanel.deliver/2` or `Mxpanel.deliver_later/2` to provide a single interface
for delivering information to Mixpanel API. This allow all operations to be batched.
- Default `:pool_size` for `Mxpanel.Batcher` changed from `10` to `System.schedulers_online()`.
- Buffers info telemetry event metadata changed to return the buffer sizes by supported endpoint.

## Removed

- `Mxpanel.Event` struct. Now the build of the event can be made directly
in the `Mxpanel.track/4` function.
- `Mxpanel.track_later/2`. Superseded by `Mxpanel.deliver_later/2`

## [0.4.0] - 2021-07-02

### Added
Expand Down
62 changes: 19 additions & 43 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,69 +34,45 @@ end
client = %Mxpanel.Client{token: "<mixpanel project token>"}

# track an event
event = Mxpanel.Event.new("signup", "billybob")
Mxpanel.track(client, event)
"signup"
|> Mxpanel.track("billybob")
|> Mxpanel.deliver(client)

# track an event with optional properties
event = Mxpanel.Event.new("signup", "billybob", %{"Favourite Color" => "Red"})
Mxpanel.track(client, event)
"signup"
|> Mxpanel.track("billybob", %{"Favourite Color" => "Red"})
|> Mxpanel.deliver(client)

# set an IP address to get automatic geolocation info
event = Mxpanel.Event.new("signup", "billybob", %{}, ip: "72.229.28.185")
Mxpanel.track(client, event)
"signup"
|> Mxpanel.track("billybob", %{}, ip: "72.229.28.185")
|> Mxpanel.deliver(client)

# track an event with a specific timestamp
event = Mxpanel.Event.new("signup", "billybob", %{}, time: System.os_time(:second) - 60)
Mxpanel.track(client, event)
"signup"
|> Mxpanel.track("billybob", %{}, time: System.os_time(:second) - 60)
|> Mxpanel.deliver(client)

# track an event in background, the event will be buffered, and later sent in batches
Mxpanel.Batcher.start_link(name: MyApp.Batcher, token: "<mixpanel project token>")
event = Mxpanel.Event.new("signup", "billybob")
Mxpanel.track_later(MyApp.MxpanelBatcher, event)

# Create an alias for an existing distinct id
Mxpanel.create_alias(client, "distinct_id", "your_alias")

# create or update a user in Mixpanel Engage
properties = %{"Address" => "1313 Mockingbird Lane", "Birthday" => "1948-01-01"}
Mxpanel.People.set(client, "billybob", properties)

# create or update a user in Mixpanel Engage without altering $last_seen
Mxpanel.People.set(client, "billybob", %{plan: "premium"}, ignore_time: true)

# set a user profile's IP address to get automatic geolocation info
Mxpanel.People.set(client, "billybob", %{plan: "premium"}, ip: "72.229.28.185")

# set properties on a user, don't override
properties = %{"First login date" => "2013-04-01T13:20:00"}
Mxpanel.People.set_once(client, "billybob", properties)

# removes the properties
Mxpanel.People.unset(client, "billybob", ["Address", "Birthday"])

# increment a numeric property
Mxpanel.People.increment(client, "billybob", "Number of Logins", 12)

# append value to a list
Mxpanel.People.append_item(client, "billybob", "Items purchased", "socks")

# remove value from a list
Mxpanel.People.remove_item(client, "billybob", "Items purchased", "t-shirt")

# delete a user
Mxpanel.People.delete(client, "billybob")
"signup"
|> Mxpanel.track("billybob")
|> Mxpanel.deliver_later(MyApp.MxpanelBatcher)

```

[Checkout the documentation](https://hexdocs.pm/mxpanel) for complete usage and available functions.

## Telemetry

Mxpanel currently exposes following Telemetry events:

* `[:mxpanel, :batcher, :buffers_info]` - Dispatched periodically by each
running batcher exposing the size of each running buffer in the pool.
running batcher exposing the size of each running buffer per endpoint in the pool.

* Measurement: `%{}`
* Metadata: `%{batcher_name: atom(), buffer_sizes: [integer()]}`
* Metadata: `%{batcher_name: atom(), buffer_sizes: %{atom() => [integer()]}}`

## Changelog

Expand Down
186 changes: 155 additions & 31 deletions lib/mxpanel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,76 +8,200 @@ defmodule Mxpanel do
alias Mxpanel.API
alias Mxpanel.Batcher
alias Mxpanel.Client
alias Mxpanel.Event
alias Mxpanel.Operation

@event_opts_schema [
time: [
type: :pos_integer,
doc: "Specific timestamp in seconds of the event. Defaults to `System.os_time(:second)`."
],
ip: [
type: :string,
doc: "IP address to get automatic geolocation info."
]
]

@doc """
Send a single event into Mixpanel.
Tracks an event.
client = %Mxpanel.Client{token: "mixpanel project token"}
event = Mxpanel.Event.new("signup", "123")
Mxpanel.track(client, event)
"signup"
|> Mxpanel.track("13793")
|> Mxpanel.deliver()
Import a batch of events into Mixpanel.
"signup"
|> Mxpanel.track("13793")
|> Mxpanel.deliver()
client = %Mxpanel.Client{token: "mixpanel project token"}
event_1 = Mxpanel.Event.new("signup", "123")
event_2 = Mxpanel.Event.new("signup", "456")
"signup"
|> Mxpanel.track("13793", %{"Favourite Color" => "Red"})
|> Mxpanel.deliver()
Mxpanel.track(client, [event_1, event_2])
"signup"
|> Mxpanel.track("13793", %{}, ip: "72.229.28.185")
|> Mxpanel.deliver()
"signup"
|> Mxpanel.track("13793", %{}, time: 1624811298)
|> Mxpanel.deliver()
## Options
#{NimbleOptions.docs(@event_opts_schema)}
"""
@spec track(Client.t(), Event.t() | [Event.t()]) :: :ok | {:error, term()}
def track(%Client{} = client, event_or_events) do
data =
event_or_events
|> List.wrap()
|> Enum.map(&Event.serialize(&1, client.token))
@spec track(String.t(), String.t(), map(), Keyword.t()) :: Operation.t()
def track(name, distinct_id, additional_properties \\ %{}, opts \\ [])
when is_binary(name) and is_binary(distinct_id) and is_map(additional_properties) do
payload = build_event(name, distinct_id, additional_properties, opts)

%Operation{endpoint: :track, payload: payload}
end

defp build_event(name, distinct_id, additional_properties, opts) do
opts = validate_options!(opts)

properties = %{
"distinct_id" => distinct_id,
"$insert_id" => unique_insert_id(),
"time" => Keyword.get(opts, :time, System.os_time(:second))
}

%{
"event" => name,
"properties" =>
additional_properties
|> Map.merge(properties)
|> maybe_put("ip", Keyword.get(opts, :ip), fn ip -> is_binary(ip) end)
}
end

defp validate_options!(opts) do
case NimbleOptions.validate(opts, @event_opts_schema) do
{:ok, opts} ->
opts

{:error, %NimbleOptions.ValidationError{message: message}} ->
raise ArgumentError, message
end
end

API.request(client, "/track", data)
defp maybe_put(map, key, value, condition) do
if condition.(value) do
Map.put(map, key, value)
else
map
end
end

defp unique_insert_id do
32
|> :crypto.strong_rand_bytes()
|> Base.encode64(padding: false)
end

@doc """
Creates an alias for an existing distinct id.
Mxpanel.create_alias(client, "distinct_id", "your_alias")
"distinct_id"
|> Mxpanel.create_alias("your_alias")
|> Mxpanel.deliver()
"""
@spec create_alias(Client.t(), String.t(), String.t()) :: :ok | {:error, term()}
def create_alias(%Client{} = client, distinct_id, alias_id)
@spec create_alias(String.t(), String.t()) :: Operation.t()
def create_alias(distinct_id, alias_id)
when is_binary(distinct_id) and is_binary(alias_id) do
data = %{
payload = %{
"event" => "$create_alias",
"properties" => %{
"distinct_id" => distinct_id,
"alias" => alias_id,
"token" => client.token
"alias" => alias_id
}
}

API.request(client, "/track", data)
%Operation{endpoint: :track, payload: payload}
end

@doc """
Delivers an operation to the mixpanel API using the configured HTTP client.
Mxpanel.deliver(operation, client)
Mxpanel.deliver([operation_1, operation_2], client)
"""
@spec deliver(Operation.t() | [Operation.t()], Client.t()) :: :ok | {:error, term()}

def deliver([], %Client{}), do: :ok

def deliver(operation_or_operations, %Client{} = client) do
path = get_path(operation_or_operations)
data = build_data(operation_or_operations, client)

API.request(client, path, data)
end

defp get_path(operation_or_operations) do
endpoints =
operation_or_operations
|> List.wrap()
|> Enum.map(& &1.endpoint)
|> Enum.uniq()

first_endpoint = List.first(endpoints)

unless Enum.all?(endpoints, fn e -> e == first_endpoint end) do
raise ArgumentError,
"expected all endpoints to be equal, got different endpoints: #{inspect(endpoints)}"
end

case first_endpoint do
:track -> "/track"
:engage -> "/engage"
:groups -> "/groups"
end
end

defp build_data(operations, client) when is_list(operations) do
Enum.map(operations, &build_data(&1, client))
end

defp build_data(%Operation{endpoint: :track, payload: payload}, client) when is_map(payload) do
put_in(payload, ["properties", "token"], client.token)
end

defp build_data(%Operation{endpoint: endpoint, payload: payload}, client)
when endpoint in [:engage, :groups] and is_map(payload) do
Map.put(payload, "$token", client.token)
end

@doc """
Enqueues the event. The event will be store in a buffer and sent in batches to mixpanel.
Enqueues an operation. The operation will be stored in a buffer and sent in batches to mixpanel.
Mxpanel.Batcher.start_link(name: MyApp.MxpanelBatcher, token: "mixpanel project token")
event = Mxpanel.Event.new("signup", "123")
Mxpanel.track_later(MyApp.MxpanelBatcher, event)
"signup"
|> Mxpanel.track("13793")
|> Mxpanel.deliver_later(MyApp.MxpanelBatcher)
Sending multiple operations:
operation_1 = Mxpanel.track("signup", "13793")
operation_2 = Mxpanel.track("first login", "13793")
Mxpanel.deliver_later([operation_1, operation_2], MyApp.MxpanelBatcher)
## Why use it?
HTTP requests to the Mixpanel API often take time and may fail. If you are
tracking events during a web request, you probably, don't want to make your
users wait the extra time for the mixpanel API call to finish.
users wait the extra time for the mixpanel API call to finish. The batcher will
enqueue the operations, send them in batches to mixpanel with automatic retries.
Checkout `Mxpanel.Batcher` for more information.
"""
@spec track_later(Batcher.name(), Event.t() | [Event.t()]) :: :ok
def track_later(batcher_name, event_or_events) when is_atom(batcher_name) do
Batcher.enqueue(batcher_name, event_or_events)
@spec deliver_later(Operation.t() | [Operation.t()], Batcher.name()) :: :ok
def deliver_later(operation_or_operations, batcher_name) when is_atom(batcher_name) do
Batcher.enqueue(batcher_name, operation_or_operations)
end

@doc """
Expand Down
Loading

0 comments on commit 52acab9

Please sign in to comment.