class Airbrake::TDigest
Ruby implementation of Ted Dunning's t-digest data structure.
This implementation is imported from github.com/castle/tdigest with custom modifications. Huge thanks to Castle for the implementation :beer:
The difference is that we pack with Big Endian (unlike Native Endian in Castle's version). Our backend does not permit little endian.
@see github.com/tdunning/t-digest @see github.com/castle/tdigest @api private @since v3.2.0
rubocop:disable Metrics/ClassLength
Constants
- SMALL_ENCODING
- VERBOSE_ENCODING
Attributes
Public Class Methods
rubocop:disable Metrics/PerceivedComplexity, Metrics/MethodLength rubocop:disable Metrics/CyclomaticComplexity, Metrics/AbcSize
# File lib/airbrake-ruby/tdigest.rb, line 255 def self.from_bytes(bytes) format, compression, size = bytes.unpack('NGN') tdigest = new(1 / compression) start_idx = 16 # after header case format when VERBOSE_ENCODING array = bytes[start_idx..-1].unpack("G#{size}N#{size}") means, counts = array.each_slice(size).to_a if array.any? when SMALL_ENCODING means = bytes[start_idx..(start_idx + 4 * size)].unpack("g#{size}") # Decode delta encoding of means x = 0 means.map! do |m| m += x x = m m end counts_bytes = bytes[(start_idx + 4 * size)..-1].unpack('C*') counts = [] # Decode variable length integer bytes size.times do v = counts_bytes.shift z = 0x7f & v shift = 7 while (v & 0x80) != 0 raise 'Shift too large in decode' if shift > 28 v = counts_bytes.shift || 0 z += (v & 0x7f) << shift shift += 7 end counts << z end # This shouldn't happen raise 'Mismatch' unless counts.size == means.size else raise 'Unknown compression format' end means.zip(counts).each { |val| tdigest.push(val[0], val[1]) } if means && counts tdigest end
rubocop:enable Metrics/PerceivedComplexity, Metrics/MethodLength rubocop:enable Metrics/CyclomaticComplexity, Metrics/AbcSize
# File lib/airbrake-ruby/tdigest.rb, line 301 def self.from_json(array) tdigest = new # Handle both string and symbol keys array.each { |a| tdigest.push(a['m'] || a[:m], a['n'] || a[:n]) } tdigest end
# File lib/airbrake-ruby/tdigest.rb, line 40 def initialize(delta = 0.01, k = 25, cx = 1.1) @delta = delta @k = k @cx = cx @centroids = RBTree.new @nreset = 0 @n = 0 reset! end
Public Instance Methods
# File lib/airbrake-ruby/tdigest.rb, line 50 def +(other) # Uses delta, k and cx from the caller t = self.class.new(@delta, @k, @cx) data = centroids.values + other.centroids.values t.push_centroid(data.delete_at(rand(data.length))) while data.any? t end
# File lib/airbrake-ruby/tdigest.rb, line 58 def as_bytes # compression as defined by Java implementation size = @centroids.size output = [VERBOSE_ENCODING, compression, size] output += @centroids.map { |_, c| c.mean } output += @centroids.map { |_, c| c.n } output.pack("NGNG#{size}N#{size}") end
rubocop:enable Metrics/AbcSize
# File lib/airbrake-ruby/tdigest.rb, line 97 def as_json(_ = nil) @centroids.map { |_, c| c.as_json } end
rubocop:disable Metrics/AbcSize
# File lib/airbrake-ruby/tdigest.rb, line 68 def as_small_bytes size = @centroids.size output = [self.class::SMALL_ENCODING, compression, size] x = 0 # delta encoding allows saving 4-bytes floats mean_arr = @centroids.map do |_, c| val = c.mean - x x = c.mean val end output += mean_arr # Variable length encoding of numbers c_arr = @centroids.each_with_object([]) do |(_, c), arr| k = 0 n = c.n while n < 0 || n > 0x7f b = 0x80 | (0x7f & n) arr << b n = n >> 7 k += 1 raise 'Unreasonable large number' if k > 6 end arr << n end output += c_arr output.pack("NGNg#{size}C#{size}") end
# File lib/airbrake-ruby/tdigest.rb, line 101 def bound_mean(x) upper = @centroids.upper_bound(x) lower = @centroids.lower_bound(x) [lower[1], upper[1]] end
# File lib/airbrake-ruby/tdigest.rb, line 107 def bound_mean_cumn(cumn) last_c = nil bounds = [] @centroids.each_value do |v| if v.mean_cumn == cumn bounds << v break elsif v.mean_cumn > cumn bounds << last_c bounds << v break else last_c = v end end # If still no results, pick lagging value if any bounds << last_c if bounds.empty? && !last_c.nil? bounds end
# File lib/airbrake-ruby/tdigest.rb, line 128 def compress! points = to_a reset! push_centroid(points.shuffle) _cumulate(true, true) nil end
# File lib/airbrake-ruby/tdigest.rb, line 136 def compression 1 / @delta end
# File lib/airbrake-ruby/tdigest.rb, line 140 def find_nearest(x) return nil if size == 0 ceil = @centroids.upper_bound(x) floor = @centroids.lower_bound(x) return floor[1] if ceil.nil? return ceil[1] if floor.nil? ceil_key = ceil[0] floor_key = floor[0] if (floor_key - x).abs < (ceil_key - x).abs floor[1] else ceil[1] end end
# File lib/airbrake-ruby/tdigest.rb, line 159 def merge!(other) push_centroid(other.centroids.values.shuffle) self end
rubocop:disable Metrics/PerceivedComplexity, Metrics/AbcSize rubocop:disable Metrics/CyclomaticComplexity
# File lib/airbrake-ruby/tdigest.rb, line 166 def p_rank(x) is_array = x.is_a? Array x = [x] unless is_array min = @centroids.first max = @centroids.last x.map! do |item| if size == 0 nil elsif item < min[1].mean 0.0 elsif item > max[1].mean 1.0 else _cumulate(true) bound = bound_mean(item) lower, upper = bound mean_cumn = lower.mean_cumn if lower != upper mean_cumn += (item - lower.mean) * (upper.mean_cumn - lower.mean_cumn) \ / (upper.mean - lower.mean) end mean_cumn / @n end end is_array ? x : x.first end
rubocop:disable Metrics/PerceivedComplexity, Metrics/CyclomaticComplexity rubocop:disable Metrics/AbcSize
# File lib/airbrake-ruby/tdigest.rb, line 199 def percentile(p) is_array = p.is_a? Array p = [p] unless is_array p.map! do |item| unless (0..1).cover?(item) raise ArgumentError, "p should be in [0,1], got #{item}" end if size == 0 nil else _cumulate(true) h = @n * item lower, upper = bound_mean_cumn(h) if lower.nil? && upper.nil? nil elsif upper == lower || lower.nil? || upper.nil? (lower || upper).mean elsif h == lower.mean_cumn lower.mean else upper.mean end end end is_array ? p : p.first end
rubocop:enable Metrics/PerceivedComplexity, Metrics/CyclomaticComplexity rubocop:enable Metrics/AbcSize
# File lib/airbrake-ruby/tdigest.rb, line 228 def push(x, n = 1) x = [x] unless x.is_a? Array x.each { |value| _digest(value, n) } end
# File lib/airbrake-ruby/tdigest.rb, line 233 def push_centroid(c) c = [c] unless c.is_a? Array c.each { |centroid| _digest(centroid.mean, centroid.n) } end
# File lib/airbrake-ruby/tdigest.rb, line 238 def reset! @centroids.clear @n = 0 @nreset += 1 @last_cumulate = 0 end
# File lib/airbrake-ruby/tdigest.rb, line 245 def size @n || 0 end
# File lib/airbrake-ruby/tdigest.rb, line 249 def to_a @centroids.map { |_, c| c } end
Private Instance Methods
# File lib/airbrake-ruby/tdigest.rb, line 310 def _add_weight(nearest, x, n) nearest.mean += n * (x - nearest.mean) / (nearest.n + n) unless x == nearest.mean _cumulate(false, true) if nearest.mean_cumn.nil? nearest.cumn += n nearest.mean_cumn += n / 2.0 nearest.n += n nil end
rubocop:disable Metrics/PerceivedComplexity, Metrics/CyclomaticComplexity
# File lib/airbrake-ruby/tdigest.rb, line 323 def _cumulate(exact = false, force = false) unless force factor = if @last_cumulate == 0 Float::INFINITY else (@n.to_f / @last_cumulate) end return if @n == @last_cumulate || (!exact && @cx && @cx > factor) end cumn = 0 @centroids.each do |_, c| c.mean_cumn = cumn + c.n / 2.0 cumn = c.cumn = cumn + c.n end @n = @last_cumulate = cumn nil end
rubocop:disable Metrics/PerceivedComplexity, Metrics/CyclomaticComplexity rubocop:disable Metrics/AbcSize
# File lib/airbrake-ruby/tdigest.rb, line 345 def _digest(x, n) # Use 'first' and 'last' instead of min/max because of performance reasons # This works because RBTree is sorted min = @centroids.first max = @centroids.last min = min.nil? ? nil : min[1] max = max.nil? ? nil : max[1] nearest = find_nearest(x) @n += n if nearest && nearest.mean == x _add_weight(nearest, x, n) elsif nearest == min _new_centroid(x, n, 0) elsif nearest == max _new_centroid(x, n, @n) else p = nearest.mean_cumn.to_f / @n max_n = (4 * @n * @delta * p * (1 - p)).floor if max_n - nearest.n >= n _add_weight(nearest, x, n) else _new_centroid(x, n, nearest.cumn) end end _cumulate(false) # If the number of centroids has grown to a very large size, # it may be due to values being inserted in sorted order. # We combat that by replaying the centroids in random order, # which is what compress! does compress! if @centroids.size > (@k / @delta) nil end
rubocop:enable Metrics/PerceivedComplexity, Metrics/CyclomaticComplexity, rubocop:enable Metrics/AbcSize
# File lib/airbrake-ruby/tdigest.rb, line 386 def _new_centroid(x, n, cumn) c = Centroid.new(x, n, cumn) @centroids[x] = c c end