Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple processes same API #31

Open
v3nz3n opened this issue May 4, 2013 · 4 comments
Open

Multiple processes same API #31

v3nz3n opened this issue May 4, 2013 · 4 comments

Comments

@v3nz3n
Copy link

v3nz3n commented May 4, 2013

Firstly, thanks for a great tool which is a great platform for doing interesting (and profitable!) things with Python. Your code is written and laid out well - and it is great to see the active maintanance.

I would like to have multiple goxtool processes running in different terminals. For the sake of network and processor efficiency, what would be required to have multiple goxtool.py processes listen to the same API stream?

From reading the code I have formed a vague comprehension. Please confirm that I'm understanding your threading implementation correctly: the goxapi.py process starts the main thread which implements Signal locking via a nonce. All of the Signal connected slots send/receive signals seem to be regulated by the Baseclient FIFO Queue object. Additionally, goxtool communicates with the remote streaming server by emiting (via self._recv_thread?) and receiving (via self._http_thread?). I hope I have this even slightly correct so far...

Should another goxtool process start, what would be an appropriate hook for it to use to access the datastream? I am prepared to work it out and code it but a pointer will be appreciated.

Also, in the same vein, I have coded a strategy that plots price indicators (using TAlib for Python) via Matplotlib. This is working great except for frequent complaints by Tkinter (in Matplotlib) that it is not being run in the main thread - when it throws an exception this often freezes goxtool. Well, tough luck - I will either have to hardcode plotting into the API or call goxtool from a separate plotting main thread. Any sugestions would be welcome and I will be sure to share my code in a fork on Github.

@prof7bit
Copy link
Owner

prof7bit commented May 6, 2013

I have not yet experimented with inter-process-communication yet (although this idea occurred to me too already because it seems natural to implement it eventually (but its not yet on my todo since there is still some other things to implement or fix or refator that has much higher priority))

I'm now trying to explain how the different threads in goxtool and goxapi work:

(1) Main thread (goxtool.py): this thread only loops in the getch() curses main loop and waits for keyboard or terminal resize and does only UI related stuff. Gox().init() is called from here and also the Strategy.init(), keypress and unload signals for the strategy but Gox() itself will have its own thread once it is started.

(2) revc thread (WebsocketClient() or SocketIOClient() class): this thread waits (blocking) on the socket for incoming json messages and loops indefinitely until Gox() is stopped, this thread does the connect and reconnect and is firing the signal_recv signal for every message it receives from the streamng API. Consequently all signals that result from an incoming json message (trade, ticker, depth, user_order and other signals that are fired from within these signals) are all on this thread, this also means your slots may not do long running tasks and should return as soon as possible or they will block the entire streaming API.

(3) http thread (BaseClient() class): This thread executes the http trading API (order/add or order/cancel and a few other things, but this is used only if its configured to use it, otherwise trading goes over the streaming API too and this thread would do nothing at all). Since all trading commands are asynchronous on the websocket API and the http api was implemented later as a drop in replacement it behaves exactly the same, it is asynchronuos too, a call to buy() or sell() will not wait for the http result, it will only enqueue it, then return immediately and the http thread will do the request and fire a signal with the result some time later. The http thread will translate all http 200 OK results into the appropriare {op:result, ...} message and inject it into signal_recv() as if it had come directly from the streaming API and all error messages are translated into {op:remark, ...} and also injected into signal_recv(). So Gox() and its streaming API will interpret the http responses without even knowing they came from http, it always behaves as if they came from the websocket. The http thread is completely agnostic of the type of request it is procesing, it only does reformatting of messages, it does not interpret them, the entire logic of dealing with gox api is implemented only once and only for streming api.

(4) fulldepth and fullhstory (BaseClient() class): these threads are started each time after streaming api connection or reconnection is established and fetch the trade history and the full depth, fire a signal when they are done and then terminate.

Signals: Making it all thread save:
All signals are synchronous, all slots that have been called must return before the signal emitter can proceed, there is no queue! All signals go through only one global reentrant lock, application wide! This means when for example the http thread fires a signal that results in the own order list to be updated it is guaranteed that at the same time no other signal will come from any other thread until all slots that might have been called as a result of this signal have returned. This means in your slot method, although you might be called from several different threads alternately it is always guaranteed that no other signal will fire at the same time anywhere and modify variables behind your back until you return from your slot and the signal lock is released again. And because the signal lock is reentrant you may of course emit other signals from within your slot and also signals can just be connected to other signals because all this would happen on the same thread and strictly synchronous but no other threads will be able to fire a signal until this thread is entirely done with this signal, all its slots have returned and the lock is released.

For experimenting with relaying things to other processes I recommend making a strategy that hooks into the needed gox signals and relays it with some IPC mechanism (for example DBus) to another process. For experimenting with a complete slave Gox() instance (you will need a modified version of Gox() here but I already have some ideas in mind) you could relay the entire signal_recv from the master to another process and the other process would run a modified Gox() that would instead of a WebsocketClient() have for example something like a DBusClient() to receive DBus messages to fire its own signal_recv. Something along these lines. But for this to be useful some more modification need to be made, there would need to be methods to make it subscribe to more than one currency. Also additional signals signal_connect() and signal_disconnect() might be useful to find the right time to send the additional subscriptions and also a way must be found to relay trading commands from the slave back to the master in such a way that it fits nicely into the rest of the existing code.

@v3nz3n
Copy link
Author

v3nz3n commented May 7, 2013

Great, thanks for the extensive answer!

I understand the logic, app design and flow better now which will help me test some ideas for IAC.

What I have done since I wrote the message above, was to put ZeroMQ into the mix. Not for additional clients (yet) but to get some strategies to communicate and share data. With your explanation I will be able to integrate ZMQ better. Currently I have a "Data Central" strategy which collates signals and transforms history data into nympy arrays for indicator consumption and DB storage. Also, an "Indicator" strategy that requests and receives the numpy arrays via ZMQ json protocol. This strategy then uses TAlib to create and update various indicators (MACD, RSI, etc). Lastly, there is the actual "Tradebot" strategy which acts on indicator and orderbook conditions.

All of the data exchange between strategies happens via ZMQ and it works well. I guess your idea of DBus inter-strategy comms would be simpler to use and code for, but then again ZMQ makes the data exchange extendable and offers a mode whereby ZMQ simply relays (publishes) the API data stream which opens up the possibility of additional (and external) clients connecting via a single API instance (and to a single data stream).

Currently I start the ZMQ server thread from the main thread by calling the API start_thread() function from my "Data Central" strategy as follows:

self.zmqserver = goxapi.start_thread(self.zmq_server)

It works but took some hacking to work around strategy reloads (which is a great mechanism in itself!) Basically, the server thread needs to run in an infinite loop, but then needs to be interrupted, join()ed and destroyed before the strategy could successfully reload.

So much on my goxtool todo list that I am finding that often, after I had coded away for a few nights, the same functionality shows up in the repos! Probably just the same as everybody that finds goxtool useful and sees its potential, I have a vision of a terrific multi-app constellation built around the API. Having said that, I just want to point out that the curses client is really a perfect central hub and status monitor. It should remain as such and I certainly will not look to replace it with another interface. My immediate objectives are to complete indicator functionality, to get it working in a live matplotlib chart. Then to get an analysis strategy going (with BI, datamining and ML in mind!) and in and amongst all that to get ZMQ to publish the API data so additional instances of the goxtool can connect!

Briefly, the reason I want the additional client(s) is because I had altered the curses code to display and summarise orderbook and market depth data in a way that suits me. As I completed that I noticed that you had just included the same. Well same but different. I would like to see both my view and the current goxtool depth chart simultaneously in 2 different terminals. Hence the multi-client idea. I will keep working on that, intermittently.

Great platform and tool. Keep it up!

@prof7bit
Copy link
Owner

prof7bit commented May 8, 2013

Oh, dbus was just an example, i didn't use it myself yet, It just crossed my mind and then I took it as an example, of course there exist better IPC mechanisms. For inter-stragegy communication within the same goxtool process I suggest using the same signal class that is used by goxapi, this has the advantage that you also use the same RLock and won't have thread safety problems with other gox signals that would fire in the meantime and might change data behind your back. Of course you should then not do any heavy number crunching since this would block all other signals but for simple stuff I recommend it because of its simplicity, it behaves just like function calls.

if you load two strategies

--strategy=strat_a.py,strat_b.py

then it is guaranteed that stra_a loads before strat_b. If you look at the Gox() class you will find there is a dict (its actually a WeakValueDict to prevent it from holding on to references and keep it from garbage collecting after strategy unload). In strat_b you can then lookup the name of strat_a in that dict to get a reference to the instance and then just directly call its methods or connect a slot in strat_b to a signal in strat_a.

strat_a

  def__init__(self, gox):
      strategy.Strategy.__init__(self, gox)
      ...
      self.name = "history"
      self.signal_newbar = goxapi.Signal()

strat_b

  def __init__(self, gox):
      strategy.Strategy.__init__(self, gox)
      ...
      if "history" in gox.strategies:
          self.history = gox.strategies["history"]
          self.history.signal_newbar.connect(self.slot_newbar)
      else:
          self.history = None

  def slot_newbar(self, history, data):
      """will be called when history emits its newbar signal"""
      ...

Any strategy can now look for "history" in gox.strategies and just connect to its signals, you just have to make sure you pass them in the correct order on the command line.

So for example you could make a strategy that runs a web server to display buy and sell buttons and order management in a nice web UI and if that webserver strategy also detects the precence of the history strategy it could optionally also display a chart and if it also detect the presence of the talib strategy then it would also allow to plot indicators in the chart and since you can get a direct reference to the other strategy instance you can just directly iterate over its arrays or call its methods and if you do it all withn goxapi signals/slots you can access all the data in a thread safe manner because you automatically own that applicaton wide RLock within every slot.

@v3nz3n
Copy link
Author

v3nz3n commented May 10, 2013

Thanks for your time on this issue.

I implemented your suggestion (above) tonight and it works smoothly as with the other Signals().

Good idea to use the same API Signal class and maintain use of the RLock mechanism. ZMQ has the objective of being lock independent and this is exactly what we don't want with interdependent modules. It also makes strategy (module) reloads messy. I will retain ZMQ because it is useful for passing heavy number crunching tasks to an external worker thereby avoiding lock-hogging.

I will commit the client/server code over the weekend in case anyone is interested.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants