In [1]:
import numpy as np
import pandas as pd
from numpy.lib.stride_tricks import as_strided

def gen_series(n):
return pd.Series(np.random.randn(n)).cumsum()

In [2]:
def dd_1(ser):
# naively loop over all possible start and end points
# O(n^2)
servals = ser.values
drawdowns = []
for end in range(len(servals)):
for start in range(end):
drawdowns.append(servals[end] - servals[start])
return min(min(drawdowns), 0)

def dd_2(ser):
# only compare each point to the previous running peak
# O(N)
running_max = pd.expanding_max(ser)
cur_dd = ser - running_max
return min(0, cur_dd.min())

In [3]:
anomalies = []
for i in range(100):
ser = gen_series(1000)
max_abs_diff = diffs.abs().max()
if max_abs_diff != 0:
anomalies.append(ser)

len(anomalies)

Out[3]:
0
In [4]:
ser = gen_series(10)
print 'n = 10'
%timeit dd_1(ser)
%timeit dd_2(ser)

ser = gen_series(100)
print 'n = 100'
%timeit dd_1(ser)
%timeit dd_2(ser)

ser = gen_series(1000)
print 'n = 1000'
%timeit dd_1(ser)
%timeit dd_2(ser)

ser = gen_series(10000)
print 'n = 10000'
%timeit dd_1(ser)
%timeit dd_2(ser)

n = 10
10000 loops, best of 3: 31.9 µs per loop
10000 loops, best of 3: 125 µs per loop
n = 100
100 loops, best of 3: 3.1 ms per loop
10000 loops, best of 3: 123 µs per loop
n = 1000
1 loops, best of 3: 290 ms per loop
10000 loops, best of 3: 154 µs per loop
n = 10000
1 loops, best of 3: 33 s per loop
1000 loops, best of 3: 441 µs per loop

In [5]:
def dd_3(ser):
if isinstance(ser, pd.Series):
ser = ser.values
running_global_peak = ser[0]
min_since_global_peak = ser[0]
running_max_dd = 0
running_global_peak_id = np.nan
running_max_dd_peak_id = np.nan
running_max_dd_trough_id = np.nan

for i, val in enumerate(ser):
if val >= running_global_peak:
running_global_peak = val
running_global_peak_id = i
min_since_global_peak = val
if val < min_since_global_peak:
min_since_global_peak = val
if val - running_global_peak <= running_max_dd:
running_max_dd = val - running_global_peak
running_max_dd_peak_id = running_global_peak_id
running_max_dd_trough_id = i
return (running_max_dd, running_max_dd_peak_id, running_max_dd_trough_id, running_global_peak_id)

In [6]:
anomalies = []
for i in range(100):
ser = gen_series(1000)
max_abs_diff = diffs.abs().max()
if max_abs_diff != 0:
anomalies.append(ser)

len(anomalies)

Out[6]:
0
In [7]:
ser = gen_series(100)
print 'n = 100'
%timeit dd_2(ser)
%timeit dd_3(ser)

ser = gen_series(1000)
print 'n = 1000'
%timeit dd_2(ser)
%timeit dd_3(ser)

ser = gen_series(10000)
print 'n = 10000'
%timeit dd_2(ser)
%timeit dd_3(ser)

n = 100
10000 loops, best of 3: 118 µs per loop
10000 loops, best of 3: 41.3 µs per loop
n = 1000
10000 loops, best of 3: 159 µs per loop
1000 loops, best of 3: 351 µs per loop
n = 10000
1000 loops, best of 3: 436 µs per loop
100 loops, best of 3: 2.41 ms per loop

In [8]:
def rolling_dd_using_dd1(ser, win):
return pd.rolling_apply(ser, win, dd_2, min_periods=0)

def rolling_dd_custom(ser, window):
index = ser.index
name = ser.name
ser = ser.values

n = len(ser)
result = np.zeros((n, 4))
running_global_peak = ser[0]
min_since_global_peak = ser[0]
running_max_dd = 0
running_global_peak_id = np.nan
running_max_dd_peak_id = np.nan
running_max_dd_trough_id = np.nan
for i, val in enumerate(ser):
if i < window:
if val >= running_global_peak:
running_global_peak = val
running_global_peak_id = i
min_since_global_peak = val
if val < min_since_global_peak:
min_since_global_peak = val
if val - running_global_peak <= running_max_dd:
running_max_dd = val - running_global_peak
running_max_dd_peak_id = running_global_peak_id
running_max_dd_trough_id = i
result[i, :] = np.array(
(running_max_dd,
running_max_dd_peak_id,
running_max_dd_trough_id,
running_global_peak_id)
)
else:
# current window max index is i
# current window min index is i-window+1
# possible difficult situations
# 1) previous running_global_peak falls out of early edge of sample
prob_1 = result[i-1, 3]<=i-window
# 2) previous running_max_peak_id falls out of early edge of sample
prob_2 = result[i-1, 1]<=i-window
if prob_1 or prob_2:
rolling_window = ser[i-window+1: i+1]
intermed = dd_3(rolling_window)
result[i, :] = np.array((intermed[0],
intermed[1] + i - window + 1,
intermed[2] + i - window + 1,
intermed[3] + i - window + 1))
else:
# the new windowed global peak is straightforward:
result[i, 3] = i if ser[i]>=ser[result[i-1,3]] else result[i-1,3]
# one candidate is previous row's drawdown.  other candidate is from previous row's
# running_global_peak to the newly entered value
new_candidate_drawdown = val - ser[result[i-1, 3]]
if new_candidate_drawdown <= result[i-1, 0]:
result[i, 0] = new_candidate_drawdown
result[i, 1] = result[i-1, 3]
result[i, 2] = i
else:
result[i, 0] = result[i-1, 0]
result[i, 1] = result[i-1, 1]
result[i, 2] = result[i-1, 2]

result = result[:, 0]
result = pd.Series(result, name=name, index=index)
return result

In [9]:
def windowed_view(x, window_size):
"""Creat a 2d windowed view of a 1d array.

x must be a 1d numpy array.

numpy.lib.stride_tricks.as_strided is used to create the view.
The data is not copied.

Example:

>>> x = np.array([1, 2, 3, 4, 5, 6])
>>> windowed_view(x, 3)
array([[1, 2, 3],
[2, 3, 4],
[3, 4, 5],
[4, 5, 6]])
"""
y = as_strided(x, shape=(x.size - window_size + 1, window_size),
strides=(x.strides[0], x.strides[0]))
return y

def rolling_max_dd_so(x, window_size, min_periods=1):
"""Compute the rolling maximum drawdown of x.

x must be a 1d numpy array.
min_periods should satisfy 1 <= min_periods <= window_size.

Returns an 1d array with length len(x) - min_periods + 1.
"""
if min_periods < window_size:
y = windowed_view(x, window_size)
running_max_y = np.maximum.accumulate(y, axis=1)
dd = y - running_max_y
return dd.min(axis=1)

def rolling_max_dd_so_wrapped(ser, window_size, min_periods=1):
vals = ser.values
rmdd = rolling_max_dd_so(vals, window_size, min_periods)
return pd.Series(data=rmdd, index=ser.index, name=ser.name)

In [10]:
ser = gen_series(1000)
print 'n=1000, w=10'
%timeit rolling_dd_using_dd1(ser, 10)
%timeit rolling_dd_custom(ser, 10)
%timeit rolling_max_dd_so_wrapped(ser, 10)

ser = gen_series(1000)
print 'n=1000, w=100'
%timeit rolling_dd_using_dd1(ser, 100)
%timeit rolling_dd_custom(ser, 100)
%timeit rolling_max_dd_so_wrapped(ser, 100)

ser = gen_series(10000)
print 'n=10000, w=100'
%timeit rolling_dd_using_dd1(ser, 100)
%timeit rolling_dd_custom(ser, 100)
%timeit rolling_max_dd_so_wrapped(ser, 100)

ser = gen_series(10000)
print 'n=10000, w=1000'
%timeit rolling_dd_using_dd1(ser, 1000)
%timeit rolling_dd_custom(ser, 1000)
%timeit rolling_max_dd_so_wrapped(ser, 1000)

n=1000, w=10
10 loops, best of 3: 27.4 ms per loop
100 loops, best of 3: 12.3 ms per loop
1000 loops, best of 3: 209 µs per loop
n=1000, w=100
10 loops, best of 3: 30.3 ms per loop
100 loops, best of 3: 10.2 ms per loop
1000 loops, best of 3: 1.09 ms per loop
n=10000, w=100
1 loops, best of 3: 299 ms per loop
10 loops, best of 3: 118 ms per loop
100 loops, best of 3: 11.2 ms per loop
n=10000, w=1000
1 loops, best of 3: 421 ms per loop
10 loops, best of 3: 130 ms per loop
10 loops, best of 3: 152 ms per loop

In [11]:
anomalies = []
for i in range(100):
ser = gen_series(1000)
one = rolling_dd_using_dd1(ser, 50)
two = rolling_dd_custom(ser, 50)
three = rolling_max_dd_so_wrapped(ser, 50)
df = pd.concat([one, two, three], axis=1)
check = df.sub(df.iloc[:, 0], axis=0).abs().sum(1).max()
if check > 0:
anomalies.append(ser)

len(anomalies)

Out[11]:
0