mirror of
https://github.com/laurent22/joplin.git
synced 2025-01-14 18:27:44 +02:00
Server: Added command to migrate content to different storage
This commit is contained in:
parent
725c79d1ec
commit
005f720f7b
@ -9,7 +9,7 @@ interface Argv {
|
||||
export default class DeleteOldChangesCommand extends BaseCommand {
|
||||
|
||||
public command() {
|
||||
return 'deleteOldChanges';
|
||||
return 'delete-old-changes';
|
||||
}
|
||||
|
||||
public description() {
|
||||
|
54
packages/server/src/commands/ImportContentCommand.ts
Normal file
54
packages/server/src/commands/ImportContentCommand.ts
Normal file
@ -0,0 +1,54 @@
|
||||
import { PositionalOptions, Options } from 'yargs';
|
||||
import Logger from '@joplin/lib/Logger';
|
||||
import BaseCommand, { RunContext } from './BaseCommand';
|
||||
import parseStorageConnectionString from '../models/items/storage/parseStorageConnectionString';
|
||||
|
||||
const logger = Logger.create('ImportContentCommand');
|
||||
|
||||
interface Argv {
|
||||
toStorage: string;
|
||||
batchSize?: number;
|
||||
}
|
||||
|
||||
export default class ImportContentCommand extends BaseCommand {
|
||||
|
||||
public command() {
|
||||
return 'import-content <to-storage>';
|
||||
}
|
||||
|
||||
public description() {
|
||||
return 'import content to storage';
|
||||
}
|
||||
|
||||
public positionals(): Record<string, PositionalOptions> {
|
||||
return {
|
||||
'to-storage': {
|
||||
description: 'storage connection string',
|
||||
type: 'string',
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
public options(): Record<string, Options> {
|
||||
return {
|
||||
'batch-size': {
|
||||
type: 'number',
|
||||
description: 'Item batch size',
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
public async run(argv: Argv, runContext: RunContext): Promise<void> {
|
||||
const toStorageConfig = parseStorageConnectionString(argv.toStorage);
|
||||
const batchSize = argv.batchSize || 1000;
|
||||
|
||||
logger.info('Importing to storage:', toStorageConfig);
|
||||
logger.info(`Batch size: ${batchSize}`);
|
||||
|
||||
await runContext.models.item().importContentToStorage(toStorageConfig, {
|
||||
batchSize: batchSize || 1000,
|
||||
logger: logger as Logger,
|
||||
});
|
||||
}
|
||||
|
||||
}
|
@ -3,7 +3,7 @@ import { Config, DatabaseConfig, DatabaseConfigClient, Env, MailerConfig, RouteT
|
||||
import * as pathUtils from 'path';
|
||||
import { loadStripeConfig, StripePublicConfig } from '@joplin/lib/utils/joplinCloud';
|
||||
import { EnvVariables } from './env';
|
||||
import parseStorageDriverConnectionString from './models/items/storage/parseStorageDriverConnectionString';
|
||||
import parseStorageDriverConnectionString from './models/items/storage/parseStorageConnectionString';
|
||||
|
||||
interface PackageJson {
|
||||
version: string;
|
||||
|
@ -0,0 +1,14 @@
|
||||
import { Knex } from 'knex';
|
||||
import { DbConnection } from '../db';
|
||||
|
||||
export async function up(db: DbConnection): Promise<any> {
|
||||
await db.schema.alterTable('items', (table: Knex.CreateTableBuilder) => {
|
||||
table.index('content_storage_id');
|
||||
});
|
||||
}
|
||||
|
||||
export async function down(db: DbConnection): Promise<any> {
|
||||
await db.schema.alterTable('items', (table: Knex.CreateTableBuilder) => {
|
||||
table.dropIndex('content_storage_id');
|
||||
});
|
||||
}
|
@ -1,6 +1,11 @@
|
||||
import { createUserAndSession, beforeAllDb, afterAllTests, beforeEachDb, models, createItem, createItemTree, createResource, createNote, createFolder, createItemTree3 } from '../utils/testing/testUtils';
|
||||
import { createUserAndSession, beforeAllDb, afterAllTests, beforeEachDb, models, createItem, createItemTree, createResource, createNote, createFolder, createItemTree3, db, tempDir } from '../utils/testing/testUtils';
|
||||
import { shareFolderWithUser } from '../utils/testing/shareApiUtils';
|
||||
import { resourceBlobPath } from '../utils/joplinUtils';
|
||||
import newModelFactory from './factory';
|
||||
import { StorageDriverType } from '../utils/types';
|
||||
import config from '../config';
|
||||
import { msleep } from '../utils/time';
|
||||
import loadStorageDriver from './items/storage/loadStorageDriver';
|
||||
|
||||
describe('ItemModel', function() {
|
||||
|
||||
@ -265,4 +270,71 @@ describe('ItemModel', function() {
|
||||
expect((await models().user().load(user3.id)).total_item_size).toBe(expected3);
|
||||
});
|
||||
|
||||
test('should allow importing content to item storage', async function() {
|
||||
const { user: user1 } = await createUserAndSession(1);
|
||||
|
||||
const tempDir1 = await tempDir('storage1');
|
||||
const tempDir2 = await tempDir('storage2');
|
||||
|
||||
const fromStorageConfig = {
|
||||
type: StorageDriverType.Filesystem,
|
||||
path: tempDir1,
|
||||
};
|
||||
|
||||
const models = newModelFactory(db(), {
|
||||
...config(),
|
||||
storageDriver: fromStorageConfig,
|
||||
});
|
||||
|
||||
await models.item().saveFromRawContent(user1, {
|
||||
body: Buffer.from(JSON.stringify({ 'version': 1 })),
|
||||
name: 'info.json',
|
||||
});
|
||||
|
||||
const itemBefore = (await models.item().all())[0];
|
||||
|
||||
const fromDriver = await loadStorageDriver(fromStorageConfig, db());
|
||||
const fromContent = await fromDriver.read(itemBefore.id, { models });
|
||||
|
||||
expect(fromContent.toString()).toBe('{"version":1}');
|
||||
|
||||
expect(itemBefore.content_storage_id).toBe(1);
|
||||
|
||||
await msleep(2);
|
||||
|
||||
const toStorageConfig = {
|
||||
type: StorageDriverType.Filesystem,
|
||||
path: tempDir2,
|
||||
};
|
||||
|
||||
const toModels = newModelFactory(db(), {
|
||||
...config(),
|
||||
storageDriver: toStorageConfig,
|
||||
});
|
||||
|
||||
const result = await toModels.item().saveFromRawContent(user1, {
|
||||
body: Buffer.from(JSON.stringify({ 'version': 2 })),
|
||||
name: 'info2.json',
|
||||
});
|
||||
|
||||
const itemBefore2 = result['info2.json'].item;
|
||||
|
||||
await models.item().importContentToStorage(toStorageConfig);
|
||||
|
||||
const itemAfter = (await models.item().all()).find(it => it.id === itemBefore.id);
|
||||
expect(itemAfter.content_storage_id).toBe(2);
|
||||
expect(itemAfter.updated_time).toBe(itemBefore.updated_time);
|
||||
|
||||
// Just check the second item has not been processed since it was
|
||||
// already on the right storage
|
||||
const itemAfter2 = (await models.item().all()).find(it => it.id === itemBefore2.id);
|
||||
expect(itemAfter2.content_storage_id).toBe(2);
|
||||
expect(itemAfter2.updated_time).toBe(itemBefore2.updated_time);
|
||||
|
||||
const toDriver = await loadStorageDriver(toStorageConfig, db());
|
||||
const toContent = await toDriver.read(itemAfter.id, { models });
|
||||
|
||||
expect(toContent.toString()).toBe(fromContent.toString());
|
||||
});
|
||||
|
||||
});
|
||||
|
@ -8,16 +8,23 @@ import { Knex } from 'knex';
|
||||
import { ChangePreviousItem } from './ChangeModel';
|
||||
import { unique } from '../utils/array';
|
||||
import StorageDriverBase, { Context } from './items/storage/StorageDriverBase';
|
||||
import { DbConnection } from '../db';
|
||||
import { DbConnection, returningSupported } from '../db';
|
||||
import { Config, StorageDriverConfig, StorageDriverMode } from '../utils/types';
|
||||
import { NewModelFactoryHandler } from './factory';
|
||||
import loadStorageDriver from './items/storage/loadStorageDriver';
|
||||
import { msleep } from '../utils/time';
|
||||
import Logger from '@joplin/lib/Logger';
|
||||
|
||||
const mimeUtils = require('@joplin/lib/mime-utils.js').mime;
|
||||
|
||||
// Converts "root:/myfile.txt:" to "myfile.txt"
|
||||
const extractNameRegex = /^root:\/(.*):$/;
|
||||
|
||||
export interface ImportContentToStorageOptions {
|
||||
batchSize?: number;
|
||||
logger?: Logger;
|
||||
}
|
||||
|
||||
export interface SaveFromRawContentItem {
|
||||
name: string;
|
||||
body: Buffer;
|
||||
@ -268,6 +275,73 @@ export default class ItemModel extends BaseModel<Item> {
|
||||
}
|
||||
}
|
||||
|
||||
private async atomicMoveContent(item: Item, toDriver: StorageDriverBase, drivers: Record<number, StorageDriverBase>) {
|
||||
for (let i = 0; i < 10; i++) {
|
||||
let fromDriver: StorageDriverBase = drivers[item.content_storage_id];
|
||||
|
||||
if (!fromDriver) {
|
||||
fromDriver = await loadStorageDriver(item.content_storage_id, this.db);
|
||||
drivers[item.content_storage_id] = fromDriver;
|
||||
}
|
||||
|
||||
const content = await fromDriver.read(item.id, { models: this.models() });
|
||||
await toDriver.write(item.id, content, { models: this.models() });
|
||||
|
||||
const updatedRows = await this
|
||||
.db(this.tableName)
|
||||
.where('id', '=', item.id)
|
||||
.where('updated_time', '=', item.updated_time) // Check that the row hasn't changed while we were transferring the content
|
||||
.update({ content_storage_id: toDriver.storageId }, returningSupported(this.db) ? ['id'] : undefined);
|
||||
|
||||
if (!returningSupported(this.db) || updatedRows.length) return;
|
||||
|
||||
await msleep(1000 + 1000 * i);
|
||||
}
|
||||
|
||||
throw new Error(`Could not atomically update content for item: ${JSON.stringify(item)}`);
|
||||
}
|
||||
|
||||
// Loop throught the items in the database and import their content to the
|
||||
// target storage. Only items not already in that storage will be processed.
|
||||
public async importContentToStorage(toStorageConfig: StorageDriverConfig, options: ImportContentToStorageOptions = null) {
|
||||
options = {
|
||||
batchSize: 1000,
|
||||
logger: new Logger(),
|
||||
...options,
|
||||
};
|
||||
|
||||
const toStorageDriver = await this.loadStorageDriver(toStorageConfig);
|
||||
const fromDrivers: Record<number, StorageDriverBase> = {};
|
||||
|
||||
const itemCount = (await this.db(this.tableName)
|
||||
.count('id', { as: 'total' })
|
||||
.where('content_storage_id', '!=', toStorageDriver.storageId)
|
||||
.first())['total'];
|
||||
|
||||
let totalDone = 0;
|
||||
|
||||
while (true) {
|
||||
const items: Item[] = await this
|
||||
.db(this.tableName)
|
||||
.select(['id', 'content_storage_id', 'updated_time'])
|
||||
.where('content_storage_id', '!=', toStorageDriver.storageId)
|
||||
.limit(options.batchSize);
|
||||
|
||||
options.logger.info(`Processing items ${totalDone} / ${itemCount}`);
|
||||
|
||||
if (!items.length) {
|
||||
options.logger.info(`All items have been processed. Total: ${totalDone}`);
|
||||
return;
|
||||
}
|
||||
|
||||
for (const item of items) {
|
||||
await this.atomicMoveContent(item, toStorageDriver, fromDrivers);
|
||||
}
|
||||
|
||||
totalDone += items.length;
|
||||
}
|
||||
}
|
||||
|
||||
public async sharedFolderChildrenItems(shareUserIds: Uuid[], folderId: string, includeResources: boolean = true): Promise<Item[]> {
|
||||
if (!shareUserIds.length) throw new Error('User IDs must be specified');
|
||||
|
||||
|
@ -15,4 +15,8 @@ export default class StorageModel extends BaseModel<Storage> {
|
||||
return this.db(this.tableName).where('connection_string', connectionString).first();
|
||||
}
|
||||
|
||||
public async byId(id: number): Promise<Storage> {
|
||||
return this.db(this.tableName).where('id', id).first();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ import globalConfig from '../../../config';
|
||||
import { clientType, DbConnection } from '../../../db';
|
||||
import { StorageDriverConfig, StorageDriverType } from '../../../utils/types';
|
||||
import newModelFactory from '../../factory';
|
||||
import parseStorageDriverConnectionString from './parseStorageConnectionString';
|
||||
import serializeStorageConfig from './serializeStorageConfig';
|
||||
import StorageDriverBase from './StorageDriverBase';
|
||||
import StorageDriverDatabase from './StorageDriverDatabase';
|
||||
@ -12,7 +13,7 @@ export interface Options {
|
||||
assignDriverId?: boolean;
|
||||
}
|
||||
|
||||
export default async function(config: StorageDriverConfig, db: DbConnection, options: Options = null): Promise<StorageDriverBase | null> {
|
||||
export default async function(config: StorageDriverConfig | number, db: DbConnection, options: Options = null): Promise<StorageDriverBase | null> {
|
||||
if (!config) return null;
|
||||
|
||||
options = {
|
||||
@ -22,20 +23,30 @@ export default async function(config: StorageDriverConfig, db: DbConnection, opt
|
||||
|
||||
let storageId: number = 0;
|
||||
|
||||
if (options.assignDriverId) {
|
||||
if (typeof config === 'number') {
|
||||
storageId = config;
|
||||
|
||||
const models = newModelFactory(db, globalConfig());
|
||||
const storage = await models.storage().byId(storageId);
|
||||
if (!storage) throw new Error(`No such storage ID: ${storageId}`);
|
||||
|
||||
const connectionString = serializeStorageConfig(config);
|
||||
let storage = await models.storage().byConnectionString(connectionString);
|
||||
config = parseStorageDriverConnectionString(storage.connection_string);
|
||||
} else {
|
||||
if (options.assignDriverId) {
|
||||
const models = newModelFactory(db, globalConfig());
|
||||
|
||||
if (!storage) {
|
||||
await models.storage().save({
|
||||
connection_string: connectionString,
|
||||
});
|
||||
storage = await models.storage().byConnectionString(connectionString);
|
||||
const connectionString = serializeStorageConfig(config);
|
||||
let storage = await models.storage().byConnectionString(connectionString);
|
||||
|
||||
if (!storage) {
|
||||
await models.storage().save({
|
||||
connection_string: connectionString,
|
||||
});
|
||||
storage = await models.storage().byConnectionString(connectionString);
|
||||
}
|
||||
|
||||
storageId = storage.id;
|
||||
}
|
||||
|
||||
storageId = storage.id;
|
||||
}
|
||||
|
||||
if (config.type === StorageDriverType.Database) {
|
||||
|
@ -1,7 +1,7 @@
|
||||
import { StorageDriverConfig, StorageDriverType } from '../../../utils/types';
|
||||
import parseStorageDriverConnectionString from './parseStorageDriverConnectionString';
|
||||
import parseStorageConnectionString from './parseStorageConnectionString';
|
||||
|
||||
describe('parseStorageDriverConnectionString', function() {
|
||||
describe('parseStorageConnectionString', function() {
|
||||
|
||||
test('should parse a connection string', async function() {
|
||||
const testCases: Record<string, StorageDriverConfig> = {
|
||||
@ -26,17 +26,17 @@ describe('parseStorageDriverConnectionString', function() {
|
||||
};
|
||||
|
||||
for (const [connectionString, config] of Object.entries(testCases)) {
|
||||
const actual = parseStorageDriverConnectionString(connectionString);
|
||||
const actual = parseStorageConnectionString(connectionString);
|
||||
expect(actual).toEqual(config);
|
||||
}
|
||||
});
|
||||
|
||||
test('should detect errors', async function() {
|
||||
expect(() => parseStorageDriverConnectionString('Path=/path/to/dir')).toThrow(); // Type is missing
|
||||
expect(() => parseStorageDriverConnectionString('Type=')).toThrow();
|
||||
expect(() => parseStorageDriverConnectionString('Type;')).toThrow();
|
||||
expect(() => parseStorageDriverConnectionString('Type=DoesntExist')).toThrow();
|
||||
expect(() => parseStorageDriverConnectionString('Type=Filesystem')).toThrow();
|
||||
expect(() => parseStorageConnectionString('Path=/path/to/dir')).toThrow(); // Type is missing
|
||||
expect(() => parseStorageConnectionString('Type=')).toThrow();
|
||||
expect(() => parseStorageConnectionString('Type;')).toThrow();
|
||||
expect(() => parseStorageConnectionString('Type=DoesntExist')).toThrow();
|
||||
expect(() => parseStorageConnectionString('Type=Filesystem')).toThrow();
|
||||
});
|
||||
|
||||
});
|
@ -6,6 +6,7 @@ const parseType = (type: string): StorageDriverType => {
|
||||
if (type === 'Database') return StorageDriverType.Database;
|
||||
if (type === 'Filesystem') return StorageDriverType.Filesystem;
|
||||
if (type === 'Memory') return StorageDriverType.Memory;
|
||||
if (type === 'S3') return StorageDriverType.S3;
|
||||
throw new Error(`Invalid type: "${type}"`);
|
||||
};
|
||||
|
@ -4,6 +4,7 @@ const serializeType = (type: StorageDriverType): string => {
|
||||
if (type === StorageDriverType.Database) return 'Database';
|
||||
if (type === StorageDriverType.Filesystem) return 'Filesystem';
|
||||
if (type === StorageDriverType.Memory) return 'Memory';
|
||||
if (type === StorageDriverType.S3) return 'S3';
|
||||
throw new Error(`Invalid type: "${type}"`);
|
||||
};
|
||||
|
||||
|
@ -2,6 +2,7 @@ import yargs = require('yargs');
|
||||
import BaseCommand from '../commands/BaseCommand';
|
||||
import DbCommand from '../commands/DbCommand';
|
||||
import DeleteOldChangesCommand from '../commands/DeleteOldChangesCommand';
|
||||
import ImportContentCommand from '../commands/ImportContentCommand';
|
||||
import MigrateCommand from '../commands/MigrateCommand';
|
||||
|
||||
export interface Commands {
|
||||
@ -16,6 +17,7 @@ export default async function setupCommands(): Promise<Commands> {
|
||||
new MigrateCommand(),
|
||||
new DbCommand(),
|
||||
new DeleteOldChangesCommand(),
|
||||
new ImportContentCommand(),
|
||||
];
|
||||
|
||||
for (const cmd of commands) {
|
||||
|
@ -42,11 +42,11 @@ export function tempDirPath(): string {
|
||||
}
|
||||
|
||||
let tempDir_: string = null;
|
||||
export async function tempDir(): Promise<string> {
|
||||
if (tempDir_) return tempDir_;
|
||||
tempDir_ = tempDirPath();
|
||||
await fs.mkdirp(tempDir_);
|
||||
return tempDir_;
|
||||
export async function tempDir(subDir: string = null): Promise<string> {
|
||||
if (!tempDir_) tempDir_ = tempDirPath();
|
||||
const fullDir = tempDir_ + (subDir ? `/${subDir}` : '');
|
||||
await fs.mkdirp(fullDir);
|
||||
return fullDir;
|
||||
}
|
||||
|
||||
export async function makeTempFileWithContent(content: string | Buffer): Promise<string> {
|
||||
|
Loading…
Reference in New Issue
Block a user