diff --git a/influxdb-schema-updater b/influxdb-schema-updater new file mode 100755 index 0000000..46b4b9d --- /dev/null +++ b/influxdb-schema-updater @@ -0,0 +1,659 @@ +#!/usr/bin/env perl +################################################################################ +# +# influxdb-schema-updater +# +# Script to update InfluxDB databases, retention policies and continuous queries +# according to the config files. Exits with 0 if and only if every required +# update has been executed successfully. +# +# Written by Anselme Goetschmann ang@open.ch April 2018 +# Copyright (c) 2018 Open Systems AG, Switzerland +# All Rights Reserved. +# +################################################################################ + +use 5.010; +use strict; +use warnings; + +use InfluxDB::HTTP; + +use Getopt::Long; +use Pod::Usage qw(pod2usage); +use IPC::Run qw(run); +use JSON::MaybeXS; +use File::Slurper qw(read_text); +use List::Util qw(sum0); + + +sub main { + my $show_usage; + my $dryrun = 0; + my $diff = 0; + my $force = 0; + my $schema_dir = '/etc/influxdb/schema/'; + my $url = 'http://localhost:8086'; + + GetOptions( + ("help" => \$show_usage, + "dryrun" => \$dryrun, + "diff" => \$diff, + "force" => \$force, + "config=s" => \$schema_dir, + "url=s" => \$url) + ); + + if ($show_usage) { + pod2usage(-verbose => 1) + } + + # schema configuration directory + if (! -d $schema_dir) { + pod2usage("The InfluxDB schema configuration directory $schema_dir does not exist"); + } + $schema_dir =~ s/\/$//; + + # influxdb client + $url =~ /^(?:(?\w+):\/\/)?(?\w+):(?\d+)$/; + if ($+{protocol} && $+{protocol} ne 'http') { + pod2usage("Got $+{protocol} as protocol to reach InfluxDB, but only http is supported."); + } + if (!$+{host} || !$+{port}) { + pod2usage("The host and the port could not be extracted from the url $url"); + } + my $influxdb_client = InfluxDB::HTTP->new(host => $+{host}, port => $+{port}); + my $ping = $influxdb_client->ping(); + die "Failed to reach InfluxDB at $url: $ping\n" if !$ping; + + # get the schema updates from the configuration directory compared to what is in influxdb + my $updates = extract_updates($influxdb_client, $schema_dir, $dryrun, $force); + + # only print the updates if we're in diff mode + if ($diff) { + print_diff($updates); + exit 0; + } + + # apply the updates + my $unapplied_updates_count = apply_updates($updates, $influxdb_client); + exit ($unapplied_updates_count == 0 ? 0 : 1); +} + +sub extract_updates { + my ($influxdb_client, $schema_dir, $dryrun, $force) = @_; + + my $db_schemas_in_influxdb = load_db_schemas_in_influxdb($influxdb_client); + my $db_schemas_in_config = load_db_schemas_in_config($schema_dir); + my $updates = extract_database_updates($db_schemas_in_influxdb, $db_schemas_in_config, $dryrun, $force); + + my $all_cqs_in_influxdb = load_all_cqs_in_influxdb($influxdb_client); + my $all_cqs_in_config = load_all_cqs_in_config($schema_dir); + my ($cq_deletions, $cq_updates_and_creations) = extract_continuous_query_updates($all_cqs_in_influxdb, $all_cqs_in_config, $dryrun, $force); + $updates = [@$cq_deletions, @$updates, @$cq_updates_and_creations]; # delete old stuff first + + return $updates; +} + +sub apply_updates { + my ($updates, $influxdb_client) = @_; + + my %object_str = (db => 'database', rp => 'retention policy', cq => 'continuous query'); + my %action_prefix = (delete => '[-]', update => '[~]', create => '[+]'); + + my $skipped_count = 0; + for my $update (@$updates) { + my $description = "$update->{action} $object_str{$update->{object}} $update->{name}" . ($update->{object} ne 'db' ? " on database $update->{db}" : ''); + if ($update->{skip}) { + say "[!] skipped: $description"; + $skipped_count += 1; + next; + } + say "$action_prefix{$update->{action}} $description"; + query_influxql($influxdb_client, $update->{query}) + } + + return $skipped_count; +} + +sub print_diff { + my ($updates) = @_; + + for my $update (@$updates) { + print '-- ' if $update->{skip}; + say $update->{query}; + } +} + +# Databases and Retention Policies --------------------------------------------- + +# returns an array of updates: +# [ +# { +# action => 'delete' | 'update' | 'create', +# object => 'db' | 'rp' | 'cq', +# db => , +# name => , +# query => , +# skip => 0 | 1, # whether this change should be skipped +# }, +# ... +# ] +sub extract_database_updates { + my ($db_schemas_in_influxdb, $db_schemas_in_config, $dryrun, $force) = @_; + + my ($old_dbs, $eq_dbs, $new_dbs) = get_Ldifference_intersection_Rdifference([keys %{$db_schemas_in_influxdb}], [keys %{$db_schemas_in_config}]); + + my %rp_updates; + for my $db (@$eq_dbs) { + my ($old, $updated, $new) = extract_retention_policy_updates($db, $db_schemas_in_influxdb->{$db}, $db_schemas_in_config->{$db}->{rps}, $dryrun, $force); + $rp_updates{old_rps}->{$db} = $old; + $rp_updates{updated_rps}->{$db} = $updated; + $rp_updates{new_rps}->{$db} = $new; + } + + # array of updates in the order in which they should be applied + my @updates; + # old retention policies + for my $db (reverse sort keys %{$rp_updates{old_rps}}) { + push @updates, @{$rp_updates{old_rps}->{$db}}; + } + # old databases + for my $db (reverse sort @$old_dbs){ + push @updates, { + action => 'delete', + object => 'db', + db => $db, + name => $db, + query => "DROP DATABASE $db;", + skip => $dryrun || !$force, + }; + } + # new databases + for my $db (sort @$new_dbs){ + push @updates, { + action => 'create', + object => 'db', + db => $db, + name => $db, + query => $db_schemas_in_config->{$db}->{create_query}, + skip => $dryrun, + }; + } + # new retention policies + for my $db (sort keys %{$rp_updates{new_rps}}) { + push @updates, @{$rp_updates{new_rps}->{$db}}; + } + # updated retention policies + for my $db (sort keys %{$rp_updates{updated_rps}}) { + push @updates, @{$rp_updates{updated_rps}->{$db}}; + } + + return \@updates; +} + +sub extract_retention_policy_updates { + my ($db, $rps_in_influxdb, $rps_in_config, $dryrun, $force) = @_; + + my ($old_rps, $eq_rps, $new_rps) = get_Ldifference_intersection_Rdifference([keys %{$rps_in_influxdb}], [keys %{$rps_in_config}]); + + my @old_rps; + for my $rp (reverse sort @$old_rps) { + push @old_rps, { + action => 'delete', + object => 'rp', + db => $db, + name => $rp, + query => "DROP RETENTION POLICY \"$rp\" ON $db;", + skip => $dryrun || !$force, + }; + } + my @updated_rps; + for my $rp (sort @$eq_rps) { + if (compare_rps($rps_in_influxdb->{$rp}, $rps_in_config->{$rp}) != 0) { + push @updated_rps, { + action => 'update', + object => 'rp', + db => $db, + name => $rp, + query => "ALTER RETENTION POLICY \"$rp\" ON $db DURATION $rps_in_config->{$rp}->{duration} REPLICATION 1 SHARD DURATION $rps_in_config->{$rp}->{shard_duration}" . ($rps_in_config->{$rp}->{default} ? ' DEFAULT;' : ';'), + skip => $dryrun, + }; + } + } + my @new_rps; + for my $rp (sort @$new_rps) { + push @new_rps, { + action => 'create', + object => 'rp', + db => $db, + name => $rp, + query => "CREATE RETENTION POLICY \"$rp\" ON $db DURATION $rps_in_config->{$rp}->{duration} REPLICATION 1 SHARD DURATION $rps_in_config->{$rp}->{shard_duration}" . ($rps_in_config->{$rp}->{default} ? ' DEFAULT;' : ';'), + skip => $dryrun, + }; + } + + return (\@old_rps, \@updated_rps, \@new_rps); +} + +sub compare_rps { + my ($rp1, $rp2) = @_; + + return (to_sec($rp1->{duration}) != to_sec($rp2->{duration})) + || (to_sec($rp1->{shard_duration}) != to_sec($rp2->{shard_duration})) + || ($rp1->{default} xor $rp2->{default}); +} + +# Continuous Queries ----------------------------------------------------------- + +sub extract_continuous_query_updates { + my ($all_cqs_in_influxdb, $all_cqs_in_config, $dryrun, $force) = @_; + + my %dbs_union = map { $_ => 1 } (keys %$all_cqs_in_influxdb, keys %$all_cqs_in_config); + my @dbs = keys %dbs_union; + + my @cq_deletions; + my @cq_updates_and_creations; + for my $db (sort @dbs) { + my $in_influxdb = {}; + $in_influxdb = $all_cqs_in_influxdb->{$db} if exists $$all_cqs_in_influxdb{$db}; + my $in_config = {}; + $in_config = $all_cqs_in_config->{$db} if exists $$all_cqs_in_config{$db}; + + my ($old, $eq, $new) = get_Ldifference_intersection_Rdifference([keys %$in_influxdb], [keys %$in_config]); + + for my $cq (sort @$old) { + push @cq_deletions, { + action => 'delete', + object => 'cq', + db => $db, + name => $cq, + query => "DROP CONTINUOUS QUERY $cq ON $db;", + skip => $dryrun || !$force, + }; + } + for my $cq (sort @$eq) { + if (compare_cqs($in_influxdb->{$cq}, $in_config->{$cq}) != 0) { + push @cq_updates_and_creations, { + action => 'update', + object => 'cq', + db => $db, + name => $cq, + query => "DROP CONTINUOUS QUERY $cq ON $db; $all_cqs_in_config->{$db}->{$cq};", + skip => $dryrun, + }; + } + } + for my $cq (sort @$new) { + push @cq_updates_and_creations, { + action => 'create', + object => 'cq', + db => $db, + name => $cq, + query => $all_cqs_in_config->{$db}->{$cq} . ';', + skip => $dryrun, + }; + } + } + + @cq_deletions = reverse @cq_deletions; + + return (\@cq_deletions, \@cq_updates_and_creations); +} + +sub compare_cqs { + my ($cq1, $cq2) = @_; + for my $cq ($cq1, $cq2) { + $cq =~ s/ //g; + $cq =~ s/;//g; + $cq =~ s/"//g; + $cq = lc $cq; + } + return $cq1 cmp $cq2; +} + + +# Data ------------------------------------------------------------------------- + +# { +# => { +# => { +# duration => ..., +# shard_duration => ..., +# default => ..., +# }, +# ... +# } +# } +sub load_db_schemas_in_influxdb { + my ($influxdb_client) = @_; + + my $query_result = query_influxql($influxdb_client, 'SHOW DATABASES'); + my $dbs_in_influxdb = $query_result->{'results'}->[0]->{series}->[0]->{values}; + my @dbs_in_influxdb = grep { $_ ne '_internal' } + map { $_->[0] } + @$dbs_in_influxdb; + + my %db_schemas_in_influxdb; + for my $db (@dbs_in_influxdb) { + my $rp_query_res = query_influxql($influxdb_client, "SHOW RETENTION POLICIES ON $db"); + $db_schemas_in_influxdb{$db} = { + map { $_->[0] => { + duration => $_->[1], + shard_duration => $_->[2], + default => $_->[4], + } + } + @{$rp_query_res->{results}->[0]->{series}->[0]->{values}} + }; + } + + return \%db_schemas_in_influxdb; +} + +# { +# => { +# create_query => '...', +# rps => { +# => { +# duration => ..., +# shard_duration => ..., +# default => ..., +# }, +# ... +# }, +# }, +# ... +# } +sub load_db_schemas_in_config { + my ($schema_dir) = @_; + + my %dbs_struct; + my $db_files = get_schema_files_for_dir("$schema_dir/db"); + + for my $db_file (@$db_files) { + my $create_queries = parse_create_queries("$schema_dir/db/$db_file"); + + for my $db (keys %$create_queries) { + my $create_query = $create_queries->{$db}; + my $rps = parse_retention_policies($db, $create_query); + + $dbs_struct{$db} = { + create_query => $create_query, + rps => $rps, + }; + } + } + return \%dbs_struct; +} + +# { +# => { +# => , +# ... +# }, +# } +sub load_all_cqs_in_influxdb { + my ($influxdb_client) = @_; + + my %all_cqs_in_influxdb; + my $cqs_query_res = query_influxql($influxdb_client, "SHOW CONTINUOUS QUERIES"); + %all_cqs_in_influxdb = map { $_->{name} => { map { $_->[0] => $_->[1] } @{$_->{values}} } } + @{$cqs_query_res->{results}->[0]->{series}}; + return \%all_cqs_in_influxdb; +} + +# { +# => { +# => , +# ... +# }, +# } +sub load_all_cqs_in_config { + my ($schema_dir) = @_; + + my %all_cqs_in_config; + my $cq_files = get_schema_files_for_dir("$schema_dir/cq"); + for my $cq_file (@$cq_files) { + my $cqs_in_file = parse_continuous_queries("$schema_dir/cq/$cq_file"); + %all_cqs_in_config = (%all_cqs_in_config, %$cqs_in_file); + } + return \%all_cqs_in_config; +} + + +sub get_schema_files_for_dir { + my ($dir) = @_; + my @files = grep { $_ } + map { /\/([\w.]+)$/; $1 } + grep { -f } + glob("$dir/*"); + return \@files; +} + +# Parsers ---------------------------------------------------------------------- + +# parse a file possibly containing multiple database create queries (with their respective RPs) +sub parse_create_queries { + my ($file_name) = @_; + my $file_content = read_text($file_name); + + my %create_queries; + while ($file_content =~ /(create database (\w+)[\s\S]+?)(?=(create database)|\z)/ig) { + my ($create_query, $db) = ($1, $2); + $create_query =~ s/^\s+|\s+$//g; + $create_queries{$db} = $create_query; + } + if (! %create_queries) { + die "No create query was found in $file_name, there is probably something wrong with the regex"; + } + + return \%create_queries; +} + +# parse RPs from something like: +# CREATE DATABASE test WITH DURATION 260w REPLICATION 1 SHARD DURATION 12w NAME rp2; +# CREATE RETENTION POLICY rp1 ON test DURATION 100d REPLICATION 1 SHARD DURATION 2w; +sub parse_retention_policies { + my ($db, $db_create_query) = @_; + + my $default_rp; + # first rp in the config + my %rps; + if ($db_create_query =~ /create database $db with duration ((?:\d+[smhdw])|(?:inf)) [\w ]*? shard duration (\d+[smhdw]) name "?([\w.]+)"?/ig) { + $rps{$3} = { + duration => $1, + shard_duration => $2, + }; + $default_rp = $3; + } + else { # https://docs.influxdata.com/influxdb/v1.5/query_language/database_management/#retention-policy-management + $rps{'autogen'} = { + duration => 'INF', + shard_duration => '7d', + }; + $default_rp = 'autogen'; + } + # loop over the rest + my $at_least_one_rp = scalar %rps; + while ($db_create_query =~ /create retention policy "?([\w.]+)"? on $db duration ((?:\d+[smhdw])|(?:inf)) replication 1 shard duration (\d+[smhdw])( default)?/ig) { + $at_least_one_rp = 1; + $rps{$1} = { + duration => $2, + shard_duration => $3, + }; + $default_rp = $1 if $4; + } + if (!$at_least_one_rp) { + die "No retention policy was matched for $db, there is probably something wrong with the regex!"; + } + # drop rps if needed + while ($db_create_query =~ /drop retention policy "?([\w.]+)"? on "?$db"?/ig) { + delete $rps{$1}; + } + + # only take in account the last rp declared as default + if ($default_rp) { + $rps{$default_rp}->{default} = 1; + } + + return \%rps; +} + +# parse CQs from something like: +# CREATE CONTINUOUS QUERY cq1 ON test RESAMPLE EVERY 5m FOR 10m BEGIN SELECT LAST(a) AS b, c INTO test.rp2.m FROM test.rp1.m GROUP BY time(5m) END; +# CREATE CONTINUOUS QUERY cq2 ON test RESAMPLE EVERY 5m FOR 10m BEGIN SELECT MAX(a) AS b, c INTO test.rp2.m FROM test.rp1.m GROUP BY time(5m) END; +sub parse_continuous_queries { + my ($filename) = @_; + + my $file_content = read_text($filename); + + my %cqs; + while ($file_content =~ /(create continuous query "?([\w.]+)"? on "?(\w+)"? [\s\S]+? end)/ig) { # some cq names contain '.' + $cqs{$3}->{$2} = $1; + } + return \%cqs; +} + + +# Helpers ---------------------------------------------------------------------- + +# given two sets (arrays) of strings, returns the left difference, the intersection and the right difference +sub get_Ldifference_intersection_Rdifference { + my ($l, $r) = @_; + my @l = sort @{$l}; + my @r = sort @{$r}; + + my @ldiff; + my @inter; + my @rdiff; + while (@l || @r) { + if (!@r) { + push @ldiff, @l; + last; + } + elsif (!@l) { + push @rdiff, @r; + last; + } + + my $rel = $l[0] cmp $r[0]; + + if ($rel < 0) { + push @ldiff, shift @l; + } + elsif ($rel == 0) { + push @inter, shift @l; + shift @r; + } + else { # $rel > 0 + push @rdiff, shift @r; + } + } + + return (\@ldiff, \@inter, \@rdiff); +} + +# convert InfluxDB duration string to seconds +sub to_sec { + my ($d) = @_; + + # infinity is encoded as 0s in InfluxDB + return 0 if $d eq 'INF'; + + state $in_seconds = { + 'w' => 60 * 60 * 24 * 7, + 'd' => 60 * 60 * 24, + 'h' => 60 * 60, + 'm' => 60, + 's' => 1, + }; + my $s; + while ($d =~ /(\d+?)([smhdw])/g) { + $s += $1 * $in_seconds->{$2}; + } + + return $s; +} + +sub query_influxql { + my ($influxdb_client, $ifql) = @_; + my $query = $influxdb_client->query($ifql); + die "The query \"$ifql\" failed.\n" if ! $query; + my $data = $query->data(); + die "Error: \"$data->{results}->[0]->{error}\" when running InfluxQL query \"$ifql\"\n" if $data->{results}->[0]->{error}; + return $data; +} + +# ------------------------------------------------------------------------------ + +main(); + +# ------------------------------------------------------------------------------ + +__END__ + +=head1 NAME + +influxdb-schema-updater - Update InfluxDB databases, retention policies and continuous queries + +=head1 VERSION + +Version 0.01 + +=head1 SYNOPSIS + +influxdb-schema-updater [--help] [--dryrun] [--diff] [--force] [--config ] [--port ] + +=head1 OPTIONS + +=over 4 + +=item B<--help> + +Print a help message and exit. + +=item B<--dryrun> + +Print the changes which would be applied in normal mode. + +=item B<--diff> + +Print the InfluxQL queries instead of executing them. + +=item B<--force> + +Apply the changes which were prevented in normal mode. + +=item B<--config> + +The directory where the schema files are located. Default is /etc/influxdb/schema/. + +=item B<--url> + +The url where the InfluxDB HTTP API is reachable. Default is localhost:8086. + +=back + +=head1 DESCRIPTION + +B will read the config directory which should have the following +structure: + + db/ + # contains one or more InfluxQL create queries for a database and its RPs + .ifql + .ifql + ... + cq/ + # contains InfluxQL create queries for CQs + .ifql + .ifql + ... + +and compare the databases, retention policies (RPs) and continuous queries (CQs) +to the ones in the InfluxDB instance reachable at . If there is a +difference, InfluxDB will be updated. The exit code is 0 if and only if no +update was skipped. + +=cut diff --git a/t/data/test00/.keepme b/t/data/test00/.keepme new file mode 100644 index 0000000..e69de29 diff --git a/t/data/test01/db/db1.ifql b/t/data/test01/db/db1.ifql new file mode 100644 index 0000000..14379bd --- /dev/null +++ b/t/data/test01/db/db1.ifql @@ -0,0 +1 @@ +CREATE DATABASE test; diff --git a/t/data/test02/db/db1.ifql b/t/data/test02/db/db1.ifql new file mode 100644 index 0000000..f4ce06e --- /dev/null +++ b/t/data/test02/db/db1.ifql @@ -0,0 +1,2 @@ +CREATE DATABASE test; +CREATE RETENTION POLICY rp1 ON test DURATION 90d REPLICATION 1 SHARD DURATION 2w; diff --git a/t/data/test03/db/db1.ifql b/t/data/test03/db/db1.ifql new file mode 100644 index 0000000..41e9df1 --- /dev/null +++ b/t/data/test03/db/db1.ifql @@ -0,0 +1,2 @@ +CREATE DATABASE test; +CREATE RETENTION POLICY rp1 ON test DURATION 100d REPLICATION 1 SHARD DURATION 2w; diff --git a/t/data/test04/db/db1.ifql b/t/data/test04/db/db1.ifql new file mode 100644 index 0000000..f92da11 --- /dev/null +++ b/t/data/test04/db/db1.ifql @@ -0,0 +1,2 @@ +CREATE DATABASE test WITH DURATION 260w REPLICATION 1 SHARD DURATION 12w NAME rp2; +CREATE RETENTION POLICY rp1 ON test DURATION 100d REPLICATION 1 SHARD DURATION 2w; diff --git a/t/data/test05/cq/db1.ifql b/t/data/test05/cq/db1.ifql new file mode 100644 index 0000000..9f5b156 --- /dev/null +++ b/t/data/test05/cq/db1.ifql @@ -0,0 +1,2 @@ +CREATE CONTINUOUS QUERY cq1 ON test RESAMPLE EVERY 5m FOR 10m BEGIN SELECT LAST(a) AS b, c INTO test.rp2.m FROM test.rp1.m GROUP BY time(5m) END; +CREATE CONTINUOUS QUERY cq2 ON test RESAMPLE EVERY 5m FOR 10m BEGIN SELECT LAST(a) AS b, c INTO test.rp2.m FROM test.rp1.m GROUP BY time(5m) END; \ No newline at end of file diff --git a/t/data/test05/db/db1.ifql b/t/data/test05/db/db1.ifql new file mode 100644 index 0000000..f92da11 --- /dev/null +++ b/t/data/test05/db/db1.ifql @@ -0,0 +1,2 @@ +CREATE DATABASE test WITH DURATION 260w REPLICATION 1 SHARD DURATION 12w NAME rp2; +CREATE RETENTION POLICY rp1 ON test DURATION 100d REPLICATION 1 SHARD DURATION 2w; diff --git a/t/data/test06/cq/db1.ifql b/t/data/test06/cq/db1.ifql new file mode 100644 index 0000000..82ad583 --- /dev/null +++ b/t/data/test06/cq/db1.ifql @@ -0,0 +1,2 @@ +CREATE CONTINUOUS QUERY cq1 ON test RESAMPLE EVERY 5m FOR 10m BEGIN SELECT LAST(a) AS b, c INTO test.rp2.m FROM test.rp1.m GROUP BY time(5m) END; +CREATE CONTINUOUS QUERY cq2 ON test RESAMPLE EVERY 5m FOR 10m BEGIN SELECT MAX(a) AS b, c INTO test.rp2.m FROM test.rp1.m GROUP BY time(5m) END; \ No newline at end of file diff --git a/t/data/test06/db/db1.ifql b/t/data/test06/db/db1.ifql new file mode 100644 index 0000000..f92da11 --- /dev/null +++ b/t/data/test06/db/db1.ifql @@ -0,0 +1,2 @@ +CREATE DATABASE test WITH DURATION 260w REPLICATION 1 SHARD DURATION 12w NAME rp2; +CREATE RETENTION POLICY rp1 ON test DURATION 100d REPLICATION 1 SHARD DURATION 2w; diff --git a/t/data/test07/cq/db1.ifql b/t/data/test07/cq/db1.ifql new file mode 100644 index 0000000..792ac2e --- /dev/null +++ b/t/data/test07/cq/db1.ifql @@ -0,0 +1 @@ +CREATE CONTINUOUS QUERY cq1 ON test RESAMPLE EVERY 5m FOR 10m BEGIN SELECT LAST(a) AS b, c INTO test.rp2.m FROM test.rp1.m GROUP BY time(5m) END; \ No newline at end of file diff --git a/t/data/test07/db/db1.ifql b/t/data/test07/db/db1.ifql new file mode 100644 index 0000000..f92da11 --- /dev/null +++ b/t/data/test07/db/db1.ifql @@ -0,0 +1,2 @@ +CREATE DATABASE test WITH DURATION 260w REPLICATION 1 SHARD DURATION 12w NAME rp2; +CREATE RETENTION POLICY rp1 ON test DURATION 100d REPLICATION 1 SHARD DURATION 2w; diff --git a/t/data/test08/cq/db2.ifql b/t/data/test08/cq/db2.ifql new file mode 100644 index 0000000..ad9ed9c --- /dev/null +++ b/t/data/test08/cq/db2.ifql @@ -0,0 +1 @@ +CREATE CONTINUOUS QUERY cq1 ON test2 RESAMPLE EVERY 5m FOR 10m BEGIN SELECT LAST(a) AS b, c INTO test2.rp2.m FROM test2.rp1.m GROUP BY time(5m) END; \ No newline at end of file diff --git a/t/data/test08/db/db2.ifql b/t/data/test08/db/db2.ifql new file mode 100644 index 0000000..c5c68f0 --- /dev/null +++ b/t/data/test08/db/db2.ifql @@ -0,0 +1,2 @@ +CREATE DATABASE test2 WITH DURATION 260w REPLICATION 1 SHARD DURATION 12w NAME rp2; +CREATE RETENTION POLICY rp1 ON test2 DURATION 100d REPLICATION 1 SHARD DURATION 2w; diff --git a/t/influxdb-schema-updater.t b/t/influxdb-schema-updater.t new file mode 100755 index 0000000..50b1527 --- /dev/null +++ b/t/influxdb-schema-updater.t @@ -0,0 +1,227 @@ +#!/usr/bin/env perl + +use 5.010; +use strict; +use warnings; + +use Test::More; + +use File::Temp; +use File::Slurper qw(write_text); +use IPC::Run qw(run); +use File::Spec; +use File::Basename; + +sub test { + my $curdir = get_directory_of_this_file(); + my $schemas_dir = "$curdir/data"; + + my $tmpdir_handle = File::Temp->newdir(CLEANUP => 1); + my $tmpdir = $tmpdir_handle->dirname(); + my $port = 17755; + + my $conf = get_test_conf($tmpdir, $port); + write_text("$tmpdir/influx.conf", $conf); + + # check if influxd is found before forking + eval { + run_cmd('influxd', 'version'); + }; + plan(skip_all => 'influxd not found in PATH') if $@; + + my $pid; + defined($pid = fork()) or die "unable to fork: $!\n"; + if ($pid == 0) { + exec("influxd -config $tmpdir/influx.conf"); + warn "unable to exec 'influxd -config $tmpdir/influx.conf': $!\n"; + exit 1; + } + sleep 1; # wait for influxdb to start + + # empty config + is run_updater($curdir, "$schemas_dir/test00", $port, '--diff'), '' => 'Empty config'; + + # only database + is run_updater($curdir, "$schemas_dir/test01", $port, '--diff'), "CREATE DATABASE test;\n" + => 'New database is detected'; + is run_updater($curdir, "$schemas_dir/test01", $port, '--diff'), "CREATE DATABASE test;\n" + => '--diff mode doesn\'t update InfluxDB'; + run_updater($curdir, "$schemas_dir/test01", $port); + is run_updater($curdir, "$schemas_dir/test01", $port, '--diff'), '' => 'Database is added'; + + # add a retention policy + is run_updater($curdir, "$schemas_dir/test02", $port, '--diff'), "CREATE RETENTION POLICY \"rp1\" ON test DURATION 90d REPLICATION 1 SHARD DURATION 2w;\n" + => 'New RP is detected'; + run_updater($curdir, "$schemas_dir/test02", $port); + is run_updater($curdir, "$schemas_dir/test02", $port, '--diff'), '' => 'RP is added'; + + # change a retention policy + is run_updater($curdir, "$schemas_dir/test03", $port, '--diff'), "ALTER RETENTION POLICY \"rp1\" ON test DURATION 100d REPLICATION 1 SHARD DURATION 2w;\n" + => 'RP change is detected'; + run_updater($curdir, "$schemas_dir/test03", $port); + is run_updater($curdir, "$schemas_dir/test03", $port, '--diff'), '' => 'RP is updated'; + + # create a retention policy on the same line as the database + is run_updater($curdir, "$schemas_dir/test04", $port, '--diff'), "-- DROP RETENTION POLICY \"autogen\" ON test;\nCREATE RETENTION POLICY \"rp2\" ON test DURATION 260w REPLICATION 1 SHARD DURATION 12w DEFAULT;\n" + => 'RP on same line as create database is detected'; + run_updater($curdir, "$schemas_dir/test04", $port); + cmp_ok $? >> 8, '==', 1 => 'Exit code 1 when some changes are not applied'; + is run_updater($curdir, "$schemas_dir/test04", $port, '--diff'), "-- DROP RETENTION POLICY \"autogen\" ON test;\n" + => 'RP autogen is not deleted without --force'; + run_updater($curdir, "$schemas_dir/test04", $port, '--force'); + cmp_ok $? >> 8, '==', 0 => 'Exit code 0 when InfluxDB is up to date'; + is run_updater($curdir, "$schemas_dir/test04", $port, '--diff'), '' => 'RP deleted with --force'; + + + # add some continuous queries + is run_updater($curdir, "$schemas_dir/test05", $port, '--diff'), "CREATE CONTINUOUS QUERY cq1 ON test RESAMPLE EVERY 5m FOR 10m BEGIN SELECT LAST(a) AS b, c INTO test.rp2.m FROM test.rp1.m GROUP BY time(5m) END;\nCREATE CONTINUOUS QUERY cq2 ON test RESAMPLE EVERY 5m FOR 10m BEGIN SELECT LAST(a) AS b, c INTO test.rp2.m FROM test.rp1.m GROUP BY time(5m) END;\n" + => 'New CQs are detected'; + run_updater($curdir, "$schemas_dir/test05", $port); + is run_updater($curdir, "$schemas_dir/test05", $port, '--diff'), '' => 'CQs are added'; + + # change a continuous query + is run_updater($curdir, "$schemas_dir/test06", $port, '--diff'), "DROP CONTINUOUS QUERY cq2 ON test; CREATE CONTINUOUS QUERY cq2 ON test RESAMPLE EVERY 5m FOR 10m BEGIN SELECT MAX(a) AS b, c INTO test.rp2.m FROM test.rp1.m GROUP BY time(5m) END;\n" + => 'CQ change is detected'; + run_updater($curdir, "$schemas_dir/test06", $port); + is run_updater($curdir, "$schemas_dir/test06", $port, '--diff'), '' => 'CQ is updated'; + + # remove a continuous query + is run_updater($curdir, "$schemas_dir/test07", $port, '--diff'), "-- DROP CONTINUOUS QUERY cq2 ON test;\n" + => 'CQ removal is detected'; + run_updater($curdir, "$schemas_dir/test07", $port); + is run_updater($curdir, "$schemas_dir/test07", $port, '--diff'), "-- DROP CONTINUOUS QUERY cq2 ON test;\n" + => 'CQ is not deleted without --force'; + run_updater($curdir, "$schemas_dir/test07", $port, '--force'); + is run_updater($curdir, "$schemas_dir/test07", $port, '--diff'), '' => 'CQ is deleted with --force'; + + # test the order of updates + is run_updater($curdir, "$schemas_dir/test08", $port, '--diff', '--force'), "DROP CONTINUOUS QUERY cq1 ON test;\nDROP DATABASE test;\nCREATE DATABASE test2 WITH DURATION 260w REPLICATION 1 SHARD DURATION 12w NAME rp2;\nCREATE RETENTION POLICY rp1 ON test2 DURATION 100d REPLICATION 1 SHARD DURATION 2w;\nCREATE CONTINUOUS QUERY cq1 ON test2 RESAMPLE EVERY 5m FOR 10m BEGIN SELECT LAST(a) AS b, c INTO test2.rp2.m FROM test2.rp1.m GROUP BY time(5m) END;\n" + => 'Updates applied in the right order'; + + # remove database + is run_updater($curdir, "$schemas_dir/test00", $port, '--diff'), "-- DROP CONTINUOUS QUERY cq1 ON test;\n-- DROP DATABASE test;\n" + => 'Old database is detected'; + run_updater($curdir, "$schemas_dir/test00", $port); + is run_updater($curdir, "$schemas_dir/test00", $port, '--diff'), "-- DROP CONTINUOUS QUERY cq1 ON test;\n-- DROP DATABASE test;\n" + => 'Database is not deleted without --force'; + run_updater($curdir, "$schemas_dir/test00", $port, '--force'); + is run_updater($curdir, "$schemas_dir/test00", $port, '--diff'), '' => 'Database is deleted with --force'; + + + done_testing(); + + kill 'KILL', $pid; +} + +sub run_updater { + my ($curdir, $schema_dir, $port, @flags) = @_; + return run_cmd("$curdir/../influxdb-schema-updater", '--config', $schema_dir, '--url', "localhost:$port", @flags); +} + +sub run_cmd { + my @cmd = @_; + my $out_and_err; + run(\@cmd, '>&', \$out_and_err); + + return $out_and_err; +} + +sub get_directory_of_this_file { + my (undef, $filename) = caller; + return dirname(File::Spec->rel2abs( $filename )); +} + +# ------------------------------------------------------------------------------ + +sub get_test_conf { + my ($tmpdir, $port) = @_; + return <<"END"; +reporting-disabled = true + +[logging] + level = "warn" + suppress-logo = true + +[meta] + dir = "$tmpdir/meta" + retention-autocreate = true + logging-enabled = true + +[data] + dir = "$tmpdir/data" + engine = "tsm1" + wal-dir = "$tmpdir/wal" + wal-logging-enabled = true + query-log-enabled = true + cache-max-memory-size = 0 + max-points-per-block = 0 + max-series-per-database = 0 + max-values-per-tag = 0 + data-logging-enabled = true + index-version = "tsi1" + +[coordinator] + write-timeout = "10s" + max-concurrent-queries = 0 + query-timeout = "0s" + log-queries-after = "0s" + max-select-point = 0 + max-select-series = 0 + max-select-buckets = 0 + +[retention] + enabled = true + check-interval = "30m0s" + +[shard-precreation] + enabled = true + check-interval = "10m0s" + advance-period = "30m0s" + +[admin] + enabled = false + +[monitor] + store-enabled = true + store-database = "_internal" + store-interval = "10s" + +[subscriber] + enabled = true + http-timeout = "30s" + +[http] + enabled = true + bind-address = ":$port" + auth-enabled = false + log-enabled = false + write-tracing = false + https-enabled = false + max-row-limit = 0 + max-connection-limit = 0 + shared-secret = "" + realm = "InfluxDB" + +[[graphite]] + enabled = false + +[[collectd]] + enabled = false + +[[opentsdb]] + enabled = false + +[[udp]] + enabled = false + +[continuous_queries] + log-enabled = true + enabled = true + run-interval = "1s" + query-stats-enabled = true +END +} + +# ------------------------------------------------------------------------------ + +test(); \ No newline at end of file