
- Re-factor - Remove code for notifications - Remove addons - Remove travis-core gem. - Ignore logs directory only - Move core tests to spec/lib
161 lines
4.1 KiB
Ruby
161 lines
4.1 KiB
Ruby
require 'dalli'
|
|
require 'connection_pool'
|
|
require 'active_support/core_ext/module/delegation'
|
|
require 'travis/api/serialize'
|
|
|
|
# TODO compare commit history to travis-states-cache, and start using it
|
|
|
|
module Travis
|
|
class StatesCache
|
|
class CacheError < StandardError; end
|
|
|
|
include Travis::Api::Serialize::Formats
|
|
|
|
attr_reader :adapter
|
|
|
|
delegate :fetch, :to => :adapter
|
|
|
|
def initialize(options = {})
|
|
@adapter = options[:adapter] || MemcachedAdapter.new
|
|
end
|
|
|
|
def write(id, branch, data)
|
|
if data.respond_to?(:id)
|
|
data = {
|
|
'id' => data.id,
|
|
'state' => data.state.to_s
|
|
}
|
|
end
|
|
|
|
adapter.write(id, branch, data)
|
|
end
|
|
|
|
def fetch_state(id, branch)
|
|
data = fetch(id, branch)
|
|
data['state'].to_sym if data && data['state']
|
|
end
|
|
|
|
class TestAdapter
|
|
attr_reader :calls
|
|
def initialize
|
|
@calls = []
|
|
end
|
|
|
|
def fetch(id, branch)
|
|
calls << [:fetch, id, branch]
|
|
end
|
|
|
|
def write(id, branch, data)
|
|
calls << [:write, id, branch, data]
|
|
end
|
|
|
|
def clear
|
|
calls.clear
|
|
end
|
|
end
|
|
|
|
class MemcachedAdapter
|
|
attr_reader :pool
|
|
attr_accessor :jitter
|
|
attr_accessor :ttl
|
|
|
|
def initialize(options = {})
|
|
@pool = ConnectionPool.new(:size => 10, :timeout => 3) do
|
|
if options[:client]
|
|
options[:client]
|
|
else
|
|
new_dalli_connection
|
|
end
|
|
end
|
|
@jitter = 0.5
|
|
@ttl = 7.days
|
|
end
|
|
|
|
def fetch(id, branch = nil)
|
|
data = get(key(id, branch))
|
|
data ? JSON.parse(data) : nil
|
|
end
|
|
|
|
def write(id, branch, data)
|
|
build_id = data['id']
|
|
data = data.to_json
|
|
|
|
Travis.logger.info("[states-cache] Writing states cache for repo_id=#{id} branch=#{branch} build_id=#{build_id}")
|
|
set(key(id), data) if update?(id, nil, build_id)
|
|
set(key(id, branch), data) if update?(id, branch, build_id)
|
|
end
|
|
|
|
def update?(id, branch, build_id)
|
|
current_data = fetch(id, branch)
|
|
return true unless current_data
|
|
|
|
current_id = current_data['id'].to_i
|
|
new_id = build_id.to_i
|
|
|
|
update = new_id >= current_id
|
|
message = "[states-cache] Checking if cache is stale for repo_id=#{id} branch=#{branch}. "
|
|
if update
|
|
message << "The cache is going to get an update, "
|
|
else
|
|
message << "The cache is fresh, "
|
|
end
|
|
message << "last cached build id=#{current_id}, we're checking build with id=#{new_id}"
|
|
Travis.logger.info(message)
|
|
|
|
return update
|
|
end
|
|
|
|
def key(id, branch = nil)
|
|
key = "state:#{id}"
|
|
if branch
|
|
key << "-#{branch}"
|
|
end
|
|
key
|
|
end
|
|
|
|
private
|
|
|
|
def new_dalli_connection
|
|
Dalli::Client.new(Travis.config.states_cache.memcached_servers, Travis.config.states_cache.memcached_options)
|
|
end
|
|
|
|
def get(key)
|
|
retry_ringerror do
|
|
pool.with { |client| client.get(key) }
|
|
end
|
|
rescue Dalli::RingError => e
|
|
Metriks.meter("memcached.connect-errors").mark
|
|
raise CacheError, "Couldn't connect to a memcached server: #{e.message}"
|
|
end
|
|
|
|
def set(key, data)
|
|
retry_ringerror do
|
|
pool.with { |client| client.set(key, data) }
|
|
Travis.logger.info("[states-cache] Setting cache for key=#{key} data=#{data}")
|
|
end
|
|
rescue Dalli::RingError => e
|
|
Metriks.meter("memcached.connect-errors").mark
|
|
Travis.logger.info("[states-cache] Writing cache key failed key=#{key} data=#{data}")
|
|
raise CacheError, "Couldn't connect to a memcached server: #{e.message}"
|
|
end
|
|
|
|
def retry_ringerror
|
|
retries = 0
|
|
begin
|
|
yield
|
|
rescue Dalli::RingError
|
|
retries += 1
|
|
if retries <= 3
|
|
# Sleep for up to 1/2 * (2^retries - 1) seconds
|
|
# For retries <= 3, this means up to 3.5 seconds
|
|
sleep(jitter * (rand(2 ** retries - 1) + 1))
|
|
retry
|
|
else
|
|
raise
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|