421 lines
12 KiB
JavaScript
421 lines
12 KiB
JavaScript
|
|
const {
|
|
difference,
|
|
keys,
|
|
filter,
|
|
values,
|
|
propEq,
|
|
compose,
|
|
indexBy,
|
|
} = require('ramda');
|
|
|
|
const Bluebird = require('bluebird');
|
|
|
|
const PAClient = require('@futpib/paclient');
|
|
|
|
const { handleActions } = require('redux-actions');
|
|
|
|
const { pulse: pulseActions } = require('../actions');
|
|
|
|
const { things } = require('../constants/pulse');
|
|
|
|
const { getPaiByTypeAndIndex } = require('../selectors');
|
|
|
|
const { primaryPulseServer } = require('../reducers/pulse');
|
|
|
|
const { parseModuleArgs, formatModuleArgs } = require('../utils/module-args');
|
|
|
|
function getFnFromType(type) {
|
|
let fn;
|
|
switch (type) {
|
|
case 'sink':
|
|
case 'card':
|
|
case 'source':
|
|
fn = type;
|
|
break;
|
|
case 'sinkInput':
|
|
case 'sourceOutput':
|
|
case 'client':
|
|
case 'module':
|
|
fn = `${type}ByIndex`;
|
|
break;
|
|
default:
|
|
throw new Error('Unexpected type: ' + type);
|
|
}
|
|
|
|
return 'get' + fn[0].toUpperCase() + fn.slice(1);
|
|
}
|
|
|
|
function setSinkChannelVolume(pa, store, index, channelIndex, volume, cb) {
|
|
const pai = getPaiByTypeAndIndex('sink', index)(store.getState());
|
|
pa.setSinkVolumes(index, pai.channelVolumes.map((v, i) => i === channelIndex ? volume : v), cb);
|
|
}
|
|
|
|
function setSourceChannelVolume(pa, store, index, channelIndex, volume, cb) {
|
|
const pai = getPaiByTypeAndIndex('source', index)(store.getState());
|
|
pa.setSourceVolumes(index, pai.channelVolumes.map((v, i) => i === channelIndex ? volume : v), cb);
|
|
}
|
|
|
|
function setSinkInputChannelVolume(pa, store, index, channelIndex, volume, cb) {
|
|
const pai = getPaiByTypeAndIndex('sinkInput', index)(store.getState());
|
|
pa.setSinkInputVolumesByIndex(index, pai.channelVolumes.map((v, i) => i === channelIndex ? volume : v), cb);
|
|
}
|
|
|
|
function setSourceOutputChannelVolume(pa, store, index, channelIndex, volume, cb) {
|
|
const pai = getPaiByTypeAndIndex('sourceOutput', index)(store.getState());
|
|
pa.setSourceOutputVolumesByIndex(index, pai.channelVolumes.map((v, i) => i === channelIndex ? volume : v), cb);
|
|
}
|
|
|
|
const createPulseClient = (store, pulseServerId = primaryPulseServer) => {
|
|
let state = store.getState();
|
|
|
|
const getPulseServerState = (s = state) => s.pulse[pulseServerId] || {};
|
|
|
|
const pa = new PAClient();
|
|
|
|
const getInfo = (type, index) => {
|
|
let method;
|
|
try {
|
|
method = getFnFromType(type);
|
|
} catch (error) {
|
|
if (error.message.startsWith('Unexpected type:')) {
|
|
console.warn(error);
|
|
return;
|
|
}
|
|
|
|
throw error;
|
|
}
|
|
|
|
pa[method](index, (err, info) => {
|
|
if (err) {
|
|
if (err.message === 'No such entity') {
|
|
console.warn(err.message, type, index);
|
|
return;
|
|
}
|
|
|
|
throw err;
|
|
}
|
|
|
|
info.type = info.type || type;
|
|
store.dispatch(pulseActions.info(info, pulseServerId));
|
|
});
|
|
};
|
|
|
|
pa
|
|
.on('ready', () => {
|
|
store.dispatch(pulseActions.ready(pulseServerId));
|
|
pa.subscribe('all');
|
|
|
|
getServerInfo();
|
|
|
|
things.forEach(({ method, type }) => {
|
|
pa[method]((err, infos) => {
|
|
handleError(err);
|
|
infos.forEach(info => {
|
|
const { index } = info;
|
|
info.type = info.type || type;
|
|
store.dispatch(pulseActions.new({ type, index }, pulseServerId));
|
|
store.dispatch(pulseActions.info(info, pulseServerId));
|
|
});
|
|
});
|
|
});
|
|
})
|
|
.on('close', () => {
|
|
store.dispatch(pulseActions.close(pulseServerId));
|
|
reconnect();
|
|
})
|
|
.on('new', (type, index) => {
|
|
if (type === 'server') {
|
|
getServerInfo();
|
|
return;
|
|
}
|
|
|
|
store.dispatch(pulseActions.new({ type, index }, pulseServerId));
|
|
getInfo(type, index);
|
|
})
|
|
.on('change', (type, index) => {
|
|
if (type === 'server') {
|
|
getServerInfo();
|
|
return;
|
|
}
|
|
|
|
store.dispatch(pulseActions.change({ type, index }, pulseServerId));
|
|
getInfo(type, index);
|
|
})
|
|
.on('remove', (type, index) => {
|
|
store.dispatch(pulseActions.remove({ type, index }, pulseServerId));
|
|
})
|
|
.on('error', error => {
|
|
handleError(error);
|
|
});
|
|
|
|
const reconnect = () => new Bluebird((resolve, reject) => {
|
|
const server = getPulseServerState();
|
|
if (server.targetState !== 'ready') {
|
|
resolve();
|
|
return;
|
|
}
|
|
|
|
pa.once('ready', resolve);
|
|
pa.once('error', reject);
|
|
|
|
if (pulseServerId === primaryPulseServer) {
|
|
pa.connect();
|
|
} else {
|
|
pa.connect({
|
|
serverString: pulseServerId,
|
|
});
|
|
}
|
|
}).catch(error => {
|
|
if (error.message === 'Unable to connect to PulseAudio server') {
|
|
return Bluebird.delay(5000).then(reconnect);
|
|
}
|
|
|
|
throw error;
|
|
});
|
|
|
|
const getServerInfo = () => {
|
|
pa.getServerInfo((err, info) => {
|
|
if (err) {
|
|
handleError(err);
|
|
} else {
|
|
store.dispatch(pulseActions.serverInfo(info, pulseServerId));
|
|
}
|
|
});
|
|
};
|
|
|
|
const handleError = error => {
|
|
if (!error) {
|
|
return;
|
|
}
|
|
|
|
console.error(error);
|
|
|
|
store.dispatch(pulseActions.error(error, pulseServerId));
|
|
};
|
|
|
|
const handlePulseActions = handleActions({
|
|
[pulseActions.moveSinkInput]: (state, { payload: { sinkInputIndex, destSinkIndex } }) => {
|
|
pa.moveSinkInput(sinkInputIndex, destSinkIndex, handleError);
|
|
return state;
|
|
},
|
|
[pulseActions.moveSourceOutput]: (state, { payload: { sourceOutputIndex, destSourceIndex } }) => {
|
|
pa.moveSourceOutput(sourceOutputIndex, destSourceIndex, handleError);
|
|
return state;
|
|
},
|
|
|
|
[pulseActions.killClientByIndex]: (state, { payload: { clientIndex } }) => {
|
|
pa.killClientByIndex(clientIndex, handleError);
|
|
return state;
|
|
},
|
|
|
|
[pulseActions.killSinkInputByIndex]: (state, { payload: { sinkInputIndex } }) => {
|
|
pa.killSinkInputByIndex(sinkInputIndex, handleError);
|
|
return state;
|
|
},
|
|
[pulseActions.killSourceOutputByIndex]: (state, { payload: { sourceOutputIndex } }) => {
|
|
pa.killSourceOutputByIndex(sourceOutputIndex, handleError);
|
|
return state;
|
|
},
|
|
|
|
[pulseActions.loadModule]: (state, { payload: { name, argument } }) => {
|
|
pa.loadModule(name, argument, handleError);
|
|
return state;
|
|
},
|
|
[pulseActions.unloadModuleByIndex]: (state, { payload: { moduleIndex } }) => {
|
|
pa.unloadModuleByIndex(moduleIndex, handleError);
|
|
return state;
|
|
},
|
|
|
|
[pulseActions.setSinkVolumes]: (state, { payload: { index, channelVolumes } }) => {
|
|
pa.setSinkVolumes(index, channelVolumes, handleError);
|
|
return state;
|
|
},
|
|
[pulseActions.setSourceVolumes]: (state, { payload: { index, channelVolumes } }) => {
|
|
pa.setSourceVolumes(index, channelVolumes, handleError);
|
|
return state;
|
|
},
|
|
[pulseActions.setSinkInputVolumes]: (state, { payload: { index, channelVolumes } }) => {
|
|
pa.setSinkInputVolumesByIndex(index, channelVolumes, handleError);
|
|
return state;
|
|
},
|
|
[pulseActions.setSourceOutputVolumes]: (state, { payload: { index, channelVolumes } }) => {
|
|
pa.setSourceOutputVolumesByIndex(index, channelVolumes, handleError);
|
|
return state;
|
|
},
|
|
|
|
[pulseActions.setSinkChannelVolume]: (state, { payload: { index, channelIndex, volume } }) => {
|
|
return setSinkChannelVolume(pa, store, index, channelIndex, volume, handleError);
|
|
},
|
|
[pulseActions.setSourceChannelVolume]: (state, { payload: { index, channelIndex, volume } }) => {
|
|
return setSourceChannelVolume(pa, store, index, channelIndex, volume, handleError);
|
|
},
|
|
[pulseActions.setSinkInputChannelVolume]: (state, { payload: { index, channelIndex, volume } }) => {
|
|
return setSinkInputChannelVolume(pa, store, index, channelIndex, volume, handleError);
|
|
},
|
|
[pulseActions.setSourceOutputChannelVolume]: (state, { payload: { index, channelIndex, volume } }) => {
|
|
return setSourceOutputChannelVolume(pa, store, index, channelIndex, volume, handleError);
|
|
},
|
|
|
|
[pulseActions.setCardProfile]: (state, { payload: { index, profileName } }) => {
|
|
pa.setCardProfile(index, profileName, handleError);
|
|
return state;
|
|
},
|
|
|
|
[pulseActions.setSinkPort]: (state, { payload: { index, portName } }) => {
|
|
pa.setSinkPort(index, portName, handleError);
|
|
return state;
|
|
},
|
|
[pulseActions.setSourcePort]: (state, { payload: { index, portName } }) => {
|
|
pa.setSourcePort(index, portName, handleError);
|
|
return state;
|
|
},
|
|
|
|
[pulseActions.setSinkMute]: (state, { payload: { index, muted } }) => {
|
|
pa.setSinkMute(index, muted, handleError);
|
|
return state;
|
|
},
|
|
[pulseActions.setSourceMute]: (state, { payload: { index, muted } }) => {
|
|
pa.setSourceMute(index, muted, handleError);
|
|
return state;
|
|
},
|
|
[pulseActions.setSinkInputMuteByIndex]: (state, { payload: { index, muted } }) => {
|
|
pa.setSinkInputMuteByIndex(index, muted, handleError);
|
|
return state;
|
|
},
|
|
[pulseActions.setSourceOutputMuteByIndex]: (state, { payload: { index, muted } }) => {
|
|
pa.setSourceOutputMuteByIndex(index, muted, handleError);
|
|
return state;
|
|
},
|
|
|
|
[pulseActions.setDefaultSinkByName]: (state, { payload: { name } }) => {
|
|
pa.setDefaultSinkByName(name, handleError);
|
|
return state;
|
|
},
|
|
[pulseActions.setDefaultSourceByName]: (state, { payload: { name } }) => {
|
|
pa.setDefaultSourceByName(name, handleError);
|
|
return state;
|
|
},
|
|
}, null);
|
|
|
|
return {
|
|
handleAction: action => handlePulseActions(null, action),
|
|
|
|
storeWillUpdate(previousState, nextState) {
|
|
state = nextState;
|
|
const previous = getPulseServerState(previousState);
|
|
const next = getPulseServerState(nextState);
|
|
|
|
if (previous === next) {
|
|
return;
|
|
}
|
|
|
|
if (previous.targetState !== next.targetState) {
|
|
if (next.targetState === 'ready') {
|
|
reconnect();
|
|
} else if (next.targetState === 'closed') {
|
|
pa.end();
|
|
}
|
|
}
|
|
},
|
|
};
|
|
};
|
|
|
|
const tunnelAttempts = {};
|
|
const tunnelAttemptTimeout = 15000;
|
|
const isNotMonitor = s => s.monitorSourceIndex < 0;
|
|
const updateTunnels = (dispatch, primaryState, remoteServerId, remoteState) => {
|
|
const sourceTunnels = compose(
|
|
indexBy(m => parseModuleArgs(m.args).source),
|
|
filter(propEq('name', 'module-tunnel-source')),
|
|
values,
|
|
)(primaryState.infos.modules);
|
|
const sinkTunnels = compose(
|
|
indexBy(m => parseModuleArgs(m.args).sink),
|
|
filter(propEq('name', 'module-tunnel-sink')),
|
|
values,
|
|
)(primaryState.infos.modules);
|
|
|
|
const remoteSources = filter(isNotMonitor, values(remoteState.infos.sources));
|
|
const remoteSinks = values(remoteState.infos.sinks);
|
|
|
|
// FIXME: BUG: sounce/sink name collisions are possible, should also check server id
|
|
|
|
remoteSinks.forEach(sink => {
|
|
if ((tunnelAttempts[sink.name] || 0) + tunnelAttemptTimeout > Date.now()) {
|
|
return;
|
|
}
|
|
|
|
if (!sinkTunnels[sink.name]) {
|
|
tunnelAttempts[sink.name] = Date.now();
|
|
dispatch(pulseActions.loadModule('module-tunnel-sink', formatModuleArgs({
|
|
server: remoteServerId,
|
|
sink: sink.name,
|
|
})));
|
|
}
|
|
});
|
|
|
|
remoteSources.forEach(source => {
|
|
if ((tunnelAttempts[source.name] || 0) + tunnelAttemptTimeout > Date.now()) {
|
|
return;
|
|
}
|
|
|
|
if (!sourceTunnels[source.name]) {
|
|
tunnelAttempts[source.name] = Date.now();
|
|
dispatch(pulseActions.loadModule('module-tunnel-source', formatModuleArgs({
|
|
server: remoteServerId,
|
|
source: source.name,
|
|
})));
|
|
}
|
|
});
|
|
};
|
|
|
|
module.exports = store => {
|
|
const clients = {
|
|
[primaryPulseServer]: createPulseClient(store, primaryPulseServer),
|
|
};
|
|
|
|
return next => action => {
|
|
const { pulseServerId = primaryPulseServer } = action.meta || {};
|
|
|
|
const previousState = store.getState();
|
|
|
|
const returnValue = next(action);
|
|
|
|
const nextState = store.getState();
|
|
|
|
const newPulseServerIds = difference(keys(nextState.pulse), keys(clients));
|
|
|
|
newPulseServerIds.forEach(pulseServerId => {
|
|
clients[pulseServerId] = createPulseClient(store, pulseServerId);
|
|
});
|
|
|
|
const client = clients[pulseServerId];
|
|
if (client) {
|
|
client.handleAction(action);
|
|
if (previousState !== nextState) {
|
|
client.storeWillUpdate(previousState, nextState);
|
|
}
|
|
}
|
|
|
|
const primaryState = nextState.pulse[primaryPulseServer];
|
|
keys(nextState.pulse).forEach(pulseServerId => {
|
|
if (pulseServerId === primaryPulseServer) {
|
|
return;
|
|
}
|
|
|
|
const remoteState = nextState.pulse[pulseServerId];
|
|
|
|
if (primaryState.state === 'ready'
|
|
&& remoteState.state === 'ready'
|
|
&& primaryState.targetState === 'ready'
|
|
&& primaryState.targetState === 'ready'
|
|
) {
|
|
updateTunnels(store.dispatch, primaryState, pulseServerId, remoteState);
|
|
}
|
|
});
|
|
|
|
return returnValue;
|
|
};
|
|
};
|