Implemented JSON & Pickle storage's

This commit is contained in:
Alex Root Junior 2018-09-22 00:42:17 +03:00
parent a44b344835
commit e21fd05004
2 changed files with 59 additions and 0 deletions

View file

@ -0,0 +1,58 @@
import json
import pathlib
import pickle
import typing
from .memory import MemoryStorage
class _FileStorage(MemoryStorage):
def __init__(self, path: typing.Union[pathlib.Path, str]):
"""
:param path: file path
"""
super(_FileStorage, self).__init__()
path = self.path = pathlib.Path(path)
try:
self.data = self.read(path)
except FileNotFoundError:
pass
async def close(self):
self.write(self.path)
await super(_FileStorage, self).close()
def read(self, path: pathlib.Path):
raise NotImplementedError
def write(self, path: pathlib.Path):
raise NotImplementedError
class JSONStorage(_FileStorage):
"""
JSON File storage based on MemoryStorage
"""
def read(self, path: pathlib.Path):
with path.open('r') as f:
return json.load(f)
def write(self, path: pathlib.Path):
with path.open('w') as f:
return json.dump(self.data, f, indent=4)
class PickleStorage(_FileStorage):
"""
Pickle File storage based on MemoryStorage
"""
def read(self, path: pathlib.Path):
with path.open('rb') as f:
return pickle.load(f)
def write(self, path: pathlib.Path):
with path.open('wb') as f:
return pickle.dump(self.data, f, protocol=pickle.HIGHEST_PROTOCOL)

View file

@ -1,3 +1,4 @@
import pathlib
import typing
from ...dispatcher.storage import BaseStorage