diff --git a/Makefile b/Makefile index 5062b47..a1c2890 100644 --- a/Makefile +++ b/Makefile @@ -15,9 +15,9 @@ release: clean -a -tags netgo \ -ldflags "-X main.Version=$(VERSION)" \ -o bin/$(TARGET) . + docker build -t gettyimages/$(TARGET):$(VERSION) . publish: release - docker build -t gettyimages/$(TARGET):$(VERSION) . docker push gettyimages/$(TARGET):$(VERSION) docker tag gettyimages/$(TARGET):$(VERSION) gettyimages/$(TARGET):latest docker push gettyimages/$(TARGET):latest diff --git a/exporter.go b/exporter.go index 19a6b9a..99c48c9 100644 --- a/exporter.go +++ b/exporter.go @@ -88,7 +88,7 @@ func (e *Exporter) scrape(ch chan<- prometheus.Metric) { } func (e *Exporter) exportApps(ch chan<- prometheus.Metric) (err error) { - content, err := e.scraper.Scrape("v2/apps") + content, err := e.scraper.Scrape("v2/apps?embed=apps.taskStats") if err != nil { log.Debugf("Problem scraping v2/apps endpoint: %v\n", err) return @@ -132,6 +132,7 @@ func (e *Exporter) scrapeApps(json *gabs.Container, ch chan<- prometheus.Metric) "mem_in_mb": "mem", "disk_in_mb": "disk", "gpus": "gpus", + "avg_uptime": "taskStats.startedAfterLastScaling.stats.lifeTime.averageSeconds", } for _, app := range elements { diff --git a/main.go b/main.go index 5584bbb..8d4bde0 100644 --- a/main.go +++ b/main.go @@ -8,7 +8,7 @@ import ( "net/url" "time" - "github.com/gambol99/go-marathon" + "github.com/matt-deboer/go-marathon" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" ) diff --git a/vendor/github.com/matt-deboer/go-marathon/AUTHORS b/vendor/github.com/matt-deboer/go-marathon/AUTHORS new file mode 100644 index 0000000..ecafb50 --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/AUTHORS @@ -0,0 +1,40 @@ +Anton Liparin +atheatos +Brian Knox +Conor Mongey +Daniel Bornkessel +David Bosschaert +Denis Parchenko +Diego de Oliveira +Dmitry Fedorov +eduser25 +Elliot Anderson +François Samin +Harpreet Sawhney +Ian Babrou +Israel Derdik +Ivan Babrou +Jacob Koren +Jie Zhang +Johan Haals +juliendsv +Kan Wu +kevinschoon +Luke Amdor +Maarten Dirkse +Marcelo Salazar +Marvin Hoffmann +Matthias Kadenbach +Mike Solomon +Mikhail Dyakov +Nic Grayson +ohmystack +Raffaele Di Fazio +Robert Jacob +Rohith +Timo Reimann +Tracy Livengood +Xavi Ramirez +Yang Bai +Yifa Zhang +Zhanpeng Chen diff --git a/vendor/github.com/matt-deboer/go-marathon/CHANGELOG.md b/vendor/github.com/matt-deboer/go-marathon/CHANGELOG.md new file mode 100644 index 0000000..a3fa03a --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/CHANGELOG.md @@ -0,0 +1,114 @@ +# Change Log +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](http://keepachangelog.com/) +and this project adheres to [Semantic Versioning](http://semver.org/). + +## [Unreleased] +### Changed +- [#242][PR242] Pointerize IPAddressPerTask.Discovery. + +## [0.5.1] - 2016-11-09 +### Fixed +- [#239][PR239] Fix scheme-less endpoint with port. + +## [0.5.0] - 2016-11-07 +### Fixed +- [#231][PR231] Fixed Marathon cluster code +- [#229][PR229] Add LastFailureCause field to HealthCheckResult. + +## [0.4.0] - 2016-10-28 +### Added +- [#223][PR223] Add support for IP-per-task. +- [#220][PR220] Add external volume definition to container. +- [#211][PR211] Close event channel on event listener removal. + +### Fixed +- [#218][PR218] Remove TimeWaitPolling from marathonClient. +- [#214][PR214] Remove extra pointer layers when passing to r.api*. + +## [0.3.0] - 2016-09-28 +- [#201][PR201]: Subscribe method is now exposed on the client to allow subscription of callback URL's + +### Fixed +- [#205][PR205]: Fix memory leak by signalling goroutine termination on event listener removal. + +### Changed +- [#205][PR205]: Change AddEventsListener to return event channel instead of taking one. + +## [0.2.0] - 2016-09-23 +### Added +- [#196][PR196]: Port definitions. +- [#191][PR191]: name and labels to portMappings. + +### Changed +- [#191][PR191] ExposePort() now takes a portMapping instance. + +### Fixed +- [#202][PR202]: Timeout error in WaitOnApplication. + +## [0.1.1] - 2016-09-07 +### Fixed +- Drop question mark-only query parameter in Applications(url.Values) manually + due to changed behavior in Go 1.7's net/url.Parse. + +## [0.1.0] - 2016-08-01 +### Added +- Field `message` to the EventStatusUpdate struct. +- Method `Host()` to set host mode explicitly. +- Field `port` to HealthCheck. +- Support for launch queues. +- Convenience method `AddFetchURIs()`. +- Support for forced operations across all methods. +- Filtering method variants (`*By`-suffixed). +- Support for Marathon DCOS token. +- Basic auth and HTTP client settings. +- Marshalling of `Deployment.DeploymentStep` for Marathon v1.X. +- Field `ipAddresses` to tasks and events. +- Field `slaveId` to tasks. +- Convenience methods to populate/clear pointerized values. +- Method `ApplicationByVersion()` to retrieve version-specific apps. +- Support for fetch URIs. +- Parse API error responses on all error types for programmatic evaluation. + +### Changed +- Consider app as unhealthy in ApplicationOK if health check is missing. (Ensures result stability during all phases of deployment.) +- Various identifiers violating golint rules. +- Do not set "bridged" mode on Docker containers by default. + +### Fixed +- Flawed unmarshalling of `CurrentStep` in events. +- Missing omitempty tag modifiers on `Application.Uris`. +- Missing leading slash in path used by `Ping()`. +- Flawed `KillTask()` in case of hierarchical app ID path. +- Missing omitempty tag modifier on `PortMapping.Protocol`. +- Nil dereference on empty debug log. +- Various occasions where omitted and empty fields could not be distinguished. + +## 0.0.1 - 2016-01-27 +### Added +- Initial SemVer release. + +[Unreleased]: https://github.com/gambol99/go-marathon/compare/v0.5.1...HEAD +[0.5.1]: https://github.com/gambol99/go-marathon/compare/v0.5.0...v0.5.1 +[0.5.0]: https://github.com/gambol99/go-marathon/compare/v0.4.0...v0.5.0 +[0.4.0]: https://github.com/gambol99/go-marathon/compare/v0.3.0...v0.4.0 +[0.3.0]: https://github.com/gambol99/go-marathon/compare/v0.2.0...v0.3.0 +[0.2.0]: https://github.com/gambol99/go-marathon/compare/v0.1.1...v0.2.0 +[0.1.1]: https://github.com/gambol99/go-marathon/compare/v0.1.0...v0.1.1 +[0.1.0]: https://github.com/gambol99/go-marathon/compare/v0.0.1...v0.1.0 + +[PR242]: https://github.com/gambol99/go-marathon/pull/242 +[PR239]: https://github.com/gambol99/go-marathon/pull/239 +[PR231]: https://github.com/gambol99/go-marathon/pull/231 +[PR229]: https://github.com/gambol99/go-marathon/pull/229 +[PR223]: https://github.com/gambol99/go-marathon/pull/223 +[PR220]: https://github.com/gambol99/go-marathon/pull/220 +[PR218]: https://github.com/gambol99/go-marathon/pull/218 +[PR214]: https://github.com/gambol99/go-marathon/pull/214 +[PR211]: https://github.com/gambol99/go-marathon/pull/211 +[PR205]: https://github.com/gambol99/go-marathon/pull/205 +[PR202]: https://github.com/gambol99/go-marathon/pull/202 +[PR201]: https://github.com/gambol99/go-marathon/pull/201 +[PR196]: https://github.com/gambol99/go-marathon/pull/196 +[PR191]: https://github.com/gambol99/go-marathon/pull/191 diff --git a/vendor/github.com/matt-deboer/go-marathon/CONTRIBUTING.md b/vendor/github.com/matt-deboer/go-marathon/CONTRIBUTING.md new file mode 100644 index 0000000..823679a --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/CONTRIBUTING.md @@ -0,0 +1,15 @@ +# Contribution Guidelines + +## Pre-Development +- Look for an existing Github issue describing the bug you have found/feature request you would like to see getting implemented. +- If no issue exists and there is reason to believe that your (non-trivial) contribution might be subject to an up-front design discussion, file an issue first and propose your idea. + +## Development +- Fork the repository. +- Create a feature branch (`git checkout -b my-new-feature master`). +- Commit your changes, preferring one commit per logical unit of work. Often times, this simply means having a single commit. +- If applicable, update the documentation in the [README file](README.md). +- In the vast majority of cases, you should add/amend a (regression) test for your bug fix/feature. +- Push your branch (`git push origin my-new-feature`). +- Create a new pull request. +- Address any comments your reviewer raises, pushing additional commits onto your branch along the way. In particular, refrain from amending/force-pushing until you receive an LGTM (Looks Good To Me) from your reviewer. This will allow for a better review experience. diff --git a/vendor/github.com/matt-deboer/go-marathon/LICENSE b/vendor/github.com/matt-deboer/go-marathon/LICENSE new file mode 100644 index 0000000..5e0fd33 --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/LICENSE @@ -0,0 +1,201 @@ +Apache License +Version 2.0, January 2004 +http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + +"License" shall mean the terms and conditions for use, reproduction, +and distribution as defined by Sections 1 through 9 of this document. + +"Licensor" shall mean the copyright owner or entity authorized by +the copyright owner that is granting the License. + +"Legal Entity" shall mean the union of the acting entity and all +other entities that control, are controlled by, or are under common +control with that entity. For the purposes of this definition, +"control" means (i) the power, direct or indirect, to cause the +direction or management of such entity, whether by contract or +otherwise, or (ii) ownership of fifty percent (50%) or more of the +outstanding shares, or (iii) beneficial ownership of such entity. + +"You" (or "Your") shall mean an individual or Legal Entity +exercising permissions granted by this License. + +"Source" form shall mean the preferred form for making modifications, +including but not limited to software source code, documentation +source, and configuration files. + +"Object" form shall mean any form resulting from mechanical +transformation or translation of a Source form, including but +not limited to compiled object code, generated documentation, +and conversions to other media types. + +"Work" shall mean the work of authorship, whether in Source or +Object form, made available under the License, as indicated by a +copyright notice that is included in or attached to the work +(an example is provided in the Appendix below). + +"Derivative Works" shall mean any work, whether in Source or Object +form, that is based on (or derived from) the Work and for which the +editorial revisions, annotations, elaborations, or other modifications +represent, as a whole, an original work of authorship. For the purposes +of this License, Derivative Works shall not include works that remain +separable from, or merely link (or bind by name) to the interfaces of, +the Work and Derivative Works thereof. + +"Contribution" shall mean any work of authorship, including +the original version of the Work and any modifications or additions +to that Work or Derivative Works thereof, that is intentionally +submitted to Licensor for inclusion in the Work by the copyright owner +or by an individual or Legal Entity authorized to submit on behalf of +the copyright owner. For the purposes of this definition, "submitted" +means any form of electronic, verbal, or written communication sent +to the Licensor or its representatives, including but not limited to +communication on electronic mailing lists, source code control systems, +and issue tracking systems that are managed by, or on behalf of, the +Licensor for the purpose of discussing and improving the Work, but +excluding communication that is conspicuously marked or otherwise +designated in writing by the copyright owner as "Not a Contribution." + +"Contributor" shall mean Licensor and any individual or Legal Entity +on behalf of whom a Contribution has been received by Licensor and +subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of +this License, each Contributor hereby grants to You a perpetual, +worldwide, non-exclusive, no-charge, royalty-free, irrevocable +copyright license to reproduce, prepare Derivative Works of, +publicly display, publicly perform, sublicense, and distribute the +Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of +this License, each Contributor hereby grants to You a perpetual, +worldwide, non-exclusive, no-charge, royalty-free, irrevocable +(except as stated in this section) patent license to make, have made, +use, offer to sell, sell, import, and otherwise transfer the Work, +where such license applies only to those patent claims licensable +by such Contributor that are necessarily infringed by their +Contribution(s) alone or by combination of their Contribution(s) +with the Work to which such Contribution(s) was submitted. If You +institute patent litigation against any entity (including a +cross-claim or counterclaim in a lawsuit) alleging that the Work +or a Contribution incorporated within the Work constitutes direct +or contributory patent infringement, then any patent licenses +granted to You under this License for that Work shall terminate +as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the +Work or Derivative Works thereof in any medium, with or without +modifications, and in Source or Object form, provided that You +meet the following conditions: + +(a) You must give any other recipients of the Work or +Derivative Works a copy of this License; and + +(b) You must cause any modified files to carry prominent notices +stating that You changed the files; and + +(c) You must retain, in the Source form of any Derivative Works +that You distribute, all copyright, patent, trademark, and +attribution notices from the Source form of the Work, +excluding those notices that do not pertain to any part of +the Derivative Works; and + +(d) If the Work includes a "NOTICE" text file as part of its +distribution, then any Derivative Works that You distribute must +include a readable copy of the attribution notices contained +within such NOTICE file, excluding those notices that do not +pertain to any part of the Derivative Works, in at least one +of the following places: within a NOTICE text file distributed +as part of the Derivative Works; within the Source form or +documentation, if provided along with the Derivative Works; or, +within a display generated by the Derivative Works, if and +wherever such third-party notices normally appear. The contents +of the NOTICE file are for informational purposes only and +do not modify the License. You may add Your own attribution +notices within Derivative Works that You distribute, alongside +or as an addendum to the NOTICE text from the Work, provided +that such additional attribution notices cannot be construed +as modifying the License. + +You may add Your own copyright statement to Your modifications and +may provide additional or different license terms and conditions +for use, reproduction, or distribution of Your modifications, or +for any such Derivative Works as a whole, provided Your use, +reproduction, and distribution of the Work otherwise complies with +the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, +any Contribution intentionally submitted for inclusion in the Work +by You to the Licensor shall be under the terms and conditions of +this License, without any additional terms or conditions. +Notwithstanding the above, nothing herein shall supersede or modify +the terms of any separate license agreement you may have executed +with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade +names, trademarks, service marks, or product names of the Licensor, +except as required for reasonable and customary use in describing the +origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or +agreed to in writing, Licensor provides the Work (and each +Contributor provides its Contributions) on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied, including, without limitation, any warranties or conditions +of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A +PARTICULAR PURPOSE. You are solely responsible for determining the +appropriateness of using or redistributing the Work and assume any +risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, +whether in tort (including negligence), contract, or otherwise, +unless required by applicable law (such as deliberate and grossly +negligent acts) or agreed to in writing, shall any Contributor be +liable to You for damages, including any direct, indirect, special, +incidental, or consequential damages of any character arising as a +result of this License or out of the use or inability to use the +Work (including but not limited to damages for loss of goodwill, +work stoppage, computer failure or malfunction, or any and all +other commercial damages or losses), even if such Contributor +has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing +the Work or Derivative Works thereof, You may choose to offer, +and charge a fee for, acceptance of support, warranty, indemnity, +or other liability obligations and/or rights consistent with this +License. However, in accepting such obligations, You may act only +on Your own behalf and on Your sole responsibility, not on behalf +of any other Contributor, and only if You agree to indemnify, +defend, and hold each Contributor harmless for any liability +incurred by, or claims asserted against, such Contributor by reason +of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + +To apply the Apache License to your work, attach the following +boilerplate notice, with the fields enclosed by brackets "{}" +replaced with your own identifying information. (Don't include +the brackets!) The text should be enclosed in the appropriate +comment syntax for the file format. We also recommend that a +file or class name and description of purpose be included on the +same "printed page" as the copyright notice for easier +identification within third-party archives. + +Copyright {yyyy} {name of copyright owner} + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/vendor/github.com/matt-deboer/go-marathon/Makefile b/vendor/github.com/matt-deboer/go-marathon/Makefile new file mode 100644 index 0000000..448e0f2 --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/Makefile @@ -0,0 +1,57 @@ +# +# Author: Rohith (gambol99@gmail.com) +# Date: 2015-02-10 15:35:14 +0000 (Tue, 10 Feb 2015) +# +# vim:ts=2:sw=2:et +# +HARDWARE=$(shell uname -m) +VERSION=$(shell awk '/const Version/ { print $$4 }' version.go | sed 's/"//g') +DEPS=$(shell go list -f '{{range .TestImports}}{{.}} {{end}}' ./...) +PACKAGES=$(shell go list ./...) +VETARGS?=-asmdecl -atomic -bool -buildtags -copylocks -methods -nilfunc -printf -rangeloops -shift -structtags -unsafeptr + +.PHONY: test examples authors changelog + +build: + go build + +authors: + git log --format='%aN <%aE>' | sort -u > AUTHORS + +deps: + @echo "--> Installing build dependencies" + @go get -d -v ./... $(DEPS) + +lint: + @echo "--> Running golint" + @which golint 2>/dev/null ; if [ $$? -eq 1 ]; then \ + go get -u github.com/golang/lint/golint; \ + fi + @golint . + +vet: + @echo "--> Running go tool vet $(VETARGS) ." + @go tool vet 2>/dev/null ; if [ $$? -eq 3 ]; then \ + go get golang.org/x/tools/cmd/vet; \ + fi + @go tool vet $(VETARGS) . + +cover: + @echo "--> Running go test --cover" + @go test --cover + +format: + @echo "--> Running go fmt" + @go fmt $(PACKAGES) + +test: deps + @echo "--> Running go tests" + @go test -v + @$(MAKE) vet + @$(MAKE) cover + +examples: + make -C examples all + +changelog: release + git log $(shell git tag | tail -n1)..HEAD --no-merges --format=%B > changelog diff --git a/vendor/github.com/matt-deboer/go-marathon/README.md b/vendor/github.com/matt-deboer/go-marathon/README.md new file mode 100644 index 0000000..3ec93f2 --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/README.md @@ -0,0 +1,383 @@ +[![Build Status](https://travis-ci.org/gambol99/go-marathon.svg?branch=master)](https://travis-ci.org/gambol99/go-marathon) +[![GoDoc](http://godoc.org/github.com/gambol99/go-marathon?status.png)](http://godoc.org/github.com/gambol99/go-marathon) + +# Go-Marathon + +Go-marathon is a API library for working with [Marathon](https://mesosphere.github.io/marathon/). +It currently supports + +- Application and group deployment +- Helper filters for pulling the status, configuration and tasks +- Multiple Endpoint support for HA deployments +- Marathon Event Subscriptions and Event Streams + +Note: the library is still under active development; users should expect frequent (possibly breaking) API changes for the time being. + +It requires Go version 1.5 or higher. + +## Code Examples + +There is also an examples directory in the source which shows hints and snippets of code of how to use it — +which is probably the best place to start. + +You can use `examples/docker-compose.yml` in order to start a test cluster. + +### Creating a client + +```Go +import ( + marathon "github.com/gambol99/go-marathon" +) + +marathonURL := "http://10.241.1.71:8080" +config := marathon.NewDefaultConfig() +config.URL = marathonURL +client, err := marathon.NewClient(config) +if err != nil { + log.Fatalf("Failed to create a client for marathon, error: %s", err) +} + +applications, err := client.Applications() +... +``` + +Note, you can also specify multiple endpoint for Marathon (i.e. you have setup Marathon in HA mode and having multiple running) + +```Go +marathonURL := "http://10.241.1.71:8080,10.241.1.72:8080,10.241.1.73:8080" +``` + +The first one specified will be used, if that goes offline the member is marked as *"unavailable"* and a +background process will continue to ping the member until it's back online. + +### Custom HTTP Client + +If you wish to override the http client (by default http.DefaultClient) used by the API; use cases bypassing TLS verification, load root CA's or change the timeouts etc, you can pass a custom client in the config. + +```Go +marathonURL := "http://10.241.1.71:8080" +config := marathon.NewDefaultConfig() +config.URL = marathonURL +config.HTTPClient = &http.Client{ + Timeout: (time.Duration(10) * time.Second), + Transport: &http.Transport{ + Dial: (&net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 10 * time.Second, + }).Dial, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, +} +``` + +### Listing the applications + +```Go +applications, err := client.Applications() +if err != nil { + log.Fatalf("Failed to list applications") +} + +log.Printf("Found %d applications running", len(applications.Apps)) +for _, application := range applications.Apps { + log.Printf("Application: %s", application) + details, err := client.Application(application.ID) + assert(err) + if details.Tasks != nil && len(details.Tasks) > 0 { + for _, task := range details.Tasks { + log.Printf("task: %s", task) + } + // check the health of the application + health, err := client.ApplicationOK(details.ID) + log.Printf("Application: %s, healthy: %t", details.ID, health) + } +} +``` + +### Creating a new application + +```Go +log.Printf("Deploying a new application") +application := marathon.NewDockerApplication(). + Name(applicationName). + CPU(0.1). + Memory(64). + Storage(0.0). + Count(2). + AddArgs("/usr/sbin/apache2ctl", "-D", "FOREGROUND"). + AddEnv("NAME", "frontend_http"). + AddEnv("SERVICE_80_NAME", "test_http"). + CheckHTTP("/health", 10, 5) + +application. + Container.Docker.Container("quay.io/gambol99/apache-php:latest"). + Bridged(). + Expose(80). + Expose(443) + +if _, err := client.CreateApplication(application); err != nil { + log.Fatalf("Failed to create application: %s, error: %s", application, err) +} else { + log.Printf("Created the application: %s", application) +} +``` + +Note: Applications may also be defined by means of initializing a `marathon.Application` struct instance directly. However, go-marathon's DSL as shown above provides a more concise way to achieve the same. + +### Scaling application + +Change the number of application instances to 4 + +```Go +log.Printf("Scale to 4 instances") +if err := client.ScaleApplicationInstances(application.ID, 4); err != nil { + log.Fatalf("Failed to delete the application: %s, error: %s", application, err) +} else { + client.WaitOnApplication(application.ID, 30 * time.Second) + log.Printf("Successfully scaled the application") +} +``` + +### Subscription & Events + +Request to listen to events related to applications — namely status updates, health checks +changes and failures. There are two different event transports controlled by `EventsTransport` +setting with the following possible values: `EventsTransportSSE` and `EventsTransportCallback` (default value). +See [Event Stream](https://mesosphere.github.io/marathon/docs/rest-api.html#event-stream) and +[Event Subscriptions](https://mesosphere.github.io/marathon/docs/rest-api.html#event-subscriptions) for details. + +Event subscriptions can also be individually controlled with the `Subscribe` and `Unsubscribe` functions. See [Controlling subscriptions](#controlling-subscriptions) for more details. + +#### Event Stream + +Only available in Marathon >= 0.9.0. Does not require any special configuration or prerequisites. + +```Go +// Configure client +config := marathon.NewDefaultConfig() +config.URL = marathonURL +config.EventsTransport = marathon.EventsTransportSSE + +client, err := marathon.NewClient(config) +if err != nil { + log.Fatalf("Failed to create a client for marathon, error: %s", err) +} + +// Register for events +events, err = client.AddEventsListener(marathon.EventIDApplications) +if err != nil { + log.Fatalf("Failed to register for events, %s", err) +} + +timer := time.After(60 * time.Second) +done := false + +// Receive events from channel for 60 seconds +for { + if done { + break + } + select { + case <-timer: + log.Printf("Exiting the loop") + done = true + case event := <-events: + log.Printf("Received event: %s", event) + } +} + +// Unsubscribe from Marathon events +client.RemoveEventsListener(events) +``` + +#### Event Subscriptions + +Requires to start a built-in web server accessible by Marathon to connect and push events to. Consider the following +additional settings: + +- `EventsInterface` — the interface we should be listening on for events. Default `"eth0"`. +- `EventsPort` — built-in web server port. Default `10001`. +- `CallbackURL` — custom callback URL. Default `""`. + +```Go +// Configure client +config := marathon.NewDefaultConfig() +config.URL = marathonURL +config.EventsInterface = marathonInterface +config.EventsPort = marathonPort + +client, err := marathon.NewClient(config) +if err != nil { + log.Fatalf("Failed to create a client for marathon, error: %s", err) +} + +// Register for events +events, err = client.AddEventsListener(marathon.EventIDApplications) +if err != nil { + log.Fatalf("Failed to register for events, %s", err) +} + +timer := time.After(60 * time.Second) +done := false + +// Receive events from channel for 60 seconds +for { + if done { + break + } + select { + case <-timer: + log.Printf("Exiting the loop") + done = true + case event := <-events: + log.Printf("Received event: %s", event) + } +} + +// Unsubscribe from Marathon events +client.RemoveEventsListener(events) +``` + +See [events.go](events.go) for a full list of event IDs. + +#### Controlling subscriptions +If you simply want to (de)register event subscribers (i.e. without starting an internal web server) you can use the `Subscribe` and `Unsubscribe` methods. + +```Go +// Configure client +config := marathon.NewDefaultConfig() +config.URL = marathonURL + +client, err := marathon.NewClient(config) +if err != nil { + log.Fatalf("Failed to create a client for marathon, error: %s", err) +} + +// Register an event subscriber via a callback URL +callbackURL := "http://10.241.1.71:9494" +if err := client.Subscribe(callbackURL); err != nil { + log.Fatalf("Unable to register the callbackURL [%s], error: %s", callbackURL, err) +} + +// Deregister the same subscriber +if err := client.Unsubscribe(callbackURL); err != nil { + log.Fatalf("Unable to deregister the callbackURL [%s], error: %s", callbackURL, err) +} +``` + +## Contributing + +See the [contribution guidelines](CONTRIBUTING.md). + +## Development + +### Marathon Fake + +go-marathon employs a [fake Marathon implementation](https://github.com/gambol99/go-marathon/blob/master/testing_utils_test.go) for testing purposes. It [maintains a YML-encoded list of HTTP response messages](https://github.com/gambol99/go-marathon/blob/master/tests/rest-api/methods.yml) which are returned upon a successful match based upon a number of attributes, the so-called _message identifier_: + +- HTTP URI (without the protocol and the hostname, e.g., `/v2/apps`) +- HTTP method (e.g., `GET`) +- response content (i.e., the message returned) +- scope (see below) + +#### Response Content + +The response content can be provided in one of two forms: + +- **static:** A pure response message returned on every match, including repeated queries. +- **index:** A list of response messages associated to a particular (indexed) sequence order. A message will be returned _iff_ it matches and its zero-based index equals the current request count. + +An example for a trivial static response content is + +```yaml +- uri: /v2/apps + method: POST + content: | + { + "app": { + } + } +``` + +which would be returned for every POST request targetting `/v2/apps`. + +An indexed response content would look like: + +```yaml +- uri: /v2/apps + method: POST + contentSequence: + - index: 1 + - content: | + { + "app": { + "id": "foo" + } + } + - index: 3 + - content: | + { + "app": { + "id": "bar" + } + } +``` + +What this means is that the first POST request to `/v2/apps` would yield a 404, the second one the _foo_ app, the third one 404 again, the fourth one _bar_, and every following request thereafter a 404 again. Indexed responses enable more flexible testing required by some use cases. + +Trying to define both a static and indexed response content constitutes an error and leads to `panic`. + +#### Scope + +By default, all responses are defined globally: Every message can be queried by any request across all tests. This enables reusability and allows to keep the YML definition fairly short. For certain cases, however, it is desirable to define a set of responses that are delivered exclusively for a particular test. Scopes offer a means to do so by representing a concept similar to [namespaces](https://en.wikipedia.org/wiki/Namespace). Combined with indexed responses, they allow to return different responses for message identifiers already defined at the global level. + +Scopes do not have a particular format -- they are just strings. A scope must be defined in two places: The message specification and the server configuration. They are pure strings without any particular structure. Given the messages specification + +```yaml +- uri: /v2/apps + method: GET + # Note: no scope defined. + content: | + { + "app": { + "id": "foo" + } + } +- uri: /v2/apps + method: GET + scope: v1.1.1 # This one does have a scope. + contentSequence: + - index: 1 + - content: | + { + "app": { + "id": "bar" + } + } +``` + +and the tests + +```go +func TestFoo(t * testing.T) { + endpoint := newFakeMarathonEndpoint(t, nil) // No custom configs given. + defer endpoint.Close() + app, err := endpoint.Client.Applications() + // Do something with "foo" +} + +func TestFoo(t * testing.T) { + endpoint := newFakeMarathonEndpoint(t, &configContainer{ + server: &serverConfig{ + scope: "v1.1.1", // Matches the message spec's scope. + }, + }) + defer endpoint.Close() + app, err := endpoint.Client.Applications() + // Do something with "bar" +} +``` + +The "foo" response can be used by all tests using the default fake endpoint (such as `TestFoo`), while the "bar" response is only visible by tests that explicitly set the scope to `1.1.1` (as `TestBar` does) and query the endpoint twice. diff --git a/vendor/github.com/matt-deboer/go-marathon/application.go b/vendor/github.com/matt-deboer/go-marathon/application.go new file mode 100644 index 0000000..6481b61 --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/application.go @@ -0,0 +1,844 @@ +/* +Copyright 2014 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +import ( + "encoding/json" + "errors" + "fmt" + "net/url" + "time" +) + +var ( + // ErrNoApplicationContainer is thrown when a container has been specified yet + ErrNoApplicationContainer = errors.New("you have not specified a docker container yet") +) + +// Applications is a collection of applications +type Applications struct { + Apps []Application `json:"apps"` +} + +// IPAddressPerTask is used by IP-per-task functionality https://mesosphere.github.io/marathon/docs/ip-per-task.html +type IPAddressPerTask struct { + Groups *[]string `json:"groups,omitempty"` + Labels *map[string]string `json:"labels,omitempty"` + Discovery *Discovery `json:"discovery,omitempty"` + NetworkName string `json:"networkName,omitempty"` +} + +// Discovery provides info about ports expose by IP-per-task functionality +type Discovery struct { + Ports *[]Port `json:"ports,omitempty"` +} + +// Port provides info about ports used by IP-per-task +type Port struct { + Number int `json:"number,omitempty"` + Name string `json:"name,omitempty"` + Protocol string `json:"protocol,omitempty"` +} + +// Application is the definition for an application in marathon +type Application struct { + ID string `json:"id,omitempty"` + Cmd *string `json:"cmd,omitempty"` + Args *[]string `json:"args,omitempty"` + Constraints *[][]string `json:"constraints,omitempty"` + Container *Container `json:"container,omitempty"` + CPUs float64 `json:"cpus,omitempty"` + Disk *float64 `json:"disk,omitempty"` + Env *map[string]string `json:"env,omitempty"` + Executor *string `json:"executor,omitempty"` + HealthChecks *[]HealthCheck `json:"healthChecks,omitempty"` + Instances *int `json:"instances,omitempty"` + Mem *float64 `json:"mem,omitempty"` + Tasks []*Task `json:"tasks,omitempty"` + Ports []int `json:"ports"` + PortDefinitions *[]PortDefinition `json:"portDefinitions,omitempty"` + RequirePorts *bool `json:"requirePorts,omitempty"` + BackoffSeconds *float64 `json:"backoffSeconds,omitempty"` + BackoffFactor *float64 `json:"backoffFactor,omitempty"` + MaxLaunchDelaySeconds *float64 `json:"maxLaunchDelaySeconds,omitempty"` + Deployments []map[string]string `json:"deployments,omitempty"` + Dependencies []string `json:"dependencies"` + TasksRunning int `json:"tasksRunning,omitempty"` + TasksStaged int `json:"tasksStaged,omitempty"` + TasksHealthy int `json:"tasksHealthy,omitempty"` + TasksUnhealthy int `json:"tasksUnhealthy,omitempty"` + TaskStats *map[string]*TaskStats `json:"taskStats,omitempty"` + User string `json:"user,omitempty"` + UpgradeStrategy *UpgradeStrategy `json:"upgradeStrategy,omitempty"` + Uris *[]string `json:"uris,omitempty"` + Version string `json:"version,omitempty"` + VersionInfo *VersionInfo `json:"versionInfo,omitempty"` + Labels *map[string]string `json:"labels,omitempty"` + AcceptedResourceRoles []string `json:"acceptedResourceRoles,omitempty"` + LastTaskFailure *LastTaskFailure `json:"lastTaskFailure,omitempty"` + Fetch *[]Fetch `json:"fetch,omitempty"` + IPAddressPerTask *IPAddressPerTask `json:"ipAddress,omitempty"` +} + +// ApplicationVersions is a collection of application versions for a specific app in marathon +type ApplicationVersions struct { + Versions []string `json:"versions"` +} + +// ApplicationVersion is the application version response from marathon +type ApplicationVersion struct { + Version string `json:"version"` +} + +// VersionInfo is the application versioning details from marathon +type VersionInfo struct { + LastScalingAt string `json:"lastScalingAt,omitempty"` + LastConfigChangeAt string `json:"lastConfigChangeAt,omitempty"` +} + +// Fetch will download URI before task starts +type Fetch struct { + URI string `json:"uri"` + Executable bool `json:"executable"` + Extract bool `json:"extract"` + Cache bool `json:"cache"` +} + +// GetAppOpts contains a payload for Application method +// embed: Embeds nested resources that match the supplied path. +// You can specify this parameter multiple times with different values +type GetAppOpts struct { + Embed []string `url:"embed,omitempty"` +} + +// DeleteAppOpts contains a payload for DeleteApplication method +// force: overrides a currently running deployment. +type DeleteAppOpts struct { + Force bool `url:"force,omitempty"` +} + +// TaskStats is a container for Stats +type TaskStats struct { + Stats *Stats `json:"stats,omitempty"` +} + +// Stats is a collection of aggregate statistics about an application's tasks +type Stats struct { + Counts *map[string]int `json:"counts,omitempty"` + LifeTime *map[string]float64 `json:"lifetime,omitempty"` +} + +// SetIPAddressPerTask defines that the application will have a IP address defines by a external agent. +// This configuration is not allowed to be used with Port or PortDefinitions. Thus, the implementation +// clears both. +func (r *Application) SetIPAddressPerTask(ipAddressPerTask IPAddressPerTask) *Application { + r.Ports = make([]int, 0) + r.EmptyPortDefinitions() + r.IPAddressPerTask = &ipAddressPerTask + + return r +} + +// NewDockerApplication creates a default docker application +func NewDockerApplication() *Application { + application := new(Application) + application.Container = NewDockerContainer() + return application +} + +// Name sets the name / ID of the application i.e. the identifier for this application +func (r *Application) Name(id string) *Application { + r.ID = validateID(id) + return r +} + +// Command sets the cmd of the application +func (r *Application) Command(cmd string) *Application { + r.Cmd = &cmd + return r +} + +// CPU set the amount of CPU shares per instance which is assigned to the application +// cpu: the CPU shared (check Docker docs) per instance +func (r *Application) CPU(cpu float64) *Application { + r.CPUs = cpu + return r +} + +// Storage sets the amount of disk space the application is assigned, which for docker +// application I don't believe is relevant +// disk: the disk space in MB +func (r *Application) Storage(disk float64) *Application { + r.Disk = &disk + return r +} + +// AllTaskRunning checks to see if all the application tasks are running, i.e. the instances is equal +// to the number of running tasks +func (r *Application) AllTaskRunning() bool { + if r.Instances == nil || *r.Instances == 0 { + return true + } + if r.Tasks == nil { + return false + } + if r.TasksRunning == *r.Instances { + return true + } + return false +} + +// DependsOn adds one or more dependencies for this application. Note, if you want to wait for +// an application dependency to actually be UP, i.e. not just deployed, you need a health check +// on the dependant app. +// names: the application id(s) this application depends on +func (r *Application) DependsOn(names ...string) *Application { + if r.Dependencies == nil { + r.Dependencies = make([]string, 0) + } + r.Dependencies = append(r.Dependencies, names...) + + return r +} + +// Memory sets he amount of memory the application can consume per instance +// memory: the amount of MB to assign +func (r *Application) Memory(memory float64) *Application { + r.Mem = &memory + + return r +} + +// AddPortDefinition adds a port definition. Port definitions are used to define ports that +// should be considered part of a resource. They are necessary when you are using HOST +// networking and no port mappings are specified. +func (r *Application) AddPortDefinition(portDefinition PortDefinition) *Application { + if r.PortDefinitions == nil { + r.EmptyPortDefinitions() + } + + portDefinitions := *r.PortDefinitions + portDefinitions = append(portDefinitions, portDefinition) + r.PortDefinitions = &portDefinitions + return r +} + +// EmptyPortDefinitions explicitly empties port definitions -- use this if you need to empty +// port definitions of an application that already has port definitions set (setting port definitions to nil will +// keep the current value) +func (r *Application) EmptyPortDefinitions() *Application { + r.PortDefinitions = &[]PortDefinition{} + + return r +} + +// Count sets the number of instances of the application to run +// count: the number of instances to run +func (r *Application) Count(count int) *Application { + r.Instances = &count + + return r +} + +// AddArgs adds one or more arguments to the applications +// arguments: the argument(s) you are adding +func (r *Application) AddArgs(arguments ...string) *Application { + if r.Args == nil { + r.EmptyArgs() + } + + args := *r.Args + args = append(args, arguments...) + r.Args = &args + + return r +} + +// EmptyArgs explicitly empties arguments -- use this if you need to empty +// arguments of an application that already has arguments set (setting args to nil will +// keep the current value) +func (r *Application) EmptyArgs() *Application { + r.Args = &[]string{} + + return r +} + +// AddConstraint adds a new constraint +// constraints: the constraint definition, one constraint per array element +func (r *Application) AddConstraint(constraints ...string) *Application { + if r.Constraints == nil { + r.EmptyConstraints() + } + + c := *r.Constraints + c = append(c, constraints) + r.Constraints = &c + + return r +} + +// EmptyConstraints explicitly empties constraints -- use this if you need to empty +// constraints of an application that already has constraints set (setting constraints to nil will +// keep the current value) +func (r *Application) EmptyConstraints() *Application { + r.Constraints = &[][]string{} + + return r +} + +// AddLabel adds a label to the application +// name: the name of the label +// value: value for this label +func (r *Application) AddLabel(name, value string) *Application { + if r.Labels == nil { + r.EmptyLabels() + } + (*r.Labels)[name] = value + + return r +} + +// EmptyLabels explicitly empties the labels -- use this if you need to empty +// the labels of an application that already has labels set (setting labels to nil will +// keep the current value) +func (r *Application) EmptyLabels() *Application { + r.Labels = &map[string]string{} + + return r +} + +// AddEnv adds an environment variable to the application +// name: the name of the variable +// value: go figure, the value associated to the above +func (r *Application) AddEnv(name, value string) *Application { + if r.Env == nil { + r.EmptyEnvs() + } + (*r.Env)[name] = value + + return r +} + +// EmptyEnvs explicitly empties the envs -- use this if you need to empty +// the environments of an application that already has environments set (setting env to nil will +// keep the current value) +func (r *Application) EmptyEnvs() *Application { + r.Env = &map[string]string{} + + return r +} + +// SetExecutor sets the executor +func (r *Application) SetExecutor(executor string) *Application { + r.Executor = &executor + + return r +} + +// AddHealthCheck adds a health check +// healthCheck the health check that should be added +func (r *Application) AddHealthCheck(healthCheck HealthCheck) *Application { + if r.HealthChecks == nil { + r.EmptyHealthChecks() + } + + healthChecks := *r.HealthChecks + healthChecks = append(healthChecks, healthCheck) + r.HealthChecks = &healthChecks + + return r +} + +// EmptyHealthChecks explicitly empties health checks -- use this if you need to empty +// health checks of an application that already has health checks set (setting health checks to nil will +// keep the current value) +func (r *Application) EmptyHealthChecks() *Application { + r.HealthChecks = &[]HealthCheck{} + + return r +} + +// HasHealthChecks is a helper method, used to check if an application has health checks +func (r *Application) HasHealthChecks() bool { + return r.HealthChecks != nil && len(*r.HealthChecks) > 0 +} + +// DeploymentIDs retrieves the application deployments IDs +func (r *Application) DeploymentIDs() []*DeploymentID { + var deployments []*DeploymentID + + if r.Deployments == nil { + return deployments + } + + // step: extract the deployment id from the result + for _, deploy := range r.Deployments { + if id, found := deploy["id"]; found { + deployment := &DeploymentID{ + Version: r.Version, + DeploymentID: id, + } + deployments = append(deployments, deployment) + } + } + + return deployments +} + +// CheckHTTP adds a HTTP check to an application +// port: the port the check should be checking +// interval: the interval in seconds the check should be performed +func (r *Application) CheckHTTP(uri string, port, interval int) (*Application, error) { + if r.Container == nil || r.Container.Docker == nil { + return nil, ErrNoApplicationContainer + } + // step: get the port index + portIndex, err := r.Container.Docker.ServicePortIndex(port) + if err != nil { + return nil, err + } + health := NewDefaultHealthCheck() + health.IntervalSeconds = interval + *health.Path = uri + *health.PortIndex = portIndex + // step: add to the checks + r.AddHealthCheck(*health) + + return r, nil +} + +// CheckTCP adds a TCP check to an application; note the port mapping must already exist, or an +// error will thrown +// port: the port the check should, err, check +// interval: the interval in seconds the check should be performed +func (r *Application) CheckTCP(port, interval int) (*Application, error) { + if r.Container == nil || r.Container.Docker == nil { + return nil, ErrNoApplicationContainer + } + // step: get the port index + portIndex, err := r.Container.Docker.ServicePortIndex(port) + if err != nil { + return nil, err + } + health := NewDefaultHealthCheck() + health.Protocol = "TCP" + health.IntervalSeconds = interval + *health.PortIndex = portIndex + // step: add to the checks + r.AddHealthCheck(*health) + + return r, nil +} + +// AddUris adds one or more uris to the applications +// arguments: the uri(s) you are adding +func (r *Application) AddUris(newUris ...string) *Application { + if r.Uris == nil { + r.EmptyUris() + } + + uris := *r.Uris + uris = append(uris, newUris...) + r.Uris = &uris + + return r +} + +// EmptyUris explicitly empties uris -- use this if you need to empty +// uris of an application that already has uris set (setting uris to nil will +// keep the current value) +func (r *Application) EmptyUris() *Application { + r.Uris = &[]string{} + + return r +} + +// AddFetchURIs adds one or more fetch URIs to the application. +// fetchURIs: the fetch URI(s) to add. +func (r *Application) AddFetchURIs(fetchURIs ...Fetch) *Application { + if r.Fetch == nil { + r.EmptyFetchURIs() + } + + fetch := *r.Fetch + fetch = append(fetch, fetchURIs...) + r.Fetch = &fetch + + return r +} + +// EmptyFetchURIs explicitly empties fetch URIs -- use this if you need to empty +// fetch URIs of an application that already has fetch URIs set. +// Setting fetch URIs to nil will keep the current value. +func (r *Application) EmptyFetchURIs() *Application { + r.Fetch = &[]Fetch{} + + return r +} + +// String returns the json representation of this application +func (r *Application) String() string { + s, err := json.MarshalIndent(r, "", " ") + if err != nil { + return fmt.Sprintf(`{"error": "error decoding type into json: %s"}`, err) + } + + return string(s) +} + +// Applications retrieves an array of all the applications which are running in marathon +func (r *marathonClient) Applications(v url.Values) (*Applications, error) { + query := v.Encode() + if query != "" { + query = "?" + query + } + + applications := new(Applications) + err := r.apiGet(marathonAPIApps+query, nil, applications) + if err != nil { + return nil, err + } + + return applications, nil +} + +// ListApplications retrieves an array of the application names currently running in marathon +func (r *marathonClient) ListApplications(v url.Values) ([]string, error) { + applications, err := r.Applications(v) + if err != nil { + return nil, err + } + var list []string + for _, application := range applications.Apps { + list = append(list, application.ID) + } + + return list, nil +} + +// HasApplicationVersion checks to see if the application version exists in Marathon +// name: the id used to identify the application +// version: the version (normally a timestamp) your looking for +func (r *marathonClient) HasApplicationVersion(name, version string) (bool, error) { + id := trimRootPath(name) + versions, err := r.ApplicationVersions(id) + if err != nil { + return false, err + } + + return contains(versions.Versions, version), nil +} + +// ApplicationVersions is a list of versions which has been deployed with marathon for a specific application +// name: the id used to identify the application +func (r *marathonClient) ApplicationVersions(name string) (*ApplicationVersions, error) { + uri := fmt.Sprintf("%s/versions", buildURI(name)) + versions := new(ApplicationVersions) + if err := r.apiGet(uri, nil, versions); err != nil { + return nil, err + } + return versions, nil +} + +// SetApplicationVersion changes the version of the application +// name: the id used to identify the application +// version: the version (normally a timestamp) you wish to change to +func (r *marathonClient) SetApplicationVersion(name string, version *ApplicationVersion) (*DeploymentID, error) { + uri := fmt.Sprintf(buildURI(name)) + deploymentID := new(DeploymentID) + if err := r.apiPut(uri, version, deploymentID); err != nil { + return nil, err + } + + return deploymentID, nil +} + +// Application retrieves the application configuration from marathon +// name: the id used to identify the application +func (r *marathonClient) Application(name string) (*Application, error) { + var wrapper struct { + Application *Application `json:"app"` + } + + if err := r.apiGet(buildURI(name), nil, &wrapper); err != nil { + return nil, err + } + + return wrapper.Application, nil +} + +// ApplicationBy retrieves the application configuration from marathon +// name: the id used to identify the application +// opts: GetAppOpts request payload +func (r *marathonClient) ApplicationBy(name string, opts *GetAppOpts) (*Application, error) { + u, err := addOptions(buildURI(name), opts) + if err != nil { + return nil, err + } + var wrapper struct { + Application *Application `json:"app"` + } + + if err := r.apiGet(u, nil, &wrapper); err != nil { + return nil, err + } + + return wrapper.Application, nil +} + +// ApplicationByVersion retrieves the application configuration from marathon +// name: the id used to identify the application +// version: the version of the configuration you would like to receive +func (r *marathonClient) ApplicationByVersion(name, version string) (*Application, error) { + app := new(Application) + + uri := fmt.Sprintf("%s/versions/%s", buildURI(name), version) + if err := r.apiGet(uri, nil, app); err != nil { + return nil, err + } + + return app, nil +} + +// ApplicationOK validates that the application, or more appropriately it's tasks have passed all the health checks. +// If no health checks exist, we simply return true +// name: the id used to identify the application +func (r *marathonClient) ApplicationOK(name string) (bool, error) { + // step: get the application + application, err := r.Application(name) + if err != nil { + return false, err + } + + // step: check if all the tasks are running? + if !application.AllTaskRunning() { + return false, nil + } + + // step: if the application has not health checks, just return true + if application.HealthChecks == nil || len(*application.HealthChecks) == 0 { + return true, nil + } + + // step: iterate the application checks and look for false + for _, task := range application.Tasks { + // Health check results may not be available immediately. Assume + // non-healthiness if they are missing for any task. + if task.HealthCheckResults == nil { + return false, nil + } + + for _, check := range task.HealthCheckResults { + //When a task is flapping in Marathon, this is sometimes nil + if check == nil || !check.Alive { + return false, nil + } + } + } + + return true, nil +} + +// ApplicationDeployments retrieves an array of Deployment IDs for an application +// name: the id used to identify the application +func (r *marathonClient) ApplicationDeployments(name string) ([]*DeploymentID, error) { + application, err := r.Application(name) + if err != nil { + return nil, err + } + + return application.DeploymentIDs(), nil +} + +// CreateApplication creates a new application in Marathon +// application: the structure holding the application configuration +func (r *marathonClient) CreateApplication(application *Application) (*Application, error) { + result := new(Application) + if err := r.apiPost(marathonAPIApps, application, result); err != nil { + return nil, err + } + + return result, nil +} + +// WaitOnApplication waits for an application to be deployed +// name: the id of the application +// timeout: a duration of time to wait for an application to deploy +func (r *marathonClient) WaitOnApplication(name string, timeout time.Duration) error { + if r.appExistAndRunning(name) { + return nil + } + + timeoutTimer := time.After(timeout) + ticker := time.NewTicker(r.config.PollingWaitTime) + defer ticker.Stop() + + for { + select { + case <-timeoutTimer: + return ErrTimeoutError + case <-ticker.C: + if r.appExistAndRunning(name) { + return nil + } + } + } +} + +func (r *marathonClient) appExistAndRunning(name string) bool { + app, err := r.Application(name) + if apiErr, ok := err.(*APIError); ok && apiErr.ErrCode == ErrCodeNotFound { + return false + } + if err == nil && app.AllTaskRunning() { + return true + } + return false +} + +// DeleteApplication deletes an application from marathon +// name: the id used to identify the application +// force: used to force the delete operation in case of blocked deployment +func (r *marathonClient) DeleteApplication(name string, force bool) (*DeploymentID, error) { + uri := buildURIWithForceParam(name, force) + // step: check of the application already exists + deployID := new(DeploymentID) + if err := r.apiDelete(uri, nil, deployID); err != nil { + return nil, err + } + + return deployID, nil +} + +// RestartApplication performs a rolling restart of marathon application +// name: the id used to identify the application +func (r *marathonClient) RestartApplication(name string, force bool) (*DeploymentID, error) { + deployment := new(DeploymentID) + var options struct{} + uri := buildURIWithForceParam(fmt.Sprintf("%s/restart", name), force) + if err := r.apiPost(uri, &options, deployment); err != nil { + return nil, err + } + + return deployment, nil +} + +// ScaleApplicationInstances changes the number of instance an application is running +// name: the id used to identify the application +// instances: the number of instances you wish to change to +// force: used to force the scale operation in case of blocked deployment +func (r *marathonClient) ScaleApplicationInstances(name string, instances int, force bool) (*DeploymentID, error) { + changes := new(Application) + changes.ID = validateID(name) + changes.Instances = &instances + uri := buildURIWithForceParam(name, force) + deployID := new(DeploymentID) + if err := r.apiPut(uri, changes, deployID); err != nil { + return nil, err + } + + return deployID, nil +} + +// UpdateApplication updates an application in Marathon +// application: the structure holding the application configuration +func (r *marathonClient) UpdateApplication(application *Application, force bool) (*DeploymentID, error) { + result := new(DeploymentID) + uri := buildURIWithForceParam(application.ID, force) + if err := r.apiPut(uri, application, result); err != nil { + return nil, err + } + return result, nil +} + +func buildURIWithForceParam(path string, force bool) string { + uri := buildURI(path) + if force { + uri += "?force=true" + } + return uri +} + +func buildURI(path string) string { + return fmt.Sprintf("%s/%s", marathonAPIApps, trimRootPath(path)) +} + +// EmptyLabels explicitly empties labels -- use this if you need to empty +// labels of an application that already has IP per task with labels defined +func (i *IPAddressPerTask) EmptyLabels() *IPAddressPerTask { + i.Labels = &map[string]string{} + return i +} + +// AddLabel adds a label to an IPAddressPerTask +// name: The label name +// value: The label value +func (i *IPAddressPerTask) AddLabel(name, value string) *IPAddressPerTask { + if i.Labels == nil { + i.EmptyLabels() + } + (*i.Labels)[name] = value + return i +} + +// EmptyGroups explicitly empties groups -- use this if you need to empty +// groups of an application that already has IP per task with groups defined +func (i *IPAddressPerTask) EmptyGroups() *IPAddressPerTask { + i.Groups = &[]string{} + return i +} + +// AddGroup adds a group to an IPAddressPerTask +// group: The group name +func (i *IPAddressPerTask) AddGroup(group string) *IPAddressPerTask { + if i.Groups == nil { + i.EmptyGroups() + } + + groups := *i.Groups + groups = append(groups, group) + i.Groups = &groups + + return i +} + +// SetDiscovery define the discovery to an IPAddressPerTask +// discovery: The discovery struct +func (i *IPAddressPerTask) SetDiscovery(discovery Discovery) *IPAddressPerTask { + i.Discovery = &discovery + return i +} + +// EmptyPorts explicitly empties discovey port -- use this if you need to empty +// discovey port of an application that already has IP per task with discovey ports +// defined +func (d *Discovery) EmptyPorts() *Discovery { + d.Ports = &[]Port{} + return d +} + +// AddPort adds a port to the discovery info of a IP per task applicable +// port: The discovery port +func (d *Discovery) AddPort(port Port) *Discovery { + if d.Ports == nil { + d.EmptyPorts() + } + ports := *d.Ports + ports = append(ports, port) + d.Ports = &ports + return d +} diff --git a/vendor/github.com/matt-deboer/go-marathon/client.go b/vendor/github.com/matt-deboer/go-marathon/client.go new file mode 100644 index 0000000..6ad34e4 --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/client.go @@ -0,0 +1,349 @@ +/* +Copyright 2014 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "net/url" + "regexp" + "sync" + "time" +) + +// Marathon is the interface to the marathon API +type Marathon interface { + // -- APPLICATIONS --- + + // get a listing of the application ids + ListApplications(url.Values) ([]string, error) + // a list of application versions + ApplicationVersions(name string) (*ApplicationVersions, error) + // check a application version exists + HasApplicationVersion(name, version string) (bool, error) + // change an application to a different version + SetApplicationVersion(name string, version *ApplicationVersion) (*DeploymentID, error) + // check if an application is ok + ApplicationOK(name string) (bool, error) + // create an application in marathon + CreateApplication(application *Application) (*Application, error) + // delete an application + DeleteApplication(name string, force bool) (*DeploymentID, error) + // update an application in marathon + UpdateApplication(application *Application, force bool) (*DeploymentID, error) + // a list of deployments on a application + ApplicationDeployments(name string) ([]*DeploymentID, error) + // scale a application + ScaleApplicationInstances(name string, instances int, force bool) (*DeploymentID, error) + // restart an application + RestartApplication(name string, force bool) (*DeploymentID, error) + // get a list of applications from marathon + Applications(url.Values) (*Applications, error) + // get an application by name + Application(name string) (*Application, error) + // get an application by options + ApplicationBy(name string, opts *GetAppOpts) (*Application, error) + // get an application by name and version + ApplicationByVersion(name, version string) (*Application, error) + // wait of application + WaitOnApplication(name string, timeout time.Duration) error + + // -- TASKS --- + + // get a list of tasks for a specific application + Tasks(application string) (*Tasks, error) + // get a list of all tasks + AllTasks(opts *AllTasksOpts) (*Tasks, error) + // get the endpoints for a service on a application + TaskEndpoints(name string, port int, healthCheck bool) ([]string, error) + // kill all the tasks for any application + KillApplicationTasks(applicationID string, opts *KillApplicationTasksOpts) (*Tasks, error) + // kill a single task + KillTask(taskID string, opts *KillTaskOpts) (*Task, error) + // kill the given array of tasks + KillTasks(taskIDs []string, opts *KillTaskOpts) error + + // --- GROUPS --- + + // list all the groups in the system + Groups() (*Groups, error) + // retrieve a specific group from marathon + Group(name string) (*Group, error) + // list all groups in marathon by options + GroupsBy(opts *GetGroupOpts) (*Groups, error) + // retrieve a specific group from marathon by options + GroupBy(name string, opts *GetGroupOpts) (*Group, error) + // create a group deployment + CreateGroup(group *Group) error + // delete a group + DeleteGroup(name string, force bool) (*DeploymentID, error) + // update a groups + UpdateGroup(id string, group *Group, force bool) (*DeploymentID, error) + // check if a group exists + HasGroup(name string) (bool, error) + // wait for an group to be deployed + WaitOnGroup(name string, timeout time.Duration) error + + // --- DEPLOYMENTS --- + + // get a list of the deployments + Deployments() ([]*Deployment, error) + // delete a deployment + DeleteDeployment(id string, force bool) (*DeploymentID, error) + // check to see if a deployment exists + HasDeployment(id string) (bool, error) + // wait of a deployment to finish + WaitOnDeployment(id string, timeout time.Duration) error + + // --- SUBSCRIPTIONS --- + + // a list of current subscriptions + Subscriptions() (*Subscriptions, error) + // add a events listener + AddEventsListener(filter int) (EventsChannel, error) + // remove a events listener + RemoveEventsListener(channel EventsChannel) + // Subscribe a callback URL + Subscribe(string) error + // Unsubscribe a callback URL + Unsubscribe(string) error + + // --- QUEUE --- + // get marathon launch queue + Queue() (*Queue, error) + // resets task launch delay of the specific application + DeleteQueueDelay(appID string) error + + // --- MISC --- + + // get the marathon url + GetMarathonURL() string + // ping the marathon + Ping() (bool, error) + // grab the marathon server info + Info() (*Info, error) + // retrieve the leader info + Leader() (string, error) + // cause the current leader to abdicate + AbdicateLeader() (string, error) +} + +var ( + // ErrInvalidResponse is thrown when marathon responds with invalid or error response + ErrInvalidResponse = errors.New("invalid response from Marathon") + // ErrMarathonDown is thrown when all the marathon endpoints are down + ErrMarathonDown = errors.New("all the Marathon hosts are presently down") + // ErrTimeoutError is thrown when the operation has timed out + ErrTimeoutError = errors.New("the operation has timed out") +) + +// EventsChannelContext holds contextual data for an EventsChannel. +type EventsChannelContext struct { + filter int + done chan struct{} + completion *sync.WaitGroup +} + +type marathonClient struct { + sync.RWMutex + // the configuration for the client + config Config + // the flag used to prevent multiple SSE subscriptions + subscribedToSSE bool + // the ip address of the client + ipAddress string + // the http server + eventsHTTP *http.Server + // the http client use for making requests + httpClient *http.Client + // the marathon hosts + hosts *cluster + // a map of service you wish to listen to + listeners map[EventsChannel]EventsChannelContext + // a custom logger for debug log messages + debugLog *log.Logger +} + +// NewClient creates a new marathon client +// config: the configuration to use +func NewClient(config Config) (Marathon, error) { + // step: if no http client, set to default + if config.HTTPClient == nil { + config.HTTPClient = http.DefaultClient + } + + // step: if no polling wait time is set, default to 500 milliseconds. + if config.PollingWaitTime == 0 { + config.PollingWaitTime = defaultPollingWaitTime + } + + // step: create a new cluster + hosts, err := newCluster(config.HTTPClient, config.URL) + if err != nil { + return nil, err + } + + debugLogOutput := config.LogOutput + if debugLogOutput == nil { + debugLogOutput = ioutil.Discard + } + + return &marathonClient{ + config: config, + listeners: make(map[EventsChannel]EventsChannelContext), + hosts: hosts, + httpClient: config.HTTPClient, + debugLog: log.New(debugLogOutput, "", 0), + }, nil +} + +// GetMarathonURL retrieves the marathon url +func (r *marathonClient) GetMarathonURL() string { + return r.config.URL +} + +// Ping pings the current marathon endpoint (note, this is not a ICMP ping, but a rest api call) +func (r *marathonClient) Ping() (bool, error) { + if err := r.apiGet(marathonAPIPing, nil, nil); err != nil { + return false, err + } + return true, nil +} + +func (r *marathonClient) apiGet(uri string, post, result interface{}) error { + return r.apiCall("GET", uri, post, result) +} + +func (r *marathonClient) apiPut(uri string, post, result interface{}) error { + return r.apiCall("PUT", uri, post, result) +} + +func (r *marathonClient) apiPost(uri string, post, result interface{}) error { + return r.apiCall("POST", uri, post, result) +} + +func (r *marathonClient) apiDelete(uri string, post, result interface{}) error { + return r.apiCall("DELETE", uri, post, result) +} + +func (r *marathonClient) apiCall(method, uri string, body, result interface{}) error { + for { + // step: grab a member from the cluster and attempt to perform the request + member, err := r.hosts.getMember() + if err != nil { + return ErrMarathonDown + } + + // step: Create the endpoint url + url := fmt.Sprintf("%s/%s", member, uri) + if r.config.DCOSToken != "" { + url = fmt.Sprintf("%s/%s", member+"/marathon", uri) + } + + // step: marshall the request to json + var requestBody []byte + if body != nil { + if requestBody, err = json.Marshal(body); err != nil { + return err + } + } + + // step: create the api request + request, err := r.buildAPIRequest(method, url, bytes.NewReader(requestBody)) + if err != nil { + return err + } + response, err := r.httpClient.Do(request) + if err != nil { + r.hosts.markDown(member) + // step: attempt the request on another member + r.debugLog.Printf("apiCall(): request failed on host: %s, error: %s, trying another\n", member, err) + continue + } + defer response.Body.Close() + + // step: read the response body + respBody, err := ioutil.ReadAll(response.Body) + if err != nil { + return err + } + + if len(requestBody) > 0 { + r.debugLog.Printf("apiCall(): %v %v %s returned %v %s\n", request.Method, request.URL.String(), requestBody, response.Status, oneLogLine(respBody)) + } else { + r.debugLog.Printf("apiCall(): %v %v returned %v %s\n", request.Method, request.URL.String(), response.Status, oneLogLine(respBody)) + } + + // step: check for a successfull response + if response.StatusCode >= 200 && response.StatusCode <= 299 { + if result != nil { + if err := json.Unmarshal(respBody, result); err != nil { + r.debugLog.Printf("apiCall(): failed to unmarshall the response from marathon, error: %s\n", err) + return ErrInvalidResponse + } + } + return nil + } + + // step: if the member node returns a >= 500 && <= 599 we should try another node? + if response.StatusCode >= 500 && response.StatusCode <= 599 { + // step: mark the host as down + r.hosts.markDown(member) + r.debugLog.Printf("apiCall(): request failed, host: %s, status: %d, trying another\n", member, response.StatusCode) + continue + } + + return NewAPIError(response.StatusCode, respBody) + } +} + +// buildAPIRequest creates a default API request +func (r *marathonClient) buildAPIRequest(method, url string, reader io.Reader) (*http.Request, error) { + // Make the http request to Marathon + request, err := http.NewRequest(method, url, reader) + if err != nil { + return nil, err + } + + // Add any basic auth and the content headers + if r.config.HTTPBasicAuthUser != "" && r.config.HTTPBasicPassword != "" { + request.SetBasicAuth(r.config.HTTPBasicAuthUser, r.config.HTTPBasicPassword) + } + + if r.config.DCOSToken != "" { + request.Header.Add("Authorization", "token="+r.config.DCOSToken) + } + + request.Header.Add("Content-Type", "application/json") + request.Header.Add("Accept", "application/json") + + return request, nil +} + +var oneLogLineRegex = regexp.MustCompile(`(?m)^\s*`) + +// oneLogLine removes indentation at the beginning of each line and +// escapes new line characters. +func oneLogLine(in []byte) []byte { + return bytes.Replace(oneLogLineRegex.ReplaceAll(in, nil), []byte("\n"), []byte("\\n "), -1) +} diff --git a/vendor/github.com/matt-deboer/go-marathon/cluster.go b/vendor/github.com/matt-deboer/go-marathon/cluster.go new file mode 100644 index 0000000..04b60be --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/cluster.go @@ -0,0 +1,180 @@ +/* +Copyright 2014 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +import ( + "fmt" + "net/http" + "net/url" + "strings" + "sync" + "time" +) + +const ( + memberStatusUp = 0 + memberStatusDown = 1 +) + +// the status of a member node +type memberStatus int + +// cluster is a collection of marathon nodes +type cluster struct { + sync.RWMutex + // a collection of nodes + members []*member + // the http client + client *http.Client +} + +// member represents an individual endpoint +type member struct { + // the name / ip address of the host + endpoint string + // the status of the host + status memberStatus +} + +// newCluster returns a new marathon cluster +func newCluster(client *http.Client, marathonURL string) (*cluster, error) { + // step: extract and basic validate the endpoints + var members []*member + var defaultProto string + + for _, endpoint := range strings.Split(marathonURL, ",") { + // step: check for nothing + if endpoint == "" { + return nil, newInvalidEndpointError("endpoint is blank") + } + // step: parse the url + u, err := url.Parse(endpoint) + if err != nil { + return nil, newInvalidEndpointError("endpoint: %s is invalid reason: %s", endpoint, err) + } + // step: set the default protocol schema + if defaultProto == "" { + if u.Scheme != "http" && u.Scheme != "https" { + return nil, newInvalidEndpointError("endpoint: %s protocol must be (http|https)", endpoint) + } + defaultProto = u.Scheme + } + // step: does the url have a protocol schema? if not, use the default + if u.Scheme == "" || u.Opaque != "" { + urlWithScheme := fmt.Sprintf("%s://%s", defaultProto, u.String()) + if u, err = url.Parse(urlWithScheme); err != nil { + panic(fmt.Sprintf("unexpected parsing error for URL '%s' with added default scheme: %s", urlWithScheme, err)) + } + } + + // step: check for empty hosts + if u.Host == "" { + return nil, newInvalidEndpointError("endpoint: %s must have a host", endpoint) + } + + // step: create a new node for this endpoint + members = append(members, &member{endpoint: u.String()}) + } + + return &cluster{ + client: client, + members: members, + }, nil +} + +// retrieve the current member, i.e. the current endpoint in use +func (c *cluster) getMember() (string, error) { + c.RLock() + defer c.RUnlock() + for _, n := range c.members { + if n.status == memberStatusUp { + return n.endpoint, nil + } + } + + return "", ErrMarathonDown +} + +// markDown marks down the current endpoint +func (c *cluster) markDown(endpoint string) { + c.Lock() + defer c.Unlock() + for _, n := range c.members { + // step: check if this is the node and it's marked as up - The double checking on the + // nodes status ensures the multiple calls don't create multiple checks + if n.status == memberStatusUp && n.endpoint == endpoint { + n.status = memberStatusDown + go c.healthCheckNode(n) + break + } + } +} + +// healthCheckNode performs a health check on the node and when active updates the status +func (c *cluster) healthCheckNode(node *member) { + // step: wait for the node to become active ... we are assuming a /ping is enough here + for { + res, err := c.client.Get(fmt.Sprintf("%s/ping", node.endpoint)) + if err == nil && res.StatusCode == 200 { + break + } + <-time.After(time.Duration(5 * time.Second)) + } + // step: mark the node as active again + c.Lock() + defer c.Unlock() + node.status = memberStatusUp +} + +// activeMembers returns a list of active members +func (c *cluster) activeMembers() []string { + return c.membersList(memberStatusUp) +} + +// nonActiveMembers returns a list of non-active members in the cluster +func (c *cluster) nonActiveMembers() []string { + return c.membersList(memberStatusDown) +} + +// memberList returns a list of members of a specified status +func (c *cluster) membersList(status memberStatus) []string { + c.RLock() + defer c.RUnlock() + var list []string + for _, m := range c.members { + if m.status == status { + list = append(list, m.endpoint) + } + } + + return list +} + +// size returns the size of the cluster +func (c *cluster) size() int { + return len(c.members) +} + +// String returns a string representation +func (m member) String() string { + status := "UP" + if m.status == memberStatusDown { + status = "DOWN" + } + + return fmt.Sprintf("member: %s:%s", m.endpoint, status) +} diff --git a/vendor/github.com/matt-deboer/go-marathon/config.go b/vendor/github.com/matt-deboer/go-marathon/config.go new file mode 100644 index 0000000..a0ede28 --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/config.go @@ -0,0 +1,67 @@ +/* +Copyright 2014 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +import ( + "io" + "io/ioutil" + "net/http" + "time" +) + +const defaultPollingWaitTime = 500 * time.Millisecond + +// EventsTransport describes which transport should be used to deliver Marathon events +type EventsTransport int + +// Config holds the settings and options for the client +type Config struct { + // URL is the url for marathon + URL string + // EventsTransport is the events transport: EventsTransportCallback or EventsTransportSSE + EventsTransport EventsTransport + // EventsPort is the event handler port + EventsPort int + // the interface we should be listening on for events + EventsInterface string + // HTTPBasicAuthUser is the http basic auth + HTTPBasicAuthUser string + // HTTPBasicPassword is the http basic password + HTTPBasicPassword string + // CallbackURL custom callback url + CallbackURL string + // DCOSToken for DCOS environment, This will override the Authorization header + DCOSToken string + // LogOutput the output for debug log messages + LogOutput io.Writer + // HTTPClient is the http client + HTTPClient *http.Client + // wait time (in milliseconds) between repetitive requests to the API during polling + PollingWaitTime time.Duration +} + +// NewDefaultConfig create a default client config +func NewDefaultConfig() Config { + return Config{ + URL: "http://127.0.0.1:8080", + EventsTransport: EventsTransportCallback, + EventsPort: 10001, + EventsInterface: "eth0", + LogOutput: ioutil.Discard, + PollingWaitTime: defaultPollingWaitTime, + } +} diff --git a/vendor/github.com/matt-deboer/go-marathon/const.go b/vendor/github.com/matt-deboer/go-marathon/const.go new file mode 100644 index 0000000..43b1d46 --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/const.go @@ -0,0 +1,42 @@ +/* +Copyright 2014 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +const ( + defaultEventsURL = "/event" + + /* --- api related constants --- */ + marathonAPIVersion = "v2" + marathonAPIEventStream = marathonAPIVersion + "/events" + marathonAPISubscription = marathonAPIVersion + "/eventSubscriptions" + marathonAPIApps = marathonAPIVersion + "/apps" + marathonAPITasks = marathonAPIVersion + "/tasks" + marathonAPIDeployments = marathonAPIVersion + "/deployments" + marathonAPIGroups = marathonAPIVersion + "/groups" + marathonAPIQueue = marathonAPIVersion + "/queue" + marathonAPIInfo = marathonAPIVersion + "/info" + marathonAPILeader = marathonAPIVersion + "/leader" + marathonAPIPing = "ping" +) + +const ( + // EventsTransportCallback activates callback events transport + EventsTransportCallback EventsTransport = 1 << iota + + // EventsTransportSSE activates stream events transport + EventsTransportSSE +) diff --git a/vendor/github.com/matt-deboer/go-marathon/deployment.go b/vendor/github.com/matt-deboer/go-marathon/deployment.go new file mode 100644 index 0000000..e4c30ac --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/deployment.go @@ -0,0 +1,156 @@ +/* +Copyright 2014 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +import ( + "encoding/json" + "fmt" + "time" +) + +// Deployment is a marathon deployment definition +type Deployment struct { + ID string `json:"id"` + Version string `json:"version"` + CurrentStep int `json:"currentStep"` + TotalSteps int `json:"totalSteps"` + AffectedApps []string `json:"affectedApps"` + Steps [][]*DeploymentStep `json:"-"` + XXStepsRaw json.RawMessage `json:"steps"` // Holds raw steps JSON to unmarshal later + CurrentActions []*DeploymentStep `json:"currentActions"` +} + +// DeploymentID is the identifier for a application deployment +type DeploymentID struct { + DeploymentID string `json:"deploymentId"` + Version string `json:"version"` +} + +// DeploymentStep is a step in the application deployment plan +type DeploymentStep struct { + Action string `json:"action"` + App string `json:"app"` +} + +// StepActions is a series of deployment steps +type StepActions struct { + Actions []struct { + Type string `json:"type"` + App string `json:"app"` + } +} + +// DeploymentPlan is a collection of steps for application deployment +type DeploymentPlan struct { + ID string `json:"id"` + Version string `json:"version"` + Original *Group `json:"original"` + Target *Group `json:"target"` + Steps []*StepActions `json:"steps"` +} + +// Deployments retrieves a list of current deployments +func (r *marathonClient) Deployments() ([]*Deployment, error) { + var deployments []*Deployment + err := r.apiGet(marathonAPIDeployments, nil, &deployments) + if err != nil { + return nil, err + } + // Allows loading of deployment steps from the Marathon v1.X API + // Implements a fix for issue https://github.com/gambol99/go-marathon/issues/153 + for _, deployment := range deployments { + // Unmarshal pre-v1.X step + if err := json.Unmarshal(deployment.XXStepsRaw, &deployment.Steps); err != nil { + deployment.Steps = make([][]*DeploymentStep, 0) + var steps []*StepActions + // Unmarshal v1.X Marathon step + if err := json.Unmarshal(deployment.XXStepsRaw, &steps); err != nil { + return nil, err + } + for stepIndex, step := range steps { + deployment.Steps = append(deployment.Steps, make([]*DeploymentStep, len(step.Actions))) + for actionIndex, action := range step.Actions { + deployment.Steps[stepIndex][actionIndex] = &DeploymentStep{ + Action: action.Type, + App: action.App, + } + } + } + } + } + return deployments, nil +} + +// DeleteDeployment delete a current deployment from marathon +// id: the deployment id you wish to delete +// force: whether or not to force the deletion +func (r *marathonClient) DeleteDeployment(id string, force bool) (*DeploymentID, error) { + deployment := new(DeploymentID) + err := r.apiDelete(fmt.Sprintf("%s/%s", marathonAPIDeployments, id), nil, deployment) + if err != nil { + return nil, err + } + + return deployment, nil +} + +// HasDeployment checks to see if a deployment exists +// id: the deployment id you are looking for +func (r *marathonClient) HasDeployment(id string) (bool, error) { + deployments, err := r.Deployments() + if err != nil { + return false, err + } + for _, deployment := range deployments { + if deployment.ID == id { + return true, nil + } + } + return false, nil +} + +// WaitOnDeployment waits on a deployment to finish +// version: the version of the application +// timeout: the timeout to wait for the deployment to take, otherwise return an error +func (r *marathonClient) WaitOnDeployment(id string, timeout time.Duration) error { + if found, err := r.HasDeployment(id); err != nil { + return err + } else if !found { + return nil + } + + nowTime := time.Now() + stopTime := nowTime.Add(timeout) + if timeout <= 0 { + stopTime = nowTime.Add(time.Duration(900) * time.Second) + } + + // step: a somewhat naive implementation, but it will work + for { + if time.Now().After(stopTime) { + return ErrTimeoutError + } + found, err := r.HasDeployment(id) + if err != nil { + return err + } + if !found { + return nil + } + time.Sleep(r.config.PollingWaitTime) + } +} diff --git a/vendor/github.com/matt-deboer/go-marathon/docker.go b/vendor/github.com/matt-deboer/go-marathon/docker.go new file mode 100644 index 0000000..550409a --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/docker.go @@ -0,0 +1,294 @@ +/* +Copyright 2014 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +import ( + "errors" + "fmt" +) + +// Container is the definition for a container type in marathon +type Container struct { + Type string `json:"type,omitempty"` + Docker *Docker `json:"docker,omitempty"` + Volumes *[]Volume `json:"volumes,omitempty"` +} + +// PortMapping is the portmapping structure between container and mesos +type PortMapping struct { + ContainerPort int `json:"containerPort,omitempty"` + HostPort int `json:"hostPort"` + Labels *map[string]string `json:"labels,omitempty"` + Name string `json:"name,omitempty"` + ServicePort int `json:"servicePort,omitempty"` + Protocol string `json:"protocol,omitempty"` +} + +// Parameters is the parameters to pass to the docker client when creating the container +type Parameters struct { + Key string `json:"key,omitempty"` + Value string `json:"value,omitempty"` +} + +// Volume is the docker volume details associated to the container +type Volume struct { + ContainerPath string `json:"containerPath,omitempty"` + HostPath string `json:"hostPath,omitempty"` + External *ExternalVolume `json:"external,omitempty"` + Mode string `json:"mode,omitempty"` +} + +// ExternalVolume is an external volume definition +type ExternalVolume struct { + Name string `json:"name,omitempty"` + Provider string `json:"provider,omitempty"` + Options *map[string]string `json:"options,omitempty"` +} + +// Docker is the docker definition from a marathon application +type Docker struct { + ForcePullImage *bool `json:"forcePullImage,omitempty"` + Image string `json:"image,omitempty"` + Network string `json:"network,omitempty"` + Parameters *[]Parameters `json:"parameters,omitempty"` + PortMappings *[]PortMapping `json:"portMappings,omitempty"` + Privileged *bool `json:"privileged,omitempty"` +} + +// Volume attachs a volume to the container +// host_path: the path on the docker host to map +// container_path: the path inside the container to map the host volume +// mode: the mode to map the container +func (container *Container) Volume(hostPath, containerPath, mode string) *Container { + if container.Volumes == nil { + container.EmptyVolumes() + } + + volumes := *container.Volumes + volumes = append(volumes, Volume{ + ContainerPath: containerPath, + HostPath: hostPath, + Mode: mode, + }) + + container.Volumes = &volumes + + return container +} + +// EmptyVolumes explicitly empties the volumes -- use this if you need to empty +// volumes of an application that already has volumes set (setting volumes to nil will +// keep the current value) +func (container *Container) EmptyVolumes() *Container { + container.Volumes = &[]Volume{} + return container +} + +// SetExternalVolume define external elements for a volume +// name: the name of the volume +// provider: the provider of the volume (e.g. dvdi) +func (v *Volume) SetExternalVolume(name, provider string) *ExternalVolume { + ev := &ExternalVolume{ + Name: name, + Provider: provider, + } + v.External = ev + return ev +} + +// EmptyExternalVolume emptys the external volume definition +func (v *Volume) EmptyExternalVolume() *Volume { + v.External = &ExternalVolume{} + return v +} + +// AddOption adds an option to an ExternalVolume +// name: the name of the option +// value: value for the option +func (ev *ExternalVolume) AddOption(name, value string) *ExternalVolume { + if ev.Options == nil { + ev.EmptyOptions() + } + (*ev.Options)[name] = value + + return ev +} + +// EmptyOptions explicitly empties the options +func (ev *ExternalVolume) EmptyOptions() *ExternalVolume { + ev.Options = &map[string]string{} + + return ev +} + +// NewDockerContainer creates a default docker container for you +func NewDockerContainer() *Container { + container := &Container{} + container.Type = "DOCKER" + container.Docker = &Docker{} + + return container +} + +// SetForcePullImage sets whether the docker image should always be force pulled before +// starting an instance +// forcePull: true / false +func (docker *Docker) SetForcePullImage(forcePull bool) *Docker { + docker.ForcePullImage = &forcePull + + return docker +} + +// SetPrivileged sets whether the docker image should be started +// with privilege turned on +// priv: true / false +func (docker *Docker) SetPrivileged(priv bool) *Docker { + docker.Privileged = &priv + + return docker +} + +// Container sets the image of the container +// image: the image name you are using +func (docker *Docker) Container(image string) *Docker { + docker.Image = image + return docker +} + +// Bridged sets the networking mode to bridged +func (docker *Docker) Bridged() *Docker { + docker.Network = "BRIDGE" + return docker +} + +// Host sets the networking mode to host +func (docker *Docker) Host() *Docker { + docker.Network = "HOST" + return docker +} + +// Expose sets the container to expose the following TCP ports +// ports: the TCP ports the container is exposing +func (docker *Docker) Expose(ports ...int) *Docker { + for _, port := range ports { + docker.ExposePort(PortMapping{ + ContainerPort: port, + HostPort: 0, + ServicePort: 0, + Protocol: "tcp"}) + } + return docker +} + +// ExposeUDP sets the container to expose the following UDP ports +// ports: the UDP ports the container is exposing +func (docker *Docker) ExposeUDP(ports ...int) *Docker { + for _, port := range ports { + docker.ExposePort(PortMapping{ + ContainerPort: port, + HostPort: 0, + ServicePort: 0, + Protocol: "udp"}) + } + return docker +} + +// ExposePort exposes an port in the container +func (docker *Docker) ExposePort(portMapping PortMapping) *Docker { + if docker.PortMappings == nil { + docker.EmptyPortMappings() + } + + portMappings := *docker.PortMappings + portMappings = append(portMappings, portMapping) + docker.PortMappings = &portMappings + + return docker +} + +// EmptyPortMappings explicitly empties the port mappings -- use this if you need to empty +// port mappings of an application that already has port mappings set (setting port mappings to nil will +// keep the current value) +func (docker *Docker) EmptyPortMappings() *Docker { + docker.PortMappings = &[]PortMapping{} + return docker +} + +// AddLabel adds a label to a PortMapping +// name: the name of the label +// value: value for this label +func (p *PortMapping) AddLabel(name, value string) *PortMapping { + if p.Labels == nil { + p.EmptyLabels() + } + (*p.Labels)[name] = value + + return p +} + +// EmptyLabels explicitly empties the labels -- use this if you need to empty +// the labels of a port mapping that already has labels set (setting labels to +// nil will keep the current value) +func (p *PortMapping) EmptyLabels() *PortMapping { + p.Labels = &map[string]string{} + + return p +} + +// AddParameter adds a parameter to the docker execution line when creating the container +// key: the name of the option to add +// value: the value of the option +func (docker *Docker) AddParameter(key string, value string) *Docker { + if docker.Parameters == nil { + docker.EmptyParameters() + } + + parameters := *docker.Parameters + parameters = append(parameters, Parameters{ + Key: key, + Value: value}) + + docker.Parameters = ¶meters + + return docker +} + +// EmptyParameters explicitly empties the parameters -- use this if you need to empty +// parameters of an application that already has parameters set (setting parameters to nil will +// keep the current value) +func (docker *Docker) EmptyParameters() *Docker { + docker.Parameters = &[]Parameters{} + return docker +} + +// ServicePortIndex finds the service port index of the exposed port +// port: the port you are looking for +func (docker *Docker) ServicePortIndex(port int) (int, error) { + if docker.PortMappings == nil || len(*docker.PortMappings) == 0 { + return 0, errors.New("The docker does not contain any port mappings to search") + } + + // step: iterate and find the port + for index, containerPort := range *docker.PortMappings { + if containerPort.ContainerPort == port { + return index, nil + } + } + + // step: we didn't find the port in the mappings + return 0, fmt.Errorf("The container port required was not found in the container port mappings") +} diff --git a/vendor/github.com/matt-deboer/go-marathon/error.go b/vendor/github.com/matt-deboer/go-marathon/error.go new file mode 100644 index 0000000..21e7311 --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/error.go @@ -0,0 +1,225 @@ +/* +Copyright 2015 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" +) + +const ( + // ErrCodeBadRequest specifies a 400 Bad Request error. + ErrCodeBadRequest = iota + // ErrCodeUnauthorized specifies a 401 Unauthorized error. + ErrCodeUnauthorized + // ErrCodeForbidden specifies a 403 Forbidden error. + ErrCodeForbidden + // ErrCodeNotFound specifies a 404 Not Found error. + ErrCodeNotFound + // ErrCodeDuplicateID specifies a PUT 409 Conflict error. + ErrCodeDuplicateID + // ErrCodeAppLocked specifies a POST 409 Conflict error. + ErrCodeAppLocked + // ErrCodeInvalidBean specifies a 422 UnprocessableEntity error. + ErrCodeInvalidBean + // ErrCodeServer specifies a 500+ Server error. + ErrCodeServer + // ErrCodeUnknown specifies an unknown error. + ErrCodeUnknown +) + +// InvalidEndpointError indicates a endpoint error in the marathon urls +type InvalidEndpointError struct { + message string +} + +// Error returns the string message +func (e *InvalidEndpointError) Error() string { + return e.message +} + +// newInvalidEndpointError creates a new error +func newInvalidEndpointError(message string, args ...interface{}) error { + return &InvalidEndpointError{message: fmt.Sprintf(message, args)} +} + +// APIError represents a generic API error. +type APIError struct { + // ErrCode specifies the nature of the error. + ErrCode int + message string +} + +func (e *APIError) Error() string { + return fmt.Sprintf("Marathon API error: %s", e.message) +} + +// NewAPIError creates a new APIError instance from the given response code and content. +func NewAPIError(code int, content []byte) error { + var errDef errorDefinition + switch { + case code == http.StatusBadRequest: + errDef = &badRequestDef{} + case code == http.StatusUnauthorized: + errDef = &simpleErrDef{code: ErrCodeUnauthorized} + case code == http.StatusForbidden: + errDef = &simpleErrDef{code: ErrCodeForbidden} + case code == http.StatusNotFound: + errDef = &simpleErrDef{code: ErrCodeNotFound} + case code == http.StatusConflict: + errDef = &conflictDef{} + case code == 422: + errDef = &unprocessableEntityDef{} + case code >= http.StatusInternalServerError: + errDef = &simpleErrDef{code: ErrCodeServer} + default: + errDef = &simpleErrDef{code: ErrCodeUnknown} + } + + return parseContent(errDef, content) +} + +type errorDefinition interface { + message() string + errCode() int +} + +func parseContent(errDef errorDefinition, content []byte) error { + // If the content cannot be JSON-unmarshalled, we assume that it's not JSON + // and encode it into the APIError instance as-is. + errMessage := string(content) + if err := json.Unmarshal(content, errDef); err == nil { + errMessage = errDef.message() + } + + return &APIError{message: errMessage, ErrCode: errDef.errCode()} +} + +type simpleErrDef struct { + Message string `json:"message"` + code int +} + +func (def *simpleErrDef) message() string { + return def.Message +} + +func (def *simpleErrDef) errCode() int { + return def.code +} + +type detailDescription struct { + Path string `json:"path"` + Errors []string `json:"errors"` +} + +func (d detailDescription) String() string { + return fmt.Sprintf("path: '%s' errors: %s", d.Path, strings.Join(d.Errors, ", ")) +} + +type badRequestDef struct { + Message string `json:"message"` + Details []detailDescription `json:"details"` +} + +func (def *badRequestDef) message() string { + var details []string + for _, detail := range def.Details { + details = append(details, detail.String()) + } + + return fmt.Sprintf("%s (%s)", def.Message, strings.Join(details, "; ")) +} + +func (def *badRequestDef) errCode() int { + return ErrCodeBadRequest +} + +type conflictDef struct { + Message string `json:"message"` + Deployments []struct { + ID string `json:"id"` + } `json:"deployments"` +} + +func (def *conflictDef) message() string { + if len(def.Deployments) == 0 { + // 409 Conflict response to "POST /v2/apps". + return def.Message + } + + // 409 Conflict response to "PUT /v2/apps/{appId}". + var ids []string + for _, deployment := range def.Deployments { + ids = append(ids, deployment.ID) + } + return fmt.Sprintf("%s (locking deployment IDs: %s)", def.Message, strings.Join(ids, ", ")) +} + +func (def *conflictDef) errCode() int { + if len(def.Deployments) == 0 { + return ErrCodeDuplicateID + } + + return ErrCodeAppLocked +} + +type unprocessableEntityDetails []struct { + // Used in Marathon >= 1.0.0-RC1. + detailDescription + // Used in Marathon < 1.0.0-RC1. + Attribute string `json:"attribute"` + Error string `json:"error"` +} + +type unprocessableEntityDef struct { + Message string `json:"message"` + // Name used in Marathon >= 0.15.0. + Details unprocessableEntityDetails `json:"details"` + // Name used in Marathon < 0.15.0. + Errors unprocessableEntityDetails `json:"errors"` +} + +func (def *unprocessableEntityDef) message() string { + joinDetails := func(details unprocessableEntityDetails) []string { + var res []string + for _, detail := range details { + res = append(res, fmt.Sprintf("attribute '%s': %s", detail.Attribute, detail.Error)) + } + return res + } + + var details []string + switch { + case len(def.Errors) > 0: + details = joinDetails(def.Errors) + case len(def.Details) > 0 && len(def.Details[0].Attribute) > 0: + details = joinDetails(def.Details) + default: + for _, detail := range def.Details { + details = append(details, detail.detailDescription.String()) + } + } + + return fmt.Sprintf("%s (%s)", def.Message, strings.Join(details, "; ")) +} + +func (def *unprocessableEntityDef) errCode() int { + return ErrCodeInvalidBean +} diff --git a/vendor/github.com/matt-deboer/go-marathon/events.go b/vendor/github.com/matt-deboer/go-marathon/events.go new file mode 100644 index 0000000..5a68985 --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/events.go @@ -0,0 +1,365 @@ +/* +Copyright 2014 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +import "fmt" + +// EventType is a wrapper for a marathon event +type EventType struct { + EventType string `json:"eventType"` +} + +const ( + // EventIDAPIRequest is the event listener ID for the corresponding event. + EventIDAPIRequest = 1 << iota + // EventIDStatusUpdate is the event listener ID for the corresponding event. + EventIDStatusUpdate + // EventIDFrameworkMessage is the event listener ID for the corresponding event. + EventIDFrameworkMessage + // EventIDSubscription is the event listener ID for the corresponding event. + EventIDSubscription + // EventIDUnsubscribed is the event listener ID for the corresponding event. + EventIDUnsubscribed + // EventIDStreamAttached is the event listener ID for the corresponding event. + EventIDStreamAttached + // EventIDStreamDetached is the event listener ID for the corresponding event. + EventIDStreamDetached + // EventIDAddHealthCheck is the event listener ID for the corresponding event. + EventIDAddHealthCheck + // EventIDRemoveHealthCheck is the event listener ID for the corresponding event. + EventIDRemoveHealthCheck + // EventIDFailedHealthCheck is the event listener ID for the corresponding event. + EventIDFailedHealthCheck + // EventIDChangedHealthCheck is the event listener ID for the corresponding event. + EventIDChangedHealthCheck + // EventIDGroupChangeSuccess is the event listener ID for the corresponding event. + EventIDGroupChangeSuccess + // EventIDGroupChangeFailed is the event listener ID for the corresponding event. + EventIDGroupChangeFailed + // EventIDDeploymentSuccess is the event listener ID for the corresponding event. + EventIDDeploymentSuccess + // EventIDDeploymentFailed is the event listener ID for the corresponding event. + EventIDDeploymentFailed + // EventIDDeploymentInfo is the event listener ID for the corresponding event. + EventIDDeploymentInfo + // EventIDDeploymentStepSuccess is the event listener ID for the corresponding event. + EventIDDeploymentStepSuccess + // EventIDDeploymentStepFailed is the event listener ID for the corresponding event. + EventIDDeploymentStepFailed + // EventIDAppTerminated is the event listener ID for the corresponding event. + EventIDAppTerminated + //EventIDApplications comprises all listener IDs for application events. + EventIDApplications = EventIDStatusUpdate | EventIDChangedHealthCheck | EventIDFailedHealthCheck | EventIDAppTerminated + //EventIDSubscriptions comprises all listener IDs for subscription events. + EventIDSubscriptions = EventIDSubscription | EventIDUnsubscribed | EventIDStreamAttached | EventIDStreamDetached +) + +var ( + eventTypesMap map[string]int +) + +func init() { + eventTypesMap = map[string]int{ + "api_post_event": EventIDAPIRequest, + "status_update_event": EventIDStatusUpdate, + "framework_message_event": EventIDFrameworkMessage, + "subscribe_event": EventIDSubscription, + "unsubscribe_event": EventIDUnsubscribed, + "event_stream_attached": EventIDStreamAttached, + "event_stream_detached": EventIDStreamDetached, + "add_health_check_event": EventIDAddHealthCheck, + "remove_health_check_event": EventIDRemoveHealthCheck, + "failed_health_check_event": EventIDFailedHealthCheck, + "health_status_changed_event": EventIDChangedHealthCheck, + "group_change_success": EventIDGroupChangeSuccess, + "group_change_failed": EventIDGroupChangeFailed, + "deployment_success": EventIDDeploymentSuccess, + "deployment_failed": EventIDDeploymentFailed, + "deployment_info": EventIDDeploymentInfo, + "deployment_step_success": EventIDDeploymentStepSuccess, + "deployment_step_failure": EventIDDeploymentStepFailed, + "app_terminated_event": EventIDAppTerminated, + } +} + +// +// Events taken from: https://mesosphere.github.io/marathon/docs/event-bus.html +// + +// Event is the definition for a event in marathon +type Event struct { + ID int + Name string + Event interface{} +} + +func (event *Event) String() string { + return fmt.Sprintf("type: %s, event: %s", event.Name, event.Event) +} + +// EventsChannel is a channel to receive events upon +type EventsChannel chan *Event + +/* --- API Request --- */ + +// EventAPIRequest describes an 'api_post_event' event. +type EventAPIRequest struct { + EventType string `json:"eventType"` + ClientIP string `json:"clientIp"` + Timestamp string `json:"timestamp"` + URI string `json:"uri"` + AppDefinition *Application `json:"appDefinition"` +} + +/* --- Status Update --- */ + +// EventStatusUpdate describes a 'status_update_event' event. +type EventStatusUpdate struct { + EventType string `json:"eventType"` + Timestamp string `json:"timestamp,omitempty"` + SlaveID string `json:"slaveId,omitempty"` + TaskID string `json:"taskId"` + TaskStatus string `json:"taskStatus"` + Message string `json:"message,omitempty"` + AppID string `json:"appId"` + Host string `json:"host"` + Ports []int `json:"ports,omitempty"` + IPAddresses []*IPAddress `json:"ipAddresses"` + Version string `json:"version,omitempty"` +} + +// EventAppTerminated describes an 'app_terminated_event' event. +type EventAppTerminated struct { + EventType string `json:"eventType"` + Timestamp string `json:"timestamp,omitempty"` + AppID string `json:"appId"` +} + +/* --- Framework Message --- */ + +// EventFrameworkMessage describes a 'framework_message_event' event. +type EventFrameworkMessage struct { + EventType string `json:"eventType"` + ExecutorID string `json:"executorId"` + Message string `json:"message"` + SlaveID string `json:"slaveId"` + Timestamp string `json:"timestamp"` +} + +/* --- Event Subscription --- */ + +// EventSubscription describes a 'subscribe_event' event. +type EventSubscription struct { + CallbackURL string `json:"callbackUrl"` + ClientIP string `json:"clientIp"` + EventType string `json:"eventType"` + Timestamp string `json:"timestamp"` +} + +// EventUnsubscription describes an 'unsubscribe_event' event. +type EventUnsubscription struct { + CallbackURL string `json:"callbackUrl"` + ClientIP string `json:"clientIp"` + EventType string `json:"eventType"` + Timestamp string `json:"timestamp"` +} + +// EventStreamAttached describes an 'event_stream_attached' event. +type EventStreamAttached struct { + RemoteAddress string `json:"remoteAddress"` + EventType string `json:"eventType"` + Timestamp string `json:"timestamp"` +} + +// EventStreamDetached describes an 'event_stream_detached' event. +type EventStreamDetached struct { + RemoteAddress string `json:"remoteAddress"` + EventType string `json:"eventType"` + Timestamp string `json:"timestamp"` +} + +/* --- Health Checks --- */ + +// EventAddHealthCheck describes an 'add_health_check_event' event. +type EventAddHealthCheck struct { + AppID string `json:"appId"` + EventType string `json:"eventType"` + HealthCheck struct { + GracePeriodSeconds float64 `json:"gracePeriodSeconds"` + IntervalSeconds float64 `json:"intervalSeconds"` + MaxConsecutiveFailures float64 `json:"maxConsecutiveFailures"` + Path string `json:"path"` + PortIndex float64 `json:"portIndex"` + Protocol string `json:"protocol"` + TimeoutSeconds float64 `json:"timeoutSeconds"` + } `json:"healthCheck"` + Timestamp string `json:"timestamp"` +} + +// EventRemoveHealthCheck describes a 'remove_health_check_event' event. +type EventRemoveHealthCheck struct { + AppID string `json:"appId"` + EventType string `json:"eventType"` + HealthCheck struct { + GracePeriodSeconds float64 `json:"gracePeriodSeconds"` + IntervalSeconds float64 `json:"intervalSeconds"` + MaxConsecutiveFailures float64 `json:"maxConsecutiveFailures"` + Path string `json:"path"` + PortIndex float64 `json:"portIndex"` + Protocol string `json:"protocol"` + TimeoutSeconds float64 `json:"timeoutSeconds"` + } `json:"healthCheck"` + Timestamp string `json:"timestamp"` +} + +// EventFailedHealthCheck describes a 'failed_health_check_event' event. +type EventFailedHealthCheck struct { + AppID string `json:"appId"` + EventType string `json:"eventType"` + HealthCheck struct { + GracePeriodSeconds float64 `json:"gracePeriodSeconds"` + IntervalSeconds float64 `json:"intervalSeconds"` + MaxConsecutiveFailures float64 `json:"maxConsecutiveFailures"` + Path string `json:"path"` + PortIndex float64 `json:"portIndex"` + Protocol string `json:"protocol"` + TimeoutSeconds float64 `json:"timeoutSeconds"` + } `json:"healthCheck"` + Timestamp string `json:"timestamp"` +} + +// EventHealthCheckChanged describes a 'health_status_changed_event' event. +type EventHealthCheckChanged struct { + EventType string `json:"eventType"` + Timestamp string `json:"timestamp,omitempty"` + AppID string `json:"appId"` + TaskID string `json:"taskId"` + Version string `json:"version,omitempty"` + Alive bool `json:"alive"` +} + +/* --- Deployments --- */ + +// EventGroupChangeSuccess describes a 'group_change_success' event. +type EventGroupChangeSuccess struct { + EventType string `json:"eventType"` + GroupID string `json:"groupId"` + Timestamp string `json:"timestamp"` + Version string `json:"version"` +} + +// EventGroupChangeFailed describes a 'group_change_failed' event. +type EventGroupChangeFailed struct { + EventType string `json:"eventType"` + GroupID string `json:"groupId"` + Timestamp string `json:"timestamp"` + Version string `json:"version"` + Reason string `json:"reason"` +} + +// EventDeploymentSuccess describes a 'deployment_success' event. +type EventDeploymentSuccess struct { + ID string `json:"id"` + EventType string `json:"eventType"` + Timestamp string `json:"timestamp"` +} + +// EventDeploymentFailed describes a 'deployment_failed' event. +type EventDeploymentFailed struct { + ID string `json:"id"` + EventType string `json:"eventType"` + Timestamp string `json:"timestamp"` +} + +// EventDeploymentInfo describes a 'deployment_info' event. +type EventDeploymentInfo struct { + EventType string `json:"eventType"` + CurrentStep *StepActions `json:"currentStep"` + Timestamp string `json:"timestamp"` + Plan *DeploymentPlan `json:"plan"` +} + +// EventDeploymentStepSuccess describes a 'deployment_step_success' event. +type EventDeploymentStepSuccess struct { + EventType string `json:"eventType"` + CurrentStep *StepActions `json:"currentStep"` + Timestamp string `json:"timestamp"` + Plan *DeploymentPlan `json:"plan"` +} + +// EventDeploymentStepFailure describes a 'deployment_step_failure' event. +type EventDeploymentStepFailure struct { + EventType string `json:"eventType"` + CurrentStep *StepActions `json:"currentStep"` + Timestamp string `json:"timestamp"` + Plan *DeploymentPlan `json:"plan"` +} + +// GetEvent returns allocated empty event object which corresponds to provided event type +// eventType: the type of Marathon event +func GetEvent(eventType string) (*Event, error) { + // step: check it's supported + id, found := eventTypesMap[eventType] + if found { + event := new(Event) + event.ID = id + event.Name = eventType + switch eventType { + case "api_post_event": + event.Event = new(EventAPIRequest) + case "status_update_event": + event.Event = new(EventStatusUpdate) + case "framework_message_event": + event.Event = new(EventFrameworkMessage) + case "subscribe_event": + event.Event = new(EventSubscription) + case "unsubscribe_event": + event.Event = new(EventUnsubscription) + case "event_stream_attached": + event.Event = new(EventStreamAttached) + case "event_stream_detached": + event.Event = new(EventStreamDetached) + case "add_health_check_event": + event.Event = new(EventAddHealthCheck) + case "remove_health_check_event": + event.Event = new(EventRemoveHealthCheck) + case "failed_health_check_event": + event.Event = new(EventFailedHealthCheck) + case "health_status_changed_event": + event.Event = new(EventHealthCheckChanged) + case "group_change_success": + event.Event = new(EventGroupChangeSuccess) + case "group_change_failed": + event.Event = new(EventGroupChangeFailed) + case "deployment_success": + event.Event = new(EventDeploymentSuccess) + case "deployment_failed": + event.Event = new(EventDeploymentFailed) + case "deployment_info": + event.Event = new(EventDeploymentInfo) + case "deployment_step_success": + event.Event = new(EventDeploymentStepSuccess) + case "deployment_step_failure": + event.Event = new(EventDeploymentStepFailure) + case "app_terminated_event": + event.Event = new(EventAppTerminated) + } + return event, nil + } + + return nil, fmt.Errorf("the event type: %s was not found or supported", eventType) +} diff --git a/vendor/github.com/matt-deboer/go-marathon/group.go b/vendor/github.com/matt-deboer/go-marathon/group.go new file mode 100644 index 0000000..2cdf2ac --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/group.go @@ -0,0 +1,237 @@ +/* +Copyright 2014 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +import ( + "fmt" + "time" +) + +// Group is a marathon application group +type Group struct { + ID string `json:"id"` + Apps []*Application `json:"apps"` + Dependencies []string `json:"dependencies"` + Groups []*Group `json:"groups"` +} + +// Groups is a collection of marathon application groups +type Groups struct { + ID string `json:"id"` + Apps []*Application `json:"apps"` + Dependencies []string `json:"dependencies"` + Groups []*Group `json:"groups"` +} + +// GetGroupOpts contains a payload for Group and Groups method +// embed: Embeds nested resources that match the supplied path. +// You can specify this parameter multiple times with different values +type GetGroupOpts struct { + Embed []string `url:"embed,omitempty"` +} + +// DeleteGroupOpts contains a payload for DeleteGroup method +// force: overrides a currently running deployment. +type DeleteGroupOpts struct { + Force bool `url:"force,omitempty"` +} + +// UpdateGroupOpts contains a payload for UpdateGroup method +// force: overrides a currently running deployment. +type UpdateGroupOpts struct { + Force bool `url:"force,omitempty"` +} + +// NewApplicationGroup create a new application group +// name: the name of the group +func NewApplicationGroup(name string) *Group { + return &Group{ + ID: name, + Apps: make([]*Application, 0), + Dependencies: make([]string, 0), + Groups: make([]*Group, 0), + } +} + +// Name sets the name of the group +// name: the name of the group +func (r *Group) Name(name string) *Group { + r.ID = validateID(name) + return r +} + +// App add a application to the group in question +// application: a pointer to the Application +func (r *Group) App(application *Application) *Group { + if r.Apps == nil { + r.Apps = make([]*Application, 0) + } + r.Apps = append(r.Apps, application) + return r +} + +// Groups retrieves a list of all the groups from marathon +func (r *marathonClient) Groups() (*Groups, error) { + groups := new(Groups) + if err := r.apiGet(marathonAPIGroups, "", groups); err != nil { + return nil, err + } + return groups, nil +} + +// Group retrieves the configuration of a specific group from marathon +// name: the identifier for the group +func (r *marathonClient) Group(name string) (*Group, error) { + group := new(Group) + if err := r.apiGet(fmt.Sprintf("%s/%s", marathonAPIGroups, trimRootPath(name)), nil, group); err != nil { + return nil, err + } + return group, nil +} + +// GroupsBy retrieves a list of all the groups from marathon by embed options +// opts: GetGroupOpts request payload +func (r *marathonClient) GroupsBy(opts *GetGroupOpts) (*Groups, error) { + u, err := addOptions(marathonAPIGroups, opts) + if err != nil { + return nil, err + } + groups := new(Groups) + if err := r.apiGet(u, "", groups); err != nil { + return nil, err + } + return groups, nil +} + +// GroupBy retrieves the configuration of a specific group from marathon +// name: the identifier for the group +// opts: GetGroupOpts request payload +func (r *marathonClient) GroupBy(name string, opts *GetGroupOpts) (*Group, error) { + u, err := addOptions(fmt.Sprintf("%s/%s", marathonAPIGroups, trimRootPath(name)), opts) + if err != nil { + return nil, err + } + group := new(Group) + if err := r.apiGet(u, nil, group); err != nil { + return nil, err + } + return group, nil +} + +// HasGroup checks if the group exists in marathon +// name: the identifier for the group +func (r *marathonClient) HasGroup(name string) (bool, error) { + uri := fmt.Sprintf("%s/%s", marathonAPIGroups, trimRootPath(name)) + err := r.apiCall("GET", uri, "", nil) + if err != nil { + if apiErr, ok := err.(*APIError); ok && apiErr.ErrCode == ErrCodeNotFound { + return false, nil + } + return false, err + } + return true, nil +} + +// CreateGroup creates a new group in marathon +// group: a pointer the Group structure defining the group +func (r *marathonClient) CreateGroup(group *Group) error { + return r.apiPost(marathonAPIGroups, group, nil) +} + +// WaitOnGroup waits for all the applications in a group to be deployed +// group: the identifier for the group +// timeout: a duration of time to wait before considering it failed (all tasks in all apps running defined as deployed) +func (r *marathonClient) WaitOnGroup(name string, timeout time.Duration) error { + err := deadline(timeout, func(stop_channel chan bool) error { + var flick atomicSwitch + go func() { + <-stop_channel + close(stop_channel) + flick.SwitchOn() + }() + for !flick.IsSwitched() { + if group, err := r.Group(name); err != nil { + continue + } else { + allRunning := true + // for each of the application, check if the tasks and running + for _, appID := range group.Apps { + // Arrrgghhh!! .. so we can't use application instances from the Application struct like with app wait on as it + // appears the instance count is not set straight away!! .. it defaults to zero and changes probably at the + // dependencies gets deployed. Which is probably how it internally handles dependencies .. + // step: grab the application + application, err := r.Application(appID.ID) + if err != nil { + allRunning = false + break + } + + if application.Tasks == nil { + allRunning = false + } else if len(application.Tasks) != *appID.Instances { + allRunning = false + } else if application.TasksRunning != *appID.Instances { + allRunning = false + } else if len(application.DeploymentIDs()) > 0 { + allRunning = false + } + } + // has anyone toggle the flag? + if allRunning { + return nil + } + } + time.Sleep(r.config.PollingWaitTime) + } + return nil + }) + + return err +} + +// DeleteGroup deletes a group from marathon +// name: the identifier for the group +// force: used to force the delete operation in case of blocked deployment +func (r *marathonClient) DeleteGroup(name string, force bool) (*DeploymentID, error) { + version := new(DeploymentID) + uri := fmt.Sprintf("%s/%s", marathonAPIGroups, trimRootPath(name)) + if force { + uri = uri + "?force=true" + } + if err := r.apiDelete(uri, nil, version); err != nil { + return nil, err + } + + return version, nil +} + +// UpdateGroup updates the parameters of a groups +// name: the identifier for the group +// group: the group structure with the new params +// force: used to force the update operation in case of blocked deployment +func (r *marathonClient) UpdateGroup(name string, group *Group, force bool) (*DeploymentID, error) { + deploymentID := new(DeploymentID) + uri := fmt.Sprintf("%s/%s", marathonAPIGroups, trimRootPath(name)) + if force { + uri = uri + "?force=true" + } + if err := r.apiPut(uri, group, deploymentID); err != nil { + return nil, err + } + + return deploymentID, nil +} diff --git a/vendor/github.com/matt-deboer/go-marathon/health.go b/vendor/github.com/matt-deboer/go-marathon/health.go new file mode 100644 index 0000000..8810213 --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/health.go @@ -0,0 +1,93 @@ +/* +Copyright 2014 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +// HealthCheck is the definition for an application health check +type HealthCheck struct { + Command *Command `json:"command,omitempty"` + PortIndex *int `json:"portIndex,omitempty"` + Port *int `json:"port,omitempty"` + Path *string `json:"path,omitempty"` + MaxConsecutiveFailures *int `json:"maxConsecutiveFailures,omitempty"` + Protocol string `json:"protocol,omitempty"` + GracePeriodSeconds int `json:"gracePeriodSeconds,omitempty"` + IntervalSeconds int `json:"intervalSeconds,omitempty"` + TimeoutSeconds int `json:"timeoutSeconds,omitempty"` +} + +// SetCommand sets the given command on the health check. +func (h HealthCheck) SetCommand(c Command) HealthCheck { + h.Command = &c + return h +} + +// SetPortIndex sets the given port index on the health check. +func (h HealthCheck) SetPortIndex(i int) HealthCheck { + h.PortIndex = &i + return h +} + +// SetPort sets the given port on the health check. +func (h HealthCheck) SetPort(i int) HealthCheck { + h.Port = &i + return h +} + +// SetPath sets the given path on the health check. +func (h HealthCheck) SetPath(p string) HealthCheck { + h.Path = &p + return h +} + +// SetMaxConsecutiveFailures sets the maximum consecutive failures on the health check. +func (h HealthCheck) SetMaxConsecutiveFailures(i int) HealthCheck { + h.MaxConsecutiveFailures = &i + return h +} + +// NewDefaultHealthCheck creates a default application health check +func NewDefaultHealthCheck() *HealthCheck { + portIndex := 0 + path := "" + maxConsecutiveFailures := 3 + + return &HealthCheck{ + Protocol: "HTTP", + Path: &path, + PortIndex: &portIndex, + MaxConsecutiveFailures: &maxConsecutiveFailures, + GracePeriodSeconds: 30, + IntervalSeconds: 10, + TimeoutSeconds: 5, + } +} + +// HealthCheckResult is the health check result +type HealthCheckResult struct { + Alive bool `json:"alive"` + ConsecutiveFailures int `json:"consecutiveFailures"` + FirstSuccess string `json:"firstSuccess"` + LastFailure string `json:"lastFailure"` + LastFailureCause string `json:"lastFailureCause"` + LastSuccess string `json:"lastSuccess"` + TaskID string `json:"taskId"` +} + +// Command is the command health check type +type Command struct { + Value string `json:"value"` +} diff --git a/vendor/github.com/matt-deboer/go-marathon/info.go b/vendor/github.com/matt-deboer/go-marathon/info.go new file mode 100644 index 0000000..4757584 --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/info.go @@ -0,0 +1,94 @@ +/* +Copyright 2014 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +// Info is the detailed stats returned from marathon info +type Info struct { + EventSubscriber struct { + HTTPEndpoints []string `json:"http_endpoints"` + Type string `json:"type"` + } `json:"event_subscriber"` + FrameworkID string `json:"frameworkId"` + HTTPConfig struct { + AssetsPath interface{} `json:"assets_path"` + HTTPPort float64 `json:"http_port"` + HTTPSPort float64 `json:"https_port"` + } `json:"http_config"` + Leader string `json:"leader"` + MarathonConfig struct { + Checkpoint bool `json:"checkpoint"` + Executor string `json:"executor"` + FailoverTimeout float64 `json:"failover_timeout"` + Ha bool `json:"ha"` + Hostname string `json:"hostname"` + LocalPortMax float64 `json:"local_port_max"` + LocalPortMin float64 `json:"local_port_min"` + Master string `json:"master"` + MesosRole string `json:"mesos_role"` + MesosUser string `json:"mesos_user"` + ReconciliationInitialDelay float64 `json:"reconciliation_initial_delay"` + ReconciliationInterval float64 `json:"reconciliation_interval"` + TaskLaunchTimeout float64 `json:"task_launch_timeout"` + } `json:"marathon_config"` + Name string `json:"name"` + Version string `json:"version"` + ZookeeperConfig struct { + Zk string `json:"zk"` + ZkFutureTimeout struct { + Duration float64 `json:"duration"` + } `json:"zk_future_timeout"` + ZkHosts string `json:"zk_hosts"` + ZkPath string `json:"zk_path"` + ZkState string `json:"zk_state"` + ZkTimeout float64 `json:"zk_timeout"` + } `json:"zookeeper_config"` +} + +// Info retrieves the info stats from marathon +func (r *marathonClient) Info() (*Info, error) { + info := new(Info) + if err := r.apiGet(marathonAPIInfo, nil, info); err != nil { + return nil, err + } + + return info, nil +} + +// Leader retrieves the current marathon leader node +func (r *marathonClient) Leader() (string, error) { + var leader struct { + Leader string `json:"leader"` + } + if err := r.apiGet(marathonAPILeader, nil, &leader); err != nil { + return "", err + } + + return leader.Leader, nil +} + +// AbdicateLeader abdicates the marathon leadership +func (r *marathonClient) AbdicateLeader() (string, error) { + var message struct { + Message string `json:"message"` + } + + if err := r.apiDelete(marathonAPILeader, nil, &message); err != nil { + return "", err + } + + return message.Message, nil +} diff --git a/vendor/github.com/matt-deboer/go-marathon/last_task_failure.go b/vendor/github.com/matt-deboer/go-marathon/last_task_failure.go new file mode 100644 index 0000000..1870f28 --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/last_task_failure.go @@ -0,0 +1,27 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +// LastTaskFailure provides details on the last error experienced by an application +type LastTaskFailure struct { + AppID string `json:"appId,omitempty"` + Host string `json:"host,omitempty"` + Message string `json:"message,omitempty"` + State string `json:"state,omitempty"` + TaskID string `json:"taskId,omitempty"` + Timestamp string `json:"timestamp,omitempty"` + Version string `json:"version,omitempty"` +} diff --git a/vendor/github.com/matt-deboer/go-marathon/port_definition.go b/vendor/github.com/matt-deboer/go-marathon/port_definition.go new file mode 100644 index 0000000..4cf3c1f --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/port_definition.go @@ -0,0 +1,54 @@ +/* +Copyright 2016 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +// PortDefinition is a definition of a port that should be considered +// part of a resource. Port definitions are necessary when you are +// using HOST networking and no port mappings are specified. +type PortDefinition struct { + Port *int `json:"port,omitempty"` + Protocol string `json:"protocol,omitempty"` + Name string `json:"name,omitempty"` + Labels *map[string]string `json:"labels,omitempty"` +} + +// SetPort sets the given port for the PortDefinition +func (p PortDefinition) SetPort(port int) PortDefinition { + p.Port = &port + return p +} + +// AddLabel adds a label to the PortDefinition +// name: the name of the label +// value: value for this label +func (p PortDefinition) AddLabel(name, value string) PortDefinition { + if p.Labels == nil { + p.EmptyLabels() + } + (*p.Labels)[name] = value + + return p +} + +// EmptyLabels explicitly empties the labels -- use this if you need to empty +// the labels of a PortDefinition that already has labels set +// (setting labels to nill will keep the current value) +func (p *PortDefinition) EmptyLabels() *PortDefinition { + p.Labels = &map[string]string{} + + return p +} diff --git a/vendor/github.com/matt-deboer/go-marathon/queue.go b/vendor/github.com/matt-deboer/go-marathon/queue.go new file mode 100644 index 0000000..dfd8c7e --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/queue.go @@ -0,0 +1,60 @@ +/* +Copyright 2016 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +import ( + "fmt" +) + +// Queue is the definition of marathon queue +type Queue struct { + Items []Item `json:"queue"` +} + +// Item is the definition of element in the queue +type Item struct { + Count int `json:"count"` + Delay Delay `json:"delay"` + Application Application `json:"app"` +} + +// Delay cotains the application postpone infomation +type Delay struct { + Overdue bool `json:"overdue"` + TimeLeftSeconds int `json:"timeLeftSeconds"` +} + +// Queue retrieves content of the marathon launch queue +func (r *marathonClient) Queue() (*Queue, error) { + var queue *Queue + err := r.apiGet(marathonAPIQueue, nil, &queue) + if err != nil { + return nil, err + } + return queue, nil +} + +// DeleteQueueDelay resets task launch delay of the specific application +// appID: the ID of the application +func (r *marathonClient) DeleteQueueDelay(appID string) error { + uri := fmt.Sprintf("%s/%s/delay", marathonAPIQueue, trimRootPath(appID)) + err := r.apiDelete(uri, nil, nil) + if err != nil { + return err + } + return nil +} diff --git a/vendor/github.com/matt-deboer/go-marathon/subscription.go b/vendor/github.com/matt-deboer/go-marathon/subscription.go new file mode 100644 index 0000000..c84b769 --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/subscription.go @@ -0,0 +1,293 @@ +/* +Copyright 2014 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "strings" + "sync" + "time" + + "github.com/donovanhide/eventsource" +) + +// Subscriptions is a collection to urls that marathon is implementing a callback on +type Subscriptions struct { + CallbackURLs []string `json:"callbackUrls"` +} + +// Subscriptions retrieves a list of registered subscriptions +func (r *marathonClient) Subscriptions() (*Subscriptions, error) { + subscriptions := new(Subscriptions) + if err := r.apiGet(marathonAPISubscription, nil, subscriptions); err != nil { + return nil, err + } + + return subscriptions, nil +} + +// AddEventsListener adds your self as a listener to events from Marathon +// channel: a EventsChannel used to receive event on +func (r *marathonClient) AddEventsListener(filter int) (EventsChannel, error) { + r.Lock() + defer r.Unlock() + + // step: someone has asked to start listening to event, we need to register for events + // if we haven't done so already + if err := r.registerSubscription(); err != nil { + return nil, err + } + + channel := make(EventsChannel) + r.listeners[channel] = EventsChannelContext{ + filter: filter, + done: make(chan struct{}, 1), + completion: &sync.WaitGroup{}, + } + return channel, nil +} + +// RemoveEventsListener removes the channel from the events listeners +// channel: the channel you are removing +func (r *marathonClient) RemoveEventsListener(channel EventsChannel) { + r.Lock() + defer r.Unlock() + + if context, found := r.listeners[channel]; found { + close(context.done) + delete(r.listeners, channel) + // step: if there is no one else listening, let's remove ourselves + // from the events callback + if r.config.EventsTransport == EventsTransportCallback && len(r.listeners) == 0 { + r.Unsubscribe(r.SubscriptionURL()) + } + + // step: wait for pending goroutines to finish and close channel + go func(completion *sync.WaitGroup) { + completion.Wait() + close(channel) + }(context.completion) + } +} + +// SubscriptionURL retrieves the subscription callback URL used when registering +func (r *marathonClient) SubscriptionURL() string { + if r.config.CallbackURL != "" { + return fmt.Sprintf("%s%s", r.config.CallbackURL, defaultEventsURL) + } + + return fmt.Sprintf("http://%s:%d%s", r.ipAddress, r.config.EventsPort, defaultEventsURL) +} + +// registerSubscription registers ourselves with Marathon to receive events from configured transport facility +func (r *marathonClient) registerSubscription() error { + switch r.config.EventsTransport { + case EventsTransportCallback: + return r.registerCallbackSubscription() + case EventsTransportSSE: + return r.registerSSESubscription() + default: + return fmt.Errorf("the events transport: %d is not supported", r.config.EventsTransport) + } +} + +func (r *marathonClient) registerCallbackSubscription() error { + if r.eventsHTTP == nil { + ipAddress, err := getInterfaceAddress(r.config.EventsInterface) + if err != nil { + return fmt.Errorf("Unable to get the ip address from the interface: %s, error: %s", + r.config.EventsInterface, err) + } + + // step: set the ip address + r.ipAddress = ipAddress + binding := fmt.Sprintf("%s:%d", ipAddress, r.config.EventsPort) + // step: register the handler + http.HandleFunc(defaultEventsURL, r.handleCallbackEvent) + // step: create the http server + r.eventsHTTP = &http.Server{ + Addr: binding, + Handler: nil, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + MaxHeaderBytes: 1 << 20, + } + + // @todo need to add a timeout value here + listener, err := net.Listen("tcp", binding) + if err != nil { + return nil + } + + go func() { + for { + r.eventsHTTP.Serve(listener) + } + }() + } + + // step: get the callback url + callback := r.SubscriptionURL() + + // step: check if the callback is registered + found, err := r.HasSubscription(callback) + if err != nil { + return err + } + if !found { + // step: we need to register ourselves + if err := r.Subscribe(callback); err != nil { + return err + } + } + + return nil +} + +func (r *marathonClient) registerSSESubscription() error { + // Prevent multiple SSE subscriptions + if r.subscribedToSSE { + return nil + } + // Get a member from the cluster + marathon, err := r.hosts.getMember() + if err != nil { + return err + } + + request, err := r.buildAPIRequest("GET", fmt.Sprintf("%s/%s", marathon, marathonAPIEventStream), nil) + if err != nil { + return err + } + + // Try to connect to stream, reusing the http client settings + stream, err := eventsource.SubscribeWith("", r.httpClient, request) + if err != nil { + return err + } + + go func() { + for { + select { + case ev := <-stream.Events: + if err := r.handleEvent(ev.Data()); err != nil { + // TODO let the user handle this error instead of logging it here + r.debugLog.Printf("registerSSESubscription(): failed to handle event: %v\n", err) + } + case err := <-stream.Errors: + // TODO let the user handle this error instead of logging it here + r.debugLog.Printf("registerSSESubscription(): failed to receive event: %v\n", err) + } + } + }() + + r.subscribedToSSE = true + return nil +} + +// Subscribe adds a URL to Marathon's callback facility +// callback : the URL you wish to subscribe +func (r *marathonClient) Subscribe(callback string) error { + uri := fmt.Sprintf("%s?callbackUrl=%s", marathonAPISubscription, callback) + return r.apiPost(uri, "", nil) + +} + +// Unsubscribe removes a URL from Marathon's callback facility +// callback : the URL you wish to unsubscribe +func (r *marathonClient) Unsubscribe(callback string) error { + // step: remove from the list of subscriptions + return r.apiDelete(fmt.Sprintf("%s?callbackUrl=%s", marathonAPISubscription, callback), nil, nil) +} + +// HasSubscription checks to see a subscription already exists with Marathon +// callback: the url of the callback +func (r *marathonClient) HasSubscription(callback string) (bool, error) { + // step: generate our events callback + subscriptions, err := r.Subscriptions() + if err != nil { + return false, err + } + + for _, subscription := range subscriptions.CallbackURLs { + if callback == subscription { + return true, nil + } + } + + return false, nil +} + +func (r *marathonClient) handleEvent(content string) error { + // step: process and decode the event + eventType := new(EventType) + err := json.NewDecoder(strings.NewReader(content)).Decode(eventType) + if err != nil { + return fmt.Errorf("failed to decode the event type, content: %s, error: %s", content, err) + } + + // step: check whether event type is handled + event, err := GetEvent(eventType.EventType) + if err != nil { + return fmt.Errorf("unable to handle event, type: %s, error: %s", eventType.EventType, err) + } + + // step: let's decode message + err = json.NewDecoder(strings.NewReader(content)).Decode(event.Event) + if err != nil { + return fmt.Errorf("failed to decode the event, id: %d, error: %s", event.ID, err) + } + + r.RLock() + defer r.RUnlock() + + // step: check if anyone is listen for this event + for channel, context := range r.listeners { + // step: check if this listener wants this event type + if event.ID&context.filter != 0 { + context.completion.Add(1) + go func(ch EventsChannel, context EventsChannelContext, e *Event) { + defer context.completion.Done() + select { + case ch <- e: + case <-context.done: + // Terminates goroutine. + } + }(channel, context, event) + } + } + + return nil +} + +func (r *marathonClient) handleCallbackEvent(writer http.ResponseWriter, request *http.Request) { + body, err := ioutil.ReadAll(request.Body) + if err != nil { + // TODO should this return a 500? + r.debugLog.Printf("handleCallbackEvent(): failed to read request body, error: %s\n", err) + return + } + + if err := r.handleEvent(string(body[:])); err != nil { + // TODO should this return a 500? + r.debugLog.Printf("handleCallbackEvent(): failed to handle event: %v\n", err) + } +} diff --git a/vendor/github.com/matt-deboer/go-marathon/task.go b/vendor/github.com/matt-deboer/go-marathon/task.go new file mode 100644 index 0000000..37f22fa --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/task.go @@ -0,0 +1,225 @@ +/* +Copyright 2014 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +import ( + "fmt" + "strings" +) + +// Tasks is a collection of marathon tasks +type Tasks struct { + Tasks []Task `json:"tasks"` +} + +// Task is the definition for a marathon task +type Task struct { + ID string `json:"id"` + AppID string `json:"appId"` + Host string `json:"host"` + HealthCheckResults []*HealthCheckResult `json:"healthCheckResults"` + Ports []int `json:"ports"` + ServicePorts []int `json:"servicePorts"` + SlaveID string `json:"slaveId"` + StagedAt string `json:"stagedAt"` + StartedAt string `json:"startedAt"` + IPAddresses []*IPAddress `json:"ipAddresses"` + Version string `json:"version"` +} + +// IPAddress represents a task's IP address and protocol. +type IPAddress struct { + IPAddress string `json:"ipAddress"` + Protocol string `json:"protocol"` +} + +// AllTasksOpts contains a payload for AllTasks method +// status: Return only those tasks whose status matches this parameter. +// If not specified, all tasks are returned. Possible values: running, staging. Default: none. +type AllTasksOpts struct { + Status string `url:"status,omitempty"` +} + +// KillApplicationTasksOpts contains a payload for KillApplicationTasks method +// host: kill only those tasks on a specific host (optional) +// scale: Scale the app down (i.e. decrement its instances setting by the number of tasks killed) after killing the specified tasks +type KillApplicationTasksOpts struct { + Host string `url:"host,omitempty"` + Scale bool `url:"scale,omitempty"` + Force bool `url:"force,omitempty"` +} + +// KillTaskOpts contains a payload for task killing methods +// scale: Scale the app down +type KillTaskOpts struct { + Scale bool `url:"scale,omitempty"` + Force bool `url:"force,omitempty"` +} + +// HasHealthCheckResults checks if the task has any health checks +func (r *Task) HasHealthCheckResults() bool { + return r.HealthCheckResults != nil && len(r.HealthCheckResults) > 0 +} + +// AllTasks lists tasks of all applications. +// opts: AllTasksOpts request payload +func (r *marathonClient) AllTasks(opts *AllTasksOpts) (*Tasks, error) { + u, err := addOptions(marathonAPITasks, opts) + if err != nil { + return nil, err + } + + tasks := new(Tasks) + if err := r.apiGet(u, nil, tasks); err != nil { + return nil, err + } + + return tasks, nil +} + +// Tasks retrieves a list of tasks for an application +// id: the id of the application +func (r *marathonClient) Tasks(id string) (*Tasks, error) { + tasks := new(Tasks) + if err := r.apiGet(fmt.Sprintf("%s/%s/tasks", marathonAPIApps, trimRootPath(id)), nil, tasks); err != nil { + return nil, err + } + + return tasks, nil +} + +// KillApplicationTasks kills all tasks relating to an application +// id: the id of the application +// opts: KillApplicationTasksOpts request payload +func (r *marathonClient) KillApplicationTasks(id string, opts *KillApplicationTasksOpts) (*Tasks, error) { + u := fmt.Sprintf("%s/%s/tasks", marathonAPIApps, trimRootPath(id)) + u, err := addOptions(u, opts) + if err != nil { + return nil, err + } + + tasks := new(Tasks) + if err := r.apiDelete(u, nil, tasks); err != nil { + return nil, err + } + + return tasks, nil +} + +// KillTask kills the task associated with a given ID +// taskID: the id for the task +// opts: KillTaskOpts request payload +func (r *marathonClient) KillTask(taskID string, opts *KillTaskOpts) (*Task, error) { + appName := taskID[0:strings.LastIndex(taskID, ".")] + appName = strings.Replace(appName, "_", "/", -1) + taskID = strings.Replace(taskID, "/", "_", -1) + + u := fmt.Sprintf("%s/%s/tasks/%s", marathonAPIApps, appName, taskID) + u, err := addOptions(u, opts) + if err != nil { + return nil, err + } + + wrappedTask := new(struct { + Task Task `json:"task"` + }) + + if err := r.apiDelete(u, nil, wrappedTask); err != nil { + return nil, err + } + + return &wrappedTask.Task, nil +} + +// KillTasks kills tasks associated with given array of ids +// tasks: the array of task ids +// opts: KillTaskOpts request payload +func (r *marathonClient) KillTasks(tasks []string, opts *KillTaskOpts) error { + u := fmt.Sprintf("%s/delete", marathonAPITasks) + u, err := addOptions(u, opts) + if err != nil { + return nil + } + + var post struct { + IDs []string `json:"ids"` + } + post.IDs = tasks + + return r.apiPost(u, &post, nil) +} + +// TaskEndpoints gets the endpoints i.e. HOST_IP:DYNAMIC_PORT for a specific application service +// I.e. a container running apache, might have ports 80/443 (translated to X dynamic ports), but i want +// port 80 only and i only want those whom have passed the health check +// +// Note: I've NO IDEA how to associate the health_check_result to the actual port, I don't think it's +// possible at the moment, however, given marathon will fail and restart an application even if one of x ports of a task is +// down, the per port check is redundant??? .. personally, I like it anyhow, but hey +// + +// name: the identifier for the application +// port: the container port you are interested in +// health: whether to check the health or not +func (r *marathonClient) TaskEndpoints(name string, port int, healthCheck bool) ([]string, error) { + // step: get the application details + application, err := r.Application(name) + if err != nil { + return nil, err + } + + // step: we need to get the port index of the service we are interested in + portIndex, err := application.Container.Docker.ServicePortIndex(port) + if err != nil { + return nil, err + } + + // step: do we have any tasks? + if application.Tasks == nil || len(application.Tasks) == 0 { + return nil, nil + } + + // step: if we are checking health the 'service' has a health check? + healthCheck = healthCheck && application.HasHealthChecks() + + // step: iterate the tasks and extract the dynamic ports + var list []string + for _, task := range application.Tasks { + if !healthCheck || task.allHealthChecksAlive() { + endpoint := fmt.Sprintf("%s:%d", task.Host, task.Ports[portIndex]) + list = append(list, endpoint) + } + } + + return list, nil +} + +func (r *Task) allHealthChecksAlive() bool { + // check: does the task have a health check result, if NOT, it's because the + // health of the task hasn't yet been performed, hence we assume it as DOWN + if !r.HasHealthCheckResults() { + return false + } + // step: check the health results then + for _, check := range r.HealthCheckResults { + if check.Alive == false { + return false + } + } + + return true +} diff --git a/vendor/github.com/matt-deboer/go-marathon/update_strategy.go b/vendor/github.com/matt-deboer/go-marathon/update_strategy.go new file mode 100644 index 0000000..bcab6dd --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/update_strategy.go @@ -0,0 +1,23 @@ +/* +Copyright 2014 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +// UpgradeStrategy is upgrade strategy applied to a application +type UpgradeStrategy struct { + MinimumHealthCapacity float64 `json:"minimumHealthCapacity"` + MaximumOverCapacity float64 `json:"maximumOverCapacity"` +} diff --git a/vendor/github.com/matt-deboer/go-marathon/utils.go b/vendor/github.com/matt-deboer/go-marathon/utils.go new file mode 100644 index 0000000..278f499 --- /dev/null +++ b/vendor/github.com/matt-deboer/go-marathon/utils.go @@ -0,0 +1,135 @@ +/* +Copyright 2014 Rohith All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package marathon + +import ( + "errors" + "fmt" + "net" + "net/url" + "reflect" + "strings" + "sync/atomic" + "time" + + "github.com/google/go-querystring/query" +) + +type atomicSwitch int64 + +func (r *atomicSwitch) IsSwitched() bool { + return atomic.LoadInt64((*int64)(r)) != 0 +} + +func (r *atomicSwitch) SwitchOn() { + atomic.StoreInt64((*int64)(r), 1) +} + +func (r *atomicSwitch) SwitchedOff() { + atomic.StoreInt64((*int64)(r), 0) +} + +func validateID(id string) string { + if !strings.HasPrefix(id, "/") { + return fmt.Sprintf("/%s", id) + } + return id +} + +func trimRootPath(id string) string { + if strings.HasPrefix(id, "/") { + return strings.TrimPrefix(id, "/") + } + return id +} + +func deadline(timeout time.Duration, work func(chan bool) error) error { + result := make(chan error) + timer := time.After(timeout) + stopChannel := make(chan bool, 1) + + // allow the method to attempt + go func() { + result <- work(stopChannel) + }() + for { + select { + case err := <-result: + return err + case <-timer: + stopChannel <- true + return ErrTimeoutError + } + } +} + +func getInterfaceAddress(name string) (string, error) { + interfaces, err := net.Interfaces() + if err != nil { + return "", err + } + for _, iface := range interfaces { + // step: get only the interface we're interested in + if iface.Name == name { + addrs, err := iface.Addrs() + if err != nil { + return "", err + } + // step: return the first address + if len(addrs) > 0 { + return parseIPAddr(addrs[0]), nil + } + } + } + + return "", errors.New("Unable to determine or find the interface") +} + +func contains(elements []string, value string) bool { + for _, element := range elements { + if element == value { + return true + } + } + return false +} + +func parseIPAddr(addr net.Addr) string { + return strings.SplitN(addr.String(), "/", 2)[0] +} + +// addOptions adds the parameters in opt as URL query parameters to s. +// opt must be a struct whose fields may contain "url" tags. +func addOptions(s string, opt interface{}) (string, error) { + v := reflect.ValueOf(opt) + if v.Kind() == reflect.Ptr && v.IsNil() { + return s, nil + } + + u, err := url.Parse(s) + if err != nil { + return s, err + } + + qs, err := query.Values(opt) + if err != nil { + return s, err + } + + u.RawQuery = qs.Encode() + return u.String(), nil +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 18da4c2..02f5d03 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -46,6 +46,12 @@ "revision": "855034b6b7a3b7144977efcaefe72d2c64b0d039", "revisionTime": "2016-08-09T16:55:30Z" }, + { + "checksumSHA1": "AEmfVw8uamcLRIEe1v74vDIIPaE=", + "path": "github.com/matt-deboer/go-marathon", + "revision": "2e43425f10404af7a7db643b9f8c10edf2b0bf8f", + "revisionTime": "2016-11-16T00:53:54Z" + }, { "checksumSHA1": "bKMZjd2wPw13VwoE7mBeSv5djFA=", "path": "github.com/matttproud/golang_protobuf_extensions/pbutil",