A simple framework for algorithmic trading strategy

Broker API and Framework

Broker API and Framework for data streaming and order management.

1. IB in sync

Wrap the IB API’s call back function and provided a async event driven framework to stream both realtime ticker data and historical data.

/github.com/erdewit/ib_insync

2. IB API

The base API provided by the Interactive Brokers. This may be useful when look into the usage of basic market data and order management.

/www.interactivebrokers.com/en/index.php?f=5041

Event-driven support for historical and realtime data

Example to get bar data in a specified time range.

import asyncio
import smtplib
from ib_insync import *
from ib_insync import IB, util


class Solution:
    ib = IB()
    ib.connect('127.0.0.1', 7497, clientId=2)
    def __init__(self, contract):
        self.contract = contract
        self.insiderBarFound = False
        self.previousBar = None

    def __del__(self):
        self.ib.disconnect()
        asyncio.get_event_loop().stop()

    def setInsiderPattern(self, insiderBarFound, previousBar):
        self.insiderBarFound = insiderBarFound
        self.previousBar = previousBar

    def send_sms(self, subject='', msg=''):
        server = smtplib.SMTP("smtp.gmail.com", 587)
        server.starttls()
        server.login('user', 'passwd')

        # Send text message through SMS gateway of destination number
        server.sendmail(subject, 'phonenumber@txt.att.net', "\n " + subject + str(msg))
        print("Congratulations! You've found: " + str(msg))

    def onBarUpdate(self, bars, hasNewBar):
        # when the new bar arrives, check if the previous three bars make an insider bar pattern
        if hasNewBar:
            print(bars[-1])
            if bars[-2].low > bars[-3].low and bars[-2].high < bars[-3].high:
                self.setInsiderPattern(True, bars[-3])
                print('Insider Bar Pattern Found:', bars[-3:-1])
                self.send_sms('Insider Bar Pattern Found: ', str(bars[-1]))
        if self.insiderBarFound:
            if bars[-1].close > self.previousBar.high:
                print('Insider Bar Break up Found at:', bars[-1])
                self.setInsiderPattern(False, None)
                self.send_sms('Insider Bar Break up Found at: ', str(bars[-1]))
            if bars[-1].close < self.previousBar.low:
                print('Insider Bar Break down Found at:', bars[-1])
                self.setInsiderPattern(False, None)
                self.send_sms('Insider Bar Break down Found at: ', str(bars[-1]))

    def init_listener(self):
        bars = self.ib.reqHistoricalData(
            self.contract,
            endDateTime='',
            durationStr='10800 S',
            barSizeSetting='1 hour',
            whatToShow='TRADES',
            useRTH=False,
            formatDate=1,
            keepUpToDate=True)
        print(bars)
        bars.updateEvent += self.onBarUpdate


if __name__ == '__main__':
    util.patchAsyncio()

    contract = Future('ES', '201903', 'GLOBEX') 
    sol = Solution(contract)
    sol.init_listener()
    IB.run()

Example to get real time tick data.

def onBarUpdate(self, bars, hasNewBar):
    pass

def init_listener(self):
    ticker = self.ib.reqMktData(contract, '', False, False, None)
    bars.pendingTickersEvent += self.onTickerUpdate

[Leetcode] 361. Bomb Enemy

class Solution {
    /*
    Brutal force is O(m*n*(m+n))
    */    public int maxKilledEnemies(char[][] grid) {
        /*
        0 E 0 0
        E 0 W E
        0 E 0 0
        */        int m = grid.length;
        int n = m > 0 ? grid[0].length : 0;

        int rowKills = 0;
        int colKills[] = new int[n];
        int result = 0;

        for (int i = 0; i < m; i++) {
            for (int j = 0; j < n; j++) {
                / for first column, or left is W, need calculate, in the example, grid[1][3]
                / whatever in grid[i][j] doesn't matter -
                / if W, rowKills=0, else if E(add 1) or 0, memoize
                if (j == 0 || grid[i][j - 1] == 'W') {
                    rowKills = 0;
                    for (int k = j; k < n && grid[i][k] != 'W'; k++) {
                        if (grid[i][k] == 'E')
                            rowKills++;
                    }
                }

                / for each row memoize/recalculate the colKills[j] (j=0 to n), 
                / so when loop through next row, for each column j, the colKills[j] can be reused
                if (i == 0 || grid[i - 1][j] == 'W') {
                    colKills[j] = 0;
                    for (int k = i; k < m && grid[k][j] != 'W'; k++) {
                        if (grid[k][j] == 'E')
                            colKills[j] += 1;
                    }
                }

                if (grid[i][j] == '0' && result < rowKills + colKills[j]) {
                    result = rowKills + colKills[j];
                }
            }
        }
        return result;
    }
}

 

[Leetcode] Sentence Similarity / II

734. Sentence Similarity

/leetcode.com/problems/sentence-similarity/description/
Given two sentences words1, words2 (each represented as an array of strings), and a list of similar word pairs pairs, determine if two sentences are similar.

For example, “great acting skills” and “fine drama talent” are similar, if the similar word pairs are pairs = [["great", "fine"], ["acting","drama"], ["skills","talent"]].

Note that the similarity relation is not transitive. For example, if “great” and “fine” are similar, and “fine” and “good” are similar, “great” and “good” are not necessarily similar.

However, similarity is symmetric. For example, “great” and “fine” being similar is the same as “fine” and “great” being similar.

Also, a word is always similar with itself. For example, the sentences words1 = ["great"], words2 = ["great"], pairs = [] are similar, even though there are no specified similar word pairs.

Finally, sentences can only be similar if they have the same number of words. So a sentence like words1 = ["great"] can never be similar to words2 = ["doubleplus","good"].

Note:

  • The length of words1 and words2 will not exceed 1000.
  • The length of pairs will not exceed 2000.
  • The length of each pairs[i] will be 2.
  • The length of each words[i] and pairs[i][j] will be in the range [1, 20].
class Solution {
    public boolean areSentencesSimilar(String[] words1, String[] words2, String[][] pairs) {
        / words1 = ["great"], words2 = ["great"], pairs = [] similar
        / words1 = ["great"],words2 = ["doubleplus","good"] not similar
        / words1 = ["great", "acting", "skills"], words2 = ["fine", "drama", "talent"], 
        / pairs = [["great", "fine"], ["acting","drama"], ["skills","talent"]]
        / pairs[i][0], pairs[i][1]  ( i= 0 to pairs.length )
        if(words1.length != words2.length)
            return false;
        
        Set<String> set = new HashSet<>();
        for(int i=0; i<pairs.length; i++) {
            set.add(pairs[i][0] + "," + pairs[i][1]);
        }
        
        for(int i=0; i<words1.length; i++) {
            String pair1 = words1[i] + "," + words2[i];
            String pair2 = words2[i] + "," + words1[i];
            
            if(!words1[i].equals(words2[i]) && 
               !set.contains(pair1) && !set.contains(pair2)){
                return false;
            }
        }

        return true;
    }
}

737. Sentence Similarity II

/leetcode.com/problems/sentence-similarity-ii/description/

Given two sentences words1, words2 (each represented as an array of strings), and a list of similar word pairs pairs, determine if two sentences are similar.

For example, words1 = ["great", "acting", "skills"] and words2 = ["fine", "drama", "talent"] are similar, if the similar word pairs are pairs = [["great", "good"], ["fine", "good"], ["acting","drama"], ["skills","talent"]].

Note that the similarity relation is transitive. For example, if “great” and “good” are similar, and “fine” and “good” are similar, then “great” and “fine” are similar.

Similarity is also symmetric. For example, “great” and “fine” being similar is the same as “fine” and “great” being similar.

Also, a word is always similar with itself. For example, the sentences words1 = ["great"], words2 = ["great"], pairs = [] are similar, even though there are no specified similar word pairs.

Finally, sentences can only be similar if they have the same number of words. So a sentence like words1 = ["great"] can never be similar to words2 = ["doubleplus","good"].

Note:

  • The length of words1 and words2 will not exceed 1000.
  • The length of pairs will not exceed 2000.
  • The length of each pairs[i] will be 2.
  • The length of each words[i] and pairs[i][j] will be in the range [1, 20].
class Solution {
    public boolean areSentencesSimilarTwo(String[] words1, String[] words2, String[][] pairs) {
        / build graph and dfs search
        if (words1.length != words2.length)
            return false;

        / key=node, vaule=set of nodes that is connected to the node 
        / for undirected graph, <node1, [node2, ...]> <node2, [node1, ...]> both exists in the map
        Map < String, Set < String >> g = new HashMap < > ();

        for (String[] pair: pairs) {
            if (!g.containsKey(pair[0])) {
                g.put(pair[0], new HashSet < > ());
            }
            if (!g.containsKey(pair[1])) {
                g.put(pair[1], new HashSet < > ());
            }
            g.get(pair[0]).add(pair[1]);
            g.get(pair[1]).add(pair[0]);
        }

        for (int i = 0; i < words1.length; i++) {
            if (words1[i].equals(words2[i]))
                continue;

            / The visited set has to be unique for each search, so it has to be a local variable inside each loop!
            if (!g.containsKey(words1[i]) || !dfs(words1[i], words2[i], new HashSet < > (), g))
                return false;
        }
        return true;
    }

    public boolean dfs(String src, String dest, Set < String > visited, Map < String, Set < String >> g) {
        if (g.get(src).contains(dest))
            return true;
        visited.add(src);
        for (String node: g.get(src)) {
            if (!visited.contains(node) && dfs(node, dest, visited, g))
                return true;
        }
        return false;
    }
}

 

A scalable cloud system metrics collector

Monitoring without agents

There are various monitoring tools like datadog, new relic and app dynamics that install agents in your hosts to collect metrics.

diagram from datadog

But some times, there are scenarios that you cannot install those agents on the hosts you need to monitor, especially when the agents are not supported by the hosts’ legacy systems OS and libraries. In my work environment, there are VMs withFreeBSD OS that running old version of python and java runtime that doesn’t support most of the agents of datadog, new relic. Also for a company that running cloud services these monitoring tools can easily cost you $100K per month if you have thousands of VMs.

This post talks about home-brewing a cloud system metrics collector that runs outside of your hosts.

Move agents out of the host

In my environment, I have software built on FreeBSD, Ubuntu, Redhat, Docker in AWS and Azure cloud platform.

 

 

 

 

 

 

 

 

 

One type of hosts are cloud version of the network traffic management hardware that provide features such as Load balancing, VPN, Web Application firewall, etc. The OS are protected and tailored, e.g. pkg installer is removed so it’s not possible to install any agent to it. (This is another reason to move agents out of hosts besides to the costs)

Other types of hosts are modern OS – Redhat, Ubuntu and Docker containers, installed with various components like databases (MySQL), cache (Redis), C++ apps etc.

The requirements of monitoring these host types are different, but basically we can categorize them into:

  • Metrics: CPU usages, Processes info, Network stats, etc, which need to be collected periodically
  • Files: core dumps generated when the system crashes, binaries created specifically by the system, etc
  • Events: written into logs when something happens (This is talked in this post “Log collecting with Syslog-ng and Splunk”)

The way to get the metrics and files is simply SSH into the host and execute shell cmds (top, netstat, scp, etc), then I can get the cmd output.

Design the Monitoring agent

At first, I’m thinking about writing all the host IP, ssh user name, password, and pem keys into a config file, then use a python script to read the info and collecting all the metrics. But in a cloud environment there are too many changing objects such as short-lived containers and autoscaling hosts, it’s really difficult to maintain the mapping between config file and the cloud resources.

So I decided to keep the monitored hosts as objects in memory. Then I found that abstract factory design pattern very useful to maintain polymorphic objects.

In the case of Metrics Collector that monitors 0 to 1000s of hosts, use a ‘factory’ to produce and maintain ‘polymorphic’ type of hosts. i.e. it will dynamically create/modify/delete host objects according to the 8086540158 in AWS and Azure. This is critical in supporting various  node types.

The purpose to tag the hosts’ type on the EC2 instances:

  • Collector to filter which instances to monitor (python boto3 is the lib I used to do so)
  • Collector to create the host object by the host’s type (objects are created for each hosts and store in a dictionary)

There are several modules, running as threads in specified interval that monitors the creation/modification/deletion of the hosts (updating the hosts dictionary) based on the tags. Each of the host object will call the methods to execute cmds periodically on the hosts (e.g. top, netstat, etc) to monitor the status of them.

To ease the maintenance of credentials, use a separate config file to store AWS programmatic access credentials (access keys/roles) to filter the EC2 instances, hosts’ access credentials ( SSH keys/Passwords).

Scalability

Using Auto Scaling Group to launch N collectors and each collector is assigned a tag: collector. ASG launch configuration will decide how many collectors should be running based on the CPU usage of the collector. Here we talk about how we split the number of hosts evenly to the collectors for monitor.

Each Collector will Assign a number to itself, this is how it works:

Inside each Collector, the collector get all collectors’ IPs including itself. Based on the comparison of IP 172.16.0.1 < 172.16.0.2 < 172.16.0.3 < .. <IPN-1, Collector assign itself number 0, 1, 2,… N-1

Based on the number, each Collector will Decide which hosts to monitor:

For Collector 0, it will monitor all hosts with ip2int(ip) mod 3 == 0.

For Collector 1, it will monitor all  hosts with ip2int(ip) mod 3 == 1.

For Collector 2, it will monitor all hosts with ip2int(ip) mod 3 == 2.

In this way, all the hosts can be monitored. The above calculations can run periodically, so that when the number of collectors change, hosts will be re-distributed evenly among the collectors.

High Availability (With K8s Leader-Elector)

To achieve high availability in Acitive-Standby mode. Kubernetes built-in leader election support can be used in this scenario. K8s allows the leader-elector container to run as a sidecar container to the main container (my application) in within a same pod, and provide an HTTP endpoint  /localhost:4040 for my application to query the master status of the sidecar container, if I know current pod is a master, i will run the application, otherwise the application will just be standby.

Refer to /kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/

205-776-9947

How does the system work?

Receiving logs – Syslog-ng listens on UDP port 514, or any other specified ports for  incoming syslog. Based on the UDP port and logging facility the logs come from, syslog-ng will store the logs in folders (specified by configurations in syslog-ng.conf) of Logging Server’s filesystem.

Forwarding logs – Splunk forwarder is configured (by inputs.conf) to monitor the log folders as mentioned above, and forward those logs to pre-configured indexes and sourcetypes in Splunk cloud.

In the server mode setup, syslog-ng acts as a central log-collecting server. It receives messages from syslog-ng clients and relays over the network, and stores them locally in files, or passes them to other applications, for example log analyzers.

@version: 3.5
@include "scl.conf"

options
{
    time-reap(30);
    mark-freq(10);
    create_dirs(yes);
    keep_hostname(no);
    use-dns(no);
};

sources_local
{
    system();
    internal();
};

# Step1: Log source: listen on UDP 514 for incoming messages
source s_network_514
{
    udp(ip("0.0.0.0") port(514))
};

# Step2: Log filter: different facilities are used by syslog clients, this is for Syslog - ng to identify log sources
filter f_log0{
    facility(local0)
};
filter f_log1{
    facility(local1)
};
filter f_log2{
    facility(local2)
};

# Step3: Log destination: store logs in local files, the date prefix is used to rotate the log files  
destination d_log0
{
    file("/var/log/syslog-data/service1/${HOST}/${HOST}.${YEAR}${MONTH}${DAY}${HOUR}.log0");
};

destination d_log1
{
    file("/var/log/syslog-data/service1/${HOST}/${HOST}.${YEAR}${MONTH}${DAY}${HOUR}.log1");
};
destination d_log2
{
    file("/var/log/syslog-data/service1/${HOST}/${HOST}.${YEAR}${MONTH}${DAY}${HOUR}.log2");
};

# Step4: pipe the logs from source(network port) to destination(local files)
log
{
    source(s_local);
    source(s_network);
    filter(f_log0);
    destination(d_log0);
};

log
{
    source(s_local);
    source(s_network);
    filter(f_log1);
    destination(d_log1);
};

log
{
    source(s_local);
    source(s_network);
    filter(f_log2);
    destination(d_log2);
};

The configuration file /etc/syslog-ng/syslog-ng.conf describes the above data flow. The logs are received from network ports from syslog clients, go through context/content based filters, and assign with local paths, then store in the destination folders/files.

Splunk Forwarder

The Splunk Universal Forwarder collects data from a data source/another forwarder/local file, and sends it to a forwarder or a Splunk deployment. In our case, with a universal forwarder, you can send data to Splunk Cloud. The universal forwarder is available as a separate installation package.

While log file/folders are generated by Syslog-ng, Splunk will monitor the log files, the monitors are configured in the Splunk app’s inputs.conf. We are using the Splunk Cloud version, so I will skip the introduction of the Splunk Server side.

Example of inputs.conf of Splunk Forwarder is as follows:

# monitor will match the logs with wildcards, the * is used to match different 
hostnames or dates on the log path.

[monitor://var/log/syslog-data/service1/*/*.log0]
# this is the index name configured in Splunk, for example staging_ns_trust
index = [environment]_[service1]

sourcetype = log0

# the 5th segment of the path is the hostname
host_segment = 5

High availability

The logging must be able to run with minimum down time. So we setup the HA pair of logging server vms. In practice, noticed that for AWS environment, UDP Load Balancing is not supported, so we can instead set up the DNS load balancing instead (just google simply dns load balancing).

Deployment

Deployment and configuration of the monitoring system is done with Ansible playbook and Rundeck. I will talk about that in later posts.

Bitnami