Rewrite s3 uploader

Update user on upload/boot
This commit is contained in:
René Preuß
2023-03-04 00:07:51 +01:00
parent df803afe2b
commit 03a92e9d85
6 changed files with 152 additions and 82 deletions

View File

@@ -1,4 +1,6 @@
import {BrowserWindow} from "electron"; import {BrowserWindow} from "electron";
import {User} from "../../shared/schema";
import axios from "axios";
export function emit(event: any, ...args: any) { export function emit(event: any, ...args: any) {
// Send a message to all windows // Send a message to all windows
@@ -6,3 +8,20 @@ export function emit(event: any, ...args: any) {
win.webContents.send(event, ...args) win.webContents.send(event, ...args)
}); });
} }
export async function resolveUser(accessToken: string, tokenType: string): Promise<User> {
const response = await axios.get('https://api.rerunmanager.com/v1/channels/me', {
headers: {
'Accept': 'application/json',
'Authorization': `${tokenType} ${accessToken}`,
}
});
return {
id: response.data.id,
name: response.data.name,
config: response.data.config,
avatar_url: response.data.avatar_url,
premium: response.data.premium,
}
}

View File

@@ -161,8 +161,8 @@ ipcMain.handle('encode', async (event: IpcMainInvokeEvent, ...args: any[]) => {
onUploadProgress: (id, progress) => event.sender.send('encode-upload-progress', id, progress), onUploadProgress: (id, progress) => event.sender.send('encode-upload-progress', id, progress),
onUploadComplete: (id, video) => event.sender.send('encode-upload-complete', id, video), onUploadComplete: (id, video) => event.sender.send('encode-upload-complete', id, video),
onError: (id, error) => event.sender.send('encode-error', id, error), onError: (id, error) => event.sender.send('encode-error', id, error),
}, settingsRepository.getSettings()); }, settingsRepository);
return await encoder.encode() return encoder.encode();
}) })
ipcMain.handle('commitSettings', async (event: IpcMainInvokeEvent, ...args: any[]) => settingsRepository.commitSettings(args[0])) ipcMain.handle('commitSettings', async (event: IpcMainInvokeEvent, ...args: any[]) => settingsRepository.commitSettings(args[0]))
settingsRepository.watch((settings: Settings) => emit('settings', settings)); settingsRepository.watch((settings: Settings) => emit('settings', settings));

View File

@@ -1,7 +1,10 @@
import {EncoderListeners, EncoderOptions, Video} from "../../shared/schema";
import {EncoderListeners, EncoderOptions, Settings, User, Video} from "../../shared/schema";
import * as fs from "fs"; import * as fs from "fs";
import axios, {AxiosInstance} from "axios"; import axios, {AxiosInstance} from "axios";
import {SettingsRepository} from "./settings-repository";
import * as path from "path";
import {path as ffmpegPath} from "@ffmpeg-installer/ffmpeg";
import {upload} from "./file-uploader";
export class Encoder { export class Encoder {
private readonly id: string; private readonly id: string;
@@ -9,7 +12,7 @@ export class Encoder {
private readonly output: string; private readonly output: string;
private readonly options: EncoderOptions; private readonly options: EncoderOptions;
private readonly listeners: EncoderListeners; private readonly listeners: EncoderListeners;
private readonly settings: Settings; private readonly settingsRepository: SettingsRepository;
private api: AxiosInstance; private api: AxiosInstance;
private s3: AxiosInstance; private s3: AxiosInstance;
@@ -18,15 +21,17 @@ export class Encoder {
input: string, input: string,
options: EncoderOptions, options: EncoderOptions,
listeners: EncoderListeners, listeners: EncoderListeners,
settings: Settings settingsRepository: SettingsRepository
) { ) {
this.id = id; this.id = id;
this.input = input; this.input = input;
this.output = this.input.replace(/\.mp4$/, '.flv') this.output = this.getOutputPath(input, 'flv');
this.options = options; this.options = options;
this.listeners = listeners; this.listeners = listeners;
this.settings = settings; this.settingsRepository = settingsRepository;
const settings = settingsRepository.getSettings();
this.api = axios.create({ this.api = axios.create({
baseURL: settings.endpoint, baseURL: settings.endpoint,
headers: { headers: {
@@ -37,45 +42,36 @@ export class Encoder {
this.s3 = axios.create({}); this.s3 = axios.create({});
} }
async encode(): Promise<void> { encode(): void {
this.listeners.onStart(this.id) this.listeners.onStart(this.id)
const ffmpegPath = require('@ffmpeg-installer/ffmpeg').path if (this.requireEncoding()) {
console.log('ffmpegPath', ffmpegPath) const ffmpegPath = require('@ffmpeg-installer/ffmpeg').path
const ffmpeg = require('fluent-ffmpeg') console.log('ffmpegPath', ffmpegPath)
ffmpeg.setFfmpegPath(ffmpegPath) const ffmpeg = require('fluent-ffmpeg')
let totalTime = 0; ffmpeg.setFfmpegPath(ffmpegPath)
ffmpeg(this.input) let totalTime = 0;
.outputOptions(this.getOutputOptions())
.output(this.output) ffmpeg(this.input)
.on('start', () => { .output(this.output)
console.log('start') .outputOptions(this.getOutputOptions())
}) .on('start', () => console.log('start'))
.on('codecData', data => { .on('codecData', data => totalTime = parseInt(data.duration.replace(/:/g, '')))
totalTime = parseInt(data.duration.replace(/:/g, '')) .on('progress', progress => {
}) const time = parseInt(progress.timemark.replace(/:/g, ''))
.on('progress', progress => { const percent = (time / totalTime) * 100
const time = parseInt(progress.timemark.replace(/:/g, '')) console.log('progress', percent)
const percent = (time / totalTime) * 100 this.listeners.onProgress(this.id, percent)
console.log('progress', percent) })
this.listeners.onProgress(this.id, percent) .on('end', async () => this.requestUpload(this.output))
}) .on('error', (error) => {
.on('end', async () => {
console.log('end')
// @ts-ignore
try {
const video = await this.requestUploadUrl(this.settings.credentials.user)
await this.upload(video)
} catch (error) {
console.log('error', error) console.log('error', error)
this.listeners.onError(this.id, error.message) this.listeners.onError(this.id, error.message)
} })
}) .run()
.on('error', (error) => { } else {
console.log('error', error) this.requestUpload(this.input)
this.listeners.onError(this.id, error.message) }
})
.run()
} }
private getOutputOptions() { private getOutputOptions() {
@@ -94,13 +90,21 @@ export class Encoder {
] ]
} }
private async requestUploadUrl(user: User): Promise<Video> { private async requestUpload(filename: string): Promise<void> {
const user = this.settingsRepository.getSettings().credentials.user;
const response = await this.api.post(`channels/${user.id}/videos`, { const response = await this.api.post(`channels/${user.id}/videos`, {
title: this.options.title, title: this.options.title,
size: fs.statSync(this.output).size, size: fs.statSync(filename).size,
}) })
return response.data as Video const video = response.data as Video
try {
await this.handleUpload(video, filename)
} catch (error) {
console.log('error', error)
this.listeners.onError(this.id, error.message)
}
} }
private async confirmUpload(video: Video): Promise<Video> { private async confirmUpload(video: Video): Promise<Video> {
@@ -117,30 +121,18 @@ export class Encoder {
await this.api.delete(`videos/${video.id}/cancel`) await this.api.delete(`videos/${video.id}/cancel`)
} }
private async upload(video: Video) { private async handleUpload(video: Video, filename: string) {
if (!video.upload_url) { if (!video.upload_url) {
return return
} }
const stream = fs.createReadStream(this.output)
stream.on('error', (error) => {
console.log('upload error', error)
this.listeners.onError(this.id, error.message)
})
const progress = (progress: any) => {
const progressCompleted = progress.loaded / fs.statSync(this.output).size * 100
this.listeners.onUploadProgress(this.id, progressCompleted)
}
try { try {
await this.s3.put(video.upload_url, stream, { await upload(video.upload_url, filename, (progress: number) => {
onUploadProgress: progress, this.listeners.onUploadProgress(this.id, progress)
headers: {
'Content-Type': 'video/x-flv',
}
}) })
this.settingsRepository.reloadUser();
this.listeners.onUploadComplete(this.id, await this.confirmUpload(video)) this.listeners.onUploadComplete(this.id, await this.confirmUpload(video))
} catch (error) { } catch (error) {
console.log('upload error', error) console.log('upload error', error)
@@ -152,4 +144,22 @@ export class Encoder {
this.listeners.onError(this.id, `Upload Error: ${error.message}`) this.listeners.onError(this.id, `Upload Error: ${error.message}`)
} }
} }
/**
* This will return the output path for the encoded file.
* It will have the same name as the input file, but with the given extension.
* Also, a suffix will be added to the file name to avoid overwriting existing files.
*/
private getOutputPath(input: string, ext: string) {
const parsed = path.parse(input)
return path.join(parsed.dir, `${parsed.name}-rerun.${ext}`)
}
/**
* You can also just upload the file as is, without encoding it.
* @private
*/
private requireEncoding() {
return path.parse(this.input).ext !== '.flv'
}
} }

View File

@@ -0,0 +1,39 @@
const fs = require('fs');
const https = require('https');
const {promisify} = require('util');
export async function upload(url, filename, onProgress): Promise<void> {
const fileStream = fs.createReadStream(filename);
const fileStats = await promisify(fs.stat)(filename);
return new Promise((resolve, reject) => {
const options = {
method: 'PUT',
headers: {
'Content-Type': 'video/x-flv',
'Content-Length': fileStats.size,
},
agent: false // Disable HTTP keep-alive
}
const req = https.request(url, options, (res) => {
if (res.statusCode >= 400) {
reject(new Error(`Failed to upload file: ${res.statusCode} ${res.statusMessage}`));
return;
}
resolve();
});
req.on('error', reject);
let uploadedBytes = 0;
fileStream.on('data', (chunk) => {
uploadedBytes += chunk.length;
onProgress(Math.round((uploadedBytes / fileStats.size) * 100));
});
fileStream.on('error', reject);
fileStream.pipe(req);
});
}

View File

@@ -6,10 +6,11 @@ import koaCors from "koa-cors";
import bodyParser from "koa-bodyparser"; import bodyParser from "koa-bodyparser";
import axios, {AxiosInstance} from "axios"; import axios, {AxiosInstance} from "axios";
import {app} from "electron" import {app} from "electron"
import {Credentials, User} from "../../shared/schema"; import {Credentials} from "../../shared/schema";
import {SettingsRepository} from "./settings-repository"; import {SettingsRepository} from "./settings-repository";
import * as fs from "fs"; import * as fs from "fs";
import {join} from "node:path"; import {join} from "node:path";
import {resolveUser} from "../main/helpers";
export class InternalServer { export class InternalServer {
private readonly app: Application; private readonly app: Application;
@@ -50,7 +51,7 @@ export class InternalServer {
expires_in: params.get('expires_in'), expires_in: params.get('expires_in'),
expires_at: this.calculateExpiresAt(params.get('expires_in')), expires_at: this.calculateExpiresAt(params.get('expires_in')),
state: params.get('state'), state: params.get('state'),
user: await this.resolveUser(params.get('access_token'), params.get('token_type')), user: await resolveUser(params.get('access_token'), params.get('token_type')),
} }
console.log('credentials', credentials); console.log('credentials', credentials);
@@ -76,23 +77,6 @@ export class InternalServer {
}); });
} }
private async resolveUser(accessToken: string, tokenType: string): Promise<User> {
const response = await this.axios.get('https://api.rerunmanager.com/v1/channels/me', {
headers: {
'Accept': 'application/json',
'Authorization': `${tokenType} ${accessToken}`,
}
});
return {
id: response.data.id,
name: response.data.name,
config: response.data.config,
avatar_url: response.data.avatar_url,
premium: response.data.premium,
}
}
private calculateExpiresAt(expiresIn: string) { private calculateExpiresAt(expiresIn: string) {
const now = new Date(); const now = new Date();
const expiresAt = new Date(now.getTime() + parseInt(expiresIn) * 1000); const expiresAt = new Date(now.getTime() + parseInt(expiresIn) * 1000);

View File

@@ -2,6 +2,7 @@ import defu from 'defu'
import {platform} from 'node:process' import {platform} from 'node:process'
import * as fs from 'fs' import * as fs from 'fs'
import {Credentials, Settings} from "../../shared/schema"; import {Credentials, Settings} from "../../shared/schema";
import {resolveUser} from "../main/helpers";
const defaults: Settings = { const defaults: Settings = {
version: '1.0.1', version: '1.0.1',
@@ -59,6 +60,8 @@ export class SettingsRepository {
// if so, set settings.credentials to null // if so, set settings.credentials to null
if (this.isExpired()) { if (this.isExpired()) {
console.log('Credentials expired!'); console.log('Credentials expired!');
} else {
this.reloadUser();
} }
await this.save() await this.save()
@@ -121,4 +124,19 @@ export class SettingsRepository {
return true; return true;
} }
reloadUser() {
try {
console.debug('Reloading user')
resolveUser(
this.settings.credentials.access_token,
this.settings.credentials.token_type
).then((user) => {
this.settings.credentials.user = user
this.save()
})
} catch (e) {
console.error('Failed to reload user', e)
}
}
} }