-
Notifications
You must be signed in to change notification settings - Fork 0
/
dirnotes-cli
executable file
·465 lines (418 loc) · 18.9 KB
/
dirnotes-cli
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
#!/usr/bin/python3
# TODO: option to print out full path name; most useful in the .json output format
VERSION = "0.4"
import sys, argparse
# global mutables
answer_json = []
verbose = debug = 0
db = None
xattr_comment = "user.xdg.comment"
xattr_author = "user.xdg.comment.author"
xattr_date = "user.xdg.comment.date"
mode = "db"
#======= debugging/verbose ===========
def print_d(*a):
if debug:
print('>>', *a)
def errorBox(*a):
print(*a)
# >>> snip here <<<
#============ the DnDataBase, UiHelper and FileObj code is shared with other dirnotes programs
import getpass, time, stat, shutil, sqlite3, json, os, math
DEFAULT_CONFIG_FILE = "~/.config/dirnotes/dirnotes.conf" # or /etc/dirnotes.conf
# config
# we could store the config in the database, in a second table
# or in a .json file
DEFAULT_CONFIG = {"xattr_tag":"user.xdg.comment",
"database":"~/.local/share/dirnotes/dirnotes.db",
"start_mode":"xattr",
"options for database":("~/.local/share/dirnotes/dirnotes.db","~/.dirnotes.db","/etc/dirnotes.db"),
"options for start_mode":("db","xattr")
}
class ConfigLoader: # singleton
def __init__(self, configFile):
configFile = os.path.expanduser(configFile)
try:
with open(configFile,"r") as f:
config = json.load(f)
except json.JSONDecodeError:
errorBox(f"problem reading config file {configFile}; check the JSON syntax")
config = DEFAULT_CONFIG
except FileNotFoundError:
errorBox(f"config file {configFile} not found; using the default settings")
config = DEFAULT_CONFIG
try:
os.makedirs(os.path.dirname(configFile),exist_ok = True)
with open(configFile,"w") as f:
json.dump(config,f,indent=4)
except:
errorBox(f"problem creating the config file {configFile}")
self.dbName = os.path.expanduser(config["database"])
self.mode = config["start_mode"] # can get over-ruled by the command line options
self.xattr_comment = config["xattr_tag"]
class DnDataBase:
''' the database is flat
fileName: fully qualified name
st_mtime: a float
size: a long
comment: a string
comment_time: a float, the time of the comment save
author: the username that created the comment
this object: 1) finds or creates the database
2) determine if it's readonly
TODO: the database is usually associated with a user, in $XDG_DATA_HOME (~/.local/share/)
TODO: if the database is not found, create it in $XDG_DATA_DIRS (/usr/local/share/)
make it 0666 permissions (rw-rw-rw-)
'''
def __init__(self,dbFile):
'''try to open the database; if not found, create it'''
try:
self.db = sqlite3.connect(dbFile)
except sqlite3.OperationalError:
print_d(f"Database {dbFile} not found")
try:
os.makedirs(os.path.dirname(dbFile), exist_ok = True)
self.db = sqlite3.connect(dbFile)
except (sqlite3.OperationalError, PermissionError):
printd(f"Failed to create {dbFile}, aborting")
raise
# create new table if it doesn't exist
try:
self.db.execute("select * from dirnotes")
except sqlite3.OperationalError:
self.db.execute("create table dirnotes (name TEXT, date DATETIME, size INTEGER, comment TEXT, comment_date DATETIME, author TEXT)")
self.db.execute("create index dirnotes_i on dirnotes(name)")
print_d(f"Table dirnotes created")
# at this point, if a shared database is required, somebody needs to set perms to 0o666
self.writable = True
try:
self.db.execute("pragma user_verson=0")
except sqlite3.OperationalError:
self.writable = False
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
class UiHelper:
@staticmethod
def epochToDb(epoch):
return time.strftime(DATE_FORMAT,time.localtime(epoch))
@staticmethod
def DbToEpoch(dbTime):
return time.mktime(time.strptime(dbTime,DATE_FORMAT))
@staticmethod
def getShortDate(longDate):
now = time.time()
diff = now - longDate
if diff > YEAR:
fmt = "%b %e %Y"
else:
fmt = "%b %e %H:%M"
return time.strftime(fmt, time.localtime(longDate))
@staticmethod
def getShortSize(fo):
if fo.isDir():
return " <DIR> "
elif fo.isLink():
return " <LINK>"
size = fo.getSize()
log = int((math.log10(size+1)-2)/3)
s = " KMGTE"[log]
base = int(size/math.pow(10,log*3))
return f"{base}{s}".strip().rjust(7)
## one for each file
## and a special one for ".." parent directory
class FileObj:
""" The FileObj knows about both kinds of comments. """
def __init__(self, fileName, db):
self.fileName = os.path.abspath(fileName) # full path; dirs end WITHOUT a terminal /
self.stat = os.lstat(self.fileName)
self.displayName = os.path.split(fileName)[1] # base name; dirs end with a /
if self.isDir():
if not self.displayName.endswith('/'):
self.displayName += '/'
self.date = self.stat.st_mtime
self.size = self.stat.st_size
self.db = db
def getName(self):
""" returns the absolute pathname """
return self.fileName
def getDisplayName(self):
""" returns just the basename of the file; dirs end in / """
return self.displayName
def getDbData(self):
""" returns (comment, author, comment_date) """
if not hasattr(self,'dbCommentAuthorDate'):
cad = self.db.execute("select comment, author, comment_date from dirnotes where name=? order by comment_date desc",(self.fileName,)).fetchone()
self.dbCommentAuthorDate = cad if cad else (None, None, None)
return self.dbCommentAuthorDate
def getDbComment(self):
return self.getDbData()[0]
def getXattrData(self):
""" returns (comment, author, comment_date) """
if not hasattr(self,'xattrCommentAuthorDate'):
c = a = d = None
try:
c = os.getxattr(self.fileName, xattr_comment, follow_symlinks=False).decode()
a = os.getxattr(self.fileName, xattr_author, follow_symlinks=False).decode()
d = os.getxattr(self.fileName, xattr_date, follow_symlinks=False).decode()
except: # no xattr comment
pass
self.xattrCommentAuthorDate = c,a,d
return self.xattrCommentAuthorDate
def getXattrComment(self):
return self.getXattrData()[0]
def setDbComment(self,newComment):
# how are we going to hook this?
#if not self.db.writable:
# errorBox("The database is readonly; you cannot add or edit comments")
# return
s = os.lstat(self.fileName)
try:
print_d(f"setDbComment db {self.db}, file: {self.fileName}")
self.db.execute("insert into dirnotes (name,date,size,comment,comment_date,author) values (?,datetime(?,'unixepoch','localtime'),?,?,datetime(?,'unixepoch','localtime'),?)",
(self.fileName, s.st_mtime, s.st_size,
str(newComment), time.time(), getpass.getuser()))
self.db.commit()
self.dbCommentAuthorDate = newComment, getpass.getuser(), UiHelper.epochToDb(time.time())
except sqlite3.OperationalError:
print_d("database is locked or unwritable")
errorBox("the database that stores comments is locked or unwritable")
def setXattrComment(self,newComment):
print_d(f"set comment {newComment} on file {self.fileName}")
try:
os.setxattr(self.fileName,xattr_comment,bytes(newComment,'utf8'),follow_symlinks=False)
os.setxattr(self.fileName,xattr_author,bytes(getpass.getuser(),'utf8'),follow_symlinks=False)
os.setxattr(self.fileName,xattr_date,bytes(time.strftime(DATE_FORMAT),'utf8'),follow_symlinks=False)
self.xattrCommentAuthorDate = newComment, getpass.getuser(), time.strftime(DATE_FORMAT)
return True
# we need to move these cases out to a handler
except Exception as e:
if self.isLink():
errorBox("Linux does not allow xattr comments on symlinks; comment is stored in database")
elif self.isSock():
errorBox("Linux does not allow comments on sockets; comment is stored in database")
elif os.access(self.fileName, os.W_OK)!=True:
errorBox(f"you don't appear to have write permissions on this file: {self.fileName}")
# change the listbox background to yellow
elif "Errno 95" in str(e):
errorBox("is this a VFAT or EXFAT volume? these don't allow comments")
return False
def getComment(self,mode):
""" returns the comment for the given mode """
return self.getDbComment() if mode == "db" else self.getXattrComment()
def getOtherComment(self,mode):
return self.getDbComment() if mode == "xattr" else self.getXattrComment()
def getData(self,mode):
""" returns (comment, author, comment_date) for the given mode """
return self.getDbData() if mode == "db" else self.getXattrData()
def getOtherData(self,mode):
""" returns (comment, author, comment_date) for the 'other' mode """
return self.getDbData() if mode == "xattr" else self.getXattrData()
def getDate(self):
return self.date
def getSize(self):
return self.size
def isDir(self):
return stat.S_ISDIR(self.stat.st_mode)
def isLink(self):
return stat.S_ISLNK(self.stat.st_mode)
def isSock(self):
return stat.S_ISSOCK(self.stat.st_mode)
def copyFile(self, dest, doMove = False):
""" dest is either a FQ filename or a FQ directory, to be expanded with same basename """
# NOTE: this method copies the xattr (comment + old author + old date)
# but creates new db (comment + this author + new date)
if os.path.isdir(dest):
dest = os.path.join(destDir,self.displayName)
try:
print_d("try copy from",self.fileName,"to",dest)
# shutil methods preserve dates & chmod/chown & xattr
if doMove:
shutil.move(self.fileName, dest)
else:
shutil.copy2(self.fileName, dest)
# can raise FileNotFoundError, Permission Error, shutil.SameFileError, IsADirectoryError
except:
errorBox(f"file copy/move to <{dest}> failed; check permissions")
return
# and copy the database record
f = FileObj(dest, self.db)
f.setDbComment(self.getDbComment())
def moveFile(self, dest):
""" dest is either a FQ filename or a FQ directory, to be expanded with same basename """
self.copyFile(dest, doMove = True)
# >>> snip here <<<
#============= the functions that are called from the main.loop ===============
def file_copy(f,target,target_is_dir,is_copy,force):
print_d(f"call file_copy/move with args={target},{target_is_dir} and {force}")
dest = target if not target_is_dir else os.path.join(target,f.getDisplayName())
if os.path.exists(dest) and not force:
go = input("The copy/move target <<" + dest + ">> exists. Overwrite? (y or n) ")
if go != 'y' and go != 'Y':
return
print_d(f"copy/move from {f} to {dest}")
if is_copy:
f.copyFile(dest)
else:
f.moveFile(dest)
def file_zap(f,all_flag):
db = f.db
print_d(f"zapping the comment history of {f.getName()}")
if all_flag:
confirm = input("You requested a complete flush of the comment database history. Please hit 'Y' to confirm")
if confirm == 'Y':
print_d("zapping the entire database")
db.execute("delete from dirnotes where comment_date < (select max(comment_date) from dirnotes d2 where d2.name = dirnotes.name)")
else:
db.execute("delete from dirnotes where name=? and comment_date < (select max(comment_date) from dirnotes where name=?)",(f.getName(),f.getName()))
db.commit()
def file_modify_comment(f, create, append, erase):
print_d(f"modify the comment on file {f} with extra={(create,append,erase)}")
if not os.path.exists(f.getName()):
print(f"the target file does not exist; please check the spelling of the file: {f}")
sys.exit(10)
if create:
f.setXattrComment(create)
f.setDbComment(create)
elif append:
c = f.getComment(mode)
f.setXattrComment(f"{c}; {append}")
f.setDbComment(f"{c}; {append}")
elif erase:
f.setXattrComment('')
f.setDbComment('')
def file_display(f, listall, json, minimal):
fn = f.getDisplayName()
print_d(f"list file details {fn}")
c,a,d = f.getData(mode)
c1,a1,d1 = f.getOtherData(mode)
diffFlag = '*' if c and (c != c1) else ''
if c or listall:
if not json:
if minimal:
print(f"{c}{diffFlag}")
elif verbose:
print(f"{f.getName()}: {repr(c)}{diffFlag}, {repr(a)}, {repr(d)}")
else:
print(f"{fn}: {repr(c)}{diffFlag}")
else:
entry = {"file":fn, "comment":c}
if verbose:
entry.update({"file":f.getName(),"author":a, "date": d})
if diffFlag:
entry["diffFlag"] = True
answer_json.append(entry)
def file_history(f,json):
db = f.db
c = db.execute("select comment, author, comment_date from dirnotes where name=? order by comment_date desc",(f.getName(),))
if not json:
print(f"file: \t\t{f.getName()}\n")
else:
answer_json.append ( {"file":f.getName()} )
for a in c.fetchall():
if not json:
print(f"comment: \t{a[0]}\nauthor: \t{a[1]}\t\tdate: \t\t{a[2]}\n")
else:
answer_json.append( {"comment":a[0],"author":a[1],"date":a[2]} )
def main(args):
parser = argparse.ArgumentParser(description="Display or add comments to files",
epilog="Some options conflict. Use only one of: -l -c -a -H -e -z -Z and one of -d -x")
parser.add_argument('-V',"--version", action="version", version=f"dncli ver:{VERSION}")
parser.add_argument('-v',"--verbose", action='count', help="verbose output (include comment author & date)",default=0)
parser.add_argument('-D',"--debug", action='store_true',help="include debugging output; do not use in scripts",default=0)
parser.add_argument('-j',"--json", action="store_true",help="output in JSON format")
pars_m = parser.add_mutually_exclusive_group()
pars_m.add_argument('-l',"--listall", action="store_true",help="list all files, including those without comments")
parser.add_argument('-d',"--db", action="store_true",help="list comments from database")
parser.add_argument('-x',"--xattr", action="store_true",help="list comments from xattr")
parser.add_argument('-n',"--minimal", action="store_true",help="output only comments; useful in scripting")
parser.add_argument('-H',"--history", action="store_true",help="output the history of database comments for a file")
pars_m.add_argument('-c',"--create", metavar="comment", help="add a comment to a file")
pars_m.add_argument('-a',"--append", metavar="comment", help="append to a comment on a file, separator=';'")
pars_m.add_argument('-C',"--copy", action="store_true",help="copy a file with its comments")
pars_m.add_argument('-M',"--move", action="store_true",help="move a file with its comments")
parser.add_argument('-y',"--cp_force",action="store_true",help="copy over existing files")
pars_m.add_argument('-e',"--erase", action="store_true",help="erase the comment on a file")
pars_m.add_argument('-z',"--zap", action="store_true",help="clear the database comment history on a file")
pars_m.add_argument('-Z',"--zapall", action="store_true",help="clear the comment history in the entire database")
parser.add_argument( "--config", dest="config_file", help="use config file (default ~/.config/dirnotes/dirnotes.conf)")
parser.add_argument('file_list',nargs='*',help="file(s); list commands may omit this")
args = parser.parse_args()
# default is to display all files that have comments
# major modes are: display (<none> -l -H), add-comment (-a -c -e), clear-history(-z -Z), copy (-C)
# determine the major mode, then apply an appropriate function over the file_list
args.display = not (args.create or args.append or args.copy or args.erase or args.zap or args.zapall)
if args.cp_force and not args.copy:
print("the -y/--cp_force options can only be used with the -C/--copy command")
sys.exit(3)
if args.json and not args.display:
print("the -j/--json option can only be used with the display modes")
sys.exit(4)
if args.minimal and not args.display:
print("the -n/--minimal option only applies to the display modes")
sys.exit(5)
if args.history and not args.display:
print("the -H/--history option only applies to the display modes")
sys.exit(5)
if args.xattr and (args.zap or args.zapall):
print("the -x/--xattr option doesn't apply to the -z/--zap and -Z/--zapall commands")
sys.exit(7)
global verbose, debug
verbose = args.verbose
debug = args.debug
config = ConfigLoader(args.config_file or DEFAULT_CONFIG_FILE)
global mode
mode = config.mode
mode = "xattr" if args.xattr else ("db" if args.db else mode)
db = DnDataBase(config.dbName).db
#====== 1) build the file list =============
files = args.file_list
# for the list commands, auto-fill the file list with the current directory
if not files and args.display:
files = os.listdir(".")
files.sort()
# other command require explicity file lists
if not files:
print("please specify a file or files to use")
sys.exit(10)
print_d("got the files:", files)
#======= 2) build the function
if args.create or args.append or args.erase:
print_d(f"create/append/erase: {args.create} . {args.append} . {args.erase}")
func = file_modify_comment
loop_args = (args.create, args.append, args.erase)
elif args.zap or args.zapall:
print_d(f"got a zap command {args.zap} . {args.zapall}")
func = file_zap
loop_args = (args.zapall,)
if args.zapall:
files = ('.',)
elif args.copy or args.move:
print_d(f"got a copy/move command to copy={args.copy}, move={args.move}")
# the last item on the file list is the target
n_files = len(files)
if n_files < 2:
print("the copy/move command requires at least two arguments, the last one is the destination")
sys.exit(1)
files, target = files[:-1], files[-1]
target_is_directory = os.path.isdir(target)
print_d(f"copy/move from {files} to {target}")
if n_files > 2 and not target_is_directory:
print("multiple copy/move files must go to a target directory")
sys.exit(3)
func = file_copy
loop_args = (target, target_is_directory, args.copy, args.cp_force)
elif args.history:
func = file_history
loop_args = (args.json,)
else:
assert args.display
print_d(f"list files using {mode} priority")
print_d(f"display command with option: {args.listall} and {args.history} and {args.json} and {args.minimal}")
loop_args = (args.listall, args.json, args.minimal)
func = file_display
#====== 3) loop on the list, execute the function =============
for f in files:
func(FileObj(f,db),*loop_args)
if answer_json:
print(json.dumps(answer_json))
if __name__ == "__main__":
main(sys.argv)